diff --git a/celery/canvas.py b/celery/canvas.py index 199fb037f94..a80e979af96 100644 --- a/celery/canvas.py +++ b/celery/canvas.py @@ -1170,21 +1170,25 @@ def _apply_tasks(self, tasks, producer=None, app=None, p=None, # we are able to tell when we are at the end by checking if # next_task is None. This enables us to set the chord size # without burning through the entire generator. See #3021. + chord_size = 0 for task_index, (current_task, next_task) in enumerate( lookahead(tasks) ): + # We expect that each task must be part of the same group which + # seems sensible enough. If that's somehow not the case we'll + # end up messing up chord counts and there are all sorts of + # awful race conditions to think about. We'll hope it's not! sig, res, group_id = current_task - _chord = sig.options.get("chord") or chord - if _chord is not None and next_task is None: - chord_size = task_index + 1 - if isinstance(sig, _chain): - if sig.tasks[-1].subtask_type == 'chord': - chord_size = sig.tasks[-1].__length_hint__() - else: - chord_size = task_index + len(sig.tasks[-1]) + chord_obj = sig.options.get("chord") or chord + # We need to check the chord size of each contributing task so + # that when we get to the final one, we can correctly set the + # size in the backend and the chord can be sensible completed. + chord_size += _chord._descend(sig) + if chord_obj is not None and next_task is None: + # Per above, sanity check that we only saw one group app.backend.set_chord_size(group_id, chord_size) sig.apply_async(producer=producer, add_to_parent=False, - chord=_chord, args=args, kwargs=kwargs, + chord=chord_obj, args=args, kwargs=kwargs, **options) # adding callback to result, such that it will gradually # fulfill the barrier. diff --git a/t/integration/test_canvas.py b/t/integration/test_canvas.py index 4c5f31a495f..28560e33e64 100644 --- a/t/integration/test_canvas.py +++ b/t/integration/test_canvas.py @@ -704,6 +704,112 @@ def test_nested_group_group(self, manager): res = sig.delay() assert res.get(timeout=TIMEOUT) == [42, 42] + def test_nested_group_chord_counting_simple(self, manager): + try: + manager.app.backend.ensure_chords_allowed() + except NotImplementedError as e: + raise pytest.skip(e.args[0]) + + gchild_sig = identity.si(42) + child_chord = chord((gchild_sig, ), identity.s()) + group_sig = group((child_chord, )) + res = group_sig.delay() + # Wait for the result to land and confirm its value is as expected + assert res.get(timeout=TIMEOUT) == [[42]] + + def test_nested_group_chord_counting_chain(self, manager): + try: + manager.app.backend.ensure_chords_allowed() + except NotImplementedError as e: + raise pytest.skip(e.args[0]) + + gchild_count = 42 + gchild_sig = chain((identity.si(1337), ) * gchild_count) + child_chord = chord((gchild_sig, ), identity.s()) + group_sig = group((child_chord, )) + res = group_sig.delay() + # Wait for the result to land and confirm its value is as expected + assert res.get(timeout=TIMEOUT) == [[1337]] + + def test_nested_group_chord_counting_group(self, manager): + try: + manager.app.backend.ensure_chords_allowed() + except NotImplementedError as e: + raise pytest.skip(e.args[0]) + + gchild_count = 42 + gchild_sig = group((identity.si(1337), ) * gchild_count) + child_chord = chord((gchild_sig, ), identity.s()) + group_sig = group((child_chord, )) + res = group_sig.delay() + # Wait for the result to land and confirm its value is as expected + assert res.get(timeout=TIMEOUT) == [[1337] * gchild_count] + + def test_nested_group_chord_counting_chord(self, manager): + try: + manager.app.backend.ensure_chords_allowed() + except NotImplementedError as e: + raise pytest.skip(e.args[0]) + + gchild_count = 42 + gchild_sig = chord( + (identity.si(1337), ) * gchild_count, identity.si(31337), + ) + child_chord = chord((gchild_sig, ), identity.s()) + group_sig = group((child_chord, )) + res = group_sig.delay() + # Wait for the result to land and confirm its value is as expected + assert res.get(timeout=TIMEOUT) == [[31337]] + + def test_nested_group_chord_counting_mixed(self, manager): + try: + manager.app.backend.ensure_chords_allowed() + except NotImplementedError as e: + raise pytest.skip(e.args[0]) + + gchild_count = 42 + child_chord = chord( + ( + identity.si(42), + chain((identity.si(42), ) * gchild_count), + group((identity.si(42), ) * gchild_count), + chord((identity.si(42), ) * gchild_count, identity.si(1337)), + ), + identity.s(), + ) + group_sig = group((child_chord, )) + res = group_sig.delay() + # Wait for the result to land and confirm its value is as expected. The + # group result gets unrolled into the encapsulating chord, hence the + # weird unpacking below + assert res.get(timeout=TIMEOUT) == [ + [42, 42, *((42, ) * gchild_count), 1337] + ] + + @pytest.mark.xfail(raises=TimeoutError, reason="#6734") + def test_nested_group_chord_body_chain(self, manager): + try: + manager.app.backend.ensure_chords_allowed() + except NotImplementedError as e: + raise pytest.skip(e.args[0]) + + child_chord = chord(identity.si(42), chain((identity.s(), ))) + group_sig = group((child_chord, )) + res = group_sig.delay() + # The result can be expected to timeout since it seems like its + # underlying promise might not be getting fulfilled (ref #6734). Pick a + # short timeout since we don't want to block for ages and this is a + # fairly simple signature which should run pretty quickly. + expected_result = [[42]] + with pytest.raises(TimeoutError) as expected_excinfo: + res.get(timeout=TIMEOUT / 10) + # Get the child `AsyncResult` manually so that we don't have to wait + # again for the `GroupResult` + assert res.children[0].get(timeout=TIMEOUT) == expected_result[0] + assert res.get(timeout=TIMEOUT) == expected_result + # Re-raise the expected exception so this test will XFAIL + raise expected_excinfo.value + def assert_ids(r, expected_value, expected_root_id, expected_parent_id): root_id, parent_id, value = r.get(timeout=TIMEOUT) diff --git a/t/unit/tasks/test_canvas.py b/t/unit/tasks/test_canvas.py index 6f638d04262..c6e9ca86035 100644 --- a/t/unit/tasks/test_canvas.py +++ b/t/unit/tasks/test_canvas.py @@ -1,5 +1,5 @@ import json -from unittest.mock import MagicMock, Mock, call, patch, sentinel +from unittest.mock import MagicMock, Mock, call, patch, sentinel, ANY import pytest import pytest_subtests # noqa: F401 @@ -782,6 +782,194 @@ def test_kwargs_delay_partial(self): res = self.helper_test_get_delay(x.delay(y=1)) assert res == [2, 2] + def test_apply_from_generator(self): + child_count = 42 + child_sig = self.add.si(0, 0) + child_sigs_gen = (child_sig for _ in range(child_count)) + group_sig = group(child_sigs_gen) + with patch("celery.canvas.Signature.apply_async") as mock_apply_async: + res_obj = group_sig.apply_async() + assert mock_apply_async.call_count == child_count + assert len(res_obj.children) == child_count + + # This needs the current app for some reason not worth digging into + @pytest.mark.usefixtures('depends_on_current_app') + def test_apply_from_generator_empty(self): + empty_gen = (False for _ in range(0)) + group_sig = group(empty_gen) + with patch("celery.canvas.Signature.apply_async") as mock_apply_async: + res_obj = group_sig.apply_async() + assert mock_apply_async.call_count == 0 + assert len(res_obj.children) == 0 + + # In the following tests, getting the group ID is a pain so we just use + # `ANY` to wildcard it when we're checking on calls made to our mocks + def test_apply_contains_chord(self): + gchild_count = 42 + gchild_sig = self.add.si(0, 0) + gchild_sigs = (gchild_sig, ) * gchild_count + child_chord = chord(gchild_sigs, gchild_sig) + group_sig = group((child_chord, )) + with patch.object( + self.app.backend, "set_chord_size", + ) as mock_set_chord_size, patch( + "celery.canvas.Signature.apply_async", + ) as mock_apply_async: + res_obj = group_sig.apply_async() + # We only see applies for the header grandchildren because the tasks + # are never actually run due to our mocking of `apply_async()` + assert mock_apply_async.call_count == gchild_count + assert len(res_obj.children) == len(group_sig.tasks) + # We must have set the chord size for the group of tasks which makes up + # the header of the `child_chord`, just before we apply the last task. + mock_set_chord_size.assert_called_once_with(ANY, gchild_count) + + def test_apply_contains_chords_containing_chain(self): + ggchild_count = 42 + ggchild_sig = self.add.si(0, 0) + gchild_sig = chain((ggchild_sig, ) * ggchild_count) + child_count = 24 + child_chord = chord((gchild_sig, ), ggchild_sig) + group_sig = group((child_chord, ) * child_count) + with patch.object( + self.app.backend, "set_chord_size", + ) as mock_set_chord_size, patch( + "celery.canvas.Signature.apply_async", + ) as mock_apply_async: + res_obj = group_sig.apply_async() + # We only see applies for the header grandchildren because the tasks + # are never actually run due to our mocking of `apply_async()` + assert mock_apply_async.call_count == child_count + assert len(res_obj.children) == child_count + # We must have set the chord sizes based on the number of tail tasks of + # the encapsulated chains - in this case 1 for each child chord + mock_set_chord_size.assert_has_calls((call(ANY, 1), ) * child_count) + + @pytest.mark.xfail(reason="Invalid canvas setup with bad exception") + def test_apply_contains_chords_containing_empty_chain(self): + gchild_sig = chain(tuple()) + child_count = 24 + child_chord = chord((gchild_sig, ), self.add.si(0, 0)) + group_sig = group((child_chord, ) * child_count) + # This is an invalid setup because we can't complete a chord header if + # there are no actual tasks which will run in it. However, the current + # behaviour of an `IndexError` isn't particularly helpful to a user. + res_obj = group_sig.apply_async() + + def test_apply_contains_chords_containing_chain_with_empty_tail(self): + ggchild_count = 42 + ggchild_sig = self.add.si(0, 0) + tail_count = 24 + gchild_sig = chain( + (ggchild_sig, ) * ggchild_count + + (group((ggchild_sig, ) * tail_count), group(tuple()), ), + ) + child_chord = chord((gchild_sig, ), ggchild_sig) + group_sig = group((child_chord, )) + with patch.object( + self.app.backend, "set_chord_size", + ) as mock_set_chord_size, patch( + "celery.canvas.Signature.apply_async", + ) as mock_apply_async: + res_obj = group_sig.apply_async() + # We only see applies for the header grandchildren because the tasks + # are never actually run due to our mocking of `apply_async()` + assert mock_apply_async.call_count == 1 + assert len(res_obj.children) == 1 + # We must have set the chord sizes based on the size of the last + # non-empty task in the encapsulated chains - in this case `tail_count` + # for the group preceding the empty one in each grandchild chain + mock_set_chord_size.assert_called_once_with(ANY, tail_count) + + def test_apply_contains_chords_containing_group(self): + ggchild_count = 42 + ggchild_sig = self.add.si(0, 0) + gchild_sig = group((ggchild_sig, ) * ggchild_count) + child_count = 24 + child_chord = chord((gchild_sig, ), ggchild_sig) + group_sig = group((child_chord, ) * child_count) + with patch.object( + self.app.backend, "set_chord_size", + ) as mock_set_chord_size, patch( + "celery.canvas.Signature.apply_async", + ) as mock_apply_async: + res_obj = group_sig.apply_async() + # We see applies for all of the header grandchildren because the tasks + # are never actually run due to our mocking of `apply_async()` + assert mock_apply_async.call_count == child_count * ggchild_count + assert len(res_obj.children) == child_count + # We must have set the chord sizes based on the number of tail tasks of + # the encapsulated groups - in this case `ggchild_count` + mock_set_chord_size.assert_has_calls( + (call(ANY, ggchild_count), ) * child_count, + ) + + @pytest.mark.xfail(reason="Invalid canvas setup but poor behaviour") + def test_apply_contains_chords_containing_empty_group(self): + gchild_sig = group(tuple()) + child_count = 24 + child_chord = chord((gchild_sig, ), self.add.si(0, 0)) + group_sig = group((child_chord, ) * child_count) + with patch.object( + self.app.backend, "set_chord_size", + ) as mock_set_chord_size, patch( + "celery.canvas.Signature.apply_async", + ) as mock_apply_async: + res_obj = group_sig.apply_async() + # We only see applies for the header grandchildren because the tasks + # are never actually run due to our mocking of `apply_async()` + assert mock_apply_async.call_count == child_count + assert len(res_obj.children) == child_count + # This is actually kind of meaningless because, similar to the empty + # chain test, this is an invalid setup. However, we should probably + # expect that the chords are dealt with in some other way the probably + # being left incomplete forever... + mock_set_chord_size.assert_has_calls((call(ANY, 0), ) * child_count) + + def test_apply_contains_chords_containing_chord(self): + ggchild_count = 42 + ggchild_sig = self.add.si(0, 0) + gchild_sig = chord((ggchild_sig, ) * ggchild_count, ggchild_sig) + child_count = 24 + child_chord = chord((gchild_sig, ), ggchild_sig) + group_sig = group((child_chord, ) * child_count) + with patch.object( + self.app.backend, "set_chord_size", + ) as mock_set_chord_size, patch( + "celery.canvas.Signature.apply_async", + ) as mock_apply_async: + res_obj = group_sig.apply_async() + # We see applies for all of the header great-grandchildren because the + # tasks are never actually run due to our mocking of `apply_async()` + assert mock_apply_async.call_count == child_count * ggchild_count + assert len(res_obj.children) == child_count + # We must have set the chord sizes based on the number of tail tasks of + # the deeply encapsulated chords' header tasks, as well as for each + # child chord. This means we have `child_count` interleaved calls to + # set chord sizes of 1 and `ggchild_count`. + mock_set_chord_size.assert_has_calls( + (call(ANY, 1), call(ANY, ggchild_count), ) * child_count, + ) + + def test_apply_contains_chords_containing_empty_chord(self): + gchild_sig = chord(tuple(), self.add.si(0, 0)) + child_count = 24 + child_chord = chord((gchild_sig, ), self.add.si(0, 0)) + group_sig = group((child_chord, ) * child_count) + with patch.object( + self.app.backend, "set_chord_size", + ) as mock_set_chord_size, patch( + "celery.canvas.Signature.apply_async", + ) as mock_apply_async: + res_obj = group_sig.apply_async() + # We only see applies for the header grandchildren because the tasks + # are never actually run due to our mocking of `apply_async()` + assert mock_apply_async.call_count == child_count + assert len(res_obj.children) == child_count + # We must have set the chord sizes based on the number of tail tasks of + # the encapsulated chains - in this case 1 for each child chord + mock_set_chord_size.assert_has_calls((call(ANY, 1), ) * child_count) + class test_chord(CanvasCase):