Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry after race during schema creation in database backend #6298

Merged
merged 3 commits into from Sep 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CONTRIBUTORS.txt
Expand Up @@ -277,3 +277,4 @@ Kyle Johnson, 2019/09/23
Dipankar Achinta, 2019/10/24
Sardorbek Imomaliev, 2020/01/24
Maksym Shalenyi, 2020/07/30
Frazer McLean, 2020/09/29
27 changes: 26 additions & 1 deletion celery/backends/database/session.py
@@ -1,14 +1,21 @@
"""SQLAlchemy session."""
import time

from kombu.utils.compat import register_after_fork
from sqlalchemy import create_engine
from sqlalchemy.exc import DatabaseError
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import NullPool

from celery.utils.time import get_exponential_backoff_interval

ResultModelBase = declarative_base()

__all__ = ('SessionManager',)

PREPARE_MODELS_MAX_RETRIES = 10


def _after_fork_cleanup_session(session):
session._after_fork()
Expand Down Expand Up @@ -50,7 +57,25 @@ def create_session(self, dburi, short_lived_sessions=False, **kwargs):

def prepare_models(self, engine):
if not self.prepared:
ResultModelBase.metadata.create_all(engine)
# SQLAlchemy will check if the items exist before trying to
# create them, which is a race condition. If it raises an error
# in one iteration, the next may pass all the existence checks
# and the call will succeed.
retries = 0
while True:
try:
ResultModelBase.metadata.create_all(engine)
georgepsarakis marked this conversation as resolved.
Show resolved Hide resolved
except DatabaseError:
if retries < PREPARE_MODELS_MAX_RETRIES:
sleep_amount_ms = get_exponential_backoff_interval(
10, retries, 1000, True
)
time.sleep(sleep_amount_ms / 1000)
retries += 1
else:
raise
else:
break
self.prepared = True

def session_factory(self, dburi, **kwargs):
Expand Down
28 changes: 27 additions & 1 deletion t/unit/backends/test_database.py
Expand Up @@ -13,7 +13,8 @@
from celery.backends.database import (DatabaseBackend, retry, session, # noqa
session_cleanup)
from celery.backends.database.models import Task, TaskSet # noqa
from celery.backends.database.session import SessionManager # noqa
from celery.backends.database.session import ( # noqa
PREPARE_MODELS_MAX_RETRIES, ResultModelBase, SessionManager)
from t import skip # noqa


Expand Down Expand Up @@ -398,3 +399,28 @@ def test_coverage_madness(self):
SessionManager()
finally:
session.register_after_fork = prev

@patch('celery.backends.database.session.create_engine')
def test_prepare_models_terminates(self, create_engine):
"""SessionManager.prepare_models has retry logic because the creation
of database tables by multiple workers is racy. This test patches
the used method to always raise, so we can verify that it does
eventually terminate.
"""
from sqlalchemy.dialects.sqlite import dialect
from sqlalchemy.exc import DatabaseError

sqlite = dialect.dbapi()
manager = SessionManager()
engine = manager.get_engine('dburi')

def raise_err(bind):
raise DatabaseError("", "", [], sqlite.DatabaseError)

patch_create_all = patch.object(
ResultModelBase.metadata, 'create_all', side_effect=raise_err)

with pytest.raises(DatabaseError), patch_create_all as mock_create_all:
manager.prepare_models(engine)

assert mock_create_all.call_count == PREPARE_MODELS_MAX_RETRIES + 1