From b9b9e99e14764aa079301a091b23c765799c9eb3 Mon Sep 17 00:00:00 2001 From: Jaroslaw Potiuk Date: Fri, 3 Dec 2021 23:48:39 +0100 Subject: [PATCH] Adds retry on taskinstance retrieval lock Fixes: #19832 --- airflow/models/taskinstance.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 7a69a4fdaa177..26387391088f3 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -91,6 +91,7 @@ from airflow.utils.net import get_hostname from airflow.utils.operator_helpers import context_to_airflow_vars from airflow.utils.platform import getuser +from airflow.utils.retries import run_with_db_retries from airflow.utils.session import create_session, provide_session from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime from airflow.utils.state import DagRunState, State @@ -723,7 +724,9 @@ def refresh_from_db(self, session=None, lock_for_update=False) -> None: ) if lock_for_update: - ti: Optional[TaskInstance] = qry.with_for_update().first() + for attempt in run_with_db_retries(logger=self.log): + with attempt: + ti: Optional[TaskInstance] = qry.with_for_update().first() else: ti = qry.first() if ti: