Skip to content

Commit

Permalink
Don't rely on current ORM structure for db clean command (#23574)
Browse files Browse the repository at this point in the history
For command DB clean, by not relying on the ORM models, we will be able to use the command even when the metadatabase is not yet upgraded to the version of Airflow you have installed.

Additionally we archive all rows before deletion.
  • Loading branch information
dstandish committed Jun 17, 2022
1 parent 376faef commit 95bd6b7
Show file tree
Hide file tree
Showing 9 changed files with 257 additions and 128 deletions.
6 changes: 6 additions & 0 deletions airflow/cli/cli_parser.py
Expand Up @@ -434,6 +434,11 @@ def string_lower_type(val):
help="Perform a dry run",
action="store_true",
)
ARG_DB_SKIP_ARCHIVE = Arg(
("--skip-archive",),
help="Don't preserve purged records in an archive table.",
action="store_true",
)


# pool
Expand Down Expand Up @@ -1452,6 +1457,7 @@ class GroupCommand(NamedTuple):
ARG_DB_CLEANUP_TIMESTAMP,
ARG_VERBOSE,
ARG_YES,
ARG_DB_SKIP_ARCHIVE,
),
),
)
Expand Down
1 change: 1 addition & 0 deletions airflow/cli/commands/db_command.py
Expand Up @@ -198,4 +198,5 @@ def cleanup_tables(args):
clean_before_timestamp=args.clean_before_timestamp,
verbose=args.verbose,
confirm=not args.yes,
skip_archive=args.skip_archive,
)
17 changes: 10 additions & 7 deletions airflow/utils/db.py
Expand Up @@ -879,7 +879,7 @@ def check_conn_id_duplicates(session: Session) -> Iterable[str]:
)


def reflect_tables(tables: List[Union[Base, str]], session):
def reflect_tables(tables: Optional[List[Union[Base, str]]], session):
"""
When running checks prior to upgrades, we use reflection to determine current state of the
database.
Expand All @@ -890,12 +890,15 @@ def reflect_tables(tables: List[Union[Base, str]], session):

metadata = sqlalchemy.schema.MetaData(session.bind)

for tbl in tables:
try:
table_name = tbl if isinstance(tbl, str) else tbl.__tablename__
metadata.reflect(only=[table_name], extend_existing=True, resolve_fks=False)
except exc.InvalidRequestError:
continue
if tables is None:
metadata.reflect(resolve_fks=False)
else:
for tbl in tables:
try:
table_name = tbl if isinstance(tbl, str) else tbl.__tablename__
metadata.reflect(only=[table_name], extend_existing=True, resolve_fks=False)
except exc.InvalidRequestError:
continue
return metadata


Expand Down

0 comments on commit 95bd6b7

Please sign in to comment.