Skip to content

Commit

Permalink
fix: Retain group_id when tasks get re-frozen
Browse files Browse the repository at this point in the history
When a group task which is part of a chain was to be delayed by
`trace_task()`, it would be reconstructed from the serialized request.
Normally, this sets the `group_id` of encapsulated tasks to the ID of
the group being instantiated. However, in the specific situation of a
group that is the last task in a chain which contributes to the
completion of a chord, it is essential that the group ID of the top-most
group is used instead. This top-most group ID is used by the redis
backend to track the completions of "final elements" of a chord in the
`on_chord_part_return()` implementation. By overwriting the group ID
which was already set in the `options` dictionaries of the child tasks
being deserialized, the chord accounting done by the redis backend would
be made inaccurate and chords would never complete.

This change alters how options are overridden for signatures to ensure
that if a `group_id` has already been set, it cannot be overridden.
Since group ID should be generally opaque to users, this should not be
disruptive.
  • Loading branch information
maybe-sybr authored and auvipy committed Oct 14, 2020
1 parent 9367d36 commit f1dbf3f
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 7 deletions.
23 changes: 17 additions & 6 deletions celery/canvas.py
Expand Up @@ -122,6 +122,9 @@ class Signature(dict):

TYPES = {}
_app = _type = None
# The following fields must not be changed during freezing/merging because
# to do so would disrupt completion of parent tasks
_IMMUTABLE_OPTIONS = {"group_id"}

@classmethod
def register_type(cls, name=None):
Expand Down Expand Up @@ -224,14 +227,22 @@ def apply_async(self, args=None, kwargs=None, route_name=None, **options):
def _merge(self, args=None, kwargs=None, options=None, force=False):
args = args if args else ()
kwargs = kwargs if kwargs else {}
options = options if options else {}
if options is not None:
# We build a new options dictionary where values in `options`
# override values in `self.options` except for keys which are
# noted as being immutable (unrelated to signature immutability)
# implying that allowing their value to change would stall tasks
new_options = dict(self.options, **{
k: v for k, v in options.items()
if k not in self._IMMUTABLE_OPTIONS or k not in self.options
})
else:
new_options = self.options
if self.immutable and not force:
return (self.args, self.kwargs,
dict(self.options,
**options) if options else self.options)
return (self.args, self.kwargs, new_options)
return (tuple(args) + tuple(self.args) if args else self.args,
dict(self.kwargs, **kwargs) if kwargs else self.kwargs,
dict(self.options, **options) if options else self.options)
new_options)

def clone(self, args=None, kwargs=None, **opts):
"""Create a copy of this signature.
Expand Down Expand Up @@ -286,7 +297,7 @@ def freeze(self, _id=None, group_id=None, chord=None,
opts['parent_id'] = parent_id
if 'reply_to' not in opts:
opts['reply_to'] = self.app.oid
if group_id:
if group_id and "group_id" not in opts:
opts['group_id'] = group_id
if chord:
opts['chord'] = chord
Expand Down
25 changes: 24 additions & 1 deletion t/unit/tasks/test_canvas.py
@@ -1,5 +1,5 @@
import json
from unittest.mock import MagicMock, Mock, patch
from unittest.mock import MagicMock, Mock, patch, sentinel

import pytest

Expand Down Expand Up @@ -154,6 +154,29 @@ def test_merge_immutable(self):
assert kwargs == {'foo': 1}
assert options == {'task_id': 3}

def test_merge_options__none(self):
sig = self.add.si()
_, _, new_options = sig._merge()
assert new_options is sig.options
_, _, new_options = sig._merge(options=None)
assert new_options is sig.options

@pytest.mark.parametrize("immutable_sig", (True, False))
def test_merge_options__group_id(self, immutable_sig):
# This is to avoid testing the behaviour in `test_set_immutable()`
if immutable_sig:
sig = self.add.si()
else:
sig = self.add.s()
# If the signature has no group ID, it can be set
assert not sig.options
_, _, new_options = sig._merge(options={"group_id": sentinel.gid})
assert new_options == {"group_id": sentinel.gid}
# But if one is already set, the new one is silently ignored
sig.set(group_id=sentinel.old_gid)
_, _, new_options = sig._merge(options={"group_id": sentinel.new_gid})
assert new_options == {"group_id": sentinel.old_gid}

def test_set_immutable(self):
x = self.add.s(2, 2)
assert not x.immutable
Expand Down

0 comments on commit f1dbf3f

Please sign in to comment.