Created
April 29, 2026 21:23
-
-
Save jkislin/22b42698586ba5ec37b764bbfa8af7ef to your computer and use it in GitHub Desktop.
This is a snippet of various things you can try to stress-test dagster declarative automation conditions. It is meant to be imported into an existing dagster definitions python module; it won't work by itself.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import dagster as dg | |
| import datetime as dt | |
| from datetime import datetime | |
| from zoneinfo import ZoneInfo | |
| tz = "America/New_York" | |
| daily_partitions_def = dg.DailyPartitionsDefinition( | |
| start_date=dt.datetime.now(ZoneInfo(tz)) - dt.timedelta(days=1), | |
| end_offset=1, | |
| timezone=tz, | |
| ) | |
| @dg.asset( | |
| partitions_def=daily_partitions_def, | |
| group_name="AutomationTest" | |
| ) | |
| def MANUAL(): | |
| return | |
| # Custom Code Location | |
| # def is_wednesday(evaluation_time: datetime) -> bool: | |
| # wednesday_logic=True | |
| # return wednesday_logic | |
| class IsWeekday(dg.AutomationCondition): | |
| def __init__(self, weekday: int): | |
| """ | |
| Check if evaluation time is a specific weekday. | |
| Args: | |
| weekday: 0=Monday, 1=Tuesday, 2=Wednesday, 3=Thursday, | |
| 4=Friday, 5=Saturday, 6=Sunday | |
| """ | |
| self.weekday = weekday | |
| super().__init__() | |
| def evaluate(self, context: dg.AutomationContext) -> dg.AutomationResult: | |
| if context.evaluation_time.weekday() == 2: | |
| true_subset = context.candidate_subset | |
| else: | |
| true_subset = context.get_empty_subset() | |
| return dg.AutomationResult(true_subset=true_subset, context=context) | |
| @property | |
| def name(self) -> str: | |
| """Define the label that will appear in the UI""" | |
| days = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', | |
| 'Friday', 'Saturday', 'Sunday'] | |
| return f"is_{days[self.weekday].lower()}" | |
| def eager_on_given_weekday(weekday: int)->dg.AutomationCondition: | |
| """ | |
| Returns an dg.AutomationCondition that evaluates eagerly but only on the 0-indexed coded Weekday provided.""" | |
| weekday_map = { | |
| 0: "Monday", | |
| 1: "Tuesday", | |
| 2: "Wednesday", | |
| 3: "Thursday", | |
| 4: "Friday", | |
| 5: "Saturday", | |
| 6: "Sunday" | |
| } | |
| return ( | |
| IsWeekday(weekday) & dg.AutomationCondition.eager() | |
| ).with_label(f"eager_on_{weekday_map[weekday].lower()}") | |
| @dg.asset( | |
| partitions_def=daily_partitions_def, | |
| group_name="AutomationTest", | |
| tags={"asset_type":"cron"}, | |
| automation_condition=( | |
| eager_on_given_weekday(2) | |
| # dg.AutomationCondition.missing() & dg.AutomationCondition.eager() | |
| ) | |
| ) | |
| def eager_on_wednesday(MANUAL): | |
| return | |
| automation_test_sensor = dg.AutomationConditionSensorDefinition( | |
| name="AutomationTestSensor", | |
| target=dg.AssetSelection.groups("AutomationTest"), | |
| use_user_code_server=True | |
| ) | |
| # @dg.asset( | |
| # partitions_def=daily_partitions_def, | |
| # group_name="AutomationTest", | |
| # automation_condition=dg.AutomationCondition.cron_tick_passed("* * * * *") | |
| # ) | |
| # def MINUTELY(): | |
| # return | |
| # On Cron (equivalent) | |
| # def same_as_on_cron(_cron_schedule="* * * * *",_cron_timezone="America/New_York")->dg.AutomationCondition: | |
| # return ( | |
| # dg.AutomationCondition.in_latest_time_window() | |
| # & dg.AutomationCondition.cron_tick_passed( | |
| # _cron_schedule, _cron_timezone | |
| # ).since_last_handled() | |
| # & dg.AutomationCondition.all_deps_updated_since_cron(_cron_schedule, _cron_timezone) | |
| # ).with_label(f"on_cron({_cron_schedule}, {_cron_timezone}") | |
| # @dg.asset( | |
| # partitions_def=daily_partitions_def, | |
| # group_name="AutomationTest", | |
| # automation_condition=same_as_on_cron(_cron_schedule="1/2 * * * *") | |
| # ) | |
| # def on_cron(MANUAL): | |
| # return | |
| # # On Cron (equivalent, minus any since condition) | |
| # def cron_no_since_at_all(_cron_schedule="* * * * *",_cron_timezone="America/New_York")->dg.AutomationCondition: | |
| # return ( | |
| # dg.AutomationCondition.in_latest_time_window() | |
| # & dg.AutomationCondition.cron_tick_passed( | |
| # _cron_schedule, _cron_timezone | |
| # ) | |
| # & dg.AutomationCondition.all_deps_updated_since_cron(_cron_schedule, _cron_timezone) | |
| # ).with_label(f"on_cron_no_since({_cron_schedule}, {_cron_timezone}") | |
| # @dg.asset( | |
| # partitions_def=daily_partitions_def, | |
| # group_name="AutomationTest", | |
| # automation_condition=cron_no_since_at_all(_cron_schedule="1/2 * * * *") | |
| # ) | |
| # def on_cron_no_since(MANUAL): | |
| # return | |
| # Eager (as is) & cron_tick_passed() | |
| # def eager_cron(_cron_schedule="* * * * *",_cron_timezone="America/New_York")->dg.AutomationCondition: | |
| # return ( | |
| # dg.AutomationCondition.eager() | |
| # & dg.AutomationCondition.cron_tick_passed( | |
| # _cron_schedule, _cron_timezone | |
| # ) | |
| # ).with_label(f"eager_cron({_cron_schedule}, {_cron_timezone}") | |
| # @dg.asset( | |
| # partitions_def=daily_partitions_def, | |
| # group_name="AutomationTest", | |
| # automation_condition=eager_cron(_cron_schedule="1/2 * * * *") | |
| # ) | |
| # def eager_cron(MANUAL): | |
| # return | |
| # _cron="1/5 * * * *" | |
| # @dg.asset( | |
| # partitions_def=daily_partitions_def, | |
| # group_name="AutomationTest", | |
| # automation_condition=dg.AutomationCondition.cron_tick_passed( | |
| # cron_schedule=_cron, | |
| # cron_timezone="America/New_York" | |
| # ).with_label(_cron) | |
| # ) | |
| # def CRON_TICKER(): | |
| # return | |
| # # cron tick passed -> reload -> materialize upstream -> see what happens | |
| # eager_no_since_all=( | |
| # dg.AutomationCondition.in_latest_time_window() | |
| # & ( | |
| # dg.AutomationCondition.newly_missing() | | |
| # dg.AutomationCondition.any_deps_updated() | |
| # ) | |
| # & ~dg.AutomationCondition.any_deps_missing() | |
| # & ~dg.AutomationCondition.any_deps_in_progress() | |
| # & ~dg.AutomationCondition.in_progress() | |
| # ).with_label("eager_no_since_at_all") | |
| # def eager_cron_no_since_all(_cron_schedule="* * * * *",_cron_timezone="America/New_York")->dg.AutomationCondition: | |
| # return ( | |
| # eager_no_since_all | |
| # & dg.AutomationCondition.cron_tick_passed( | |
| # _cron_schedule, _cron_timezone | |
| # ) | |
| # ).with_label(f"eager_cron_no_since({_cron_schedule}, {_cron_timezone}") | |
| # @dg.asset( | |
| # partitions_def=daily_partitions_def, | |
| # group_name="AutomationTest", | |
| # tags={"asset_type":"cron"}, | |
| # automation_condition=eager_cron_no_since_all(_cron_schedule="1/2 * * * *") | |
| # ) | |
| # def eager_cron_no_since(MANUAL): | |
| # return | |
| # # CUSTOM CONDITION TESTING | |
| # def eager_with_internal_cron_condition(_cron_schedule="* * * * *",_cron_timezone="America/New_York")->dg.AutomationCondition: | |
| # return ( | |
| # dg.AutomationCondition.in_latest_time_window() | |
| # & ( | |
| # (dg.AutomationCondition.newly_missing() | | |
| # dg.AutomationCondition.any_deps_updated()) & | |
| # dg.AutomationCondition.cron_tick_passed(_cron_schedule,_cron_timezone) | |
| # ).since( | |
| # dg.AutomationCondition.newly_requested() | | |
| # dg.AutomationCondition.newly_updated() | | |
| # dg.AutomationCondition.initial_evaluation() | |
| # ) | |
| # & ~dg.AutomationCondition.any_deps_missing() | |
| # & ~dg.AutomationCondition.any_deps_in_progress() | |
| # & ~dg.AutomationCondition.in_progress() | |
| # ).with_label(f"eager_with_internal_cron{_cron_schedule, _cron_timezone}") | |
| # @dg.asset( | |
| # partitions_def=daily_partitions_def, | |
| # group_name="AutomationTest", | |
| # tags={"asset_type":"cron"}, | |
| # automation_condition=eager_with_internal_cron_condition(_cron_schedule="1/2 * * * *") | |
| # ) | |
| # def eager_with_internal_cron(MANUAL): | |
| # return | |
| # def eager_since_cron_condition(_cron_schedule="* * * * *",_cron_timezone="America/New_York")->dg.AutomationCondition: | |
| # return ( | |
| # dg.AutomationCondition.in_latest_time_window() | |
| # & ( | |
| # (dg.AutomationCondition.newly_missing() | | |
| # dg.AutomationCondition.any_deps_updated()) | |
| # ).since( | |
| # dg.AutomationCondition.newly_requested() | | |
| # dg.AutomationCondition.newly_updated() | | |
| # dg.AutomationCondition.initial_evaluation() | | |
| # dg.AutomationCondition.cron_tick_passed(_cron_schedule,_cron_timezone) | |
| # ) | |
| # & ~dg.AutomationCondition.any_deps_missing() | |
| # & ~dg.AutomationCondition.any_deps_in_progress() | |
| # & ~dg.AutomationCondition.in_progress() | |
| # ).with_label(f"eager_since_cron{_cron_schedule, _cron_timezone}") | |
| # @dg.asset( | |
| # partitions_def=daily_partitions_def, | |
| # group_name="AutomationTest", | |
| # tags={"asset_type":"cron"}, | |
| # automation_condition=eager_since_cron_condition(_cron_schedule="1/2 * * * *") | |
| # ) | |
| # def eager_since_cron(MANUAL): | |
| # return | |
| # CUSTOM CONDITION TESTING | |
| # same_as_eager=( | |
| # dg.AutomationCondition.in_latest_time_window() | |
| # & ( | |
| # dg.AutomationCondition.newly_missing() | | |
| # dg.AutomationCondition.any_deps_updated() | |
| # ).since( | |
| # dg.AutomationCondition.newly_requested() | | |
| # dg.AutomationCondition.newly_updated() | | |
| # dg.AutomationCondition.initial_evaluation() | |
| # ) | |
| # & ~dg.AutomationCondition.any_deps_missing() | |
| # & ~dg.AutomationCondition.any_deps_in_progress() | |
| # & ~dg.AutomationCondition.in_progress() | |
| # ).with_label("same_as_eager") | |
| # Eager (No Since Initial Evaluation) + cron_tick_passed() | |
| # eager_no_initial_eval=( | |
| # dg.AutomationCondition.in_latest_time_window() | |
| # & ( | |
| # dg.AutomationCondition.newly_missing() | | |
| # dg.AutomationCondition.any_deps_updated() | |
| # ).since( | |
| # dg.AutomationCondition.newly_requested() | | |
| # dg.AutomationCondition.newly_updated() | |
| # ) | |
| # & ~dg.AutomationCondition.any_deps_missing() | |
| # & ~dg.AutomationCondition.any_deps_in_progress() | |
| # & ~dg.AutomationCondition.in_progress() | |
| # ).with_label("eager_no_since_initial_eval") | |
| # def eager_cron_no_initial_eval(_cron_schedule="* * * * *",_cron_timezone="America/New_York")->dg.AutomationCondition: | |
| # return ( | |
| # eager_no_initial_eval | |
| # & dg.AutomationCondition.cron_tick_passed( | |
| # _cron_schedule, _cron_timezone | |
| # ) | |
| # ).with_label(f"eager_cron_no_initial({_cron_schedule}, {_cron_timezone}") | |
| # @dg.asset( | |
| # partitions_def=daily_partitions_def, | |
| # group_name="AutomationTest", | |
| # automation_condition=eager_cron_no_initial_eval() | |
| # ) | |
| # def eager_cron_no_since_initial(MINUTELY): | |
| # return | |
| # Eager (No Since At All) + cron_tick_passed() | |
| # @dg.asset( | |
| # partitions_def=daily_partitions_def, | |
| # group_name="AutomationTest", | |
| # automation_condition=eager_no_since_at_all, | |
| # ) | |
| # def eager_no_since(MINUTELY): | |
| # return | |
| # @dg.asset( | |
| # partitions_def=daily_partitions_def, | |
| # group_name="AutomationTest", | |
| # automation_condition=same_as_eager | |
| # ) | |
| # def eager(MINUTELY): | |
| # return | |
| # @dg.asset( | |
| # partitions_def=daily_partitions_def, | |
| # group_name="AutomationTest", | |
| # automation_condition=eager_no_initial_eval | |
| # ) | |
| # def eager_no_since_initial(MINUTELY): | |
| # return |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment