From 4dc8b909bedd04094be3079c3f7384ea044ec011 Mon Sep 17 00:00:00 2001 From: Sam Wheating Date: Mon, 10 Jan 2022 11:55:51 -0800 Subject: [PATCH] Avoid unintentional data loss when deleting DAGs (#20758) (cherry picked from commit 5980d2b05eee484256c634d5efae9410265c65e9) --- airflow/api/common/delete_dag.py | 18 +++++++++++++++--- tests/api/common/test_delete_dag.py | 14 ++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/airflow/api/common/delete_dag.py b/airflow/api/common/delete_dag.py index c448127f2c484..5e0afa81cb5c9 100644 --- a/airflow/api/common/delete_dag.py +++ b/airflow/api/common/delete_dag.py @@ -18,7 +18,7 @@ """Delete DAGs APIs.""" import logging -from sqlalchemy import or_ +from sqlalchemy import and_, or_ from airflow import models from airflow.exceptions import AirflowException, DagNotFound @@ -54,6 +54,15 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i if dag is None: raise DagNotFound(f"Dag id {dag_id} not found") + # deleting a DAG should also delete all of its subdags + dags_to_delete_query = session.query(DagModel.dag_id).filter( + or_( + DagModel.dag_id == dag_id, + and_(DagModel.dag_id.like(f"{dag_id}.%"), DagModel.is_subdag), + ) + ) + dags_to_delete = [dag_id for dag_id, in dags_to_delete_query] + # Scheduler removes DAGs without files from serialized_dag table every dag_dir_list_interval. # There may be a lag, so explicitly removes serialized DAG here. if SerializedDagModel.has_dag(dag_id=dag_id, session=session): @@ -65,8 +74,11 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i if hasattr(model, "dag_id"): if keep_records_in_log and model.__name__ == 'Log': continue - cond = or_(model.dag_id == dag_id, model.dag_id.like(dag_id + ".%")) - count += session.query(model).filter(cond).delete(synchronize_session='fetch') + count += ( + session.query(model) + .filter(model.dag_id.in_(dags_to_delete)) + .delete(synchronize_session='fetch') + ) if dag.is_subdag: parent_dag_id, task_id = dag_id.rsplit(".", 1) for model in TaskFail, models.TaskInstance: diff --git a/tests/api/common/test_delete_dag.py b/tests/api/common/test_delete_dag.py index 0eb058a18337a..d9dc0b0a01c7f 100644 --- a/tests/api/common/test_delete_dag.py +++ b/tests/api/common/test_delete_dag.py @@ -162,3 +162,17 @@ def test_delete_subdag_successful_delete(self): self.check_dag_models_exists() delete_dag(dag_id=self.key, keep_records_in_log=False) self.check_dag_models_removed(expect_logs=0) + + def test_delete_dag_preserves_other_dags(self): + + self.setup_dag_models() + + with create_session() as session: + session.add(DM(dag_id=self.key + ".other_dag", fileloc=self.dag_file_path)) + session.add(DM(dag_id=self.key + ".subdag", fileloc=self.dag_file_path, is_subdag=True)) + + delete_dag(self.key) + + with create_session() as session: + assert session.query(DM).filter(DM.dag_id == self.key + ".other_dag").count() == 1 + assert session.query(DM).filter(DM.dag_id.like(self.key + "%")).count() == 1