From 445c1b723221ad05d1f78d10992a12cb06e9e797 Mon Sep 17 00:00:00 2001 From: maybe-sybr <58414429+maybe-sybr@users.noreply.github.com> Date: Mon, 19 Apr 2021 12:10:10 +1000 Subject: [PATCH] fix: Chord counting of group children This change ensures that we only have one piece of code which calculates chord sizes (ie. `_chord._descend()`, recently made protected so other canvas classes can use it as required). By doing so, we fix some edge cases in the chord counting logic which was being used for children of groups, and also add some unit tests to capture those cases and their expected behaviours. This change also introduces an integration test which checks the current behaviour of chains used as chord bodies when nested in groups. Due to some misbehaviour, likely with promise fulfillment, the `GroupResult` object will time out unless all of its children are resolved prior to `GroupResult` being joined (specifically, native joins block forever or until timeout). This misbehaviour is tracked by #6734 and the test in not marked as `xfail`ing to ensure that the current janky behaviour continues to work as expected rather than regressing. --- celery/canvas.py | 22 ++-- t/integration/test_canvas.py | 106 +++++++++++++++++++ t/unit/tasks/test_canvas.py | 190 ++++++++++++++++++++++++++++++++++- 3 files changed, 308 insertions(+), 10 deletions(-) diff --git a/celery/canvas.py b/celery/canvas.py index 199fb037f94..a80e979af96 100644 --- a/celery/canvas.py +++ b/celery/canvas.py @@ -1170,21 +1170,25 @@ def _apply_tasks(self, tasks, producer=None, app=None, p=None, # we are able to tell when we are at the end by checking if # next_task is None. This enables us to set the chord size # without burning through the entire generator. See #3021. + chord_size = 0 for task_index, (current_task, next_task) in enumerate( lookahead(tasks) ): + # We expect that each task must be part of the same group which + # seems sensible enough. If that's somehow not the case we'll + # end up messing up chord counts and there are all sorts of + # awful race conditions to think about. We'll hope it's not! sig, res, group_id = current_task - _chord = sig.options.get("chord") or chord - if _chord is not None and next_task is None: - chord_size = task_index + 1 - if isinstance(sig, _chain): - if sig.tasks[-1].subtask_type == 'chord': - chord_size = sig.tasks[-1].__length_hint__() - else: - chord_size = task_index + len(sig.tasks[-1]) + chord_obj = sig.options.get("chord") or chord + # We need to check the chord size of each contributing task so + # that when we get to the final one, we can correctly set the + # size in the backend and the chord can be sensible completed. + chord_size += _chord._descend(sig) + if chord_obj is not None and next_task is None: + # Per above, sanity check that we only saw one group app.backend.set_chord_size(group_id, chord_size) sig.apply_async(producer=producer, add_to_parent=False, - chord=_chord, args=args, kwargs=kwargs, + chord=chord_obj, args=args, kwargs=kwargs, **options) # adding callback to result, such that it will gradually # fulfill the barrier. diff --git a/t/integration/test_canvas.py b/t/integration/test_canvas.py index 4c5f31a495f..28560e33e64 100644 --- a/t/integration/test_canvas.py +++ b/t/integration/test_canvas.py @@ -704,6 +704,112 @@ def test_nested_group_group(self, manager): res = sig.delay() assert res.get(timeout=TIMEOUT) == [42, 42] + def test_nested_group_chord_counting_simple(self, manager): + try: + manager.app.backend.ensure_chords_allowed() + except NotImplementedError as e: + raise pytest.skip(e.args[0]) + + gchild_sig = identity.si(42) + child_chord = chord((gchild_sig, ), identity.s()) + group_sig = group((child_chord, )) + res = group_sig.delay() + # Wait for the result to land and confirm its value is as expected + assert res.get(timeout=TIMEOUT) == [[42]] + + def test_nested_group_chord_counting_chain(self, manager): + try: + manager.app.backend.ensure_chords_allowed() + except NotImplementedError as e: + raise pytest.skip(e.args[0]) + + gchild_count = 42 + gchild_sig = chain((identity.si(1337), ) * gchild_count) + child_chord = chord((gchild_sig, ), identity.s()) + group_sig = group((child_chord, )) + res = group_sig.delay() + # Wait for the result to land and confirm its value is as expected + assert res.get(timeout=TIMEOUT) == [[1337]] + + def test_nested_group_chord_counting_group(self, manager): + try: + manager.app.backend.ensure_chords_allowed() + except NotImplementedError as e: + raise pytest.skip(e.args[0]) + + gchild_count = 42 + gchild_sig = group((identity.si(1337), ) * gchild_count) + child_chord = chord((gchild_sig, ), identity.s()) + group_sig = group((child_chord, )) + res = group_sig.delay() + # Wait for the result to land and confirm its value is as expected + assert res.get(timeout=TIMEOUT) == [[1337] * gchild_count] + + def test_nested_group_chord_counting_chord(self, manager): + try: + manager.app.backend.ensure_chords_allowed() + except NotImplementedError as e: + raise pytest.skip(e.args[0]) + + gchild_count = 42 + gchild_sig = chord( + (identity.si(1337), ) * gchild_count, identity.si(31337), + ) + child_chord = chord((gchild_sig, ), identity.s()) + group_sig = group((child_chord, )) + res = group_sig.delay() + # Wait for the result to land and confirm its value is as expected + assert res.get(timeout=TIMEOUT) == [[31337]] + + def test_nested_group_chord_counting_mixed(self, manager): + try: + manager.app.backend.ensure_chords_allowed() + except NotImplementedError as e: + raise pytest.skip(e.args[0]) + + gchild_count = 42 + child_chord = chord( + ( + identity.si(42), + chain((identity.si(42), ) * gchild_count), + group((identity.si(42), ) * gchild_count), + chord((identity.si(42), ) * gchild_count, identity.si(1337)), + ), + identity.s(), + ) + group_sig = group((child_chord, )) + res = group_sig.delay() + # Wait for the result to land and confirm its value is as expected. The + # group result gets unrolled into the encapsulating chord, hence the + # weird unpacking below + assert res.get(timeout=TIMEOUT) == [ + [42, 42, *((42, ) * gchild_count), 1337] + ] + + @pytest.mark.xfail(raises=TimeoutError, reason="#6734") + def test_nested_group_chord_body_chain(self, manager): + try: + manager.app.backend.ensure_chords_allowed() + except NotImplementedError as e: + raise pytest.skip(e.args[0]) + + child_chord = chord(identity.si(42), chain((identity.s(), ))) + group_sig = group((child_chord, )) + res = group_sig.delay() + # The result can be expected to timeout since it seems like its + # underlying promise might not be getting fulfilled (ref #6734). Pick a + # short timeout since we don't want to block for ages and this is a + # fairly simple signature which should run pretty quickly. + expected_result = [[42]] + with pytest.raises(TimeoutError) as expected_excinfo: + res.get(timeout=TIMEOUT / 10) + # Get the child `AsyncResult` manually so that we don't have to wait + # again for the `GroupResult` + assert res.children[0].get(timeout=TIMEOUT) == expected_result[0] + assert res.get(timeout=TIMEOUT) == expected_result + # Re-raise the expected exception so this test will XFAIL + raise expected_excinfo.value + def assert_ids(r, expected_value, expected_root_id, expected_parent_id): root_id, parent_id, value = r.get(timeout=TIMEOUT) diff --git a/t/unit/tasks/test_canvas.py b/t/unit/tasks/test_canvas.py index 6f638d04262..c6e9ca86035 100644 --- a/t/unit/tasks/test_canvas.py +++ b/t/unit/tasks/test_canvas.py @@ -1,5 +1,5 @@ import json -from unittest.mock import MagicMock, Mock, call, patch, sentinel +from unittest.mock import MagicMock, Mock, call, patch, sentinel, ANY import pytest import pytest_subtests # noqa: F401 @@ -782,6 +782,194 @@ def test_kwargs_delay_partial(self): res = self.helper_test_get_delay(x.delay(y=1)) assert res == [2, 2] + def test_apply_from_generator(self): + child_count = 42 + child_sig = self.add.si(0, 0) + child_sigs_gen = (child_sig for _ in range(child_count)) + group_sig = group(child_sigs_gen) + with patch("celery.canvas.Signature.apply_async") as mock_apply_async: + res_obj = group_sig.apply_async() + assert mock_apply_async.call_count == child_count + assert len(res_obj.children) == child_count + + # This needs the current app for some reason not worth digging into + @pytest.mark.usefixtures('depends_on_current_app') + def test_apply_from_generator_empty(self): + empty_gen = (False for _ in range(0)) + group_sig = group(empty_gen) + with patch("celery.canvas.Signature.apply_async") as mock_apply_async: + res_obj = group_sig.apply_async() + assert mock_apply_async.call_count == 0 + assert len(res_obj.children) == 0 + + # In the following tests, getting the group ID is a pain so we just use + # `ANY` to wildcard it when we're checking on calls made to our mocks + def test_apply_contains_chord(self): + gchild_count = 42 + gchild_sig = self.add.si(0, 0) + gchild_sigs = (gchild_sig, ) * gchild_count + child_chord = chord(gchild_sigs, gchild_sig) + group_sig = group((child_chord, )) + with patch.object( + self.app.backend, "set_chord_size", + ) as mock_set_chord_size, patch( + "celery.canvas.Signature.apply_async", + ) as mock_apply_async: + res_obj = group_sig.apply_async() + # We only see applies for the header grandchildren because the tasks + # are never actually run due to our mocking of `apply_async()` + assert mock_apply_async.call_count == gchild_count + assert len(res_obj.children) == len(group_sig.tasks) + # We must have set the chord size for the group of tasks which makes up + # the header of the `child_chord`, just before we apply the last task. + mock_set_chord_size.assert_called_once_with(ANY, gchild_count) + + def test_apply_contains_chords_containing_chain(self): + ggchild_count = 42 + ggchild_sig = self.add.si(0, 0) + gchild_sig = chain((ggchild_sig, ) * ggchild_count) + child_count = 24 + child_chord = chord((gchild_sig, ), ggchild_sig) + group_sig = group((child_chord, ) * child_count) + with patch.object( + self.app.backend, "set_chord_size", + ) as mock_set_chord_size, patch( + "celery.canvas.Signature.apply_async", + ) as mock_apply_async: + res_obj = group_sig.apply_async() + # We only see applies for the header grandchildren because the tasks + # are never actually run due to our mocking of `apply_async()` + assert mock_apply_async.call_count == child_count + assert len(res_obj.children) == child_count + # We must have set the chord sizes based on the number of tail tasks of + # the encapsulated chains - in this case 1 for each child chord + mock_set_chord_size.assert_has_calls((call(ANY, 1), ) * child_count) + + @pytest.mark.xfail(reason="Invalid canvas setup with bad exception") + def test_apply_contains_chords_containing_empty_chain(self): + gchild_sig = chain(tuple()) + child_count = 24 + child_chord = chord((gchild_sig, ), self.add.si(0, 0)) + group_sig = group((child_chord, ) * child_count) + # This is an invalid setup because we can't complete a chord header if + # there are no actual tasks which will run in it. However, the current + # behaviour of an `IndexError` isn't particularly helpful to a user. + res_obj = group_sig.apply_async() + + def test_apply_contains_chords_containing_chain_with_empty_tail(self): + ggchild_count = 42 + ggchild_sig = self.add.si(0, 0) + tail_count = 24 + gchild_sig = chain( + (ggchild_sig, ) * ggchild_count + + (group((ggchild_sig, ) * tail_count), group(tuple()), ), + ) + child_chord = chord((gchild_sig, ), ggchild_sig) + group_sig = group((child_chord, )) + with patch.object( + self.app.backend, "set_chord_size", + ) as mock_set_chord_size, patch( + "celery.canvas.Signature.apply_async", + ) as mock_apply_async: + res_obj = group_sig.apply_async() + # We only see applies for the header grandchildren because the tasks + # are never actually run due to our mocking of `apply_async()` + assert mock_apply_async.call_count == 1 + assert len(res_obj.children) == 1 + # We must have set the chord sizes based on the size of the last + # non-empty task in the encapsulated chains - in this case `tail_count` + # for the group preceding the empty one in each grandchild chain + mock_set_chord_size.assert_called_once_with(ANY, tail_count) + + def test_apply_contains_chords_containing_group(self): + ggchild_count = 42 + ggchild_sig = self.add.si(0, 0) + gchild_sig = group((ggchild_sig, ) * ggchild_count) + child_count = 24 + child_chord = chord((gchild_sig, ), ggchild_sig) + group_sig = group((child_chord, ) * child_count) + with patch.object( + self.app.backend, "set_chord_size", + ) as mock_set_chord_size, patch( + "celery.canvas.Signature.apply_async", + ) as mock_apply_async: + res_obj = group_sig.apply_async() + # We see applies for all of the header grandchildren because the tasks + # are never actually run due to our mocking of `apply_async()` + assert mock_apply_async.call_count == child_count * ggchild_count + assert len(res_obj.children) == child_count + # We must have set the chord sizes based on the number of tail tasks of + # the encapsulated groups - in this case `ggchild_count` + mock_set_chord_size.assert_has_calls( + (call(ANY, ggchild_count), ) * child_count, + ) + + @pytest.mark.xfail(reason="Invalid canvas setup but poor behaviour") + def test_apply_contains_chords_containing_empty_group(self): + gchild_sig = group(tuple()) + child_count = 24 + child_chord = chord((gchild_sig, ), self.add.si(0, 0)) + group_sig = group((child_chord, ) * child_count) + with patch.object( + self.app.backend, "set_chord_size", + ) as mock_set_chord_size, patch( + "celery.canvas.Signature.apply_async", + ) as mock_apply_async: + res_obj = group_sig.apply_async() + # We only see applies for the header grandchildren because the tasks + # are never actually run due to our mocking of `apply_async()` + assert mock_apply_async.call_count == child_count + assert len(res_obj.children) == child_count + # This is actually kind of meaningless because, similar to the empty + # chain test, this is an invalid setup. However, we should probably + # expect that the chords are dealt with in some other way the probably + # being left incomplete forever... + mock_set_chord_size.assert_has_calls((call(ANY, 0), ) * child_count) + + def test_apply_contains_chords_containing_chord(self): + ggchild_count = 42 + ggchild_sig = self.add.si(0, 0) + gchild_sig = chord((ggchild_sig, ) * ggchild_count, ggchild_sig) + child_count = 24 + child_chord = chord((gchild_sig, ), ggchild_sig) + group_sig = group((child_chord, ) * child_count) + with patch.object( + self.app.backend, "set_chord_size", + ) as mock_set_chord_size, patch( + "celery.canvas.Signature.apply_async", + ) as mock_apply_async: + res_obj = group_sig.apply_async() + # We see applies for all of the header great-grandchildren because the + # tasks are never actually run due to our mocking of `apply_async()` + assert mock_apply_async.call_count == child_count * ggchild_count + assert len(res_obj.children) == child_count + # We must have set the chord sizes based on the number of tail tasks of + # the deeply encapsulated chords' header tasks, as well as for each + # child chord. This means we have `child_count` interleaved calls to + # set chord sizes of 1 and `ggchild_count`. + mock_set_chord_size.assert_has_calls( + (call(ANY, 1), call(ANY, ggchild_count), ) * child_count, + ) + + def test_apply_contains_chords_containing_empty_chord(self): + gchild_sig = chord(tuple(), self.add.si(0, 0)) + child_count = 24 + child_chord = chord((gchild_sig, ), self.add.si(0, 0)) + group_sig = group((child_chord, ) * child_count) + with patch.object( + self.app.backend, "set_chord_size", + ) as mock_set_chord_size, patch( + "celery.canvas.Signature.apply_async", + ) as mock_apply_async: + res_obj = group_sig.apply_async() + # We only see applies for the header grandchildren because the tasks + # are never actually run due to our mocking of `apply_async()` + assert mock_apply_async.call_count == child_count + assert len(res_obj.children) == child_count + # We must have set the chord sizes based on the number of tail tasks of + # the encapsulated chains - in this case 1 for each child chord + mock_set_chord_size.assert_has_calls((call(ANY, 1), ) * child_count) + class test_chord(CanvasCase):