Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove TaskFail duplicates check #26714

Merged
merged 3 commits into from Sep 28, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 19 additions & 20 deletions airflow/utils/db.py
Expand Up @@ -940,31 +940,31 @@ def reflect_tables(tables: list[Base | str] | None, session):
return metadata


def check_task_fail_for_duplicates(session):
"""Check that there are no duplicates in the task_fail table before creating FK"""
from airflow.models.taskfail import TaskFail

metadata = reflect_tables([TaskFail], session)
task_fail = metadata.tables.get(TaskFail.__tablename__) # type: ignore
if task_fail is None: # table not there
return
if "run_id" in task_fail.columns: # upgrade already applied
return
yield from check_table_for_duplicates(
table_name=task_fail.name,
uniqueness=['dag_id', 'task_id', 'execution_date'],
session=session,
version='2.3',
)


def check_table_for_duplicates(
*, session: Session, table_name: str, uniqueness: list[str], version: str
) -> Iterable[str]:
"""
Check table for duplicates, given a list of columns which define the uniqueness of the table.

Call from ``run_duplicates_checks``.
Usage example:

.. code-block:: python

def check_task_fail_for_duplicates(session):
from airflow.models.taskfail import TaskFail

metadata = reflect_tables([TaskFail], session)
task_fail = metadata.tables.get(TaskFail.__tablename__) # type: ignore
if task_fail is None: # table not there
return
if "run_id" in task_fail.columns: # upgrade already applied
return
yield from check_table_for_duplicates(
table_name=task_fail.name,
uniqueness=['dag_id', 'task_id', 'execution_date'],
session=session,
version='2.3',
)

:param table_name: table name to check
:param uniqueness: uniqueness constraint to evaluate against
Expand Down Expand Up @@ -1387,7 +1387,6 @@ def _check_migration_errors(session: Session = NEW_SESSION) -> Iterable[str]:
:rtype: list[str]
"""
check_functions: tuple[Callable[..., Iterable[str]], ...] = (
check_task_fail_for_duplicates,
check_conn_id_duplicates,
check_conn_type_null,
check_run_id_null,
Expand Down