From f0242dab7142a6ffd2ead3cf6159d6a4155e6915 Mon Sep 17 00:00:00 2001 From: maybe-sybr <58414429+maybe-sybr@users.noreply.github.com> Date: Tue, 13 Oct 2020 16:31:29 +1100 Subject: [PATCH] fix: Pass back real result for single task chains When chains are delayed, they are first frozen as part of preparation which causes the sub-tasks to also be frozen. Afterward, the final (0th since we reverse the tasks/result order when freezing) result object from the freezing process would be passed back to the caller. This caused problems in signaling completion of groups contained in chains because the group relies on a promise which is fulfilled by a barrier linked to each of its applied subtasks. By constructing two `GroupResult` objects (one during freezing, one when the chain sub-tasks are applied), this resulted in there being two promises; only one of which would actually be fulfilled by the group subtasks. This change ensures that in the special case where a chain has a single task, we pass back the result object constructed when the task was actually applied. When that single child is a group which does not get unrolled (ie. contains more than one child itself), this ensures that we pass back a `GroupResult` object which will actually be fulfilled. The caller can then await the result confidently! --- celery/canvas.py | 12 ++++++++-- t/integration/test_canvas.py | 18 +++++++++++++++ t/unit/tasks/test_canvas.py | 44 +++++++++++++++++++++++++++++++++++- 3 files changed, 71 insertions(+), 3 deletions(-) diff --git a/celery/canvas.py b/celery/canvas.py index 7871f7b395..4a223c8f4d 100644 --- a/celery/canvas.py +++ b/celery/canvas.py @@ -663,8 +663,16 @@ def run(self, args=None, kwargs=None, group_id=None, chord=None, first_task = tasks.pop() options = _prepare_chain_from_options(options, tasks, use_link) - first_task.apply_async(**options) - return results[0] + real_result = first_task.apply_async(**options) + # If we only have a single task, it may be important that we pass + # the real result object rather than the one obtained via freezing. + # e.g. For `GroupResult`s, we need to pass back the result object + # which will actually have its promise fulfilled by the subtasks, + # something that will never occur for the frozen result. + if not tasks: + return real_result + else: + return results[0] def freeze(self, _id=None, group_id=None, chord=None, root_id=None, parent_id=None, group_index=None): diff --git a/t/integration/test_canvas.py b/t/integration/test_canvas.py index f5d19184a3..6b1f316b03 100644 --- a/t/integration/test_canvas.py +++ b/t/integration/test_canvas.py @@ -413,6 +413,16 @@ def test_chain_of_a_chord_and_three_tasks_and_a_group(self, manager): res = c() assert res.get(timeout=TIMEOUT) == [8, 8] + def test_nested_chain_group_lone(self, manager): + """ + Test that a lone group in a chain completes. + """ + sig = chain( + group(identity.s(42), identity.s(42)), # [42, 42] + ) + res = sig.delay() + assert res.get(timeout=TIMEOUT) == [42, 42] + class test_result_set: @@ -504,6 +514,14 @@ def test_large_group(self, manager): assert res.get(timeout=TIMEOUT) == list(range(1000)) + def test_group_lone(self, manager): + """ + Test that a simple group completes. + """ + sig = group(identity.s(42), identity.s(42)) # [42, 42] + res = sig.delay() + assert res.get(timeout=TIMEOUT) == [42, 42] + def assert_ids(r, expected_value, expected_root_id, expected_parent_id): root_id, parent_id, value = r.get(timeout=TIMEOUT) diff --git a/t/unit/tasks/test_canvas.py b/t/unit/tasks/test_canvas.py index 53f98615e8..c15dec83d6 100644 --- a/t/unit/tasks/test_canvas.py +++ b/t/unit/tasks/test_canvas.py @@ -1,5 +1,5 @@ import json -from unittest.mock import MagicMock, Mock +from unittest.mock import MagicMock, Mock, patch import pytest @@ -535,6 +535,48 @@ def test_append_to_empty_chain(self): assert x.apply().get() == 3 + @pytest.mark.usefixtures('depends_on_current_app') + def test_chain_single_child_result(self): + child_sig = self.add.si(1, 1) + chain_sig = chain(child_sig) + assert chain_sig.tasks[0] is child_sig + + with patch.object( + # We want to get back the result of actually applying the task + child_sig, "apply_async", + ) as mock_apply, patch.object( + # The child signature may be clone by `chain.prepare_steps()` + child_sig, "clone", return_value=child_sig, + ): + res = chain_sig() + # `_prepare_chain_from_options()` sets this `chain` kwarg with the + # subsequent tasks which would be run - nothing in this case + mock_apply.assert_called_once_with(chain=[]) + assert res is mock_apply.return_value + + @pytest.mark.usefixtures('depends_on_current_app') + def test_chain_single_child_group_result(self): + child_sig = self.add.si(1, 1) + # The group will `clone()` the child during instantiation so mock it + with patch.object(child_sig, "clone", return_value=child_sig): + group_sig = group(child_sig) + # Now we can construct the chain signature which is actually under test + chain_sig = chain(group_sig) + assert chain_sig.tasks[0].tasks[0] is child_sig + + with patch.object( + # We want to get back the result of actually applying the task + child_sig, "apply_async", + ) as mock_apply, patch.object( + # The child signature may be clone by `chain.prepare_steps()` + child_sig, "clone", return_value=child_sig, + ): + res = chain_sig() + # `_prepare_chain_from_options()` sets this `chain` kwarg with the + # subsequent tasks which would be run - nothing in this case + mock_apply.assert_called_once_with(chain=[]) + assert res is mock_apply.return_value + class test_group(CanvasCase):