Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid unintentional data loss when deleting DAGs #20758

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 15 additions & 3 deletions airflow/api/common/delete_dag.py
Expand Up @@ -18,7 +18,7 @@
"""Delete DAGs APIs."""
import logging

from sqlalchemy import or_
from sqlalchemy import and_, or_

from airflow import models
from airflow.exceptions import AirflowException, DagNotFound
Expand Down Expand Up @@ -54,6 +54,15 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
if dag is None:
raise DagNotFound(f"Dag id {dag_id} not found")

# deleting a DAG should also delete all of its subdags
dags_to_delete_query = session.query(DagModel.dag_id).filter(
or_(
DagModel.dag_id == dag_id,
and_(DagModel.dag_id.like(f"{dag_id}.%"), DagModel.is_subdag),
)
)
dags_to_delete = [dag_id for dag_id, in dags_to_delete_query]
Comment on lines +58 to +64
Copy link
Member

@kaxil kaxil Jan 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        or_(
            DagModel.dag_id == dag_id,,
            DagModel.root_dag_id == dag_id

might have also worked @SamWheating


# Scheduler removes DAGs without files from serialized_dag table every dag_dir_list_interval.
# There may be a lag, so explicitly removes serialized DAG here.
if SerializedDagModel.has_dag(dag_id=dag_id, session=session):
Expand All @@ -65,8 +74,11 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
if hasattr(model, "dag_id"):
if keep_records_in_log and model.__name__ == 'Log':
continue
cond = or_(model.dag_id == dag_id, model.dag_id.like(dag_id + ".%"))
count += session.query(model).filter(cond).delete(synchronize_session='fetch')
count += (
session.query(model)
.filter(model.dag_id.in_(dags_to_delete))
.delete(synchronize_session='fetch')
)
if dag.is_subdag:
parent_dag_id, task_id = dag_id.rsplit(".", 1)
for model in TaskFail, models.TaskInstance:
Expand Down
14 changes: 14 additions & 0 deletions tests/api/common/test_delete_dag.py
Expand Up @@ -162,3 +162,17 @@ def test_delete_subdag_successful_delete(self):
self.check_dag_models_exists()
delete_dag(dag_id=self.key, keep_records_in_log=False)
self.check_dag_models_removed(expect_logs=0)

def test_delete_dag_preserves_other_dags(self):

self.setup_dag_models()

with create_session() as session:
session.add(DM(dag_id=self.key + ".other_dag", fileloc=self.dag_file_path))
session.add(DM(dag_id=self.key + ".subdag", fileloc=self.dag_file_path, is_subdag=True))

delete_dag(self.key)

with create_session() as session:
assert session.query(DM).filter(DM.dag_id == self.key + ".other_dag").count() == 1
assert session.query(DM).filter(DM.dag_id.like(self.key + "%")).count() == 1