Skip to content

Commit

Permalink
Avoid unintentional data loss when deleting DAGs (#20758)
Browse files Browse the repository at this point in the history
(cherry picked from commit 5980d2b)
  • Loading branch information
SamWheating authored and jedcunningham committed Feb 17, 2022
1 parent 663bb54 commit 4dc8b90
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 3 deletions.
18 changes: 15 additions & 3 deletions airflow/api/common/delete_dag.py
Expand Up @@ -18,7 +18,7 @@
"""Delete DAGs APIs."""
import logging

from sqlalchemy import or_
from sqlalchemy import and_, or_

from airflow import models
from airflow.exceptions import AirflowException, DagNotFound
Expand Down Expand Up @@ -54,6 +54,15 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
if dag is None:
raise DagNotFound(f"Dag id {dag_id} not found")

# deleting a DAG should also delete all of its subdags
dags_to_delete_query = session.query(DagModel.dag_id).filter(
or_(
DagModel.dag_id == dag_id,
and_(DagModel.dag_id.like(f"{dag_id}.%"), DagModel.is_subdag),
)
)
dags_to_delete = [dag_id for dag_id, in dags_to_delete_query]

# Scheduler removes DAGs without files from serialized_dag table every dag_dir_list_interval.
# There may be a lag, so explicitly removes serialized DAG here.
if SerializedDagModel.has_dag(dag_id=dag_id, session=session):
Expand All @@ -65,8 +74,11 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
if hasattr(model, "dag_id"):
if keep_records_in_log and model.__name__ == 'Log':
continue
cond = or_(model.dag_id == dag_id, model.dag_id.like(dag_id + ".%"))
count += session.query(model).filter(cond).delete(synchronize_session='fetch')
count += (
session.query(model)
.filter(model.dag_id.in_(dags_to_delete))
.delete(synchronize_session='fetch')
)
if dag.is_subdag:
parent_dag_id, task_id = dag_id.rsplit(".", 1)
for model in TaskFail, models.TaskInstance:
Expand Down
14 changes: 14 additions & 0 deletions tests/api/common/test_delete_dag.py
Expand Up @@ -162,3 +162,17 @@ def test_delete_subdag_successful_delete(self):
self.check_dag_models_exists()
delete_dag(dag_id=self.key, keep_records_in_log=False)
self.check_dag_models_removed(expect_logs=0)

def test_delete_dag_preserves_other_dags(self):

self.setup_dag_models()

with create_session() as session:
session.add(DM(dag_id=self.key + ".other_dag", fileloc=self.dag_file_path))
session.add(DM(dag_id=self.key + ".subdag", fileloc=self.dag_file_path, is_subdag=True))

delete_dag(self.key)

with create_session() as session:
assert session.query(DM).filter(DM.dag_id == self.key + ".other_dag").count() == 1
assert session.query(DM).filter(DM.dag_id.like(self.key + "%")).count() == 1

0 comments on commit 4dc8b90

Please sign in to comment.