From 3d85b3c13500691407a0115b8451cbccd84785e6 Mon Sep 17 00:00:00 2001 From: Frazer McLean Date: Wed, 12 Aug 2020 14:31:02 +0200 Subject: [PATCH 1/3] Retry after race during schema creation in database backend Fixes #6296 This race condition does not commonly present, since the schema creation only needs to happen once per database. It's more likely to appear in e.g. a test suite that uses a new database each time. For context of the sleep times I chose, the schema creation takes ~50 ms on my laptop. I did a simulated test run of 50 concurrent calls to MetaData.create_all repeated 200 times and the number of retries was: - 0 retries: 8717x - 1 retry: 1279x - 2 retries 4x --- celery/backends/database/session.py | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/celery/backends/database/session.py b/celery/backends/database/session.py index 047a9271d9..175b7ac567 100644 --- a/celery/backends/database/session.py +++ b/celery/backends/database/session.py @@ -1,10 +1,15 @@ """SQLAlchemy session.""" +import time + from kombu.utils.compat import register_after_fork from sqlalchemy import create_engine +from sqlalchemy.exc import DatabaseError from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from sqlalchemy.pool import NullPool +from celery.utils.time import get_exponential_backoff_interval + ResultModelBase = declarative_base() __all__ = ('SessionManager',) @@ -50,7 +55,26 @@ def create_session(self, dburi, short_lived_sessions=False, **kwargs): def prepare_models(self, engine): if not self.prepared: - ResultModelBase.metadata.create_all(engine) + # SQLAlchemy will check if the items exist before trying to + # create them, which is a race condition. If it raises an error + # in one iteration, the next may pass all the existence checks + # and the call will succeed. + max_retries = 10 + retries = 0 + while True: + try: + ResultModelBase.metadata.create_all(engine) + except DatabaseError: + if retries < max_retries: + sleep_amount_ms = get_exponential_backoff_interval( + 10, retries, 1000, True + ) + time.sleep(sleep_amount_ms / 1000) + retries += 1 + else: + raise + else: + break self.prepared = True def session_factory(self, dburi, **kwargs): From 787fb82b3c214e49f32e05d5cb5c61435863fea5 Mon Sep 17 00:00:00 2001 From: Frazer McLean Date: Tue, 29 Sep 2020 20:09:41 +0200 Subject: [PATCH 2/3] Add test for prepare_models retry error condition --- celery/backends/database/session.py | 5 +++-- t/unit/backends/test_database.py | 28 +++++++++++++++++++++++++++- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/celery/backends/database/session.py b/celery/backends/database/session.py index 175b7ac567..ca3d683bea 100644 --- a/celery/backends/database/session.py +++ b/celery/backends/database/session.py @@ -14,6 +14,8 @@ __all__ = ('SessionManager',) +PREPARE_MODELS_MAX_RETRIES = 10 + def _after_fork_cleanup_session(session): session._after_fork() @@ -59,13 +61,12 @@ def prepare_models(self, engine): # create them, which is a race condition. If it raises an error # in one iteration, the next may pass all the existence checks # and the call will succeed. - max_retries = 10 retries = 0 while True: try: ResultModelBase.metadata.create_all(engine) except DatabaseError: - if retries < max_retries: + if retries < PREPARE_MODELS_MAX_RETRIES: sleep_amount_ms = get_exponential_backoff_interval( 10, retries, 1000, True ) diff --git a/t/unit/backends/test_database.py b/t/unit/backends/test_database.py index bff4236184..28e2fedbbb 100644 --- a/t/unit/backends/test_database.py +++ b/t/unit/backends/test_database.py @@ -13,7 +13,8 @@ from celery.backends.database import (DatabaseBackend, retry, session, # noqa session_cleanup) from celery.backends.database.models import Task, TaskSet # noqa -from celery.backends.database.session import SessionManager # noqa +from celery.backends.database.session import ( # noqa + PREPARE_MODELS_MAX_RETRIES, ResultModelBase, SessionManager) from t import skip # noqa @@ -398,3 +399,28 @@ def test_coverage_madness(self): SessionManager() finally: session.register_after_fork = prev + + @patch('celery.backends.database.session.create_engine') + def test_prepare_models_terminates(self, create_engine): + """SessionManager.prepare_models has retry logic because the creation + of database tables by multiple workers is racy. This test patches + the used method to always raise, so we can verify that it does + eventually terminate. + """ + from sqlalchemy.dialects.sqlite import dialect + from sqlalchemy.exc import DatabaseError + + sqlite = dialect.dbapi() + manager = SessionManager() + engine = manager.get_engine('dburi') + + def raise_err(bind): + raise DatabaseError("", "", [], sqlite.DatabaseError) + + patch_create_all = patch.object( + ResultModelBase.metadata, 'create_all', side_effect=raise_err) + + with pytest.raises(DatabaseError), patch_create_all as mock_create_all: + manager.prepare_models(engine) + + assert mock_create_all.call_count == PREPARE_MODELS_MAX_RETRIES + 1 From 95015877c596ba81fbdfea2a1ef882c286e72545 Mon Sep 17 00:00:00 2001 From: Frazer McLean Date: Tue, 29 Sep 2020 20:27:20 +0200 Subject: [PATCH 3/3] Add name to contributors --- CONTRIBUTORS.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/CONTRIBUTORS.txt b/CONTRIBUTORS.txt index 748cabf4d0..a29157e1e5 100644 --- a/CONTRIBUTORS.txt +++ b/CONTRIBUTORS.txt @@ -277,3 +277,4 @@ Kyle Johnson, 2019/09/23 Dipankar Achinta, 2019/10/24 Sardorbek Imomaliev, 2020/01/24 Maksym Shalenyi, 2020/07/30 +Frazer McLean, 2020/09/29