Skip to content

Commit

Permalink
Fix sqlalchemy primary key black-out error on DDRQ (#27538)
Browse files Browse the repository at this point in the history
closes #27509
  • Loading branch information
ephraimbuddy committed Nov 7, 2022
1 parent 7297892 commit fc59b02
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 0 deletions.
1 change: 1 addition & 0 deletions airflow/models/dataset.py
Expand Up @@ -116,6 +116,7 @@ class DagScheduleDatasetReference(Base):
DagScheduleDatasetReference.dataset_id == foreign(DatasetDagRunQueue.dataset_id),
DagScheduleDatasetReference.dag_id == foreign(DatasetDagRunQueue.target_dag_id),
)""",
cascade="all, delete, delete-orphan",
)

__tablename__ = "dag_schedule_dataset_reference"
Expand Down
37 changes: 37 additions & 0 deletions tests/models/test_taskinstance.py
Expand Up @@ -1914,6 +1914,43 @@ def test_outlet_datasets_skipped(self, create_task_instance):
# check that no dataset events were generated
assert session.query(DatasetEvent).count() == 0

def test_changing_of_dataset_when_ddrq_is_already_populated(self, dag_maker, session):
"""
Test that when a task that produces dataset has ran, that changing the consumer
dag dataset will not cause primary key blank-out
"""
from airflow import Dataset

with dag_maker(schedule=None, serialized=True) as dag1:

@task(outlets=Dataset("test/1"))
def test_task1():
print(1)

test_task1()

dr1 = dag_maker.create_dagrun()
test_task1 = dag1.get_task("test_task1")

with dag_maker(dag_id="testdag", schedule=[Dataset("test/1")], serialized=True):

@task
def test_task2():
print(1)

test_task2()

ti = dr1.get_task_instance(task_id="test_task1")
ti.run()
# Change the dataset.
with dag_maker(dag_id="testdag", schedule=[Dataset("test2/1")], serialized=True):

@task
def test_task2():
print(1)

test_task2()

@staticmethod
def _test_previous_dates_setup(
schedule_interval: str | datetime.timedelta | None,
Expand Down

0 comments on commit fc59b02

Please sign in to comment.