From 875dddd750f2943d886c8adfc2ea73bf234879db Mon Sep 17 00:00:00 2001 From: Sam Wheating Date: Fri, 7 Jan 2022 13:36:41 -0800 Subject: [PATCH 1/3] Avoid unintentional data loss when deleting DAGs --- airflow/api/common/delete_dag.py | 18 +++++++++++++++--- tests/api/common/test_delete_dag.py | 14 ++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/airflow/api/common/delete_dag.py b/airflow/api/common/delete_dag.py index c448127f2c484..d779d550ac47d 100644 --- a/airflow/api/common/delete_dag.py +++ b/airflow/api/common/delete_dag.py @@ -18,7 +18,7 @@ """Delete DAGs APIs.""" import logging -from sqlalchemy import or_ +from sqlalchemy import and_ from airflow import models from airflow.exceptions import AirflowException, DagNotFound @@ -54,6 +54,15 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i if dag is None: raise DagNotFound(f"Dag id {dag_id} not found") + # deleting a DAG should also all of its subdags + dags_to_delete = [ + d[0] + for d in session.query(DagModel.dag_id) + .filter(and_(DagModel.dag_id.like(dag_id + ".%"), DagModel.is_subdag)) + .all() + ] + dags_to_delete.append(dag_id) + # Scheduler removes DAGs without files from serialized_dag table every dag_dir_list_interval. # There may be a lag, so explicitly removes serialized DAG here. if SerializedDagModel.has_dag(dag_id=dag_id, session=session): @@ -65,8 +74,11 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i if hasattr(model, "dag_id"): if keep_records_in_log and model.__name__ == 'Log': continue - cond = or_(model.dag_id == dag_id, model.dag_id.like(dag_id + ".%")) - count += session.query(model).filter(cond).delete(synchronize_session='fetch') + count += ( + session.query(model) + .filter(model.dag_id.in_(dags_to_delete)) + .delete(synchronize_session='fetch') + ) if dag.is_subdag: parent_dag_id, task_id = dag_id.rsplit(".", 1) for model in TaskFail, models.TaskInstance: diff --git a/tests/api/common/test_delete_dag.py b/tests/api/common/test_delete_dag.py index 0eb058a18337a..d9dc0b0a01c7f 100644 --- a/tests/api/common/test_delete_dag.py +++ b/tests/api/common/test_delete_dag.py @@ -162,3 +162,17 @@ def test_delete_subdag_successful_delete(self): self.check_dag_models_exists() delete_dag(dag_id=self.key, keep_records_in_log=False) self.check_dag_models_removed(expect_logs=0) + + def test_delete_dag_preserves_other_dags(self): + + self.setup_dag_models() + + with create_session() as session: + session.add(DM(dag_id=self.key + ".other_dag", fileloc=self.dag_file_path)) + session.add(DM(dag_id=self.key + ".subdag", fileloc=self.dag_file_path, is_subdag=True)) + + delete_dag(self.key) + + with create_session() as session: + assert session.query(DM).filter(DM.dag_id == self.key + ".other_dag").count() == 1 + assert session.query(DM).filter(DM.dag_id.like(self.key + "%")).count() == 1 From ba38977459a3abdc68d97362acf605adec8dfb1d Mon Sep 17 00:00:00 2001 From: Sam Wheating Date: Sun, 9 Jan 2022 14:11:52 -0800 Subject: [PATCH 2/3] Build list of dags to delete in a single query --- airflow/api/common/delete_dag.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/airflow/api/common/delete_dag.py b/airflow/api/common/delete_dag.py index d779d550ac47d..70cc949528729 100644 --- a/airflow/api/common/delete_dag.py +++ b/airflow/api/common/delete_dag.py @@ -18,7 +18,7 @@ """Delete DAGs APIs.""" import logging -from sqlalchemy import and_ +from sqlalchemy import and_, or_ from airflow import models from airflow.exceptions import AirflowException, DagNotFound @@ -55,13 +55,13 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i raise DagNotFound(f"Dag id {dag_id} not found") # deleting a DAG should also all of its subdags - dags_to_delete = [ - d[0] - for d in session.query(DagModel.dag_id) - .filter(and_(DagModel.dag_id.like(dag_id + ".%"), DagModel.is_subdag)) - .all() - ] - dags_to_delete.append(dag_id) + dags_to_delete_query = session.query(DagModel.dag_id).filter( + or_( + DagModel.dag_id == dag_id, + and_(DagModel.dag_id.like(f"{dag_id}.%"), DagModel.is_subdag), + ) + ) + dags_to_delete = [dag_id for dag_id, in dags_to_delete_query] # Scheduler removes DAGs without files from serialized_dag table every dag_dir_list_interval. # There may be a lag, so explicitly removes serialized DAG here. From d444e22b48468935af808ddbf1f069afd6ab18b1 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Mon, 10 Jan 2022 06:37:26 +0100 Subject: [PATCH 3/3] Update airflow/api/common/delete_dag.py --- airflow/api/common/delete_dag.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/api/common/delete_dag.py b/airflow/api/common/delete_dag.py index 70cc949528729..5e0afa81cb5c9 100644 --- a/airflow/api/common/delete_dag.py +++ b/airflow/api/common/delete_dag.py @@ -54,7 +54,7 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i if dag is None: raise DagNotFound(f"Dag id {dag_id} not found") - # deleting a DAG should also all of its subdags + # deleting a DAG should also delete all of its subdags dags_to_delete_query = session.query(DagModel.dag_id).filter( or_( DagModel.dag_id == dag_id,