Last active
October 4, 2024 20:43
-
-
Save cnolanminich/554dc45ae164b36e6a0cfa08906a9af1 to your computer and use it in GitHub Desktop.
run status sensor that only runs once both upstream jobs complete
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dagster import run_status_sensor, DagsterRunStatus, RunRequest, SkipReason, job, sensor, define_asset_job, SensorEvaluationContext, DagsterInstance, RunsFilter, Definitions, asset, AssetExecutionContext | |
from datetime import datetime, timedelta | |
import json | |
@asset | |
def first_asset(context: AssetExecutionContext) -> None: | |
context.log.info("First asset") | |
@asset | |
def second_asset(context: AssetExecutionContext) -> None: | |
context.log.info("Second asset") | |
@asset | |
def third_asset(context: AssetExecutionContext) -> None: | |
context.log.info("Second asset") | |
# Define the upstream jobs | |
upstream_job_1 = define_asset_job(name = "upstream_job_1", selection = "first_asset") | |
upstream_job_2 = define_asset_job(name = "upstream_job_2", selection = "second_asset") | |
downstream_job = define_asset_job(name = "downstream_job", selection = "third_asset") | |
# Define the job names for the upstream jobs | |
UPSTREAM_JOB_A = "upstream_job_1" | |
UPSTREAM_JOB_B = "upstream_job_2" | |
# Define the downstream job | |
DOWNSTREAM_JOB = "downstream_job" | |
@sensor(job_name=DOWNSTREAM_JOB) | |
def compare_completion_times_sensor(context: SensorEvaluationContext): | |
instance = context.instance | |
now = datetime.now() | |
one_day_ago = now - timedelta(days=1) | |
# Create filters for the upstream jobs | |
filter_a = RunsFilter(job_name=UPSTREAM_JOB_A, statuses=[DagsterRunStatus.SUCCESS]) | |
filter_b = RunsFilter(job_name=UPSTREAM_JOB_B, statuses=[DagsterRunStatus.SUCCESS]) | |
# Retrieve the most recent successful run for each upstream job | |
run_records_a = instance.get_run_records(filters=filter_a, limit=1, order_by="update_timestamp", ascending=False) | |
run_records_b = instance.get_run_records(filters=filter_b, limit=1, order_by="update_timestamp", ascending=False) | |
# Check if both runs exist and were completed within the past day | |
if not run_records_a or not run_records_b: | |
return SkipReason("One or both upstream jobs have not completed successfully.") | |
completion_time_a = run_records_a[0].end_time | |
completion_time_b = run_records_b[0].end_time | |
# Deserialize the cursor to get the previous completion times | |
previous_cursor = json.loads(context.cursor) if context.cursor else {} | |
previous_completion_time_a = previous_cursor.get("completion_time_a") | |
previous_completion_time_b = previous_cursor.get("completion_time_b") | |
# Convert previous completion times to datetime objects if necessary | |
if isinstance(previous_completion_time_a, float): | |
previous_completion_time_a = datetime.fromtimestamp(previous_completion_time_a) | |
if isinstance(previous_completion_time_b, float): | |
previous_completion_time_b = datetime.fromtimestamp(previous_completion_time_b) | |
# Check if both completion times are newer than the previous ones and within the past day | |
completion_time_a = datetime.fromtimestamp(completion_time_a) | |
completion_time_b = datetime.fromtimestamp(completion_time_b) | |
if (completion_time_a and completion_time_b and | |
completion_time_a > one_day_ago and completion_time_b > one_day_ago and | |
(not previous_completion_time_a or completion_time_a > previous_completion_time_a) and | |
(not previous_completion_time_b or completion_time_b > previous_completion_time_b)): | |
# Update the cursor with the new completion times | |
new_cursor = json.dumps({ | |
"completion_time_a": completion_time_a.timestamp(), | |
"completion_time_b": completion_time_b.timestamp(), | |
}) | |
context.update_cursor(new_cursor) | |
return RunRequest(run_key=None, run_config={}) | |
else: | |
return SkipReason("One or both upstream jobs did not complete within the past day.") | |
# Add the sensor to the Definitions object | |
defs = Definitions( | |
assets = [first_asset, second_asset, third_asset], | |
jobs=[upstream_job_1, upstream_job_2, downstream_job], | |
sensors=[compare_completion_times_sensor] | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment