"""
## Example Airflow Workflow (DAG)

Markdown docstrings are rendered in the Airflow UI!!!

"""
from airflow import DAG
from airflow.models import BaseOperator
from datetime import datetime, timedelta

# define set of dummy tasks
class ExtractAdRevenueOperator(BaseOperator):
    def __init__(self, *args, **kwargs):
        super(ExtractAdRevenueOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        print 'executing {}'.__str__()

class ExtractAppStoreRevenueOperator(BaseOperator):
    def __init__(self, app_store_name, *args, **kwargs):
        self.app_store_name = app_store_name
        super(ExtractAppStoreRevenueOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        print 'executing {}'.__str__()

class ExtractConversionRatesOperator(BaseOperator):
    def __init__(self, *args, **kwargs):
        super(ExtractConversionRatesOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        print 'executing {}'.__str__()

class TransformSpreadsheetDataOperator(BaseOperator):
    def __init__(self, *args, **kwargs):
        super(TransformSpreadsheetDataOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        print 'executing {}'.__str__()

class TransformJSONDataOperator(BaseOperator):
    def __init__(self, *args, **kwargs):
        super(TransformJSONDataOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        print 'executing {}'.__str__()

class CurrencyConversionsOperator(BaseOperator):
    def __init__(self, *args, **kwargs):
        super(CurrencyConversionsOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        print 'executing {}'.__str__()

class CombineDataRevenueDataOperator(BaseOperator):
    def __init__(self, *args, **kwargs):
        super(CombineDataRevenueDataOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        print 'executing {}'.__str__()

class CheckHistoricalDataOperator(BaseOperator):
    def __init__(self, *args, **kwargs):
        super(CheckHistoricalDataOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        print 'executing {}'.__str__()

class RevenuePredictionOperator(BaseOperator):
    def __init__(self, *args, **kwargs):
        super(RevenuePredictionOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        print 'executing {}'.__str__()

WORKFLOW_START_DATE = datetime(2017, 1, 1)

default_args = {
    'owner': 'example',
    'depends_on_past': False,
    'start_date': WORKFLOW_START_DATE,
    'email': ['example@example_company.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'example_workflow_dag',
    start_date=WORKFLOW_START_DATE,
    schedule_interval=timedelta(1),
    default_args=default_args,
)

# set the DAG documentation
dag.doc_md = __doc__

## Define all tasks

# task to wait for ad network revenue and extract
extract_ad_revenue = ExtractAdRevenueOperator(
    task_id='extract_ad_revenue',
    dag=dag)

# dynamically create tasks to wait and extract app store data
APP_STORES = ['app_store_a', 'app_store_b', 'app_store_c']
app_store_tasks = []
for app_store in APP_STORES:
    task = ExtractAppStoreRevenueOperator(
        task_id='extract_{}_revenue'.format(app_store),
        dag=dag,
        app_store_name=app_store,
        )
    app_store_tasks.append(task)


# task to wait for and extract conversion rates from API
extract_conversion_rates = ExtractConversionRatesOperator(
    task_id='get_conversion_rates',
    dag=dag)

# task to transform Spreadsheet data
transform_spreadsheet_data = TransformSpreadsheetDataOperator(
    task_id='transform_spreadsheet_data',
    dag=dag) 

# task to transform JSON data
transform_json_data = TransformJSONDataOperator(
    task_id='transform_json_data',
    dag=dag)

# task to apply currency exchange rates
perform_currency_conversions = CurrencyConversionsOperator(
    task_id='perform_currency_conversions',
    dag=dag)

# task to combine all data sources
combine_revenue_data = CombineDataRevenueDataOperator(
    task_id='combine_revenue_data',
    dag=dag)  

# task to check that historical data exists
check_historical_data = CheckHistoricalDataOperator(
    task_id='check_historical_data',
    dag=dag)

# task to make predictions from historical data
predict_revenue = RevenuePredictionOperator(
    task_id='predict_revenue',
    dag=dag)  



## Define all task dependencies

# extract_ad_revenue depends on transform_spreadsheet_data, etc.
transform_spreadsheet_data.set_upstream(extract_ad_revenue)

# dynamically define app store dependencies
for task in app_store_tasks:
    transform_json_data.set_upstream(task)

perform_currency_conversions.set_upstream(transform_json_data)

perform_currency_conversions.set_upstream(extract_conversion_rates)

combine_revenue_data.set_upstream(transform_spreadsheet_data)
combine_revenue_data.set_upstream(perform_currency_conversions)

check_historical_data.set_upstream(combine_revenue_data)

predict_revenue.set_upstream(check_historical_data)