From a18b5c6525e5e713ce7efe5ab7371a4675981500 Mon Sep 17 00:00:00 2001 From: maybe-sybr <58414429+maybe-sybr@users.noreply.github.com> Date: Tue, 13 Oct 2020 16:23:44 +1100 Subject: [PATCH] fix: Make KV-store backends respect chord size This avoids an issue where the `on_chord_part_return()` implementation would check the the length of the result of a chain ending in a nested group. This would manifest in behaviour where a worker would be blocked waiting for for the result object it holds to complete since it would attempt to `.join()` the result object. In situations with plenty of workers, this wouldn't really cause any noticable issue apart from some latency or unpredictable failures - but in concurrency constrained situations like the integrations tests, it causes deadlocks. We know from previous commits in this series that chord completion is more complex than just waiting for a direct child, so we correct the `size` value in `BaseKeyValueStoreBackend.on_chord_part_return()` to respect the `chord_size` value from the request, falling back to the length of the `deps` if that value is missing for some reason (this is necessary to keep a number of the tests happy but it's not clear to me if that will ever be the case in real life situations). --- celery/backends/base.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/celery/backends/base.py b/celery/backends/base.py index 28e5b2a4d6b..74fce23c3c4 100644 --- a/celery/backends/base.py +++ b/celery/backends/base.py @@ -919,7 +919,11 @@ def on_chord_part_return(self, request, state, result, **kwargs): ChordError(f'GroupResult {gid} no longer exists'), ) val = self.incr(key) - size = len(deps) + # Set the chord size to the value defined in the request, or fall back + # to the number of dependencies we can see from the restored result + size = request.chord.get("chord_size") + if size is None: + size = len(deps) if val > size: # pragma: no cover logger.warning('Chord counter incremented too many times for %r', gid)