Last active
March 26, 2025 06:11
-
-
Save rajvermacas/9749798e6936caf39a2786f3170a0c3c to your computer and use it in GitHub Desktop.
airflow
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from abc import ABCMeta, abstractmethod | |
import inspect | |
from airflow import DAG | |
from airflow.decorators import task | |
from datetime import datetime | |
# Define the metaclass for DAG creation | |
class DagMeta(ABCMeta): | |
def __new__(mcs, name, bases, attrs): | |
# Create the class using the standard process | |
cls = super().__new__(mcs, name, bases, attrs) | |
# Only create a DAG for subclasses, not the abstract base class | |
if name != 'AbstractDag': | |
# Get the DAG ID from the subclass's get_dag_id method | |
dag_id = cls.get_dag_id() | |
# Define default arguments for the DAG | |
default_args = { | |
'owner': 'airflow', | |
'start_date': datetime(2023, 1, 1), | |
'retries': 1, | |
} | |
# Create the DAG instance | |
dag = DAG( | |
dag_id=dag_id, | |
default_args=default_args, | |
schedule_interval=None, # Set to None or adjust as needed | |
catchup=False, | |
) | |
# Define a task within the DAG context | |
with dag: | |
@task | |
def sample_task(): | |
print("This is a sample task executed by the DAG.") | |
sample_task() | |
# Assign the DAG to the module's globals for Airflow to discover | |
module = inspect.getmodule(cls) | |
module.__dict__[dag_id] = dag | |
return cls | |
# Define the abstract base class | |
class AbstractDag(metaclass=DagMeta): | |
@classmethod | |
@abstractmethod | |
def get_dag_id(cls): | |
"""Abstract method that subclasses must override to provide the DAG ID.""" | |
pass | |
# Example subclass | |
class MyDag(AbstractDag): | |
@classmethod | |
def get_dag_id(cls): | |
return "my_dag" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from abc import ABCMeta, abstractmethod | |
import inspect | |
from airflow import DAG | |
from airflow.decorators import task | |
from datetime import datetime | |
class DagMeta(ABCMeta): | |
def __new__(mcs, name, bases, attrs): | |
cls = super().__new__(mcs, name, bases, attrs) | |
if name != 'AbstractDag': | |
dag_id = cls.get_dag_id() | |
default_args = { | |
'owner': 'airflow', | |
'start_date': datetime(2023, 1, 1), | |
'retries': 1, | |
} | |
dag = DAG( | |
dag_id=dag_id, | |
default_args=default_args, | |
schedule_interval=None, | |
catchup=False, | |
) | |
with dag: | |
@task | |
def sample_task(): | |
print("This is a sample task executed by the DAG.") | |
sample_task() | |
# Get the module where the class is being used/imported | |
frame = inspect.currentframe() | |
while frame: | |
if frame.f_code.co_name == '<module>': | |
module = inspect.getmodule(frame) | |
break | |
frame = frame.f_back | |
if module: | |
module.__dict__[dag_id] = dag | |
return cls | |
class AbstractDag(metaclass=DagMeta): | |
@classmethod | |
@abstractmethod | |
def get_dag_id(cls): | |
"""Abstract method that subclasses must override to provide the DAG ID.""" | |
pass |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow.decorators import dag | |
from airflow.operators.trigger_dagrun import TriggerDagRunOperator | |
from datetime import datetime | |
@dag( | |
dag_id="specialized_dag", | |
start_date=datetime(2023, 1, 1), | |
schedule_interval="@daily", | |
catchup=False, | |
) | |
def specialized_dag(): | |
""" | |
DAG to trigger a generic DAG using the TriggerDagRunOperator | |
with the specified configuration. | |
""" | |
trigger = TriggerDagRunOperator( | |
task_id="trigger_generic_dag", | |
trigger_dag_id="generic_dag", # The ID of the generic DAG to trigger | |
conf={ | |
"param1": "value1", | |
"param2": 42, | |
"param3": "foo" | |
}, | |
trigger_run_id="triggered__{{ ds_nodash }}__{{ task_instance.task_id }}" | |
) | |
# Instantiate the DAG by calling the decorated function | |
dag = specialized_dag() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment