Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix atomic transaction not routing to the the correct DB #324

Merged
merged 4 commits into from Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion django_celery_results/managers.py
Expand Up @@ -87,7 +87,7 @@ def get_all_expired(self, expires):

def delete_expired(self, expires):
"""Delete all expired results."""
with transaction.atomic():
with transaction.atomic(using=self.db):
raw_delete(queryset=self.get_all_expired(expires))


Expand Down
26 changes: 26 additions & 0 deletions t/proj/db_routers.py
@@ -0,0 +1,26 @@
class AlwaysSecondaryDbRouter:
nofalx marked this conversation as resolved.
Show resolved Hide resolved
connection_name = "secondary"

def db_for_read(self, model, **hints):
"""
Route read always for the specified connection
"""
return self.connection_name

def db_for_write(self, model, **hints):
"""
Route write always for the specified connection
"""
return self.connection_name

def allow_relation(self, obj1, obj2, **hints):
"""
Router have no opinion here
"""
return None

def allow_migrate(self, db, app_label, model_name=None, **hints):
"""
Router have no opinion here
"""
return None
20 changes: 19 additions & 1 deletion t/unit/test_models.py
Expand Up @@ -5,7 +5,7 @@
from celery import states, uuid
from django.db import transaction
from django.db.utils import InterfaceError
from django.test import TransactionTestCase
from django.test import TransactionTestCase, override_settings

from django_celery_results.backends import DatabaseBackend
from django_celery_results.models import GroupResult, TaskResult
Expand Down Expand Up @@ -209,3 +209,21 @@ class TransactionError(Exception):
raise TransactionError()
except TransactionError:
pass


@pytest.mark.usefixtures('depends_on_current_app')
class test_ModelsOnSecondaryDbOnly(TransactionTestCase):
"""
These tests will fail with the below error incase we
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain this part a bit more please?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will try to run the method delete_expired from the Custom Manager ResultManager where the bug occurs.

The issue is that if the manager was directed to use a db other than the "default" db, the method delete_expired would run the operation on the "default" anyways. Since the function does not return anything and is a delete operation I can not use assert

My method of testing is the below: (let me know if there is a better way)

I specify the allowed db to test on as "secondary" only. If the test tries to use the "default" db it will throw error:
https://github.com/nofalx/django-celery-results/blob/fix-323/t/unit/test_models.py#L231:L232

I specify the TaskResult and the GroupResult manager which extend ResultManager to use "secondary" db using .db_manager("secondary").

https://github.com/nofalx/django-celery-results/blob/fix-323/t/unit/test_models.py#L235:L240

Before the fixes this test would fail as transaction.atomic() would try to use the "default" db. After the fix the method doesnt use "default" db anymore and the test pass.

try to read/write from a db other than the secondary

AssertionError: Database connections to 'default' are
not allowed in this test.
"""
databases = ['secondary']

@override_settings(
DATABASE_ROUTERS=['t.proj.db_routers.AlwaysSecondaryDbRouter'],
)
def test_operations_with_atomic_transactions(self):
TaskResult.objects.delete_expired(expires=10)