From 098466038359606ef5e10a0fe318aca4a303ce5c Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Thu, 28 Jul 2022 11:46:34 +0100 Subject: [PATCH] fixup! Don't mistakenly take a lock on DagRun via ti.refresh_from_fb --- airflow/models/taskinstance.py | 4 ++++ tests/jobs/test_scheduler_job.py | 4 +++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 2973aa4a8c9aa..b64d1a5f47887 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -288,6 +288,7 @@ def clear_task_instances( if dag_run_state == DagRunState.QUEUED: dr.last_scheduling_decision = None dr.start_date = None + session.flush() class _LazyXComAccessIterator(collections.abc.Iterator): @@ -848,6 +849,9 @@ def refresh_from_db(self, session: Session = NEW_SESSION, lock_for_update: bool """ self.log.debug("Refreshing TaskInstance %s from DB", self) + if self in session: + session.refresh(self, TaskInstance.__mapper__.column_attrs.keys()) + qry = ( # To avoid joining any relationships by default select the all # columns, not the object. This also means we get (effectively) a diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index df8c0f6c7160f..8d6a779c1d30f 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -458,6 +458,7 @@ def test_execute_task_instances_is_paused_wont_execute(self, session, dag_maker) ti1.state = State.SCHEDULED self.scheduler_job._critical_section_enqueue_task_instances(session) + session.flush() ti1.refresh_from_db(session=session) assert State.SCHEDULED == ti1.state session.rollback() @@ -1316,7 +1317,8 @@ def test_enqueue_task_instances_sets_ti_state_to_None_if_dagrun_in_finish_state( with patch.object(BaseExecutor, 'queue_command') as mock_queue_command: self.scheduler_job._enqueue_task_instances_with_queued_state([ti], session=session) - ti.refresh_from_db() + session.flush() + ti.refresh_from_db(session=session) assert ti.state == State.NONE mock_queue_command.assert_not_called()