Skip to content

Commit

Permalink
fix: Pass back real result for single task chains
Browse files Browse the repository at this point in the history
When chains are delayed, they are first frozen as part of preparation
which causes the sub-tasks to also be frozen. Afterward, the final (0th
since we reverse the tasks/result order when freezing) result object
from the freezing process would be passed back to the caller. This
caused problems in signaling completion of groups contained in chains
because the group relies on a promise which is fulfilled by a barrier
linked to each of its applied subtasks. By constructing two
`GroupResult` objects (one during freezing, one when the chain sub-tasks
are applied), this resulted in there being two promises; only one of
which would actually be fulfilled by the group subtasks.

This change ensures that in the special case where a chain has a single
task, we pass back the result object constructed when the task was
actually applied. When that single child is a group which does not get
unrolled (ie. contains more than one child itself), this ensures that we
pass back a `GroupResult` object which will actually be fulfilled. The
caller can then await the result confidently!
  • Loading branch information
maybe-sybr authored and auvipy committed Oct 14, 2020
1 parent 735f167 commit 9367d36
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 5 deletions.
16 changes: 12 additions & 4 deletions celery/canvas.py
Expand Up @@ -652,19 +652,27 @@ def run(self, args=None, kwargs=None, group_id=None, chord=None,
args = (tuple(args) + tuple(self.args)
if args and not self.immutable else self.args)

tasks, results = self.prepare_steps(
tasks, results_from_prepare = self.prepare_steps(
args, kwargs, self.tasks, root_id, parent_id, link_error, app,
task_id, group_id, chord,
)

if results:
if results_from_prepare:
if link:
tasks[0].extend_list_option('link', link)
first_task = tasks.pop()
options = _prepare_chain_from_options(options, tasks, use_link)

first_task.apply_async(**options)
return results[0]
result_from_apply = first_task.apply_async(**options)
# If we only have a single task, it may be important that we pass
# the real result object rather than the one obtained via freezing.
# e.g. For `GroupResult`s, we need to pass back the result object
# which will actually have its promise fulfilled by the subtasks,
# something that will never occur for the frozen result.
if not tasks:
return result_from_apply
else:
return results_from_prepare[0]

def freeze(self, _id=None, group_id=None, chord=None,
root_id=None, parent_id=None, group_index=None):
Expand Down
18 changes: 18 additions & 0 deletions t/integration/test_canvas.py
Expand Up @@ -413,6 +413,16 @@ def test_chain_of_a_chord_and_three_tasks_and_a_group(self, manager):
res = c()
assert res.get(timeout=TIMEOUT) == [8, 8]

def test_nested_chain_group_lone(self, manager):
"""
Test that a lone group in a chain completes.
"""
sig = chain(
group(identity.s(42), identity.s(42)), # [42, 42]
)
res = sig.delay()
assert res.get(timeout=TIMEOUT) == [42, 42]


class test_result_set:

Expand Down Expand Up @@ -504,6 +514,14 @@ def test_large_group(self, manager):

assert res.get(timeout=TIMEOUT) == list(range(1000))

def test_group_lone(self, manager):
"""
Test that a simple group completes.
"""
sig = group(identity.s(42), identity.s(42)) # [42, 42]
res = sig.delay()
assert res.get(timeout=TIMEOUT) == [42, 42]


def assert_ids(r, expected_value, expected_root_id, expected_parent_id):
root_id, parent_id, value = r.get(timeout=TIMEOUT)
Expand Down
44 changes: 43 additions & 1 deletion t/unit/tasks/test_canvas.py
@@ -1,5 +1,5 @@
import json
from unittest.mock import MagicMock, Mock
from unittest.mock import MagicMock, Mock, patch

import pytest

Expand Down Expand Up @@ -535,6 +535,48 @@ def test_append_to_empty_chain(self):

assert x.apply().get() == 3

@pytest.mark.usefixtures('depends_on_current_app')
def test_chain_single_child_result(self):
child_sig = self.add.si(1, 1)
chain_sig = chain(child_sig)
assert chain_sig.tasks[0] is child_sig

with patch.object(
# We want to get back the result of actually applying the task
child_sig, "apply_async",
) as mock_apply, patch.object(
# The child signature may be clone by `chain.prepare_steps()`
child_sig, "clone", return_value=child_sig,
):
res = chain_sig()
# `_prepare_chain_from_options()` sets this `chain` kwarg with the
# subsequent tasks which would be run - nothing in this case
mock_apply.assert_called_once_with(chain=[])
assert res is mock_apply.return_value

@pytest.mark.usefixtures('depends_on_current_app')
def test_chain_single_child_group_result(self):
child_sig = self.add.si(1, 1)
# The group will `clone()` the child during instantiation so mock it
with patch.object(child_sig, "clone", return_value=child_sig):
group_sig = group(child_sig)
# Now we can construct the chain signature which is actually under test
chain_sig = chain(group_sig)
assert chain_sig.tasks[0].tasks[0] is child_sig

with patch.object(
# We want to get back the result of actually applying the task
child_sig, "apply_async",
) as mock_apply, patch.object(
# The child signature may be clone by `chain.prepare_steps()`
child_sig, "clone", return_value=child_sig,
):
res = chain_sig()
# `_prepare_chain_from_options()` sets this `chain` kwarg with the
# subsequent tasks which would be run - nothing in this case
mock_apply.assert_called_once_with(chain=[])
assert res is mock_apply.return_value


class test_group(CanvasCase):

Expand Down

0 comments on commit 9367d36

Please sign in to comment.