Skip to content

Commit

Permalink
Retry after race during schema creation in database backend
Browse files Browse the repository at this point in the history
Fixes #6296

This race condition does not commonly present, since the schema creation
only needs to happen once per database. It's more likely to appear in
e.g. a test suite that uses a new database each time.

For context of the sleep times I chose, the schema creation takes ~50 ms
on my laptop.

I did a simulated test run of 50 concurrent calls to MetaData.create_all
repeated 200 times and the number of retries was:

- 0 retries: 8717x
- 1 retry:   1279x
- 2 retries  4x
  • Loading branch information
RazerM committed Sep 3, 2020
1 parent 7ba1b46 commit 216eeaa
Showing 1 changed file with 25 additions and 1 deletion.
26 changes: 25 additions & 1 deletion celery/backends/database/session.py
@@ -1,10 +1,15 @@
"""SQLAlchemy session."""
import time

from kombu.utils.compat import register_after_fork
from sqlalchemy import create_engine
from sqlalchemy.exc import DatabaseError
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import NullPool

from celery.utils.time import get_exponential_backoff_interval

ResultModelBase = declarative_base()

__all__ = ('SessionManager',)
Expand Down Expand Up @@ -50,7 +55,26 @@ def create_session(self, dburi, short_lived_sessions=False, **kwargs):

def prepare_models(self, engine):
if not self.prepared:
ResultModelBase.metadata.create_all(engine)
# SQLAlchemy will check if the items exist before trying to
# create them, which is a race condition. If it raises an error
# in one iteration, the next may pass all the existence checks
# and the call will succeed.
max_retries = 10
retries = 0
while True:
try:
ResultModelBase.metadata.create_all(engine)
except DatabaseError:
if retries < max_retries:
sleep_amount_ms = get_exponential_backoff_interval(
10, retries, 1000, True
)
time.sleep(sleep_amount_ms / 1000)
retries += 1
else:
raise
else:
break
self.prepared = True

def session_factory(self, dburi, **kwargs):
Expand Down

0 comments on commit 216eeaa

Please sign in to comment.