From f1dbf3f05fb047c4d57c61b4ccaa0dcedd16e193 Mon Sep 17 00:00:00 2001 From: maybe-sybr <58414429+maybe-sybr@users.noreply.github.com> Date: Fri, 18 Sep 2020 14:23:40 +1000 Subject: [PATCH] fix: Retain `group_id` when tasks get re-frozen When a group task which is part of a chain was to be delayed by `trace_task()`, it would be reconstructed from the serialized request. Normally, this sets the `group_id` of encapsulated tasks to the ID of the group being instantiated. However, in the specific situation of a group that is the last task in a chain which contributes to the completion of a chord, it is essential that the group ID of the top-most group is used instead. This top-most group ID is used by the redis backend to track the completions of "final elements" of a chord in the `on_chord_part_return()` implementation. By overwriting the group ID which was already set in the `options` dictionaries of the child tasks being deserialized, the chord accounting done by the redis backend would be made inaccurate and chords would never complete. This change alters how options are overridden for signatures to ensure that if a `group_id` has already been set, it cannot be overridden. Since group ID should be generally opaque to users, this should not be disruptive. --- celery/canvas.py | 23 +++++++++++++++++------ t/unit/tasks/test_canvas.py | 25 ++++++++++++++++++++++++- 2 files changed, 41 insertions(+), 7 deletions(-) diff --git a/celery/canvas.py b/celery/canvas.py index 866c1c888b2..f767de1ce0a 100644 --- a/celery/canvas.py +++ b/celery/canvas.py @@ -122,6 +122,9 @@ class Signature(dict): TYPES = {} _app = _type = None + # The following fields must not be changed during freezing/merging because + # to do so would disrupt completion of parent tasks + _IMMUTABLE_OPTIONS = {"group_id"} @classmethod def register_type(cls, name=None): @@ -224,14 +227,22 @@ def apply_async(self, args=None, kwargs=None, route_name=None, **options): def _merge(self, args=None, kwargs=None, options=None, force=False): args = args if args else () kwargs = kwargs if kwargs else {} - options = options if options else {} + if options is not None: + # We build a new options dictionary where values in `options` + # override values in `self.options` except for keys which are + # noted as being immutable (unrelated to signature immutability) + # implying that allowing their value to change would stall tasks + new_options = dict(self.options, **{ + k: v for k, v in options.items() + if k not in self._IMMUTABLE_OPTIONS or k not in self.options + }) + else: + new_options = self.options if self.immutable and not force: - return (self.args, self.kwargs, - dict(self.options, - **options) if options else self.options) + return (self.args, self.kwargs, new_options) return (tuple(args) + tuple(self.args) if args else self.args, dict(self.kwargs, **kwargs) if kwargs else self.kwargs, - dict(self.options, **options) if options else self.options) + new_options) def clone(self, args=None, kwargs=None, **opts): """Create a copy of this signature. @@ -286,7 +297,7 @@ def freeze(self, _id=None, group_id=None, chord=None, opts['parent_id'] = parent_id if 'reply_to' not in opts: opts['reply_to'] = self.app.oid - if group_id: + if group_id and "group_id" not in opts: opts['group_id'] = group_id if chord: opts['chord'] = chord diff --git a/t/unit/tasks/test_canvas.py b/t/unit/tasks/test_canvas.py index c15dec83d60..b90321572f3 100644 --- a/t/unit/tasks/test_canvas.py +++ b/t/unit/tasks/test_canvas.py @@ -1,5 +1,5 @@ import json -from unittest.mock import MagicMock, Mock, patch +from unittest.mock import MagicMock, Mock, patch, sentinel import pytest @@ -154,6 +154,29 @@ def test_merge_immutable(self): assert kwargs == {'foo': 1} assert options == {'task_id': 3} + def test_merge_options__none(self): + sig = self.add.si() + _, _, new_options = sig._merge() + assert new_options is sig.options + _, _, new_options = sig._merge(options=None) + assert new_options is sig.options + + @pytest.mark.parametrize("immutable_sig", (True, False)) + def test_merge_options__group_id(self, immutable_sig): + # This is to avoid testing the behaviour in `test_set_immutable()` + if immutable_sig: + sig = self.add.si() + else: + sig = self.add.s() + # If the signature has no group ID, it can be set + assert not sig.options + _, _, new_options = sig._merge(options={"group_id": sentinel.gid}) + assert new_options == {"group_id": sentinel.gid} + # But if one is already set, the new one is silently ignored + sig.set(group_id=sentinel.old_gid) + _, _, new_options = sig._merge(options={"group_id": sentinel.new_gid}) + assert new_options == {"group_id": sentinel.old_gid} + def test_set_immutable(self): x = self.add.s(2, 2) assert not x.immutable