diff --git a/airflow/migrations/env.py b/airflow/migrations/env.py index 6474f0799c797..9dcd29e9ca0b5 100644 --- a/airflow/migrations/env.py +++ b/airflow/migrations/env.py @@ -17,6 +17,7 @@ # under the License. from __future__ import annotations +import contextlib from logging.config import fileConfig from alembic import context @@ -89,9 +90,12 @@ def run_migrations_online(): and associate a connection with the context. """ - connectable = settings.engine + with contextlib.ExitStack() as stack: + connection = config.attributes.get('connection', None) + + if not connection: + connection = stack.push(settings.engine.connect()) - with connectable.connect() as connection: context.configure( connection=connection, transaction_per_migration=True, diff --git a/airflow/migrations/versions/0118_2_4_2_add_missing_autoinc_fab.py b/airflow/migrations/versions/0118_2_4_2_add_missing_autoinc_fab.py new file mode 100644 index 0000000000000..f6becd9dfe64a --- /dev/null +++ b/airflow/migrations/versions/0118_2_4_2_add_missing_autoinc_fab.py @@ -0,0 +1,78 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Add missing auto-increment to columns on FAB tables + +Revision ID: b0d31815b5a6 +Revises: ecb43d2a1842 +Create Date: 2022-10-05 13:16:45.638490 + +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = 'b0d31815b5a6' +down_revision = 'ecb43d2a1842' +branch_labels = None +depends_on = None +airflow_version = '2.4.2' + + +def upgrade(): + """Apply migration. + + If these columns are already of the right type (i.e. created by our + migration in 1.10.13 rather than FAB itself in an earlier version), this + migration will issue an alter statement to change them to what they already + are -- i.e. its a no-op. + + These tables are small (100 to low 1k rows at most), so it's not too costly + to change them. + """ + conn = op.get_bind() + if conn.dialect.name in ['mssql', 'sqlite']: + # 1.10.12 didn't support SQL Server, so it couldn't have gotten this wrong --> nothing to correct + # SQLite autoinc was "implicit" for an INTEGER NOT NULL PRIMARY KEY + return + + for table in ( + 'ab_permission', + 'ab_view_menu', + 'ab_role', + 'ab_permission_view', + 'ab_permission_view_role', + 'ab_user', + 'ab_user_role', + 'ab_register_user', + ): + with op.batch_alter_table(table) as batch: + kwargs = {} + if conn.dialect.name == 'postgresql': + kwargs['type_'] = sa.Sequence(f'{table}_id_seq').next_value() + else: + kwargs['autoincrement'] = True + batch.alter_column("id", existing_type=sa.Integer(), existing_nullable=False, **kwargs) + + +def downgrade(): + """Unapply add_missing_autoinc_fab""" + # No downgrade needed, these _should_ have applied from 1.10.13 but didn't due to a previous bug! diff --git a/airflow/migrations/versions/0118_2_5_0_add_updated_at_to_dagrun_and_ti.py b/airflow/migrations/versions/0119_2_5_0_add_updated_at_to_dagrun_and_ti.py similarity index 97% rename from airflow/migrations/versions/0118_2_5_0_add_updated_at_to_dagrun_and_ti.py rename to airflow/migrations/versions/0119_2_5_0_add_updated_at_to_dagrun_and_ti.py index fc580529658d6..2e249b9233979 100644 --- a/airflow/migrations/versions/0118_2_5_0_add_updated_at_to_dagrun_and_ti.py +++ b/airflow/migrations/versions/0119_2_5_0_add_updated_at_to_dagrun_and_ti.py @@ -19,7 +19,7 @@ """Add updated_at column to DagRun and TaskInstance Revision ID: ee8d93fcc81e -Revises: ecb43d2a1842 +Revises: b0d31815b5a6 Create Date: 2022-09-08 19:08:37.623121 """ @@ -33,7 +33,7 @@ # revision identifiers, used by Alembic. revision = 'ee8d93fcc81e' -down_revision = 'ecb43d2a1842' +down_revision = 'b0d31815b5a6' branch_labels = None depends_on = None airflow_version = '2.5.0' diff --git a/airflow/settings.py b/airflow/settings.py index 10e963d5f760d..696b4b652ae25 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -259,14 +259,14 @@ def configure_vars(): DONOT_MODIFY_HANDLERS = conf.getboolean('logging', 'donot_modify_handlers', fallback=False) -def configure_orm(disable_connection_pool=False): +def configure_orm(disable_connection_pool=False, pool_class=None): """Configure ORM using SQLAlchemy""" from airflow.utils.log.secrets_masker import mask_secret log.debug("Setting up DB connection pool (PID %s)", os.getpid()) global engine global Session - engine_args = prepare_engine_args(disable_connection_pool) + engine_args = prepare_engine_args(disable_connection_pool, pool_class) if conf.has_option('database', 'sql_alchemy_connect_args'): connect_args = conf.getimport('database', 'sql_alchemy_connect_args') @@ -319,7 +319,7 @@ def configure_orm(disable_connection_pool=False): } -def prepare_engine_args(disable_connection_pool=False): +def prepare_engine_args(disable_connection_pool=False, pool_class=None): """Prepare SQLAlchemy engine args""" default_args = {} for dialect, default in DEFAULT_ENGINE_ARGS.items(): @@ -331,7 +331,10 @@ def prepare_engine_args(disable_connection_pool=False): 'database', 'sql_alchemy_engine_args', fallback=default_args ) # type: ignore - if disable_connection_pool or not conf.getboolean('database', 'SQL_ALCHEMY_POOL_ENABLED'): + if pool_class: + # Don't use separate settings for size etc, only those from sql_alchemy_engine_args + engine_args['poolclass'] = pool_class + elif disable_connection_pool or not conf.getboolean('database', 'SQL_ALCHEMY_POOL_ENABLED'): engine_args['poolclass'] = NullPool log.debug("settings.prepare_engine_args(): Using NullPool") elif not SQL_ALCHEMY_CONN.startswith('sqlite'): @@ -413,10 +416,10 @@ def dispose_orm(): engine = None -def reconfigure_orm(disable_connection_pool=False): +def reconfigure_orm(disable_connection_pool=False, pool_class=None): """Properly close database connections and re-configure ORM""" dispose_orm() - configure_orm(disable_connection_pool=disable_connection_pool) + configure_orm(disable_connection_pool=disable_connection_pool, pool_class=pool_class) def configure_adapters(): diff --git a/airflow/utils/db.py b/airflow/utils/db.py index e6dfcf9f2e101..f796156f8f96d 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -1539,8 +1539,23 @@ def upgradedb( initdb(session=session, load_connections=False) return with create_global_lock(session=session, lock=DBLocks.MIGRATIONS): + import sqlalchemy.pool + log.info("Creating tables") - command.upgrade(config, revision=to_revision or 'heads') + val = os.environ.get('AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE') + try: + # Reconfigure the ORM ot use _EXACTLY_ one connection, otherwise some db engines hang forever + # trying to ALTER TABLEs + os.environ['AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE'] = '1' + settings.reconfigure_orm(pool_class=sqlalchemy.pool.SingletonThreadPool) + command.upgrade(config, revision=to_revision or 'heads') + finally: + if val is None: + os.environ.pop('AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE') + else: + os.environ['AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE'] = val + settings.reconfigure_orm() + reserialize_dags(session=session) add_default_pool_if_not_exists(session=session) synchronize_log_template(session=session) diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index efbf555b9e440..26f6e90fbf41d 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -f15522e5fdd4bc7ddd43b5f9e5ac73a7fd408bf976a23715603463718a1aecf3 \ No newline at end of file +5d0b9bcb02f09e99338b2c230cf2c7b1e8af7f7ea675eca6f31e49e851e11941 \ No newline at end of file diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index 3b92588ba0a90..f09052ca7bc3a 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=================================+===================+===================+==============================================================+ -| ``ee8d93fcc81e`` (head) | ``ecb43d2a1842`` | ``2.5.0`` | Add updated_at column to DagRun and TaskInstance | +| ``ee8d93fcc81e`` (head) | ``b0d31815b5a6`` | ``2.5.0`` | Add updated_at column to DagRun and TaskInstance | ++---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ +| ``b0d31815b5a6`` | ``ecb43d2a1842`` | ``2.4.2`` | Add missing auto-increment to columns on FAB tables | +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ | ``ecb43d2a1842`` | ``1486deb605b4`` | ``2.4.0`` | Add processor_subdir column to DagModel, SerializedDagModel | | | | | and CallbackRequest tables. |