Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alt: Add a session backend to store session data in the database #21478

Merged
merged 14 commits into from Feb 15, 2022
7 changes: 7 additions & 0 deletions airflow/config_templates/config.yml
Expand Up @@ -1026,6 +1026,13 @@
type: string
example: ~
default: ""
- name: session_backend
description: |
The type of backend used to store web session data, can be 'database' or 'securecookie'
version_added: 2.2.4
type: string
example: securecookie
default: database
- name: web_server_master_timeout
description: |
Number of seconds the webserver waits before killing gunicorn master that doesn't respond
Expand Down
4 changes: 4 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Expand Up @@ -529,6 +529,10 @@ web_server_ssl_cert =
# provided SSL will be enabled. This does not change the web server port.
web_server_ssl_key =

# The type of backend used to store web session data, can be 'database' or 'securecookie'
# Example: session_backend = securecookie
session_backend = database

# Number of seconds the webserver waits before killing gunicorn master that doesn't respond
web_server_master_timeout = 120

Expand Down
Expand Up @@ -19,7 +19,7 @@
"""Increase length of email and username

Revision ID: 5e3ec427fdd3
Revises: be2bfac3da23
Revises: c381b21cb7e4
Create Date: 2021-12-01 11:49:26.390210

"""
Expand All @@ -29,7 +29,7 @@

# revision identifiers, used by Alembic.
revision = '5e3ec427fdd3'
down_revision = 'be2bfac3da23'
down_revision = 'c381b21cb7e4'
branch_labels = None
depends_on = None

Expand Down
@@ -0,0 +1,54 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""add session table to db

Revision ID: c381b21cb7e4
Revises: be2bfac3da23
Create Date: 2022-01-25 13:56:35.069429

"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = 'c381b21cb7e4'
down_revision = 'be2bfac3da23'
branch_labels = None
depends_on = None

TABLE_NAME = 'session'


def upgrade():
"""Apply add session table to db"""
op.create_table(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also add an ORM model for this table so that it's not only through migration file that the table can be created.
I'm working on a change that would have new DBs created through the ORM rather than going through the migration files. #21462

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do it like this I think

from flask.session import SqlAlchemySessionInterface
from flask_appbuilder import SQLA
...

    db = SQLA()
    db.session = settings.Session
    SessionModel = SqlAlchemySessionInterface(db=db).sql_session_model

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(If we should or not is the next question)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what it ends up looking like, when considering #21462's changes:

--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -606,12 +606,21 @@ def initdb(session: Session = NEW_SESSION):
     from airflow.models.base import Base
     from airflow.www.app import create_app
 
+    from airflow.www.session import AirflowDatabaseSessionInterface
+    from flask_sqlalchemy import SQLAlchemy
+
     Base.metadata.create_all(settings.engine)
     # Stamp migration head with alembic
     config = _get_alembic_config()
     command.stamp(config, "head")
     # sets up the db; Sync permissions etc
-    create_app(config={'UPDATE_FAB_PERMS': True})
+    app = create_app(config={'UPDATE_FAB_PERMS': True})
+
+    # Create `session` table
+    db = SQLAlchemy(app)
+    AirflowDatabaseSessionInterface(app=app, db=db, table='session', key_prefix='')
+    db.create_all()
+
     if conf.getboolean('core', 'LOAD_DEFAULT_CONNECTIONS'):
         create_default_connections(session=session)
 

TABLE_NAME,
sa.Column('id', sa.Integer()),
sa.Column('session_id', sa.String(255)),
sa.Column('data', sa.LargeBinary()),
sa.Column('expiry', sa.DateTime()),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('session_id'),
)


def downgrade():
"""Unapply add session table to db"""
op.drop_table(TABLE_NAME)
3 changes: 3 additions & 0 deletions airflow/utils/db.py
Expand Up @@ -1065,9 +1065,12 @@ def drop_airflow_models(connection):
users.drop(settings.engine, checkfirst=True)
dag_stats = Table('dag_stats', Base.metadata)
dag_stats.drop(settings.engine, checkfirst=True)
session = Table('session', Base.metadata)
session.drop(settings.engine, checkfirst=True)

Base.metadata.drop_all(connection)
# we remove the Tables here so that if resetdb is run metadata does not keep the old tables.
Base.metadata.remove(session)
Base.metadata.remove(dag_stats)
Base.metadata.remove(users)
Base.metadata.remove(user)
Expand Down
3 changes: 1 addition & 2 deletions airflow/www/app.py
Expand Up @@ -37,7 +37,7 @@
from airflow.www.extensions.init_manifest_files import configure_manifest_files
from airflow.www.extensions.init_robots import init_robots
from airflow.www.extensions.init_security import init_api_experimental_auth, init_xframe_protection
from airflow.www.extensions.init_session import init_airflow_session_interface, init_permanent_session
from airflow.www.extensions.init_session import init_airflow_session_interface
from airflow.www.extensions.init_views import (
init_api_connexion,
init_api_experimental,
Expand Down Expand Up @@ -137,7 +137,6 @@ def create_app(config=None, testing=False):

init_jinja_globals(flask_app)
init_xframe_protection(flask_app)
init_permanent_session(flask_app)
init_airflow_session_interface(flask_app)
return flask_app

Expand Down
63 changes: 38 additions & 25 deletions airflow/www/extensions/init_session.py
Expand Up @@ -15,33 +15,46 @@
# specific language governing permissions and limitations
# under the License.

from flask import request, session as flask_session
from flask.sessions import SecureCookieSessionInterface
from flask import session as builtin_flask_session


class AirflowSessionInterface(SecureCookieSessionInterface):
"""
Airflow cookie session interface.
Modifications of sessions should be done here because
the change here is global.
"""

def save_session(self, *args, **kwargs):
"""Prevent creating session from REST API requests."""
if request.blueprint == '/api/v1':
return None
return super().save_session(*args, **kwargs)


def init_permanent_session(app):
"""Make session permanent to allows us to store data"""

def make_session_permanent():
flask_session.permanent = True

app.before_request(make_session_permanent)
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
from airflow.www.session import AirflowDatabaseSessionInterface, AirflowSecureCookieSessionInterface


def init_airflow_session_interface(app):
"""Set airflow session interface"""
app.session_interface = AirflowSessionInterface()
config = app.config.copy()
selected_backend = conf.get('webserver', 'SESSION_BACKEND')
# A bit of a misnomer - normally cookies expire whenever the browser is closed
# or when they hit their expiry datetime, whichever comes first. "Permanent"
# cookies only expire when they hit their expiry datetime, and can outlive
# the browser being closed.
permanent_cookie = config.get('SESSION_PERMANENT', True)

if selected_backend == 'securecookie':
app.session_interface = AirflowSecureCookieSessionInterface()
if permanent_cookie:

def make_session_permanent():
builtin_flask_session.permanent = True

app.before_request(make_session_permanent)
elif selected_backend == 'database':
app.session_interface = AirflowDatabaseSessionInterface(
app=app,
db=None,
permanent=permanent_cookie,
# Typically these would be configurable with Flask-Session,
# but we will set them explicitly instead as they don't make
# sense to have configurable in Airflow's use case
table='session',
key_prefix='',
use_signer=True,
)
else:
raise AirflowConfigException(
"Unrecognized session backend specified in "
f"web_server_session_backend: '{selected_backend}'. Please set "
"this to either 'database' or 'securecookie'."
)
40 changes: 40 additions & 0 deletions airflow/www/session.py
@@ -0,0 +1,40 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from flask import request
from flask.sessions import SecureCookieSessionInterface
from flask_session.sessions import SqlAlchemySessionInterface


class SesssionExemptMixin:
"""Exempt certain blueprints/paths from autogenerated sessions"""

def save_session(self, *args, **kwargs):
"""Prevent creating session from REST API and health requests."""
if request.blueprint == '/api/v1':
return None
if request.path == '/health':
return None
return super().save_session(*args, **kwargs)


class AirflowDatabaseSessionInterface(SesssionExemptMixin, SqlAlchemySessionInterface):
"""Session interface that exempts some routes and stores session data in the database"""


class AirflowSecureCookieSessionInterface(SesssionExemptMixin, SecureCookieSessionInterface):
"""Session interface that exempts some routes and stores session data in a signed cookie"""
4 changes: 3 additions & 1 deletion docs/apache-airflow/migrations-ref.rst
Expand Up @@ -34,9 +34,11 @@ Here's the list of all the Database Migrations that are executed via when you ru
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| ``786e3737b18f`` | ``5e3ec427fdd3`` | ``2.3.0`` | Add ``timetable_description`` column to DagModel for UI. |
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| ``5e3ec427fdd3`` | ``be2bfac3da23`` | ``2.3.0`` | Increase length of email and username in ``ab_user`` and ``ab_register_user`` table |
| ``5e3ec427fdd3`` | ``c381b21cb7e4`` | ``2.3.0`` | Increase length of email and username in ``ab_user`` and ``ab_register_user`` table |
| | | | to ``256`` characters |
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| ``c381b21cb7e4`` | ``be2bfac3da23`` | ``2.2.4`` | Create a ``session`` table to store web session data |
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| ``be2bfac3da23`` | ``7b2661a43ba3`` | ``2.2.3`` | Add has_import_errors column to DagModel |
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| ``7b2661a43ba3`` | ``142555e44c17`` | ``2.2.0`` | Change ``TaskInstance`` and ``TaskReschedule`` tables from execution_date to run_id. |
Expand Down
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Expand Up @@ -1255,6 +1255,7 @@ sdk
secretRef
secretRefs
securable
securecookie
securestring
securityManager
seealso
Expand Down
3 changes: 3 additions & 0 deletions setup.cfg
Expand Up @@ -106,6 +106,9 @@ install_requires =
flask-appbuilder~=3.4, <4.0.0
flask-caching>=1.5.0, <2.0.0
flask-login>=0.3, <0.5
# Strict upper-bound on the latest release of flask-session,
# as any schema changes will require a migration.
flask-session>=0.3.1, <=0.4.0
flask-wtf>=0.14.3, <0.15
graphviz>=0.12
gunicorn>=20.1.0
Expand Down
7 changes: 6 additions & 1 deletion tests/api_connexion/conftest.py
Expand Up @@ -25,7 +25,12 @@
@pytest.fixture(scope="session")
def minimal_app_for_api():
@dont_initialize_flask_app_submodules(
skip_all_except=["init_appbuilder", "init_api_experimental_auth", "init_api_connexion"]
skip_all_except=[
"init_appbuilder",
"init_api_experimental_auth",
"init_api_connexion",
"init_airflow_session_interface",
]
)
def factory():
with conf_vars({("api", "auth_backend"): "tests.test_utils.remote_user_api_auth_backend"}):
Expand Down
4 changes: 4 additions & 0 deletions tests/api_connexion/test_security.py
Expand Up @@ -45,3 +45,7 @@ def setup_attrs(self, configured_app) -> None:
def test_session_not_created_on_api_request(self):
self.client.get("api/v1/dags", environ_overrides={'REMOTE_USER': "test"})
assert all(cookie.name != "session" for cookie in self.client.cookie_jar)

def test_session_not_created_on_health_endpoint_request(self):
self.client.get("health")
assert all(cookie.name != "session" for cookie in self.client.cookie_jar)
2 changes: 1 addition & 1 deletion tests/test_utils/decorators.py
Expand Up @@ -42,7 +42,7 @@ def no_op(*args, **kwargs):
"sync_appbuilder_roles",
"init_jinja_globals",
"init_xframe_protection",
"init_permanent_session",
"init_airflow_session_interface",
"init_appbuilder",
]

Expand Down
3 changes: 3 additions & 0 deletions tests/utils/test_db.py
Expand Up @@ -75,6 +75,9 @@ def test_database_schema_and_sqlalchemy_model_are_in_sync(self):
lambda t: (t[0] == 'remove_table' and t[1].name == 'spt_fallback_usg'),
lambda t: (t[0] == 'remove_table' and t[1].name == 'MSreplication_options'),
lambda t: (t[0] == 'remove_table' and t[1].name == 'spt_fallback_dev'),
# Ignore flask-session table/index
lambda t: (t[0] == 'remove_table' and t[1].name == 'session'),
lambda t: (t[0] == 'remove_index' and t[1].name == 'session_id'),
]
for ignore in ignores:
diff = [d for d in diff if not ignore(d)]
Expand Down
1 change: 1 addition & 0 deletions tests/www/views/conftest.py
Expand Up @@ -55,6 +55,7 @@ def app(examples_dag_bag):
"init_flash_views",
"init_jinja_globals",
"init_plugins",
"init_airflow_session_interface",
]
)
def factory():
Expand Down
65 changes: 65 additions & 0 deletions tests/www/views/test_session.py
@@ -0,0 +1,65 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import pytest

from airflow.exceptions import AirflowConfigException
from airflow.www import app
from tests.test_utils.config import conf_vars
from tests.test_utils.decorators import dont_initialize_flask_app_submodules


def test_session_cookie_created_on_login(user_client):
assert any(cookie.name == 'session' for cookie in user_client.cookie_jar)


def test_session_inaccessible_after_logout(user_client):
session_cookie = next((cookie for cookie in user_client.cookie_jar if cookie.name == 'session'), None)
assert session_cookie is not None

resp = user_client.get('/logout/')
assert resp.status_code == 302

# Try to access /home with the session cookie from earlier
user_client.set_cookie('session', session_cookie.value)
user_client.get('/home/')
assert resp.status_code == 302


def test_invalid_session_backend_option():
@dont_initialize_flask_app_submodules(
skip_all_except=[
"init_api_connexion",
"init_appbuilder",
"init_appbuilder_links",
"init_appbuilder_views",
"init_flash_views",
"init_jinja_globals",
"init_plugins",
"init_airflow_session_interface",
]
)
def poorly_configured_app_factory():
with conf_vars({("webserver", "session_backend"): "invalid_value_for_session_backend"}):
return app.create_app(testing=True)

expected_exc_regex = (
"^Unrecognized session backend specified in web_server_session_backend: "
r"'invalid_value_for_session_backend'\. Please set this to .+\.$"
)
with pytest.raises(AirflowConfigException, match=expected_exc_regex):
poorly_configured_app_factory()