Created
January 23, 2019 18:18
-
-
Save ftfarias/af7f0d92218472e280eb8da0d946162a to your computer and use it in GitHub Desktop.
How to clean airflow dags
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sqlalchemy | |
import sys | |
def print_clear_dag(dag_id): | |
"""Clear all information for a DAG from airflow postgres database""" | |
list_tables = ['xcom', 'task_instance', 'sla_miss', 'log', 'job', 'dag_run', 'dag', | |
'dag_stats', 'task_fail'] | |
for table in list_tables: | |
queries = ["DELETE FROM {} WHERE dag_id='{}'".format(table, dag_id), | |
"DELETE FROM {} WHERE dag_id LIKE '{}'".format(table, dag_id)] | |
for query in queries: | |
print("{};".format(query)) | |
def clear_dag(dag_id): | |
"""Clear all information for a DAG from airflow postgres database""" | |
list_tables = ['xcom', 'task_instance', 'sla_miss', 'log', 'job', 'dag_run', 'dag', | |
'dag_stats', 'task_fail'] | |
uri = 'mysql+mysqldb:// /airflow' | |
engine = sqlalchemy.create_engine(uri) | |
with engine.connect() as conn: | |
for table in list_tables: | |
queries = ["DELETE FROM {} WHERE dag_id='{}'".format(table, dag_id), | |
"DELETE FROM {} WHERE dag_id LIKE '{}'".format(table, dag_id)] | |
for query in queries: | |
print("\t: running {}".format(query)) | |
conn.execute(query) | |
print("Deleting {}".format(sys.argv[1])) | |
print_clear_dag(sys.argv[1]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment