Created
September 2, 2019 07:43
-
-
Save francbartoli/289ab292773fd5886e26c489bb1941d1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# routes | |
from fastapi import APIRouter, Depends, HTTPException | |
from starlette.status import HTTP_201_CREATED | |
from starlette.responses import JSONResponse | |
from sqlalchemy.orm import Session | |
from app.core.config import WPS_PROCESS_LINK | |
from app import crud | |
from app.api.utils.db import get_db | |
from app.models.job import ( | |
jobCollection, | |
JobCreate, | |
statusInfo, | |
statusEnum | |
) | |
from app.core.celery_app import celery_app | |
import logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
router = APIRouter() | |
@router.post( | |
"/processes/{id}/jobs/", | |
operation_id="createJob", | |
status_code=HTTP_201_CREATED, | |
include_in_schema=True | |
) | |
def create_wps_job_by_process( | |
*, | |
db: Session = Depends(get_db), | |
id: str, | |
job_in: JobCreate, | |
current_user: DBUser = Depends(get_current_active_user), | |
): | |
""" | |
Create new job for a wps process. | |
""" | |
process = crud.process.get_by_id(db_session=db, id=id) | |
if not process: | |
raise HTTPException( | |
status_code=404, | |
detail=f"The process with id {id} does not exist." | |
) | |
job = crud.job.create( | |
db_session=db, | |
job_in=job_in, | |
process_id=process.pid, | |
owner_id=current_user.id | |
) | |
base_url = WPS_PROCESS_LINK["href"] | |
job_id = job.jid | |
location = f"{base_url}{id}/jobs/{job_id}" | |
headers = {"Location": f"{location}"} | |
if job: | |
# the conditional order drives the precedence of async/sync for now | |
# It could be something like ?c=sync/async | |
if jobControlOptions.ASYNC in process.jobControlOptions: | |
logger.info(f"======> Launch async task for jobID {job_id}") | |
celery_app.send_task( | |
"async_buffer", | |
args=[job.jid, location] | |
) | |
return JSONResponse( | |
content=None, | |
headers=headers, | |
status_code=HTTP_201_CREATED | |
) | |
elif jobControlOptions.SYNC in process.jobControlOptions: | |
# @TODO: Implement an abstract way to get the process | |
# from the catalog | |
pass | |
# celery_app | |
from celery import Celery | |
celery_app = Celery("worker", broker="amqp://guest@queue//") | |
celery_app.conf.task_routes = { | |
"app.worker.test_celery": "main-queue", | |
"async_buffer": "main-queue" | |
} | |
#app/tasks/base.py | |
import sqlalchemy | |
from app.core.celery_app import celery_app | |
from sqlalchemy.orm import sessionmaker | |
from sqlalchemy.orm import scoped_session | |
from app.db.session import engine | |
class WPSTask(celery_app.Task): | |
def __call__(self, *args, **kwargs): | |
self.engine = engine | |
session_factory = sessionmaker( | |
autocommit=False, autoflush=False, bind=self.engine | |
) | |
self.session = scoped_session(session_factory) | |
return super().__call__(*args, **kwargs) | |
def after_return(self, status, retval, task_id, args, kwargs, einfo): | |
if hasattr(self, 'session'): | |
self.session.remove() | |
if hasattr(self, 'engine'): | |
self.engine.engine.dispose() | |
from typing import List | |
from pydantic import UUID4, UrlStr | |
from app.db_models.job import Job | |
from app.core.celery_app import celery_app | |
from app.tasks.base import WPSTask | |
from app.api.utils.processes.buffer.buffer import Buffer | |
from app.api.utils.processes.hello.hello import getResult | |
from app.models.job import statusEnum | |
from app.models.common import link as Link | |
from app.core.config import ( | |
StatusMessage, | |
ApplicationType, | |
WPSRel, | |
Lang, | |
Title | |
) | |
# app/worker.py | |
import logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
@celery_app.task(acks_late=True) | |
def test_celery(word: str): | |
return f"test task return {word}" | |
@celery_app.task( | |
bind=True, | |
base=WPSTask, | |
name="async_buffer", | |
acks_late=True | |
) | |
def async_buffer(self, job_jid: UUID4, location: UrlStr): | |
logger.info("======> Let's try to execute the query") | |
logger.info(f"======> The unique value of UUID for the job is {job_jid}") | |
job = self.session.query(Job).get(job_jid) | |
logger.info(f"======> Status for the job is {job.status}") | |
self_link = Link( | |
href=f"{location}", | |
rel=WPSRel.SELF.value, | |
type=ApplicationType.JSON.value, | |
hreflang=Lang.EN.value, | |
title=Title.SELF.value | |
) | |
logger.info(f"======> Self link is {self_link}") | |
job_links = [self_link.dict()] | |
job.links = job_links | |
self.session.commit() | |
p = Buffer(inputs=job.inputs, outputs=job.outputs) | |
# p.run() | |
# save result in the Job model | |
logger.info("======> Let's try to save the result") | |
result = getResult() # p.result | |
if result: | |
result_link = Link( | |
href=f"{location}/result", | |
rel=WPSRel.RESULT.value, | |
type=ApplicationType.JSON.value, | |
hreflang=Lang.EN.value, | |
title=Title.RESULT.value | |
) | |
job.progress = 100 | |
job.status = statusEnum.SUCCESSFUL.value | |
job.message = StatusMessage.SUCCESSFUL.value | |
job_links = job_links + [result_link.dict()] | |
logger.info(f"======> job_links has value {job_links}") | |
job.links = job_links | |
job.result = result.dict() | |
self.session.commit() | |
json_result = result.json() | |
logger.info(f"======> Job record refreshed") | |
logger.info(f"======> The new value for the status of the job is {job.status}") | |
logger.info(f"======> Result dictionary for the job {job_jid} is \n{json_result}") | |
return f"async buffer finished" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment