Skip to content

Commit

Permalink
Completed testing on chord part return with multiple databases
Browse files Browse the repository at this point in the history
  • Loading branch information
dvdria committed Apr 24, 2024
1 parent 17f9c2e commit 76d2d1b
Showing 1 changed file with 55 additions and 42 deletions.
97 changes: 55 additions & 42 deletions t/unit/backends/test_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from celery.utils.serialization import b64decode
from celery.worker.request import Request
from celery.worker.strategy import hybrid_to_proto2
from django.test import TransactionTestCase

from django_celery_results.backends.database import DatabaseBackend
from django_celery_results.models import ChordCounter, TaskResult
Expand Down Expand Up @@ -782,48 +783,6 @@ def test_on_chord_part_return_counter_not_found(self):
request.group = uuid()
self.b.on_chord_part_return(request=request, state=None, result=None)

def test_on_chord_part_return_multiple_databases(self):
"""
Test if the ChordCounter is properly decremented and the callback is
triggered after all chord parts have returned with multiple databases
"""
gid = uuid()
tid1 = uuid()
tid2 = uuid()
subtasks = [AsyncResult(tid1), AsyncResult(tid2)]
group = GroupResult(id=gid, results=subtasks)
self.b.apply_chord(group, self.add.s())

chord_counter = ChordCounter.objects.using(
"secondary"
).get(group_id=gid)
assert chord_counter.count == 2

request = mock.MagicMock()
request.id = subtasks[0].id
request.group = gid
request.task = "my_task"
request.args = ["a", 1, "password"]
request.kwargs = {"c": 3, "d": "e", "password": "password"}
request.argsrepr = "argsrepr"
request.kwargsrepr = "kwargsrepr"
request.hostname = "celery@ip-0-0-0-0"
request.properties = {"periodic_task_name": "my_periodic_task"}
request.ignore_result = False
result = {"foo": "baz"}

self.b.mark_as_done(tid1, result, request=request)

chord_counter.refresh_from_db()
assert chord_counter.count == 1

self.b.mark_as_done(tid2, result, request=request)

with pytest.raises(ChordCounter.DoesNotExist):
ChordCounter.objects.using("secondary").get(group_id=gid)

request.chord.delay.assert_called_once()

def test_callback_failure(self):
"""Test if a failure in the chord callback is properly handled"""
gid = uuid()
Expand Down Expand Up @@ -961,3 +920,57 @@ def test_backend_result_extended_is_false(self):
tr = TaskResult.objects.get(task_id=tid2)
assert tr.task_args is None
assert tr.task_kwargs is None


class ChordPartReturnTestCase(TransactionTestCase):
databases = "__all__"

def setUp(self):
super().setUp()
self.app.conf.result_serializer = 'json'
self.app.conf.result_backend = (
'django_celery_results.backends:DatabaseBackend')
self.app.conf.result_extended = True
self.b = DatabaseBackend(app=self.app)

def test_on_chord_part_return_multiple_databases(self):
"""
Test if the ChordCounter is properly decremented and the callback is
triggered after all chord parts have returned with multiple databases
"""
gid = uuid()
tid1 = uuid()
tid2 = uuid()
subtasks = [AsyncResult(tid1), AsyncResult(tid2)]
group = GroupResult(id=gid, results=subtasks)
self.b.apply_chord(group, self.add.s())

chord_counter = ChordCounter.objects.using(
"secondary"
).get(group_id=gid)
assert chord_counter.count == 2

request = mock.MagicMock()
request.id = subtasks[0].id
request.group = gid
request.task = "my_task"
request.args = ["a", 1, "password"]
request.kwargs = {"c": 3, "d": "e", "password": "password"}
request.argsrepr = "argsrepr"
request.kwargsrepr = "kwargsrepr"
request.hostname = "celery@ip-0-0-0-0"
request.properties = {"periodic_task_name": "my_periodic_task"}
request.ignore_result = False
result = {"foo": "baz"}

self.b.mark_as_done(tid1, result, request=request)

chord_counter.refresh_from_db()
assert chord_counter.count == 1

self.b.mark_as_done(tid2, result, request=request)

with pytest.raises(ChordCounter.DoesNotExist):
ChordCounter.objects.using("secondary").get(group_id=gid)

request.chord.delay.assert_called_once()

0 comments on commit 76d2d1b

Please sign in to comment.