Skip to content

Instantly share code, notes, and snippets.

@jkislin
Created April 29, 2026 21:23
Show Gist options
  • Select an option

  • Save jkislin/22b42698586ba5ec37b764bbfa8af7ef to your computer and use it in GitHub Desktop.

Select an option

Save jkislin/22b42698586ba5ec37b764bbfa8af7ef to your computer and use it in GitHub Desktop.
This is a snippet of various things you can try to stress-test dagster declarative automation conditions. It is meant to be imported into an existing dagster definitions python module; it won't work by itself.
import dagster as dg
import datetime as dt
from datetime import datetime
from zoneinfo import ZoneInfo
tz = "America/New_York"
daily_partitions_def = dg.DailyPartitionsDefinition(
start_date=dt.datetime.now(ZoneInfo(tz)) - dt.timedelta(days=1),
end_offset=1,
timezone=tz,
)
@dg.asset(
partitions_def=daily_partitions_def,
group_name="AutomationTest"
)
def MANUAL():
return
# Custom Code Location
# def is_wednesday(evaluation_time: datetime) -> bool:
# wednesday_logic=True
# return wednesday_logic
class IsWeekday(dg.AutomationCondition):
def __init__(self, weekday: int):
"""
Check if evaluation time is a specific weekday.
Args:
weekday: 0=Monday, 1=Tuesday, 2=Wednesday, 3=Thursday,
4=Friday, 5=Saturday, 6=Sunday
"""
self.weekday = weekday
super().__init__()
def evaluate(self, context: dg.AutomationContext) -> dg.AutomationResult:
if context.evaluation_time.weekday() == 2:
true_subset = context.candidate_subset
else:
true_subset = context.get_empty_subset()
return dg.AutomationResult(true_subset=true_subset, context=context)
@property
def name(self) -> str:
"""Define the label that will appear in the UI"""
days = ['Monday', 'Tuesday', 'Wednesday', 'Thursday',
'Friday', 'Saturday', 'Sunday']
return f"is_{days[self.weekday].lower()}"
def eager_on_given_weekday(weekday: int)->dg.AutomationCondition:
"""
Returns an dg.AutomationCondition that evaluates eagerly but only on the 0-indexed coded Weekday provided."""
weekday_map = {
0: "Monday",
1: "Tuesday",
2: "Wednesday",
3: "Thursday",
4: "Friday",
5: "Saturday",
6: "Sunday"
}
return (
IsWeekday(weekday) & dg.AutomationCondition.eager()
).with_label(f"eager_on_{weekday_map[weekday].lower()}")
@dg.asset(
partitions_def=daily_partitions_def,
group_name="AutomationTest",
tags={"asset_type":"cron"},
automation_condition=(
eager_on_given_weekday(2)
# dg.AutomationCondition.missing() & dg.AutomationCondition.eager()
)
)
def eager_on_wednesday(MANUAL):
return
automation_test_sensor = dg.AutomationConditionSensorDefinition(
name="AutomationTestSensor",
target=dg.AssetSelection.groups("AutomationTest"),
use_user_code_server=True
)
# @dg.asset(
# partitions_def=daily_partitions_def,
# group_name="AutomationTest",
# automation_condition=dg.AutomationCondition.cron_tick_passed("* * * * *")
# )
# def MINUTELY():
# return
# On Cron (equivalent)
# def same_as_on_cron(_cron_schedule="* * * * *",_cron_timezone="America/New_York")->dg.AutomationCondition:
# return (
# dg.AutomationCondition.in_latest_time_window()
# & dg.AutomationCondition.cron_tick_passed(
# _cron_schedule, _cron_timezone
# ).since_last_handled()
# & dg.AutomationCondition.all_deps_updated_since_cron(_cron_schedule, _cron_timezone)
# ).with_label(f"on_cron({_cron_schedule}, {_cron_timezone}")
# @dg.asset(
# partitions_def=daily_partitions_def,
# group_name="AutomationTest",
# automation_condition=same_as_on_cron(_cron_schedule="1/2 * * * *")
# )
# def on_cron(MANUAL):
# return
# # On Cron (equivalent, minus any since condition)
# def cron_no_since_at_all(_cron_schedule="* * * * *",_cron_timezone="America/New_York")->dg.AutomationCondition:
# return (
# dg.AutomationCondition.in_latest_time_window()
# & dg.AutomationCondition.cron_tick_passed(
# _cron_schedule, _cron_timezone
# )
# & dg.AutomationCondition.all_deps_updated_since_cron(_cron_schedule, _cron_timezone)
# ).with_label(f"on_cron_no_since({_cron_schedule}, {_cron_timezone}")
# @dg.asset(
# partitions_def=daily_partitions_def,
# group_name="AutomationTest",
# automation_condition=cron_no_since_at_all(_cron_schedule="1/2 * * * *")
# )
# def on_cron_no_since(MANUAL):
# return
# Eager (as is) & cron_tick_passed()
# def eager_cron(_cron_schedule="* * * * *",_cron_timezone="America/New_York")->dg.AutomationCondition:
# return (
# dg.AutomationCondition.eager()
# & dg.AutomationCondition.cron_tick_passed(
# _cron_schedule, _cron_timezone
# )
# ).with_label(f"eager_cron({_cron_schedule}, {_cron_timezone}")
# @dg.asset(
# partitions_def=daily_partitions_def,
# group_name="AutomationTest",
# automation_condition=eager_cron(_cron_schedule="1/2 * * * *")
# )
# def eager_cron(MANUAL):
# return
# _cron="1/5 * * * *"
# @dg.asset(
# partitions_def=daily_partitions_def,
# group_name="AutomationTest",
# automation_condition=dg.AutomationCondition.cron_tick_passed(
# cron_schedule=_cron,
# cron_timezone="America/New_York"
# ).with_label(_cron)
# )
# def CRON_TICKER():
# return
# # cron tick passed -> reload -> materialize upstream -> see what happens
# eager_no_since_all=(
# dg.AutomationCondition.in_latest_time_window()
# & (
# dg.AutomationCondition.newly_missing() |
# dg.AutomationCondition.any_deps_updated()
# )
# & ~dg.AutomationCondition.any_deps_missing()
# & ~dg.AutomationCondition.any_deps_in_progress()
# & ~dg.AutomationCondition.in_progress()
# ).with_label("eager_no_since_at_all")
# def eager_cron_no_since_all(_cron_schedule="* * * * *",_cron_timezone="America/New_York")->dg.AutomationCondition:
# return (
# eager_no_since_all
# & dg.AutomationCondition.cron_tick_passed(
# _cron_schedule, _cron_timezone
# )
# ).with_label(f"eager_cron_no_since({_cron_schedule}, {_cron_timezone}")
# @dg.asset(
# partitions_def=daily_partitions_def,
# group_name="AutomationTest",
# tags={"asset_type":"cron"},
# automation_condition=eager_cron_no_since_all(_cron_schedule="1/2 * * * *")
# )
# def eager_cron_no_since(MANUAL):
# return
# # CUSTOM CONDITION TESTING
# def eager_with_internal_cron_condition(_cron_schedule="* * * * *",_cron_timezone="America/New_York")->dg.AutomationCondition:
# return (
# dg.AutomationCondition.in_latest_time_window()
# & (
# (dg.AutomationCondition.newly_missing() |
# dg.AutomationCondition.any_deps_updated()) &
# dg.AutomationCondition.cron_tick_passed(_cron_schedule,_cron_timezone)
# ).since(
# dg.AutomationCondition.newly_requested() |
# dg.AutomationCondition.newly_updated() |
# dg.AutomationCondition.initial_evaluation()
# )
# & ~dg.AutomationCondition.any_deps_missing()
# & ~dg.AutomationCondition.any_deps_in_progress()
# & ~dg.AutomationCondition.in_progress()
# ).with_label(f"eager_with_internal_cron{_cron_schedule, _cron_timezone}")
# @dg.asset(
# partitions_def=daily_partitions_def,
# group_name="AutomationTest",
# tags={"asset_type":"cron"},
# automation_condition=eager_with_internal_cron_condition(_cron_schedule="1/2 * * * *")
# )
# def eager_with_internal_cron(MANUAL):
# return
# def eager_since_cron_condition(_cron_schedule="* * * * *",_cron_timezone="America/New_York")->dg.AutomationCondition:
# return (
# dg.AutomationCondition.in_latest_time_window()
# & (
# (dg.AutomationCondition.newly_missing() |
# dg.AutomationCondition.any_deps_updated())
# ).since(
# dg.AutomationCondition.newly_requested() |
# dg.AutomationCondition.newly_updated() |
# dg.AutomationCondition.initial_evaluation() |
# dg.AutomationCondition.cron_tick_passed(_cron_schedule,_cron_timezone)
# )
# & ~dg.AutomationCondition.any_deps_missing()
# & ~dg.AutomationCondition.any_deps_in_progress()
# & ~dg.AutomationCondition.in_progress()
# ).with_label(f"eager_since_cron{_cron_schedule, _cron_timezone}")
# @dg.asset(
# partitions_def=daily_partitions_def,
# group_name="AutomationTest",
# tags={"asset_type":"cron"},
# automation_condition=eager_since_cron_condition(_cron_schedule="1/2 * * * *")
# )
# def eager_since_cron(MANUAL):
# return
# CUSTOM CONDITION TESTING
# same_as_eager=(
# dg.AutomationCondition.in_latest_time_window()
# & (
# dg.AutomationCondition.newly_missing() |
# dg.AutomationCondition.any_deps_updated()
# ).since(
# dg.AutomationCondition.newly_requested() |
# dg.AutomationCondition.newly_updated() |
# dg.AutomationCondition.initial_evaluation()
# )
# & ~dg.AutomationCondition.any_deps_missing()
# & ~dg.AutomationCondition.any_deps_in_progress()
# & ~dg.AutomationCondition.in_progress()
# ).with_label("same_as_eager")
# Eager (No Since Initial Evaluation) + cron_tick_passed()
# eager_no_initial_eval=(
# dg.AutomationCondition.in_latest_time_window()
# & (
# dg.AutomationCondition.newly_missing() |
# dg.AutomationCondition.any_deps_updated()
# ).since(
# dg.AutomationCondition.newly_requested() |
# dg.AutomationCondition.newly_updated()
# )
# & ~dg.AutomationCondition.any_deps_missing()
# & ~dg.AutomationCondition.any_deps_in_progress()
# & ~dg.AutomationCondition.in_progress()
# ).with_label("eager_no_since_initial_eval")
# def eager_cron_no_initial_eval(_cron_schedule="* * * * *",_cron_timezone="America/New_York")->dg.AutomationCondition:
# return (
# eager_no_initial_eval
# & dg.AutomationCondition.cron_tick_passed(
# _cron_schedule, _cron_timezone
# )
# ).with_label(f"eager_cron_no_initial({_cron_schedule}, {_cron_timezone}")
# @dg.asset(
# partitions_def=daily_partitions_def,
# group_name="AutomationTest",
# automation_condition=eager_cron_no_initial_eval()
# )
# def eager_cron_no_since_initial(MINUTELY):
# return
# Eager (No Since At All) + cron_tick_passed()
# @dg.asset(
# partitions_def=daily_partitions_def,
# group_name="AutomationTest",
# automation_condition=eager_no_since_at_all,
# )
# def eager_no_since(MINUTELY):
# return
# @dg.asset(
# partitions_def=daily_partitions_def,
# group_name="AutomationTest",
# automation_condition=same_as_eager
# )
# def eager(MINUTELY):
# return
# @dg.asset(
# partitions_def=daily_partitions_def,
# group_name="AutomationTest",
# automation_condition=eager_no_initial_eval
# )
# def eager_no_since_initial(MINUTELY):
# return
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment