Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: respect django database routing #451

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 23 additions & 3 deletions django_celery_beat/schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
from kombu.utils.json import dumps, loads

from django.conf import settings
from django.db import transaction, close_old_connections
from django.db import (
DEFAULT_DB_ALIAS,
close_old_connections,
router,
transaction
)
from django.db.utils import DatabaseError, InterfaceError
from django.core.exceptions import ObjectDoesNotExist

Expand Down Expand Up @@ -258,7 +263,7 @@ def schedule_changed(self):
# other transactions until the current transaction is
# committed (Issue #41).
try:
transaction.commit()
transaction.commit(using=self.target_db)
except transaction.TransactionManagementError:
pass # not in transaction management.

Expand Down Expand Up @@ -287,7 +292,18 @@ def reserve(self, entry):
self._dirty.add(new_entry.name)
return new_entry

def sync(self):
@property
def target_db(self):
"""Determine if there is a django route"""
if not settings.DATABASE_ROUTERS:
return DEFAULT_DB_ALIAS
# If the project does not actually implement this method,
# DEFAULT_DB_ALIAS will be automatically returned.
# The exception will be located to the django routing section
db = router.db_for_write(self.Model)
return db

def _sync(self):
if logger.isEnabledFor(logging.DEBUG):
debug('Writing entries...')
_tried = set()
Expand All @@ -313,6 +329,10 @@ def sync(self):
# retry later, only for the failed ones
self._dirty |= _failed

def sync(self):
with transaction.atomic(using=self.target_db):
self._sync()

def update_from_dict(self, mapping):
s = {}
for name, entry_fields in mapping.items():
Expand Down