Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deeply deserialize groups #6342

Merged
merged 4 commits into from Oct 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 9 additions & 1 deletion celery/canvas.py
Expand Up @@ -1047,8 +1047,16 @@ class group(Signature):

@classmethod
def from_dict(cls, d, app=None):
# We need to mutate the `kwargs` element in place to avoid confusing
# `freeze()` implementations which end up here and expect to be able to
# access elements from that dictionary later and refer to objects
# canonicalized here
orig_tasks = d["kwargs"]["tasks"]
d["kwargs"]["tasks"] = rebuilt_tasks = type(orig_tasks)((
maybe_signature(task, app=app) for task in orig_tasks
))
return _upgrade(
d, group(d['kwargs']['tasks'], app=app, **d['options']),
d, group(rebuilt_tasks, app=app, **d['options']),
)

def __init__(self, *tasks, **options):
Expand Down
1 change: 1 addition & 0 deletions requirements/test.txt
@@ -1,6 +1,7 @@
case>=1.3.1
pytest~=6.0
pytest-celery
pytest-subtests
pytest-timeout~=1.4.2
boto3>=1.9.178
moto==1.3.7
Expand Down
66 changes: 65 additions & 1 deletion t/integration/tasks.py
@@ -1,6 +1,6 @@
from time import sleep

from celery import Task, chain, chord, group, shared_task
from celery import Signature, Task, chain, chord, group, shared_task
maybe-sybr marked this conversation as resolved.
Show resolved Hide resolved
from celery.exceptions import SoftTimeLimitExceeded
from celery.utils.log import get_task_logger

Expand Down Expand Up @@ -244,3 +244,67 @@ def run(self):
if self.request.retries:
return self.request.retries
raise ValueError()


# The signatures returned by these tasks wouldn't actually run because the
# arguments wouldn't be fulfilled - we never actually delay them so it's fine
@shared_task
def return_nested_signature_chain_chain():
return chain(chain([add.s()]))


@shared_task
def return_nested_signature_chain_group():
return chain(group([add.s()]))


@shared_task
def return_nested_signature_chain_chord():
return chain(chord([add.s()], add.s()))


@shared_task
def return_nested_signature_group_chain():
return group(chain([add.s()]))


@shared_task
def return_nested_signature_group_group():
return group(group([add.s()]))


@shared_task
def return_nested_signature_group_chord():
return group(chord([add.s()], add.s()))


@shared_task
def return_nested_signature_chord_chain():
return chord(chain([add.s()]), add.s())


@shared_task
def return_nested_signature_chord_group():
return chord(group([add.s()]), add.s())


@shared_task
def return_nested_signature_chord_chord():
return chord(chord([add.s()], add.s()), add.s())


@shared_task
def rebuild_signature(sig_dict):
sig_obj = Signature.from_dict(sig_dict)

def _recurse(sig):
if not isinstance(sig, Signature):
raise TypeError("{!r} is not a signature object".format(sig))
# Most canvas types have a `tasks` attribute
if isinstance(sig, (chain, group, chord)):
for task in sig.tasks:
_recurse(task)
# `chord`s also have a `body` attribute
if isinstance(sig, chord):
_recurse(sig.body)
_recurse(sig_obj)
99 changes: 99 additions & 0 deletions t/integration/test_canvas.py
Expand Up @@ -9,6 +9,7 @@
from celery.exceptions import TimeoutError
from celery.result import AsyncResult, GroupResult, ResultSet

from . import tasks
from .conftest import get_active_redis_channels, get_redis_connection
from .tasks import (ExpectedException, add, add_chord_to_chord, add_replaced,
add_to_all, add_to_all_to_chord, build_chain_inside_task,
Expand Down Expand Up @@ -1095,3 +1096,101 @@ def test_nested_chord_group_chain_group_tail(self, manager):
)
res = sig.delay()
assert res.get(timeout=TIMEOUT) == [[42, 42]]


class test_signature_serialization:
"""
Confirm nested signatures can be rebuilt after passing through a backend.

These tests are expected to finish and return `None` or raise an exception
in the error case. The exception indicates that some element of a nested
signature object was not properly deserialized from its dictionary
representation, and would explode later on if it were used as a signature.
"""
def test_rebuild_nested_chain_chain(self, manager):
sig = chain(
tasks.return_nested_signature_chain_chain.s(),
tasks.rebuild_signature.s()
)
sig.delay().get(timeout=TIMEOUT)

def test_rebuild_nested_chain_group(self, manager):
sig = chain(
tasks.return_nested_signature_chain_group.s(),
tasks.rebuild_signature.s()
)
sig.delay().get(timeout=TIMEOUT)

def test_rebuild_nested_chain_chord(self, manager):
try:
manager.app.backend.ensure_chords_allowed()
except NotImplementedError as e:
raise pytest.skip(e.args[0])

sig = chain(
tasks.return_nested_signature_chain_chord.s(),
tasks.rebuild_signature.s()
)
sig.delay().get(timeout=TIMEOUT)

def test_rebuild_nested_group_chain(self, manager):
sig = chain(
tasks.return_nested_signature_group_chain.s(),
tasks.rebuild_signature.s()
)
sig.delay().get(timeout=TIMEOUT)

def test_rebuild_nested_group_group(self, manager):
sig = chain(
tasks.return_nested_signature_group_group.s(),
tasks.rebuild_signature.s()
)
sig.delay().get(timeout=TIMEOUT)

def test_rebuild_nested_group_chord(self, manager):
try:
manager.app.backend.ensure_chords_allowed()
except NotImplementedError as e:
raise pytest.skip(e.args[0])

sig = chain(
tasks.return_nested_signature_group_chord.s(),
tasks.rebuild_signature.s()
)
sig.delay().get(timeout=TIMEOUT)

def test_rebuild_nested_chord_chain(self, manager):
try:
manager.app.backend.ensure_chords_allowed()
except NotImplementedError as e:
raise pytest.skip(e.args[0])

sig = chain(
tasks.return_nested_signature_chord_chain.s(),
tasks.rebuild_signature.s()
)
sig.delay().get(timeout=TIMEOUT)

def test_rebuild_nested_chord_group(self, manager):
try:
manager.app.backend.ensure_chords_allowed()
except NotImplementedError as e:
raise pytest.skip(e.args[0])

sig = chain(
tasks.return_nested_signature_chord_group.s(),
tasks.rebuild_signature.s()
)
sig.delay().get(timeout=TIMEOUT)

def test_rebuild_nested_chord_chord(self, manager):
try:
manager.app.backend.ensure_chords_allowed()
except NotImplementedError as e:
raise pytest.skip(e.args[0])

sig = chain(
tasks.return_nested_signature_chord_chord.s(),
tasks.rebuild_signature.s()
)
sig.delay().get(timeout=TIMEOUT)