From 216eeaa82b9ca4f8c0b3ef5e00453c7e96ffde18 Mon Sep 17 00:00:00 2001 From: Frazer McLean Date: Wed, 12 Aug 2020 14:31:02 +0200 Subject: [PATCH] Retry after race during schema creation in database backend Fixes #6296 This race condition does not commonly present, since the schema creation only needs to happen once per database. It's more likely to appear in e.g. a test suite that uses a new database each time. For context of the sleep times I chose, the schema creation takes ~50 ms on my laptop. I did a simulated test run of 50 concurrent calls to MetaData.create_all repeated 200 times and the number of retries was: - 0 retries: 8717x - 1 retry: 1279x - 2 retries 4x --- celery/backends/database/session.py | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/celery/backends/database/session.py b/celery/backends/database/session.py index e03271f2c1..dea8de2142 100644 --- a/celery/backends/database/session.py +++ b/celery/backends/database/session.py @@ -1,10 +1,15 @@ """SQLAlchemy session.""" +import time + from kombu.utils.compat import register_after_fork from sqlalchemy import create_engine +from sqlalchemy.exc import DatabaseError from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from sqlalchemy.pool import NullPool +from celery.utils.time import get_exponential_backoff_interval + ResultModelBase = declarative_base() __all__ = ('SessionManager',) @@ -50,7 +55,26 @@ def create_session(self, dburi, short_lived_sessions=False, **kwargs): def prepare_models(self, engine): if not self.prepared: - ResultModelBase.metadata.create_all(engine) + # SQLAlchemy will check if the items exist before trying to + # create them, which is a race condition. If it raises an error + # in one iteration, the next may pass all the existence checks + # and the call will succeed. + max_retries = 10 + retries = 0 + while True: + try: + ResultModelBase.metadata.create_all(engine) + except DatabaseError: + if retries < max_retries: + sleep_amount_ms = get_exponential_backoff_interval( + 10, retries, 1000, True + ) + time.sleep(sleep_amount_ms / 1000) + retries += 1 + else: + raise + else: + break self.prepared = True def session_factory(self, dburi, **kwargs):