Last active
July 27, 2023 11:53
-
-
Save nhammad/564a1774100ebefb15246d14669b2c15 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow.contrib.sensors.python_sensor import PythonSensor | |
.... | |
with DAG('test_airflow_dag', description='testing_python_sensor', | |
schedule_interval='30 5 * * *', | |
start_date=datetime(year=2020, month=01, day=12), | |
catchup=False, | |
default_args=args) as dag: | |
..... | |
def run_qa_check(**kwargs): | |
query = """SELECT | |
CASE | |
WHEN DATE_DIFF('hour', your_date_column, CURRENT_TIMESTAMP) = 24 THEN 'TRUE' | |
ELSE 'FALSE' | |
END AS is_24_hours | |
FROM | |
my_table; | |
""" | |
logging.info("Establishing DB connection") | |
con = snow.get_ctx(wh='TEST_DWH') | |
cur = con.cursor() | |
logging.info("Running query") | |
try: | |
cur.execute(query) | |
results = cur.fetchall() | |
logging.info("Query ran successfully") | |
if results[0][0] == "True": | |
logging.info("QA CHECK PASSED") | |
return True | |
else: | |
logging.info("QA CHECK NOT PASSED") | |
return False | |
except Exception as e: | |
logging.info(f"Error while running query on the database {e}") | |
con.close() | |
return 1 | |
start = DummyOperator(task_id='start') | |
end = DummyOperator(task_id='end') | |
A = PythonOperator( | |
task_id='A', | |
python_callable=function_a | |
) | |
B = PythonOperator( | |
task_id='B', | |
python_callable=function_b | |
) | |
wait_for_condition = PythonSensor( | |
task_id='wait_for_qa_check_completion', | |
python_callable=run_qa_check, | |
mode='reschedule', | |
poke_interval=600 | |
) | |
start >> A >> wait_for_condition >> B >> end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment