Skip to content

Commit

Permalink
Don't mistakenly take a lock on DagRun via ti.refresh_from_fb
Browse files Browse the repository at this point in the history
In 2.2.0 we made TI.dag_run be automatically join-loaded, which is fine
for most cases, but for `refresh_from_db` we don't need that (we don't
access anything under ti.dag_run) and it's possible that when
`lock_for_update=True` is passed we are locking more than we want to and
_might_ cause deadlocks.

Even if it doesn't, selecting more than we need is wasteful.
  • Loading branch information
ashb committed Jul 28, 2022
1 parent a0aa93f commit 7a7d2ac
Showing 1 changed file with 14 additions and 10 deletions.
24 changes: 14 additions & 10 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -848,28 +848,32 @@ def refresh_from_db(self, session: Session = NEW_SESSION, lock_for_update: bool
"""
self.log.debug("Refreshing TaskInstance %s from DB", self)

qry = session.query(TaskInstance).filter(
TaskInstance.dag_id == self.dag_id,
TaskInstance.task_id == self.task_id,
TaskInstance.run_id == self.run_id,
TaskInstance.map_index == self.map_index,
qry = (
# To avoid joining any relationships by default select the all
# columns, not the object. This also means we get (effectively) a
# namedtuple back, not a TI object
session.query(*TaskInstance.__table__.columns).filter(
TaskInstance.dag_id == self.dag_id,
TaskInstance.task_id == self.task_id,
TaskInstance.run_id == self.run_id,
TaskInstance.map_index == self.map_index,
)
)

if lock_for_update:
for attempt in run_with_db_retries(logger=self.log):
with attempt:
ti: Optional[TaskInstance] = qry.with_for_update().first()
ti: Optional[TaskInstance] = qry.with_for_update().one_or_none()
else:
ti = qry.first()
ti = qry.one_or_none()
if ti:
# Fields ordered per model definition
self.start_date = ti.start_date
self.end_date = ti.end_date
self.duration = ti.duration
self.state = ti.state
# Get the raw value of try_number column, don't read through the
# accessor here otherwise it will be incremented by one already.
self.try_number = ti._try_number
# Since we selected columns, not the object, this is the raw value
self.try_number = ti.try_number
self.max_tries = ti.max_tries
self.hostname = ti.hostname
self.unixname = ti.unixname
Expand Down

0 comments on commit 7a7d2ac

Please sign in to comment.