Last active
June 18, 2019 20:43
-
-
Save adamantnz/36c826e32873649f4941c4cfa43f9762 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import airflow | |
from airflow import DAG | |
from airflow.operators.python_operator import PythonOperator | |
from airflow.operators.bash_operator import BashOperator | |
from datetime import datetime, timedelta | |
from pathlib import Path | |
import boto3 | |
# dag methods | |
def on_failure(context): | |
# specify what should happen when any operator fails | |
# i.e post to slack, send an email etc. | |
# pass this method to to your default_args dict so that | |
# all your workflow steps inherit this behaviour | |
pass | |
def xcom_push_example(**kwargs): | |
task_instance = kwargs["ti"] | |
# push a key/value pair to xcom using xcom_push() | |
task_instance.xcom_push(key="mykey", value="myvalue") | |
def xcom_pull_example(**kwargs): | |
task_instance = kwargs["ti"] | |
# retrieve the key/value pair from xcom using xcom_pull() | |
xcom_value = task_instance.xcom_pull("xcom_push_example", key="mykey") | |
return f"xcom value: {xcom_value}" | |
def upload_file_example(): | |
s3 = boto3.resource("s3") | |
try: | |
s3.Bucket("bucketname").upload_file( | |
"/local/file/aliens.jpg", | |
"/s3path/aliens.jpg" | |
) | |
except Exception as err: | |
raise (f"ERROR: {err}") | |
# dag configuration | |
default_args = { | |
"owner": "AG", | |
"start_date": datetime(2018, 12, 31), | |
"concurrency": 1, | |
"max_active_runs": 1, | |
"retries": 1, | |
"retry_delay": timedelta(minutes=5), | |
"on_failure_callback": on_failure, | |
"provide_context": True | |
} | |
with DAG( | |
dag_id=Path(__file__).stem, | |
default_args=default_args, | |
description="a super descriptive description", | |
schedule_interval="00 12 * * *", | |
catchup=False | |
) as dag: | |
get_hostname_operator = BashOperator( | |
task_id="get_hostname", | |
bash_command="hostname") | |
xcom_push_operator = PythonOperator( | |
task_id="xcom_push_operator", | |
python_callable=xcom_push_example) | |
xcom_pull_operator = PythonOperator( | |
task_id="xcom_pull_operator", | |
python_callable=xcom_pull_example) | |
upload_file_operator = PythonOperator( | |
task_id="upload_file_operator", | |
python_callable=upload_file_example) | |
# dag workflow | |
get_hostname_operator >> \ | |
xcom_push_operator >> \ | |
xcom_pull_operator >> \ | |
upload_file_operator | |
if __name__ == "__main__": | |
dag.cli() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment