Skip to content

Instantly share code, notes, and snippets.

@MikeRzhevsky
Last active April 21, 2022 15:13
Show Gist options
  • Save MikeRzhevsky/2239a35c1ba17916532c570989df045f to your computer and use it in GitHub Desktop.
Save MikeRzhevsky/2239a35c1ba17916532c570989df045f to your computer and use it in GitHub Desktop.
from dagster import job, op, Failure, sensor, RunRequest
from dagster_celery import celery_executor
from dagster.core.storage.pipeline_run import (
RunsFilter,
PipelineRunStatus,
)
import time
#def sensor_A():
# yield RunRequest(run_key=None, run_config={})
@sensor(job=test_job,minimum_interval_seconds=10)
def run_sensor(context):
try:
run_records = context.instance.get_run_records(
RunsFilter(
job_name = context.pipeline_name,
statuses=[PipelineRunStatus.STARTED],
)
)
context.log.info(f'Total running {len(run_records)}')
if (len(run_records) > 1):
raise Failure(
description="Another instance of the job is currently running",
)
except Exception as e:
time.sleep(60)
yield RunRequest(
run_key=None,
run_config={
"ops": {"hello(get_name(is_running_test()))"}
},
)
@op
def get_context(context):
context.log.info(f'Start!')
@job(executor_def=celery_executor)
def run_job():
get_context(run_test_job())
if __name__ == "__main__":
result = run_job.execute_in_process()
@MikeRzhevsky
Copy link
Author

from dagster import job, op, Failure, sensor, RunRequest
from dagster_celery import celery_executor
from dagster.core.storage.pipeline_run import (
RunsFilter,
PipelineRunStatus,
)
import time

@sensor(job=test_job,minimum_interval_seconds=10)
def run_sensor(context):
run_records = context.instance.get_run_records(
RunsFilter(
job_name = context.pipeline_name,
#pipeline_name ?
statuses=[PipelineRunStatus.STARTED],
)
)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment