Skip to content

Commit

Permalink
fixup! Don't mistakenly take a lock on DagRun via ti.refresh_from_fb
Browse files Browse the repository at this point in the history
  • Loading branch information
ashb committed Jul 28, 2022
1 parent 7a7d2ac commit 0984660
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 1 deletion.
4 changes: 4 additions & 0 deletions airflow/models/taskinstance.py
Expand Up @@ -288,6 +288,7 @@ def clear_task_instances(
if dag_run_state == DagRunState.QUEUED:
dr.last_scheduling_decision = None
dr.start_date = None
session.flush()


class _LazyXComAccessIterator(collections.abc.Iterator):
Expand Down Expand Up @@ -848,6 +849,9 @@ def refresh_from_db(self, session: Session = NEW_SESSION, lock_for_update: bool
"""
self.log.debug("Refreshing TaskInstance %s from DB", self)

if self in session:
session.refresh(self, TaskInstance.__mapper__.column_attrs.keys())

qry = (
# To avoid joining any relationships by default select the all
# columns, not the object. This also means we get (effectively) a
Expand Down
4 changes: 3 additions & 1 deletion tests/jobs/test_scheduler_job.py
Expand Up @@ -458,6 +458,7 @@ def test_execute_task_instances_is_paused_wont_execute(self, session, dag_maker)
ti1.state = State.SCHEDULED

self.scheduler_job._critical_section_enqueue_task_instances(session)
session.flush()
ti1.refresh_from_db(session=session)
assert State.SCHEDULED == ti1.state
session.rollback()
Expand Down Expand Up @@ -1316,7 +1317,8 @@ def test_enqueue_task_instances_sets_ti_state_to_None_if_dagrun_in_finish_state(

with patch.object(BaseExecutor, 'queue_command') as mock_queue_command:
self.scheduler_job._enqueue_task_instances_with_queued_state([ti], session=session)
ti.refresh_from_db()
session.flush()
ti.refresh_from_db(session=session)
assert ti.state == State.NONE
mock_queue_command.assert_not_called()

Expand Down

0 comments on commit 0984660

Please sign in to comment.