Skip to content

Instantly share code, notes, and snippets.

@rajvermacas
Last active March 26, 2025 06:11
Show Gist options
  • Save rajvermacas/9749798e6936caf39a2786f3170a0c3c to your computer and use it in GitHub Desktop.
Save rajvermacas/9749798e6936caf39a2786f3170a0c3c to your computer and use it in GitHub Desktop.
airflow
from abc import ABCMeta, abstractmethod
import inspect
from airflow import DAG
from airflow.decorators import task
from datetime import datetime
# Define the metaclass for DAG creation
class DagMeta(ABCMeta):
def __new__(mcs, name, bases, attrs):
# Create the class using the standard process
cls = super().__new__(mcs, name, bases, attrs)
# Only create a DAG for subclasses, not the abstract base class
if name != 'AbstractDag':
# Get the DAG ID from the subclass's get_dag_id method
dag_id = cls.get_dag_id()
# Define default arguments for the DAG
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
# Create the DAG instance
dag = DAG(
dag_id=dag_id,
default_args=default_args,
schedule_interval=None, # Set to None or adjust as needed
catchup=False,
)
# Define a task within the DAG context
with dag:
@task
def sample_task():
print("This is a sample task executed by the DAG.")
sample_task()
# Assign the DAG to the module's globals for Airflow to discover
module = inspect.getmodule(cls)
module.__dict__[dag_id] = dag
return cls
# Define the abstract base class
class AbstractDag(metaclass=DagMeta):
@classmethod
@abstractmethod
def get_dag_id(cls):
"""Abstract method that subclasses must override to provide the DAG ID."""
pass
# Example subclass
class MyDag(AbstractDag):
@classmethod
def get_dag_id(cls):
return "my_dag"
from abc import ABCMeta, abstractmethod
import inspect
from airflow import DAG
from airflow.decorators import task
from datetime import datetime
class DagMeta(ABCMeta):
def __new__(mcs, name, bases, attrs):
cls = super().__new__(mcs, name, bases, attrs)
if name != 'AbstractDag':
dag_id = cls.get_dag_id()
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
dag = DAG(
dag_id=dag_id,
default_args=default_args,
schedule_interval=None,
catchup=False,
)
with dag:
@task
def sample_task():
print("This is a sample task executed by the DAG.")
sample_task()
# Get the module where the class is being used/imported
frame = inspect.currentframe()
while frame:
if frame.f_code.co_name == '<module>':
module = inspect.getmodule(frame)
break
frame = frame.f_back
if module:
module.__dict__[dag_id] = dag
return cls
class AbstractDag(metaclass=DagMeta):
@classmethod
@abstractmethod
def get_dag_id(cls):
"""Abstract method that subclasses must override to provide the DAG ID."""
pass
from airflow.decorators import dag
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from datetime import datetime
@dag(
dag_id="specialized_dag",
start_date=datetime(2023, 1, 1),
schedule_interval="@daily",
catchup=False,
)
def specialized_dag():
"""
DAG to trigger a generic DAG using the TriggerDagRunOperator
with the specified configuration.
"""
trigger = TriggerDagRunOperator(
task_id="trigger_generic_dag",
trigger_dag_id="generic_dag", # The ID of the generic DAG to trigger
conf={
"param1": "value1",
"param2": 42,
"param3": "foo"
},
trigger_run_id="triggered__{{ ds_nodash }}__{{ task_instance.task_id }}"
)
# Instantiate the DAG by calling the decorated function
dag = specialized_dag()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment