From 9981a9fb6880b065001f5122b1715b81077dc8e8 Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Fri, 7 Oct 2022 11:01:01 +0100 Subject: [PATCH] Run migrations with with a pool of a connection. Without this `create_session()` will open a new connection, and that causes mysql to hang waiting to get a "metadata lock on table". Using the "stock" pool with size=1 and max_overflow=0 doesn't work, that instead times out if you try to get a new connection from the pool. SingletonThreadPool instead returns the existing active connection which is what we want. --- airflow/migrations/env.py | 8 ++++++-- airflow/settings.py | 15 +++++++++------ airflow/utils/db.py | 17 ++++++++++++++++- 3 files changed, 31 insertions(+), 9 deletions(-) diff --git a/airflow/migrations/env.py b/airflow/migrations/env.py index 6474f0799c797..9dcd29e9ca0b5 100644 --- a/airflow/migrations/env.py +++ b/airflow/migrations/env.py @@ -17,6 +17,7 @@ # under the License. from __future__ import annotations +import contextlib from logging.config import fileConfig from alembic import context @@ -89,9 +90,12 @@ def run_migrations_online(): and associate a connection with the context. """ - connectable = settings.engine + with contextlib.ExitStack() as stack: + connection = config.attributes.get('connection', None) + + if not connection: + connection = stack.push(settings.engine.connect()) - with connectable.connect() as connection: context.configure( connection=connection, transaction_per_migration=True, diff --git a/airflow/settings.py b/airflow/settings.py index 10e963d5f760d..696b4b652ae25 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -259,14 +259,14 @@ def configure_vars(): DONOT_MODIFY_HANDLERS = conf.getboolean('logging', 'donot_modify_handlers', fallback=False) -def configure_orm(disable_connection_pool=False): +def configure_orm(disable_connection_pool=False, pool_class=None): """Configure ORM using SQLAlchemy""" from airflow.utils.log.secrets_masker import mask_secret log.debug("Setting up DB connection pool (PID %s)", os.getpid()) global engine global Session - engine_args = prepare_engine_args(disable_connection_pool) + engine_args = prepare_engine_args(disable_connection_pool, pool_class) if conf.has_option('database', 'sql_alchemy_connect_args'): connect_args = conf.getimport('database', 'sql_alchemy_connect_args') @@ -319,7 +319,7 @@ def configure_orm(disable_connection_pool=False): } -def prepare_engine_args(disable_connection_pool=False): +def prepare_engine_args(disable_connection_pool=False, pool_class=None): """Prepare SQLAlchemy engine args""" default_args = {} for dialect, default in DEFAULT_ENGINE_ARGS.items(): @@ -331,7 +331,10 @@ def prepare_engine_args(disable_connection_pool=False): 'database', 'sql_alchemy_engine_args', fallback=default_args ) # type: ignore - if disable_connection_pool or not conf.getboolean('database', 'SQL_ALCHEMY_POOL_ENABLED'): + if pool_class: + # Don't use separate settings for size etc, only those from sql_alchemy_engine_args + engine_args['poolclass'] = pool_class + elif disable_connection_pool or not conf.getboolean('database', 'SQL_ALCHEMY_POOL_ENABLED'): engine_args['poolclass'] = NullPool log.debug("settings.prepare_engine_args(): Using NullPool") elif not SQL_ALCHEMY_CONN.startswith('sqlite'): @@ -413,10 +416,10 @@ def dispose_orm(): engine = None -def reconfigure_orm(disable_connection_pool=False): +def reconfigure_orm(disable_connection_pool=False, pool_class=None): """Properly close database connections and re-configure ORM""" dispose_orm() - configure_orm(disable_connection_pool=disable_connection_pool) + configure_orm(disable_connection_pool=disable_connection_pool, pool_class=pool_class) def configure_adapters(): diff --git a/airflow/utils/db.py b/airflow/utils/db.py index e6dfcf9f2e101..f796156f8f96d 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -1539,8 +1539,23 @@ def upgradedb( initdb(session=session, load_connections=False) return with create_global_lock(session=session, lock=DBLocks.MIGRATIONS): + import sqlalchemy.pool + log.info("Creating tables") - command.upgrade(config, revision=to_revision or 'heads') + val = os.environ.get('AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE') + try: + # Reconfigure the ORM ot use _EXACTLY_ one connection, otherwise some db engines hang forever + # trying to ALTER TABLEs + os.environ['AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE'] = '1' + settings.reconfigure_orm(pool_class=sqlalchemy.pool.SingletonThreadPool) + command.upgrade(config, revision=to_revision or 'heads') + finally: + if val is None: + os.environ.pop('AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE') + else: + os.environ['AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE'] = val + settings.reconfigure_orm() + reserialize_dags(session=session) add_default_pool_if_not_exists(session=session) synchronize_log_template(session=session)