""" ## Example Airflow Workflow (DAG) Markdown docstrings are rendered in the Airflow UI!!! """ from airflow import DAG from airflow.models import BaseOperator from datetime import datetime, timedelta # define set of dummy tasks class ExtractAdRevenueOperator(BaseOperator): def __init__(self, *args, **kwargs): super(ExtractAdRevenueOperator, self).__init__(*args, **kwargs) def execute(self, context): print 'executing {}'.__str__() class ExtractAppStoreRevenueOperator(BaseOperator): def __init__(self, app_store_name, *args, **kwargs): self.app_store_name = app_store_name super(ExtractAppStoreRevenueOperator, self).__init__(*args, **kwargs) def execute(self, context): print 'executing {}'.__str__() class ExtractConversionRatesOperator(BaseOperator): def __init__(self, *args, **kwargs): super(ExtractConversionRatesOperator, self).__init__(*args, **kwargs) def execute(self, context): print 'executing {}'.__str__() class TransformSpreadsheetDataOperator(BaseOperator): def __init__(self, *args, **kwargs): super(TransformSpreadsheetDataOperator, self).__init__(*args, **kwargs) def execute(self, context): print 'executing {}'.__str__() class TransformJSONDataOperator(BaseOperator): def __init__(self, *args, **kwargs): super(TransformJSONDataOperator, self).__init__(*args, **kwargs) def execute(self, context): print 'executing {}'.__str__() class CurrencyConversionsOperator(BaseOperator): def __init__(self, *args, **kwargs): super(CurrencyConversionsOperator, self).__init__(*args, **kwargs) def execute(self, context): print 'executing {}'.__str__() class CombineDataRevenueDataOperator(BaseOperator): def __init__(self, *args, **kwargs): super(CombineDataRevenueDataOperator, self).__init__(*args, **kwargs) def execute(self, context): print 'executing {}'.__str__() class CheckHistoricalDataOperator(BaseOperator): def __init__(self, *args, **kwargs): super(CheckHistoricalDataOperator, self).__init__(*args, **kwargs) def execute(self, context): print 'executing {}'.__str__() class RevenuePredictionOperator(BaseOperator): def __init__(self, *args, **kwargs): super(RevenuePredictionOperator, self).__init__(*args, **kwargs) def execute(self, context): print 'executing {}'.__str__() WORKFLOW_START_DATE = datetime(2017, 1, 1) default_args = { 'owner': 'example', 'depends_on_past': False, 'start_date': WORKFLOW_START_DATE, 'email': ['example@example_company.com'], 'email_on_failure': True, 'email_on_retry': False, 'retries': 5, 'retry_delay': timedelta(minutes=5) } dag = DAG( 'example_workflow_dag', start_date=WORKFLOW_START_DATE, schedule_interval=timedelta(1), default_args=default_args, ) # set the DAG documentation dag.doc_md = __doc__ ## Define all tasks # task to wait for ad network revenue and extract extract_ad_revenue = ExtractAdRevenueOperator( task_id='extract_ad_revenue', dag=dag) # dynamically create tasks to wait and extract app store data APP_STORES = ['app_store_a', 'app_store_b', 'app_store_c'] app_store_tasks = [] for app_store in APP_STORES: task = ExtractAppStoreRevenueOperator( task_id='extract_{}_revenue'.format(app_store), dag=dag, app_store_name=app_store, ) app_store_tasks.append(task) # task to wait for and extract conversion rates from API extract_conversion_rates = ExtractConversionRatesOperator( task_id='get_conversion_rates', dag=dag) # task to transform Spreadsheet data transform_spreadsheet_data = TransformSpreadsheetDataOperator( task_id='transform_spreadsheet_data', dag=dag) # task to transform JSON data transform_json_data = TransformJSONDataOperator( task_id='transform_json_data', dag=dag) # task to apply currency exchange rates perform_currency_conversions = CurrencyConversionsOperator( task_id='perform_currency_conversions', dag=dag) # task to combine all data sources combine_revenue_data = CombineDataRevenueDataOperator( task_id='combine_revenue_data', dag=dag) # task to check that historical data exists check_historical_data = CheckHistoricalDataOperator( task_id='check_historical_data', dag=dag) # task to make predictions from historical data predict_revenue = RevenuePredictionOperator( task_id='predict_revenue', dag=dag) ## Define all task dependencies # extract_ad_revenue depends on transform_spreadsheet_data, etc. transform_spreadsheet_data.set_upstream(extract_ad_revenue) # dynamically define app store dependencies for task in app_store_tasks: transform_json_data.set_upstream(task) perform_currency_conversions.set_upstream(transform_json_data) perform_currency_conversions.set_upstream(extract_conversion_rates) combine_revenue_data.set_upstream(transform_spreadsheet_data) combine_revenue_data.set_upstream(perform_currency_conversions) check_historical_data.set_upstream(combine_revenue_data) predict_revenue.set_upstream(check_historical_data)