Skip to content

Commit

Permalink
Add missing AUTOINC/SERIAL for FAB tables (#26885)
Browse files Browse the repository at this point in the history
* Add missing AUTOINC/SERIAL for FAB tables

In 1.10.13 we introduced a migration that creates the tables with the
server_default but that migration only did anything if the tables didn't
already exist. But the tables created by the FAB model have a default
(but not a server_default).

Oh, and the final bit of the puzzle, in 2.4 we finally "took control" of
the FAB security models in to airflow and those do not have the default
set.

* Update airflow/migrations/versions/0118_2_4_2_add_missing_autoinc_fab.py

* Fix static checks

* Run migrations with with a pool of a connection.

Without this `create_session()` will open a new connection, and that causes mysql
to hang waiting to get a "metadata lock on table".

Using the "stock" pool with size=1 and max_overflow=0 doesn't work, that
instead times out if you try to get a new connection from the pool.
SingletonThreadPool instead returns the existing active connection which
is what we want.

Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
  • Loading branch information
ashb and uranusjr committed Oct 7, 2022
1 parent 2f326a6 commit 7efdeed
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 13 deletions.
8 changes: 6 additions & 2 deletions airflow/migrations/env.py
Expand Up @@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations

import contextlib
from logging.config import fileConfig

from alembic import context
Expand Down Expand Up @@ -89,9 +90,12 @@ def run_migrations_online():
and associate a connection with the context.
"""
connectable = settings.engine
with contextlib.ExitStack() as stack:
connection = config.attributes.get('connection', None)

if not connection:
connection = stack.push(settings.engine.connect())

with connectable.connect() as connection:
context.configure(
connection=connection,
transaction_per_migration=True,
Expand Down
78 changes: 78 additions & 0 deletions airflow/migrations/versions/0118_2_4_2_add_missing_autoinc_fab.py
@@ -0,0 +1,78 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Add missing auto-increment to columns on FAB tables
Revision ID: b0d31815b5a6
Revises: ecb43d2a1842
Create Date: 2022-10-05 13:16:45.638490
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = 'b0d31815b5a6'
down_revision = 'ecb43d2a1842'
branch_labels = None
depends_on = None
airflow_version = '2.4.2'


def upgrade():
"""Apply migration.
If these columns are already of the right type (i.e. created by our
migration in 1.10.13 rather than FAB itself in an earlier version), this
migration will issue an alter statement to change them to what they already
are -- i.e. its a no-op.
These tables are small (100 to low 1k rows at most), so it's not too costly
to change them.
"""
conn = op.get_bind()
if conn.dialect.name in ['mssql', 'sqlite']:
# 1.10.12 didn't support SQL Server, so it couldn't have gotten this wrong --> nothing to correct
# SQLite autoinc was "implicit" for an INTEGER NOT NULL PRIMARY KEY
return

for table in (
'ab_permission',
'ab_view_menu',
'ab_role',
'ab_permission_view',
'ab_permission_view_role',
'ab_user',
'ab_user_role',
'ab_register_user',
):
with op.batch_alter_table(table) as batch:
kwargs = {}
if conn.dialect.name == 'postgresql':
kwargs['type_'] = sa.Sequence(f'{table}_id_seq').next_value()
else:
kwargs['autoincrement'] = True
batch.alter_column("id", existing_type=sa.Integer(), existing_nullable=False, **kwargs)


def downgrade():
"""Unapply add_missing_autoinc_fab"""
# No downgrade needed, these _should_ have applied from 1.10.13 but didn't due to a previous bug!
Expand Up @@ -19,7 +19,7 @@
"""Add updated_at column to DagRun and TaskInstance
Revision ID: ee8d93fcc81e
Revises: ecb43d2a1842
Revises: b0d31815b5a6
Create Date: 2022-09-08 19:08:37.623121
"""
Expand All @@ -33,7 +33,7 @@

# revision identifiers, used by Alembic.
revision = 'ee8d93fcc81e'
down_revision = 'ecb43d2a1842'
down_revision = 'b0d31815b5a6'
branch_labels = None
depends_on = None
airflow_version = '2.5.0'
Expand Down
15 changes: 9 additions & 6 deletions airflow/settings.py
Expand Up @@ -259,14 +259,14 @@ def configure_vars():
DONOT_MODIFY_HANDLERS = conf.getboolean('logging', 'donot_modify_handlers', fallback=False)


def configure_orm(disable_connection_pool=False):
def configure_orm(disable_connection_pool=False, pool_class=None):
"""Configure ORM using SQLAlchemy"""
from airflow.utils.log.secrets_masker import mask_secret

log.debug("Setting up DB connection pool (PID %s)", os.getpid())
global engine
global Session
engine_args = prepare_engine_args(disable_connection_pool)
engine_args = prepare_engine_args(disable_connection_pool, pool_class)

if conf.has_option('database', 'sql_alchemy_connect_args'):
connect_args = conf.getimport('database', 'sql_alchemy_connect_args')
Expand Down Expand Up @@ -319,7 +319,7 @@ def configure_orm(disable_connection_pool=False):
}


def prepare_engine_args(disable_connection_pool=False):
def prepare_engine_args(disable_connection_pool=False, pool_class=None):
"""Prepare SQLAlchemy engine args"""
default_args = {}
for dialect, default in DEFAULT_ENGINE_ARGS.items():
Expand All @@ -331,7 +331,10 @@ def prepare_engine_args(disable_connection_pool=False):
'database', 'sql_alchemy_engine_args', fallback=default_args
) # type: ignore

if disable_connection_pool or not conf.getboolean('database', 'SQL_ALCHEMY_POOL_ENABLED'):
if pool_class:
# Don't use separate settings for size etc, only those from sql_alchemy_engine_args
engine_args['poolclass'] = pool_class
elif disable_connection_pool or not conf.getboolean('database', 'SQL_ALCHEMY_POOL_ENABLED'):
engine_args['poolclass'] = NullPool
log.debug("settings.prepare_engine_args(): Using NullPool")
elif not SQL_ALCHEMY_CONN.startswith('sqlite'):
Expand Down Expand Up @@ -413,10 +416,10 @@ def dispose_orm():
engine = None


def reconfigure_orm(disable_connection_pool=False):
def reconfigure_orm(disable_connection_pool=False, pool_class=None):
"""Properly close database connections and re-configure ORM"""
dispose_orm()
configure_orm(disable_connection_pool=disable_connection_pool)
configure_orm(disable_connection_pool=disable_connection_pool, pool_class=pool_class)


def configure_adapters():
Expand Down
17 changes: 16 additions & 1 deletion airflow/utils/db.py
Expand Up @@ -1539,8 +1539,23 @@ def upgradedb(
initdb(session=session, load_connections=False)
return
with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
import sqlalchemy.pool

log.info("Creating tables")
command.upgrade(config, revision=to_revision or 'heads')
val = os.environ.get('AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE')
try:
# Reconfigure the ORM ot use _EXACTLY_ one connection, otherwise some db engines hang forever
# trying to ALTER TABLEs
os.environ['AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE'] = '1'
settings.reconfigure_orm(pool_class=sqlalchemy.pool.SingletonThreadPool)
command.upgrade(config, revision=to_revision or 'heads')
finally:
if val is None:
os.environ.pop('AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE')
else:
os.environ['AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE'] = val
settings.reconfigure_orm()

reserialize_dags(session=session)
add_default_pool_if_not_exists(session=session)
synchronize_log_template(session=session)
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
@@ -1 +1 @@
f15522e5fdd4bc7ddd43b5f9e5ac73a7fd408bf976a23715603463718a1aecf3
5d0b9bcb02f09e99338b2c230cf2c7b1e8af7f7ea675eca6f31e49e851e11941
4 changes: 3 additions & 1 deletion docs/apache-airflow/migrations-ref.rst
Expand Up @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=================================+===================+===================+==============================================================+
| ``ee8d93fcc81e`` (head) | ``ecb43d2a1842`` | ``2.5.0`` | Add updated_at column to DagRun and TaskInstance |
| ``ee8d93fcc81e`` (head) | ``b0d31815b5a6`` | ``2.5.0`` | Add updated_at column to DagRun and TaskInstance |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| ``b0d31815b5a6`` | ``ecb43d2a1842`` | ``2.4.2`` | Add missing auto-increment to columns on FAB tables |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| ``ecb43d2a1842`` | ``1486deb605b4`` | ``2.4.0`` | Add processor_subdir column to DagModel, SerializedDagModel |
| | | | and CallbackRequest tables. |
Expand Down

0 comments on commit 7efdeed

Please sign in to comment.