diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py index b1a58e5442d87..c9cc3eca2a7b3 100644 --- a/airflow/models/dataset.py +++ b/airflow/models/dataset.py @@ -116,6 +116,7 @@ class DagScheduleDatasetReference(Base): DagScheduleDatasetReference.dataset_id == foreign(DatasetDagRunQueue.dataset_id), DagScheduleDatasetReference.dag_id == foreign(DatasetDagRunQueue.target_dag_id), )""", + cascade="all, delete, delete-orphan", ) __tablename__ = "dag_schedule_dataset_reference" diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index c167e4d5b5c7d..bc670af122814 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -1914,6 +1914,43 @@ def test_outlet_datasets_skipped(self, create_task_instance): # check that no dataset events were generated assert session.query(DatasetEvent).count() == 0 + def test_changing_of_dataset_when_ddrq_is_already_populated(self, dag_maker, session): + """ + Test that when a task that produces dataset has ran, that changing the consumer + dag dataset will not cause primary key blank-out + """ + from airflow import Dataset + + with dag_maker(schedule=None, serialized=True) as dag1: + + @task(outlets=Dataset("test/1")) + def test_task1(): + print(1) + + test_task1() + + dr1 = dag_maker.create_dagrun() + test_task1 = dag1.get_task("test_task1") + + with dag_maker(dag_id="testdag", schedule=[Dataset("test/1")], serialized=True): + + @task + def test_task2(): + print(1) + + test_task2() + + ti = dr1.get_task_instance(task_id="test_task1") + ti.run() + # Change the dataset. + with dag_maker(dag_id="testdag", schedule=[Dataset("test2/1")], serialized=True): + + @task + def test_task2(): + print(1) + + test_task2() + @staticmethod def _test_previous_dates_setup( schedule_interval: str | datetime.timedelta | None,