Creates a step in Amazon EMR for a given cluster_id
and monitors it's progress using a sensor. A more complex example, that involves cluster creation/termination can be found here.
Created
August 13, 2018 15:15
-
-
Save sjednac/cf2f72590117b37d017fe17c62781df1 to your computer and use it in GitHub Desktop.
Submit a Spark job to an existing Amazon EMR cluster
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import airflow | |
from airflow import DAG | |
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator | |
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor | |
from datetime import timedelta | |
DEFAULT_ARGS = { | |
'owner': 'airflow', | |
'depends_on_past': False, | |
'start_date': airflow.utils.dates.days_ago(2), | |
'email': ['[email protected]'], | |
'email_on_failure': False, | |
'email_on_retry': False | |
} | |
SPARK_TEST_STEPS = [ | |
{ | |
'Name': 'TestSparkJob1', | |
'ActionOnFailure': 'CONTINUE', | |
'HadoopJarStep': { | |
'Jar': 'command-runner.jar', | |
'Args': [ | |
'spark-submit', | |
'--deploy-mode', 'cluster', | |
'--master', 'yarn', | |
'--driver-memory', '1G', | |
'--executor-memory','1G', | |
'--num-executors', '1', | |
'--class', 'Main', | |
's3://dev-bucket/test-spark-job-spark-assembly-0.1.0-SNAPSHOT.jar' | |
] | |
} | |
} | |
] | |
cluster_id = "j-xxxxxxxxxxxxx" | |
dag = DAG( | |
'emr1', | |
default_args=DEFAULT_ARGS, | |
dagrun_timeout=timedelta(hours=2), | |
schedule_interval='0 3 * * *' | |
) | |
step_adder = EmrAddStepsOperator( | |
task_id='add_steps', | |
job_flow_id=cluster_id, | |
aws_conn_id='aws_default', | |
steps=SPARK_TEST_STEPS, | |
dag=dag | |
) | |
step_checker = EmrStepSensor( | |
task_id='watch_step', | |
job_flow_id=cluster_id, | |
step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}", | |
aws_conn_id='aws_default', | |
dag=dag | |
) | |
step_adder.set_downstream(step_checker) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment