diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 3acbaf6f221cb..cec3d58c364b9 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -941,31 +941,31 @@ def reflect_tables(tables: list[Base | str] | None, session): return metadata -def check_task_fail_for_duplicates(session): - """Check that there are no duplicates in the task_fail table before creating FK""" - from airflow.models.taskfail import TaskFail - - metadata = reflect_tables([TaskFail], session) - task_fail = metadata.tables.get(TaskFail.__tablename__) # type: ignore - if task_fail is None: # table not there - return - if "run_id" in task_fail.columns: # upgrade already applied - return - yield from check_table_for_duplicates( - table_name=task_fail.name, - uniqueness=['dag_id', 'task_id', 'execution_date'], - session=session, - version='2.3', - ) - - def check_table_for_duplicates( *, session: Session, table_name: str, uniqueness: list[str], version: str ) -> Iterable[str]: """ Check table for duplicates, given a list of columns which define the uniqueness of the table. - Call from ``run_duplicates_checks``. + Usage example: + + .. code-block:: python + + def check_task_fail_for_duplicates(session): + from airflow.models.taskfail import TaskFail + + metadata = reflect_tables([TaskFail], session) + task_fail = metadata.tables.get(TaskFail.__tablename__) # type: ignore + if task_fail is None: # table not there + return + if "run_id" in task_fail.columns: # upgrade already applied + return + yield from check_table_for_duplicates( + table_name=task_fail.name, + uniqueness=['dag_id', 'task_id', 'execution_date'], + session=session, + version='2.3', + ) :param table_name: table name to check :param uniqueness: uniqueness constraint to evaluate against @@ -1388,7 +1388,6 @@ def _check_migration_errors(session: Session = NEW_SESSION) -> Iterable[str]: :rtype: list[str] """ check_functions: tuple[Callable[..., Iterable[str]], ...] = ( - check_task_fail_for_duplicates, check_conn_id_duplicates, check_conn_type_null, check_run_id_null,