Last active
June 6, 2019 17:07
-
-
Save adamantnz/91123520998485c013187540ab13030a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import airflow | |
from airflow import DAG | |
from airflow.operators.python_operator import PythonOperator | |
from airflow.operators.bash_operator import BashOperator | |
from datetime import datetime, timedelta | |
from pathlib import Path | |
print( | |
"""This print is in the global state, so it will print this string | |
every time the scheduler parses the file. Don't do this!""") | |
def my_function: | |
print( | |
"""This print is contained within a method, so it will only print this string | |
when the method is invoked. Much better :)""") | |
def on_failure(context): | |
# specify what should happen when any operator fails i.e post to slack, send an email etc | |
pass | |
def xcom_push_example(**kwargs): | |
task_instance = kwargs["ti"] | |
# push a key/value pair to xcom using xcom_push() | |
task_instance.xcom_push(key="mykey", value="myvalue") | |
def xcom_pull_example(**kwargs): | |
task_instance = kwargs["ti"] | |
# retrieve the key/value pair from xcom using xcom_pull() | |
xcom_value = task_instance.xcom_pull("xcom_push_example", key="mykey") | |
return "xcom value: {}".format(xcom_value) | |
args = { | |
"owner": "AG", | |
"start_date": datetime(2018, 12, 31), | |
"concurrency": 1, | |
"max_active_runs": 1, | |
"retries": 1, | |
"retry_delay": timedelta(minutes=5), | |
"on_failure_callback": on_failure, | |
"provide_context": True | |
} | |
dag = DAG ( | |
dag_id=Path(__file__).stem, | |
default_args=args, | |
description="a super descriptive description", | |
# run each day at 12:00 UTC, validate your cron syntax at crontab.guru | |
schedule_interval="00 12 * * *", | |
catchup=False | |
) | |
get_hostname_operator = BashOperator( | |
task_id="get_hostname", | |
bash_command="hostname", | |
dag=dag) | |
xcom_push_operator = PythonOperator( | |
task_id="xcom_push_operator", | |
python_callable=xcom_push_example, | |
dag=dag) | |
xcom_pull_operator = PythonOperator( | |
task_id="xcom_pull_operator", | |
python_callable=xcom_pull_example, | |
dag=dag) | |
get_hostname_operator >> xcom_push_operator >> xcom_pull_operator | |
if __name__ == "__main__": | |
dag.cli() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment