Skip to content

Instantly share code, notes, and snippets.

@cnolanminich
Last active October 4, 2024 20:43
Show Gist options
  • Save cnolanminich/554dc45ae164b36e6a0cfa08906a9af1 to your computer and use it in GitHub Desktop.
Save cnolanminich/554dc45ae164b36e6a0cfa08906a9af1 to your computer and use it in GitHub Desktop.
run status sensor that only runs once both upstream jobs complete
from dagster import run_status_sensor, DagsterRunStatus, RunRequest, SkipReason, job, sensor, define_asset_job, SensorEvaluationContext, DagsterInstance, RunsFilter, Definitions, asset, AssetExecutionContext
from datetime import datetime, timedelta
import json
@asset
def first_asset(context: AssetExecutionContext) -> None:
context.log.info("First asset")
@asset
def second_asset(context: AssetExecutionContext) -> None:
context.log.info("Second asset")
@asset
def third_asset(context: AssetExecutionContext) -> None:
context.log.info("Second asset")
# Define the upstream jobs
upstream_job_1 = define_asset_job(name = "upstream_job_1", selection = "first_asset")
upstream_job_2 = define_asset_job(name = "upstream_job_2", selection = "second_asset")
downstream_job = define_asset_job(name = "downstream_job", selection = "third_asset")
# Define the job names for the upstream jobs
UPSTREAM_JOB_A = "upstream_job_1"
UPSTREAM_JOB_B = "upstream_job_2"
# Define the downstream job
DOWNSTREAM_JOB = "downstream_job"
@sensor(job_name=DOWNSTREAM_JOB)
def compare_completion_times_sensor(context: SensorEvaluationContext):
instance = context.instance
now = datetime.now()
one_day_ago = now - timedelta(days=1)
# Create filters for the upstream jobs
filter_a = RunsFilter(job_name=UPSTREAM_JOB_A, statuses=[DagsterRunStatus.SUCCESS])
filter_b = RunsFilter(job_name=UPSTREAM_JOB_B, statuses=[DagsterRunStatus.SUCCESS])
# Retrieve the most recent successful run for each upstream job
run_records_a = instance.get_run_records(filters=filter_a, limit=1, order_by="update_timestamp", ascending=False)
run_records_b = instance.get_run_records(filters=filter_b, limit=1, order_by="update_timestamp", ascending=False)
# Check if both runs exist and were completed within the past day
if not run_records_a or not run_records_b:
return SkipReason("One or both upstream jobs have not completed successfully.")
completion_time_a = run_records_a[0].end_time
completion_time_b = run_records_b[0].end_time
# Deserialize the cursor to get the previous completion times
previous_cursor = json.loads(context.cursor) if context.cursor else {}
previous_completion_time_a = previous_cursor.get("completion_time_a")
previous_completion_time_b = previous_cursor.get("completion_time_b")
# Convert previous completion times to datetime objects if necessary
if isinstance(previous_completion_time_a, float):
previous_completion_time_a = datetime.fromtimestamp(previous_completion_time_a)
if isinstance(previous_completion_time_b, float):
previous_completion_time_b = datetime.fromtimestamp(previous_completion_time_b)
# Check if both completion times are newer than the previous ones and within the past day
completion_time_a = datetime.fromtimestamp(completion_time_a)
completion_time_b = datetime.fromtimestamp(completion_time_b)
if (completion_time_a and completion_time_b and
completion_time_a > one_day_ago and completion_time_b > one_day_ago and
(not previous_completion_time_a or completion_time_a > previous_completion_time_a) and
(not previous_completion_time_b or completion_time_b > previous_completion_time_b)):
# Update the cursor with the new completion times
new_cursor = json.dumps({
"completion_time_a": completion_time_a.timestamp(),
"completion_time_b": completion_time_b.timestamp(),
})
context.update_cursor(new_cursor)
return RunRequest(run_key=None, run_config={})
else:
return SkipReason("One or both upstream jobs did not complete within the past day.")
# Add the sensor to the Definitions object
defs = Definitions(
assets = [first_asset, second_asset, third_asset],
jobs=[upstream_job_1, upstream_job_2, downstream_job],
sensors=[compare_completion_times_sensor]
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment