Skip to content

Commit

Permalink
Run migrations with with a pool of a connection.
Browse files Browse the repository at this point in the history
Without this `create_session()` will open a new connection, and that causes mysql
to hang waiting to get a "metadata lock on table".

Using the "stock" pool with size=1 and max_overflow=0 doesn't work, that
instead times out if you try to get a new connection from the pool.
SingletonThreadPool instead returns the existing active connection which
is what we want.
  • Loading branch information
ashb committed Oct 7, 2022
1 parent 7c7dff9 commit 9981a9f
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 9 deletions.
8 changes: 6 additions & 2 deletions airflow/migrations/env.py
Expand Up @@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations

import contextlib
from logging.config import fileConfig

from alembic import context
Expand Down Expand Up @@ -89,9 +90,12 @@ def run_migrations_online():
and associate a connection with the context.
"""
connectable = settings.engine
with contextlib.ExitStack() as stack:
connection = config.attributes.get('connection', None)

if not connection:
connection = stack.push(settings.engine.connect())

with connectable.connect() as connection:
context.configure(
connection=connection,
transaction_per_migration=True,
Expand Down
15 changes: 9 additions & 6 deletions airflow/settings.py
Expand Up @@ -259,14 +259,14 @@ def configure_vars():
DONOT_MODIFY_HANDLERS = conf.getboolean('logging', 'donot_modify_handlers', fallback=False)


def configure_orm(disable_connection_pool=False):
def configure_orm(disable_connection_pool=False, pool_class=None):
"""Configure ORM using SQLAlchemy"""
from airflow.utils.log.secrets_masker import mask_secret

log.debug("Setting up DB connection pool (PID %s)", os.getpid())
global engine
global Session
engine_args = prepare_engine_args(disable_connection_pool)
engine_args = prepare_engine_args(disable_connection_pool, pool_class)

if conf.has_option('database', 'sql_alchemy_connect_args'):
connect_args = conf.getimport('database', 'sql_alchemy_connect_args')
Expand Down Expand Up @@ -319,7 +319,7 @@ def configure_orm(disable_connection_pool=False):
}


def prepare_engine_args(disable_connection_pool=False):
def prepare_engine_args(disable_connection_pool=False, pool_class=None):
"""Prepare SQLAlchemy engine args"""
default_args = {}
for dialect, default in DEFAULT_ENGINE_ARGS.items():
Expand All @@ -331,7 +331,10 @@ def prepare_engine_args(disable_connection_pool=False):
'database', 'sql_alchemy_engine_args', fallback=default_args
) # type: ignore

if disable_connection_pool or not conf.getboolean('database', 'SQL_ALCHEMY_POOL_ENABLED'):
if pool_class:
# Don't use separate settings for size etc, only those from sql_alchemy_engine_args
engine_args['poolclass'] = pool_class
elif disable_connection_pool or not conf.getboolean('database', 'SQL_ALCHEMY_POOL_ENABLED'):
engine_args['poolclass'] = NullPool
log.debug("settings.prepare_engine_args(): Using NullPool")
elif not SQL_ALCHEMY_CONN.startswith('sqlite'):
Expand Down Expand Up @@ -413,10 +416,10 @@ def dispose_orm():
engine = None


def reconfigure_orm(disable_connection_pool=False):
def reconfigure_orm(disable_connection_pool=False, pool_class=None):
"""Properly close database connections and re-configure ORM"""
dispose_orm()
configure_orm(disable_connection_pool=disable_connection_pool)
configure_orm(disable_connection_pool=disable_connection_pool, pool_class=pool_class)


def configure_adapters():
Expand Down
17 changes: 16 additions & 1 deletion airflow/utils/db.py
Expand Up @@ -1539,8 +1539,23 @@ def upgradedb(
initdb(session=session, load_connections=False)
return
with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
import sqlalchemy.pool

log.info("Creating tables")
command.upgrade(config, revision=to_revision or 'heads')
val = os.environ.get('AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE')
try:
# Reconfigure the ORM ot use _EXACTLY_ one connection, otherwise some db engines hang forever
# trying to ALTER TABLEs
os.environ['AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE'] = '1'
settings.reconfigure_orm(pool_class=sqlalchemy.pool.SingletonThreadPool)
command.upgrade(config, revision=to_revision or 'heads')
finally:
if val is None:
os.environ.pop('AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE')
else:
os.environ['AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE'] = val
settings.reconfigure_orm()

reserialize_dags(session=session)
add_default_pool_if_not_exists(session=session)
synchronize_log_template(session=session)
Expand Down

0 comments on commit 9981a9f

Please sign in to comment.