From fb86c6a618947803b2d1c4692d1594e394b8fa1b Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 27 Sep 2022 09:36:53 -0700 Subject: [PATCH 1/2] Remove TaskFail duplicates check This check was added in error. The duplicates are supposed to be there. The task duration and gantt views process TaskFail records with the apparent intention of including durations from non-final failed attempts to the overall duration of the run. --- airflow/utils/db.py | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 6aa284026402f..777346f35ced7 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -940,32 +940,12 @@ def reflect_tables(tables: list[Base | str] | None, session): return metadata -def check_task_fail_for_duplicates(session): - """Check that there are no duplicates in the task_fail table before creating FK""" - from airflow.models.taskfail import TaskFail - - metadata = reflect_tables([TaskFail], session) - task_fail = metadata.tables.get(TaskFail.__tablename__) # type: ignore - if task_fail is None: # table not there - return - if "run_id" in task_fail.columns: # upgrade already applied - return - yield from check_table_for_duplicates( - table_name=task_fail.name, - uniqueness=['dag_id', 'task_id', 'execution_date'], - session=session, - version='2.3', - ) - - def check_table_for_duplicates( *, session: Session, table_name: str, uniqueness: list[str], version: str ) -> Iterable[str]: """ Check table for duplicates, given a list of columns which define the uniqueness of the table. - Call from ``run_duplicates_checks``. - :param table_name: table name to check :param uniqueness: uniqueness constraint to evaluate against :param session: session of the sqlalchemy @@ -1387,7 +1367,6 @@ def _check_migration_errors(session: Session = NEW_SESSION) -> Iterable[str]: :rtype: list[str] """ check_functions: tuple[Callable[..., Iterable[str]], ...] = ( - check_task_fail_for_duplicates, check_conn_id_duplicates, check_conn_type_null, check_run_id_null, From 9aa24ba1bb26fac935fd171f66ed660e2b0ce4c6 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 27 Sep 2022 09:42:20 -0700 Subject: [PATCH 2/2] add usage example --- airflow/utils/db.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 777346f35ced7..9e48b3465974c 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -946,6 +946,26 @@ def check_table_for_duplicates( """ Check table for duplicates, given a list of columns which define the uniqueness of the table. + Usage example: + + .. code-block:: python + + def check_task_fail_for_duplicates(session): + from airflow.models.taskfail import TaskFail + + metadata = reflect_tables([TaskFail], session) + task_fail = metadata.tables.get(TaskFail.__tablename__) # type: ignore + if task_fail is None: # table not there + return + if "run_id" in task_fail.columns: # upgrade already applied + return + yield from check_table_for_duplicates( + table_name=task_fail.name, + uniqueness=['dag_id', 'task_id', 'execution_date'], + session=session, + version='2.3', + ) + :param table_name: table name to check :param uniqueness: uniqueness constraint to evaluate against :param session: session of the sqlalchemy