From b94db52c3497e04747e63fce85dc08dd7f4657fc Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 27 Sep 2022 22:12:54 -0700 Subject: [PATCH] Remove TaskFail duplicates check (#26714) This check was added in error. The duplicates are supposed to be there. The task duration and gantt views process TaskFail records with the apparent intention of including durations from non-final failed attempts to the overall duration of the run. (cherry picked from commit ee21c1bac4cb5bb1c19ea9e5e84ee9b5854ab039) --- airflow/utils/db.py | 37 +++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 3853c07c3b4d5..3ee81296d2181 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -926,29 +926,31 @@ def reflect_tables(tables: Optional[List[Union[Base, str]]], session): return metadata -def check_task_fail_for_duplicates(session): - """Check that there are no duplicates in the task_fail table before creating FK""" - metadata = reflect_tables([TaskFail], session) - task_fail = metadata.tables.get(TaskFail.__tablename__) # type: ignore - if task_fail is None: # table not there - return - if "run_id" in task_fail.columns: # upgrade already applied - return - yield from check_table_for_duplicates( - table_name=task_fail.name, - uniqueness=['dag_id', 'task_id', 'execution_date'], - session=session, - version='2.3', - ) - - def check_table_for_duplicates( *, session: Session, table_name: str, uniqueness: List[str], version: str ) -> Iterable[str]: """ Check table for duplicates, given a list of columns which define the uniqueness of the table. - Call from ``run_duplicates_checks``. + Usage example: + + .. code-block:: python + + def check_task_fail_for_duplicates(session): + from airflow.models.taskfail import TaskFail + + metadata = reflect_tables([TaskFail], session) + task_fail = metadata.tables.get(TaskFail.__tablename__) # type: ignore + if task_fail is None: # table not there + return + if "run_id" in task_fail.columns: # upgrade already applied + return + yield from check_table_for_duplicates( + table_name=task_fail.name, + uniqueness=['dag_id', 'task_id', 'execution_date'], + session=session, + version='2.3', + ) :param table_name: table name to check :param uniqueness: uniqueness constraint to evaluate against @@ -1393,7 +1395,6 @@ def _check_migration_errors(session: Session = NEW_SESSION) -> Iterable[str]: :rtype: list[str] """ check_functions: Tuple[Callable[..., Iterable[str]], ...] = ( - check_task_fail_for_duplicates, check_conn_id_duplicates, check_conn_type_null, check_run_id_null,