Skip to content

Instantly share code, notes, and snippets.

@seandavi
Last active April 15, 2025 04:26
Show Gist options
  • Save seandavi/58b4e6c36befb0f6732e1f7a6d63b30c to your computer and use it in GitHub Desktop.
Save seandavi/58b4e6c36befb0f6732e1f7a6d63b30c to your computer and use it in GitHub Desktop.
Run a google cloud batch job set up in python
import argparse
import fsspec
from omicidx.sra import parser as sp
import sys
import orjson
parser = argparse.ArgumentParser(
prog='Parser SRA metadata xml',
description='Parse SRA metadata xml into jsonlines',
epilog='Example: python abc.py https://ftp.ncbi.nlm.nih.gov/sra/reports/Mirroring/NCBI_SRA_Mirroring_20230729/meta_sample_set.xml.gz test.ndjson.gz')
parser.add_argument('url', help='url to SRA metadata xml.gz file')
parser.add_argument('outfile', help='output file name')
args = parser.parse_args()
url = args.url
outfile = args.outfile
with fsspec.open(url, mode='rb', compression='gzip') as f:
parser = sp.sra_object_generator(f)
with fsspec.open(outfile, 'wb', compression='gzip') as out:
for obj in parser:
out.write(orjson.dumps(obj.data) + b'\n')
print(f"Done: {url} -> {outfile}")
from google.cloud import batch_v1
def create_container_job(project_id: str, region: str, job_name: str, url: str, outfile: str) -> batch_v1.Job:
"""
This method shows how to create a sample Batch Job that will run
a simple command inside a container on Cloud Compute instances.
Args:
project_id: project ID or project number of the Cloud project you want to use.
region: name of the region you want to use to run the job. Regions that are
available for Batch are listed on: https://cloud.google.com/batch/docs/get-started#locations
job_name: the name of the job that will be created.
It needs to be unique for each project and region pair.
Returns:
A job object representing the job created.
"""
client = batch_v1.BatchServiceClient()
# Define what will be done as part of the job.
runnable = batch_v1.Runnable()
runnable.container = batch_v1.Runnable.Container()
runnable.container.image_uri = "seandavi/omicidx1"
runnable.container.entrypoint = "python"
runnable.container.commands = [
"abc.py",
url,
outfile
]
# Jobs can be divided into tasks. In this case, we have only one task.
task = batch_v1.TaskSpec()
task.runnables = [runnable]
# We can specify what resources are requested by each task.
resources = batch_v1.ComputeResource()
resources.cpu_milli = 1000 # in milliseconds per cpu-second. This means the task requires 2 whole CPUs.
resources.memory_mib = 256 # in MiB
task.compute_resource = resources
task.max_retry_count = 2
task.max_run_duration = "20000s"
# Tasks are grouped inside a job using TaskGroups.
# Currently, it's possible to have only one task group.
group = batch_v1.TaskGroup()
group.task_count = 1
group.task_spec = task
# Policies are used to define on what kind of virtual machines the tasks will run on.
# In this case, we tell the system to use "e2-standard-4" machine type.
# Read more about machine types here: https://cloud.google.com/compute/docs/machine-types
policy = batch_v1.AllocationPolicy.InstancePolicy()
policy.machine_type = "e2-standard-2"
instances = batch_v1.AllocationPolicy.InstancePolicyOrTemplate()
instances.policy = policy
allocation_policy = batch_v1.AllocationPolicy()
allocation_policy.instances = [instances]
job = batch_v1.Job()
job.task_groups = [group]
job.allocation_policy = allocation_policy
job.labels = {"env": "testing", "type": "container"}
# We use Cloud Logging as it's an out of the box available option
job.logs_policy = batch_v1.LogsPolicy()
job.logs_policy.destination = batch_v1.LogsPolicy.Destination.CLOUD_LOGGING
create_request = batch_v1.CreateJobRequest()
create_request.job = job
create_request.job_id = job_name
# The job's parent is the region in which the job will run
create_request.parent = f"projects/{project_id}/locations/{region}"
return client.create_job(create_request)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--project_id", help="project ID", required=True)
parser.add_argument("--region", help="region", required=True)
parser.add_argument("--job_name", help="job name", required=True)
parser.add_argument("--url", help="url", required=True)
parser.add_argument("--outfile", help="outfile", required=True)
args = parser.parse_args()
print(create_container_job(args.project_id, args.region, args.job_name, args.url, args.outfile))
from python:3.11
RUN pip install --upgrade pip
RUN pip install omicidx
RUN pip install fsspec gcsfs s3fs
COPY abc.py .
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment