Skip to content

Commit

Permalink
fix: Chord counting of group children
Browse files Browse the repository at this point in the history
This change ensures that we only have one piece of code which calculates
chord sizes (ie. `_chord._descend()`, recently made protected so other
canvas classes can use it as required). By doing so, we fix some edge
cases in the chord counting logic which was being used for children of
groups, and also add some unit tests to capture those cases and their
expected behaviours.

This change also introduces an integration test which checks the current
behaviour of chains used as chord bodies when nested in groups. Due to
some misbehaviour, likely with promise fulfillment, the `GroupResult`
object will time out unless all of its children are resolved prior to
`GroupResult` being joined (specifically, native joins block forever or
until timeout). This misbehaviour is tracked by #6734 and the test in
not marked as `xfail`ing to ensure that the current janky behaviour
continues to work as expected rather than regressing.
  • Loading branch information
maybe-sybr committed Apr 27, 2021
1 parent 6c7a2c9 commit 445c1b7
Show file tree
Hide file tree
Showing 3 changed files with 308 additions and 10 deletions.
22 changes: 13 additions & 9 deletions celery/canvas.py
Expand Up @@ -1170,21 +1170,25 @@ def _apply_tasks(self, tasks, producer=None, app=None, p=None,
# we are able to tell when we are at the end by checking if
# next_task is None. This enables us to set the chord size
# without burning through the entire generator. See #3021.
chord_size = 0
for task_index, (current_task, next_task) in enumerate(
lookahead(tasks)
):
# We expect that each task must be part of the same group which
# seems sensible enough. If that's somehow not the case we'll
# end up messing up chord counts and there are all sorts of
# awful race conditions to think about. We'll hope it's not!
sig, res, group_id = current_task
_chord = sig.options.get("chord") or chord
if _chord is not None and next_task is None:
chord_size = task_index + 1
if isinstance(sig, _chain):
if sig.tasks[-1].subtask_type == 'chord':
chord_size = sig.tasks[-1].__length_hint__()
else:
chord_size = task_index + len(sig.tasks[-1])
chord_obj = sig.options.get("chord") or chord
# We need to check the chord size of each contributing task so
# that when we get to the final one, we can correctly set the
# size in the backend and the chord can be sensible completed.
chord_size += _chord._descend(sig)
if chord_obj is not None and next_task is None:
# Per above, sanity check that we only saw one group
app.backend.set_chord_size(group_id, chord_size)
sig.apply_async(producer=producer, add_to_parent=False,
chord=_chord, args=args, kwargs=kwargs,
chord=chord_obj, args=args, kwargs=kwargs,
**options)
# adding callback to result, such that it will gradually
# fulfill the barrier.
Expand Down
106 changes: 106 additions & 0 deletions t/integration/test_canvas.py
Expand Up @@ -704,6 +704,112 @@ def test_nested_group_group(self, manager):
res = sig.delay()
assert res.get(timeout=TIMEOUT) == [42, 42]

def test_nested_group_chord_counting_simple(self, manager):
try:
manager.app.backend.ensure_chords_allowed()
except NotImplementedError as e:
raise pytest.skip(e.args[0])

gchild_sig = identity.si(42)
child_chord = chord((gchild_sig, ), identity.s())
group_sig = group((child_chord, ))
res = group_sig.delay()
# Wait for the result to land and confirm its value is as expected
assert res.get(timeout=TIMEOUT) == [[42]]

def test_nested_group_chord_counting_chain(self, manager):
try:
manager.app.backend.ensure_chords_allowed()
except NotImplementedError as e:
raise pytest.skip(e.args[0])

gchild_count = 42
gchild_sig = chain((identity.si(1337), ) * gchild_count)
child_chord = chord((gchild_sig, ), identity.s())
group_sig = group((child_chord, ))
res = group_sig.delay()
# Wait for the result to land and confirm its value is as expected
assert res.get(timeout=TIMEOUT) == [[1337]]

def test_nested_group_chord_counting_group(self, manager):
try:
manager.app.backend.ensure_chords_allowed()
except NotImplementedError as e:
raise pytest.skip(e.args[0])

gchild_count = 42
gchild_sig = group((identity.si(1337), ) * gchild_count)
child_chord = chord((gchild_sig, ), identity.s())
group_sig = group((child_chord, ))
res = group_sig.delay()
# Wait for the result to land and confirm its value is as expected
assert res.get(timeout=TIMEOUT) == [[1337] * gchild_count]

def test_nested_group_chord_counting_chord(self, manager):
try:
manager.app.backend.ensure_chords_allowed()
except NotImplementedError as e:
raise pytest.skip(e.args[0])

gchild_count = 42
gchild_sig = chord(
(identity.si(1337), ) * gchild_count, identity.si(31337),
)
child_chord = chord((gchild_sig, ), identity.s())
group_sig = group((child_chord, ))
res = group_sig.delay()
# Wait for the result to land and confirm its value is as expected
assert res.get(timeout=TIMEOUT) == [[31337]]

def test_nested_group_chord_counting_mixed(self, manager):
try:
manager.app.backend.ensure_chords_allowed()
except NotImplementedError as e:
raise pytest.skip(e.args[0])

gchild_count = 42
child_chord = chord(
(
identity.si(42),
chain((identity.si(42), ) * gchild_count),
group((identity.si(42), ) * gchild_count),
chord((identity.si(42), ) * gchild_count, identity.si(1337)),
),
identity.s(),
)
group_sig = group((child_chord, ))
res = group_sig.delay()
# Wait for the result to land and confirm its value is as expected. The
# group result gets unrolled into the encapsulating chord, hence the
# weird unpacking below
assert res.get(timeout=TIMEOUT) == [
[42, 42, *((42, ) * gchild_count), 1337]
]

@pytest.mark.xfail(raises=TimeoutError, reason="#6734")
def test_nested_group_chord_body_chain(self, manager):
try:
manager.app.backend.ensure_chords_allowed()
except NotImplementedError as e:
raise pytest.skip(e.args[0])

child_chord = chord(identity.si(42), chain((identity.s(), )))
group_sig = group((child_chord, ))
res = group_sig.delay()
# The result can be expected to timeout since it seems like its
# underlying promise might not be getting fulfilled (ref #6734). Pick a
# short timeout since we don't want to block for ages and this is a
# fairly simple signature which should run pretty quickly.
expected_result = [[42]]
with pytest.raises(TimeoutError) as expected_excinfo:
res.get(timeout=TIMEOUT / 10)
# Get the child `AsyncResult` manually so that we don't have to wait
# again for the `GroupResult`
assert res.children[0].get(timeout=TIMEOUT) == expected_result[0]
assert res.get(timeout=TIMEOUT) == expected_result
# Re-raise the expected exception so this test will XFAIL
raise expected_excinfo.value


def assert_ids(r, expected_value, expected_root_id, expected_parent_id):
root_id, parent_id, value = r.get(timeout=TIMEOUT)
Expand Down
190 changes: 189 additions & 1 deletion t/unit/tasks/test_canvas.py
@@ -1,5 +1,5 @@
import json
from unittest.mock import MagicMock, Mock, call, patch, sentinel
from unittest.mock import MagicMock, Mock, call, patch, sentinel, ANY

import pytest
import pytest_subtests # noqa: F401
Expand Down Expand Up @@ -782,6 +782,194 @@ def test_kwargs_delay_partial(self):
res = self.helper_test_get_delay(x.delay(y=1))
assert res == [2, 2]

def test_apply_from_generator(self):
child_count = 42
child_sig = self.add.si(0, 0)
child_sigs_gen = (child_sig for _ in range(child_count))
group_sig = group(child_sigs_gen)
with patch("celery.canvas.Signature.apply_async") as mock_apply_async:
res_obj = group_sig.apply_async()
assert mock_apply_async.call_count == child_count
assert len(res_obj.children) == child_count

# This needs the current app for some reason not worth digging into
@pytest.mark.usefixtures('depends_on_current_app')
def test_apply_from_generator_empty(self):
empty_gen = (False for _ in range(0))
group_sig = group(empty_gen)
with patch("celery.canvas.Signature.apply_async") as mock_apply_async:
res_obj = group_sig.apply_async()
assert mock_apply_async.call_count == 0
assert len(res_obj.children) == 0

# In the following tests, getting the group ID is a pain so we just use
# `ANY` to wildcard it when we're checking on calls made to our mocks
def test_apply_contains_chord(self):
gchild_count = 42
gchild_sig = self.add.si(0, 0)
gchild_sigs = (gchild_sig, ) * gchild_count
child_chord = chord(gchild_sigs, gchild_sig)
group_sig = group((child_chord, ))
with patch.object(
self.app.backend, "set_chord_size",
) as mock_set_chord_size, patch(
"celery.canvas.Signature.apply_async",
) as mock_apply_async:
res_obj = group_sig.apply_async()
# We only see applies for the header grandchildren because the tasks
# are never actually run due to our mocking of `apply_async()`
assert mock_apply_async.call_count == gchild_count
assert len(res_obj.children) == len(group_sig.tasks)
# We must have set the chord size for the group of tasks which makes up
# the header of the `child_chord`, just before we apply the last task.
mock_set_chord_size.assert_called_once_with(ANY, gchild_count)

def test_apply_contains_chords_containing_chain(self):
ggchild_count = 42
ggchild_sig = self.add.si(0, 0)
gchild_sig = chain((ggchild_sig, ) * ggchild_count)
child_count = 24
child_chord = chord((gchild_sig, ), ggchild_sig)
group_sig = group((child_chord, ) * child_count)
with patch.object(
self.app.backend, "set_chord_size",
) as mock_set_chord_size, patch(
"celery.canvas.Signature.apply_async",
) as mock_apply_async:
res_obj = group_sig.apply_async()
# We only see applies for the header grandchildren because the tasks
# are never actually run due to our mocking of `apply_async()`
assert mock_apply_async.call_count == child_count
assert len(res_obj.children) == child_count
# We must have set the chord sizes based on the number of tail tasks of
# the encapsulated chains - in this case 1 for each child chord
mock_set_chord_size.assert_has_calls((call(ANY, 1), ) * child_count)

@pytest.mark.xfail(reason="Invalid canvas setup with bad exception")
def test_apply_contains_chords_containing_empty_chain(self):
gchild_sig = chain(tuple())
child_count = 24
child_chord = chord((gchild_sig, ), self.add.si(0, 0))
group_sig = group((child_chord, ) * child_count)
# This is an invalid setup because we can't complete a chord header if
# there are no actual tasks which will run in it. However, the current
# behaviour of an `IndexError` isn't particularly helpful to a user.
res_obj = group_sig.apply_async()

def test_apply_contains_chords_containing_chain_with_empty_tail(self):
ggchild_count = 42
ggchild_sig = self.add.si(0, 0)
tail_count = 24
gchild_sig = chain(
(ggchild_sig, ) * ggchild_count +
(group((ggchild_sig, ) * tail_count), group(tuple()), ),
)
child_chord = chord((gchild_sig, ), ggchild_sig)
group_sig = group((child_chord, ))
with patch.object(
self.app.backend, "set_chord_size",
) as mock_set_chord_size, patch(
"celery.canvas.Signature.apply_async",
) as mock_apply_async:
res_obj = group_sig.apply_async()
# We only see applies for the header grandchildren because the tasks
# are never actually run due to our mocking of `apply_async()`
assert mock_apply_async.call_count == 1
assert len(res_obj.children) == 1
# We must have set the chord sizes based on the size of the last
# non-empty task in the encapsulated chains - in this case `tail_count`
# for the group preceding the empty one in each grandchild chain
mock_set_chord_size.assert_called_once_with(ANY, tail_count)

def test_apply_contains_chords_containing_group(self):
ggchild_count = 42
ggchild_sig = self.add.si(0, 0)
gchild_sig = group((ggchild_sig, ) * ggchild_count)
child_count = 24
child_chord = chord((gchild_sig, ), ggchild_sig)
group_sig = group((child_chord, ) * child_count)
with patch.object(
self.app.backend, "set_chord_size",
) as mock_set_chord_size, patch(
"celery.canvas.Signature.apply_async",
) as mock_apply_async:
res_obj = group_sig.apply_async()
# We see applies for all of the header grandchildren because the tasks
# are never actually run due to our mocking of `apply_async()`
assert mock_apply_async.call_count == child_count * ggchild_count
assert len(res_obj.children) == child_count
# We must have set the chord sizes based on the number of tail tasks of
# the encapsulated groups - in this case `ggchild_count`
mock_set_chord_size.assert_has_calls(
(call(ANY, ggchild_count), ) * child_count,
)

@pytest.mark.xfail(reason="Invalid canvas setup but poor behaviour")
def test_apply_contains_chords_containing_empty_group(self):
gchild_sig = group(tuple())
child_count = 24
child_chord = chord((gchild_sig, ), self.add.si(0, 0))
group_sig = group((child_chord, ) * child_count)
with patch.object(
self.app.backend, "set_chord_size",
) as mock_set_chord_size, patch(
"celery.canvas.Signature.apply_async",
) as mock_apply_async:
res_obj = group_sig.apply_async()
# We only see applies for the header grandchildren because the tasks
# are never actually run due to our mocking of `apply_async()`
assert mock_apply_async.call_count == child_count
assert len(res_obj.children) == child_count
# This is actually kind of meaningless because, similar to the empty
# chain test, this is an invalid setup. However, we should probably
# expect that the chords are dealt with in some other way the probably
# being left incomplete forever...
mock_set_chord_size.assert_has_calls((call(ANY, 0), ) * child_count)

def test_apply_contains_chords_containing_chord(self):
ggchild_count = 42
ggchild_sig = self.add.si(0, 0)
gchild_sig = chord((ggchild_sig, ) * ggchild_count, ggchild_sig)
child_count = 24
child_chord = chord((gchild_sig, ), ggchild_sig)
group_sig = group((child_chord, ) * child_count)
with patch.object(
self.app.backend, "set_chord_size",
) as mock_set_chord_size, patch(
"celery.canvas.Signature.apply_async",
) as mock_apply_async:
res_obj = group_sig.apply_async()
# We see applies for all of the header great-grandchildren because the
# tasks are never actually run due to our mocking of `apply_async()`
assert mock_apply_async.call_count == child_count * ggchild_count
assert len(res_obj.children) == child_count
# We must have set the chord sizes based on the number of tail tasks of
# the deeply encapsulated chords' header tasks, as well as for each
# child chord. This means we have `child_count` interleaved calls to
# set chord sizes of 1 and `ggchild_count`.
mock_set_chord_size.assert_has_calls(
(call(ANY, 1), call(ANY, ggchild_count), ) * child_count,
)

def test_apply_contains_chords_containing_empty_chord(self):
gchild_sig = chord(tuple(), self.add.si(0, 0))
child_count = 24
child_chord = chord((gchild_sig, ), self.add.si(0, 0))
group_sig = group((child_chord, ) * child_count)
with patch.object(
self.app.backend, "set_chord_size",
) as mock_set_chord_size, patch(
"celery.canvas.Signature.apply_async",
) as mock_apply_async:
res_obj = group_sig.apply_async()
# We only see applies for the header grandchildren because the tasks
# are never actually run due to our mocking of `apply_async()`
assert mock_apply_async.call_count == child_count
assert len(res_obj.children) == child_count
# We must have set the chord sizes based on the number of tail tasks of
# the encapsulated chains - in this case 1 for each child chord
mock_set_chord_size.assert_has_calls((call(ANY, 1), ) * child_count)


class test_chord(CanvasCase):

Expand Down

0 comments on commit 445c1b7

Please sign in to comment.