Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Pass back real result for single task chains #6411

Merged
merged 1 commit into from Oct 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 12 additions & 4 deletions celery/canvas.py
Expand Up @@ -652,19 +652,27 @@ def run(self, args=None, kwargs=None, group_id=None, chord=None,
args = (tuple(args) + tuple(self.args)
if args and not self.immutable else self.args)

tasks, results = self.prepare_steps(
tasks, results_from_prepare = self.prepare_steps(
args, kwargs, self.tasks, root_id, parent_id, link_error, app,
task_id, group_id, chord,
)

if results:
if results_from_prepare:
if link:
tasks[0].extend_list_option('link', link)
first_task = tasks.pop()
options = _prepare_chain_from_options(options, tasks, use_link)

first_task.apply_async(**options)
return results[0]
result_from_apply = first_task.apply_async(**options)
# If we only have a single task, it may be important that we pass
# the real result object rather than the one obtained via freezing.
# e.g. For `GroupResult`s, we need to pass back the result object
# which will actually have its promise fulfilled by the subtasks,
# something that will never occur for the frozen result.
if not tasks:
return result_from_apply
else:
return results_from_prepare[0]

def freeze(self, _id=None, group_id=None, chord=None,
root_id=None, parent_id=None, group_index=None):
Expand Down
18 changes: 18 additions & 0 deletions t/integration/test_canvas.py
Expand Up @@ -413,6 +413,16 @@ def test_chain_of_a_chord_and_three_tasks_and_a_group(self, manager):
res = c()
assert res.get(timeout=TIMEOUT) == [8, 8]

def test_nested_chain_group_lone(self, manager):
"""
Test that a lone group in a chain completes.
"""
sig = chain(
group(identity.s(42), identity.s(42)), # [42, 42]
)
res = sig.delay()
assert res.get(timeout=TIMEOUT) == [42, 42]


class test_result_set:

Expand Down Expand Up @@ -504,6 +514,14 @@ def test_large_group(self, manager):

assert res.get(timeout=TIMEOUT) == list(range(1000))

def test_group_lone(self, manager):
"""
Test that a simple group completes.
"""
sig = group(identity.s(42), identity.s(42)) # [42, 42]
res = sig.delay()
assert res.get(timeout=TIMEOUT) == [42, 42]


def assert_ids(r, expected_value, expected_root_id, expected_parent_id):
root_id, parent_id, value = r.get(timeout=TIMEOUT)
Expand Down
44 changes: 43 additions & 1 deletion t/unit/tasks/test_canvas.py
@@ -1,5 +1,5 @@
import json
from unittest.mock import MagicMock, Mock
from unittest.mock import MagicMock, Mock, patch

import pytest

Expand Down Expand Up @@ -535,6 +535,48 @@ def test_append_to_empty_chain(self):

assert x.apply().get() == 3

@pytest.mark.usefixtures('depends_on_current_app')
def test_chain_single_child_result(self):
child_sig = self.add.si(1, 1)
chain_sig = chain(child_sig)
assert chain_sig.tasks[0] is child_sig

with patch.object(
# We want to get back the result of actually applying the task
child_sig, "apply_async",
) as mock_apply, patch.object(
# The child signature may be clone by `chain.prepare_steps()`
child_sig, "clone", return_value=child_sig,
):
res = chain_sig()
# `_prepare_chain_from_options()` sets this `chain` kwarg with the
# subsequent tasks which would be run - nothing in this case
mock_apply.assert_called_once_with(chain=[])
assert res is mock_apply.return_value

@pytest.mark.usefixtures('depends_on_current_app')
def test_chain_single_child_group_result(self):
child_sig = self.add.si(1, 1)
# The group will `clone()` the child during instantiation so mock it
with patch.object(child_sig, "clone", return_value=child_sig):
group_sig = group(child_sig)
# Now we can construct the chain signature which is actually under test
chain_sig = chain(group_sig)
assert chain_sig.tasks[0].tasks[0] is child_sig

with patch.object(
# We want to get back the result of actually applying the task
child_sig, "apply_async",
) as mock_apply, patch.object(
# The child signature may be clone by `chain.prepare_steps()`
child_sig, "clone", return_value=child_sig,
):
res = chain_sig()
# `_prepare_chain_from_options()` sets this `chain` kwarg with the
# subsequent tasks which would be run - nothing in this case
mock_apply.assert_called_once_with(chain=[])
assert res is mock_apply.return_value


class test_group(CanvasCase):

Expand Down