diff --git a/.vscode/settings.json b/.vscode/settings.json index 97924983a3..d3def91314 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,3 +1,3 @@ { "editor.formatOnSave": false -} \ No newline at end of file +} diff --git a/celery/app/amqp.py b/celery/app/amqp.py index 1a0454e9a9..10d99cb907 100644 --- a/celery/app/amqp.py +++ b/celery/app/amqp.py @@ -279,7 +279,7 @@ def TaskConsumer(self, channel, queues=None, accept=None, **kw): def as_task_v2(self, task_id, name, args=None, kwargs=None, countdown=None, eta=None, group_id=None, group_index=None, - expires=None, retries=0, chord=None, + trailer_request=None, expires=None, retries=0, chord=None, callbacks=None, errbacks=None, reply_to=None, time_limit=None, soft_time_limit=None, create_sent_event=False, root_id=None, parent_id=None, @@ -336,6 +336,7 @@ def as_task_v2(self, task_id, name, args=None, kwargs=None, 'expires': expires, 'group': group_id, 'group_index': group_index, + 'trailer_request': trailer_request, 'retries': retries, 'timelimit': [time_limit, soft_time_limit], 'root_id': root_id, @@ -371,6 +372,7 @@ def as_task_v2(self, task_id, name, args=None, kwargs=None, def as_task_v1(self, task_id, name, args=None, kwargs=None, countdown=None, eta=None, group_id=None, group_index=None, + trailer_request=None, expires=None, retries=0, chord=None, callbacks=None, errbacks=None, reply_to=None, time_limit=None, soft_time_limit=None, @@ -415,6 +417,7 @@ def as_task_v1(self, task_id, name, args=None, kwargs=None, 'kwargs': kwargs, 'group': group_id, 'group_index': group_index, + 'trailer_request': trailer_request, 'retries': retries, 'eta': eta, 'expires': expires, diff --git a/celery/app/base.py b/celery/app/base.py index 27e5b610ca..5de58b15b1 100644 --- a/celery/app/base.py +++ b/celery/app/base.py @@ -689,7 +689,7 @@ def send_task(self, name, args=None, kwargs=None, countdown=None, router=None, result_cls=None, expires=None, publisher=None, link=None, link_error=None, add_to_parent=True, group_id=None, group_index=None, - retries=0, chord=None, + trailer_request=None, retries=0, chord=None, reply_to=None, time_limit=None, soft_time_limit=None, root_id=None, parent_id=None, route_name=None, shadow=None, chain=None, task_type=None, **options): @@ -730,7 +730,7 @@ def send_task(self, name, args=None, kwargs=None, countdown=None, message = amqp.create_task_message( task_id, name, args, kwargs, countdown, eta, group_id, group_index, - expires, retries, chord, + trailer_request, expires, retries, chord, maybe_list(link), maybe_list(link_error), reply_to or self.thread_oid, time_limit, soft_time_limit, self.conf.task_send_sent_event, diff --git a/celery/app/task.py b/celery/app/task.py index 2265ebb9e6..1156ebfaec 100644 --- a/celery/app/task.py +++ b/celery/app/task.py @@ -80,6 +80,7 @@ class Context: taskset = None # compat alias to group group = None group_index = None + trailer_request = [] chord = None chain = None utc = None @@ -114,6 +115,7 @@ def as_execution_options(self): 'parent_id': self.parent_id, 'group_id': self.group, 'group_index': self.group_index, + 'trailer_request': self.trailer_request or [], 'chord': self.chord, 'chain': self.chain, 'link': self.callbacks, @@ -907,6 +909,7 @@ def replace(self, sig): chord=chord, group_id=self.request.group, group_index=self.request.group_index, + trailer_request=self.request.trailer_request, root_id=self.request.root_id, ) sig.freeze(self.request.id) @@ -934,6 +937,7 @@ def add_to_chord(self, sig, lazy=False): sig.set( group_id=self.request.group, group_index=self.request.group_index, + trailer_request=self.request.trailer_request, chord=self.request.chord, root_id=self.request.root_id, ) diff --git a/celery/app/trace.py b/celery/app/trace.py index f9b8c83e6e..3b004d2d75 100644 --- a/celery/app/trace.py +++ b/celery/app/trace.py @@ -208,7 +208,6 @@ def handle_failure(self, task, req, store_errors=True, call_errbacks=True): einfo = ExceptionInfo() einfo.exception = get_pickleable_exception(einfo.exception) einfo.type = get_pickleable_etype(einfo.type) - task.backend.mark_as_failure( req.id, exc, einfo.traceback, request=req, store_result=store_errors, diff --git a/celery/backends/asynchronous.py b/celery/backends/asynchronous.py index 32475d5eaa..9a530235d8 100644 --- a/celery/backends/asynchronous.py +++ b/celery/backends/asynchronous.py @@ -5,7 +5,7 @@ from collections import deque from queue import Empty from time import sleep -from weakref import WeakKeyDictionary +from weakref import WeakKeyDictionary, WeakSet from kombu.utils.compat import detect_environment @@ -173,7 +173,10 @@ def _maybe_resolve_from_buffer(self, result): def _add_pending_result(self, task_id, result, weak=False): concrete, weak_ = self._pending_results if task_id not in weak_ and result.id not in concrete: - (weak_ if weak else concrete)[task_id] = result + ref = (weak_ if weak else concrete) + results = ref.get(task_id, WeakSet() if weak else set()) + results.add(result) + ref[task_id] = results self.result_consumer.consume_from(task_id) def add_pending_results(self, results, weak=False): @@ -292,13 +295,14 @@ def on_state_change(self, meta, message): if meta['status'] in states.READY_STATES: task_id = meta['task_id'] try: - result = self._get_pending_result(task_id) + results = self._get_pending_result(task_id) except KeyError: # send to buffer in case we received this result # before it was added to _pending_results. self._pending_messages.put(task_id, meta) else: - result._maybe_set_cache(meta) + for result in results: + result._maybe_set_cache(meta) buckets = self.buckets try: # remove bucket for this result, since it's fulfilled diff --git a/celery/backends/base.py b/celery/backends/base.py index 1aac2a0fc9..37c26dc4be 100644 --- a/celery/backends/base.py +++ b/celery/backends/base.py @@ -63,6 +63,12 @@ Result backends that supports chords: Redis, Database, Memcached, and more. """ +trailer_request_obj = namedtuple( + "trailer_request", + ("id", "group", "errbacks", "chord", "trailer_request", "group_index"), + defaults=(None, ) * 6 +) + def unpickle_backend(cls, args, kwargs): """Return an unpickled backend.""" @@ -130,7 +136,7 @@ def __init__(self, app, self.base_sleep_between_retries_ms = conf.get('result_backend_base_sleep_between_retries_ms', 10) self.max_retries = conf.get('result_backend_max_retries', float("inf")) - self._pending_results = pending_results_t({}, WeakValueDictionary()) + self._pending_results = pending_results_t({}, {}) self._pending_messages = BufferMap(MESSAGE_BUFFER_MAX) self.url = url @@ -164,6 +170,14 @@ def mark_as_failure(self, task_id, exc, self.store_result(task_id, exc, state, traceback=traceback, request=request) if request: + if request.trailer_request: + self.mark_as_failure( + request.trailer_request["id"], exc, traceback=traceback, + store_result=store_result, call_errbacks=call_errbacks, + request=trailer_request_obj(**request.trailer_request), + state=state + ) + if request.chord: self.on_chord_part_return(request, state, exc) if call_errbacks and request.errbacks: @@ -218,11 +232,10 @@ def _call_task_errbacks(self, request, exc, traceback): def mark_as_revoked(self, task_id, reason='', request=None, store_result=True, state=states.REVOKED): exc = TaskRevokedError(reason) - if store_result: - self.store_result(task_id, exc, state, - traceback=None, request=request) - if request and request.chord: - self.on_chord_part_return(request, state, exc) + + return self.mark_as_failure( + task_id, exc, request=request, store_result=store_result, state=state + ) def mark_as_retry(self, task_id, exc, traceback=None, request=None, store_result=True, state=states.RETRY): diff --git a/celery/backends/redis.py b/celery/backends/redis.py index dd3677f569..7a96bdbd66 100644 --- a/celery/backends/redis.py +++ b/celery/backends/redis.py @@ -1,4 +1,5 @@ """Redis result store backend.""" +import uuid import time from contextlib import contextmanager from functools import partial @@ -12,7 +13,7 @@ from celery import states from celery._state import task_join_will_block from celery.canvas import maybe_signature -from celery.exceptions import ChordError, ImproperlyConfigured +from celery.exceptions import ChordError, ImproperlyConfigured, TaskRevokedError from celery.result import GroupResult, allow_join_result from celery.utils.functional import dictfilter from celery.utils.log import get_logger @@ -157,7 +158,8 @@ def drain_events(self, timeout=None): def consume_from(self, task_id): if self._pubsub is None: return self.start(task_id) - self._consume_from(task_id) + else: + self._consume_from(task_id) def _consume_from(self, task_id): key = self._get_key_for_task(task_id) @@ -269,6 +271,7 @@ def __init__(self, host=None, port=None, db=None, password=None, self.connection_errors, self.channel_errors = ( get_redis_error_classes() if get_redis_error_classes else ((), ())) + self.result_consumer = self.ResultConsumer( self, self.app, self.accept, self._pending_results, self._pending_messages, @@ -397,6 +400,10 @@ def _unpack_chord_result(self, tup, decode, _, tid, state, retval = decode(tup) if state in EXCEPTION_STATES: retval = self.exception_to_python(retval) + + if isinstance(retval, TaskRevokedError): + raise retval + if state in PROPAGATE_STATES: raise ChordError(f'Dependency {tid} raised {retval!r}') return retval @@ -484,6 +491,15 @@ def on_chord_part_return(self, request, state, result, resl = [unpack(tup, decode) for tup in resl] try: callback.delay(resl) + except TaskRevokedError as exc: + logger.exception( + 'Group %r task was revoked: %r', request.group, exc) + if callback.id is None: + callback.id = str(uuid.uuid4()) + return self.chord_error_from_stack( + callback, + exc + ) except Exception as exc: # pylint: disable=broad-except logger.exception( 'Chord callback for %r raised: %r', request.group, exc) diff --git a/celery/canvas.py b/celery/canvas.py index a4de76428d..25fd111ab4 100644 --- a/celery/canvas.py +++ b/celery/canvas.py @@ -238,8 +238,15 @@ def _merge(self, args=None, kwargs=None, options=None, force=False): }) else: new_options = self.options + + new_options = new_options if new_options else {} + new_options["link_error"] = ( + new_options.get("link_error", []) + new_options.pop("link_error", []) + ) + if self.immutable and not force: return (self.args, self.kwargs, new_options) + return (tuple(args) + tuple(self.args) if args else self.args, dict(self.kwargs, **kwargs) if kwargs else self.kwargs, new_options) @@ -274,7 +281,7 @@ def clone(self, args=None, kwargs=None, **opts): partial = clone def freeze(self, _id=None, group_id=None, chord=None, - root_id=None, parent_id=None, group_index=None): + root_id=None, parent_id=None, group_index=None, trailer_request=None): """Finalize the signature by adding a concrete task id. The task won't be called and you shouldn't call the signature @@ -303,6 +310,8 @@ def freeze(self, _id=None, group_id=None, chord=None, opts['chord'] = chord if group_index is not None: opts['group_index'] = group_index + if trailer_request is not None: + opts['trailer_request'] = trailer_request # pylint: disable=too-many-function-args # Borks on this, as it's a property. return self.AsyncResult(tid) @@ -686,13 +695,13 @@ def run(self, args=None, kwargs=None, group_id=None, chord=None, return results_from_prepare[0] def freeze(self, _id=None, group_id=None, chord=None, - root_id=None, parent_id=None, group_index=None): + root_id=None, parent_id=None, group_index=None,trailer_request=None): # pylint: disable=redefined-outer-name # XXX chord is also a class in outer scope. _, results = self._frozen = self.prepare_steps( self.args, self.kwargs, self.tasks, root_id, parent_id, None, self.app, _id, group_id, chord, clone=False, - group_index=group_index, + group_index=group_index, trailer_request=trailer_request ) return results[0] @@ -700,7 +709,7 @@ def prepare_steps(self, args, kwargs, tasks, root_id=None, parent_id=None, link_error=None, app=None, last_task_id=None, group_id=None, chord_body=None, clone=True, from_dict=Signature.from_dict, - group_index=None): + group_index=None, trailer_request=None): app = app or self.app # use chain message field for protocol 2 and later. # this avoids pickle blowing the stack on the recursion @@ -777,7 +786,7 @@ def prepare_steps(self, args, kwargs, tasks, res = task.freeze( last_task_id, root_id=root_id, group_id=group_id, chord=chord_body, - group_index=group_index, + group_index=group_index, trailer_request=trailer_request, ) else: res = task.freeze(root_id=root_id) @@ -1088,8 +1097,7 @@ def apply_async(self, args=None, kwargs=None, add_to_parent=True, if link is not None: raise TypeError('Cannot add link to group: use a chord') if link_error is not None: - raise TypeError( - 'Cannot add link to group: do that on individual tasks') + link_error = None app = self.app if app.conf.task_always_eager: return self.apply(args, kwargs, **options) @@ -1205,7 +1213,7 @@ def _freeze_gid(self, options): return options, group_id, options.get('root_id') def freeze(self, _id=None, group_id=None, chord=None, - root_id=None, parent_id=None, group_index=None): + root_id=None, parent_id=None, group_index=None, trailer_request=None): # pylint: disable=redefined-outer-name # XXX chord is also a class in outer scope. opts = self.options @@ -1219,6 +1227,8 @@ def freeze(self, _id=None, group_id=None, chord=None, opts['chord'] = chord if group_index is not None: opts['group_index'] = group_index + if trailer_request is not None: + opts['trailer_request'] = trailer_request root_id = opts.setdefault('root_id', root_id) parent_id = opts.setdefault('parent_id', parent_id) new_tasks = [] @@ -1328,7 +1338,7 @@ def __call__(self, body=None, **options): return self.apply_async((), {'body': body} if body else {}, **options) def freeze(self, _id=None, group_id=None, chord=None, - root_id=None, parent_id=None, group_index=None): + root_id=None, parent_id=None, group_index=None, trailer_request=None): # pylint: disable=redefined-outer-name # XXX chord is also a class in outer scope. if not isinstance(self.tasks, group): @@ -1337,7 +1347,7 @@ def freeze(self, _id=None, group_id=None, chord=None, parent_id=parent_id, root_id=root_id, chord=self.body) body_result = self.body.freeze( _id, root_id=root_id, chord=chord, group_id=group_id, - group_index=group_index) + group_index=group_index, trailer_request=trailer_request) # we need to link the body result back to the group result, # but the body may actually be a chain, # so find the first result without a parent diff --git a/celery/result.py b/celery/result.py index 0c10d58e86..7334045564 100644 --- a/celery/result.py +++ b/celery/result.py @@ -554,6 +554,14 @@ def _on_ready(self): if self.backend.is_async: self.on_ready() + def collect(self, **kwargs): + for task in self.results: + task_results = list(task.collect(**kwargs)) + if isinstance(task, ResultSet): + yield task_results + else: + yield task_results[-1] + def remove(self, result): """Remove result from the set; it must be a member. @@ -666,7 +674,8 @@ def __getitem__(self, index): def get(self, timeout=None, propagate=True, interval=0.5, callback=None, no_ack=True, on_message=None, - disable_sync_subtasks=True, on_interval=None): + disable_sync_subtasks=True, on_interval=None, **kwargs): + # PATCH: added kwargs for more generalized interface """See :meth:`join`. This is here for API compatibility with :class:`AsyncResult`, @@ -949,6 +958,14 @@ def as_tuple(self): def children(self): return self.results + @property + def state(self): + for child in self.children: + if (child.state in states.EXCEPTION_STATES + or child.state in states.UNREADY_STATES): + break + return child.state + @classmethod def restore(cls, id, backend=None, app=None): """Restore previously saved group result.""" @@ -1065,3 +1082,8 @@ def result_from_tuple(r, app=None): return Result(id, parent=parent) return r + + +def get_exception_in_callback(task_id: str) -> Exception: + with allow_join_result(): + return AsyncResult(task_id).get(propagate=False) \ No newline at end of file diff --git a/celery/states.py b/celery/states.py index e807ed4822..375ca72b9f 100644 --- a/celery/states.py +++ b/celery/states.py @@ -140,12 +140,13 @@ def __le__(self, other): #: Task is waiting for retry. RETRY = 'RETRY' IGNORED = 'IGNORED' +TRAILED = 'TRAILED' READY_STATES = frozenset({SUCCESS, FAILURE, REVOKED}) -UNREADY_STATES = frozenset({PENDING, RECEIVED, STARTED, REJECTED, RETRY}) +UNREADY_STATES = frozenset({PENDING, RECEIVED, STARTED, REJECTED, RETRY, TRAILED}) EXCEPTION_STATES = frozenset({RETRY, FAILURE, REVOKED}) PROPAGATE_STATES = frozenset({FAILURE, REVOKED}) ALL_STATES = frozenset({ - PENDING, RECEIVED, STARTED, SUCCESS, FAILURE, RETRY, REVOKED, + PENDING, RECEIVED, STARTED, SUCCESS, FAILURE, RETRY, REVOKED, TRAILED }) diff --git a/celery/worker/request.py b/celery/worker/request.py index 81c3387d98..40c9cc4a28 100644 --- a/celery/worker/request.py +++ b/celery/worker/request.py @@ -3,6 +3,7 @@ This module defines the :class:`Request` class, that specifies how tasks are executed. """ +import os import logging import sys from datetime import datetime @@ -35,6 +36,8 @@ IS_PYPY = hasattr(sys, 'pypy_version_info') +REJECT_TO_HIGH_MEMORY = os.getenv("REJECT_TO_HIGH_MEMORY") + logger = get_logger(__name__) debug, info, warn, error = (logger.debug, logger.info, logger.warning, logger.error) @@ -75,7 +78,7 @@ class Request: if not IS_PYPY: # pragma: no cover __slots__ = ( - '_app', '_type', 'name', 'id', '_root_id', '_parent_id', + '_app', '_type', 'name', 'id', '_root_id', '_parent_id', '_trailer_request', '_on_ack', '_body', '_hostname', '_eventer', '_connection_errors', '_task', '_eta', '_expires', '_request_dict', '_on_reject', '_utc', '_content_type', '_content_encoding', '_argsrepr', '_kwargsrepr', @@ -485,7 +488,8 @@ def on_failure(self, exc_info, send_failed_event=True, return_ok=False): """Handler called if the task raised an exception.""" task_ready(self) if isinstance(exc_info.exception, MemoryError): - raise MemoryError(f'Process got: {exc_info.exception}') + if not REJECT_TO_HIGH_MEMORY or not self.task.acks_late: + raise MemoryError(f'Process got: {exc_info.exception}') elif isinstance(exc_info.exception, Reject): return self.reject(requeue=exc_info.exception.requeue) elif isinstance(exc_info.exception, Ignore): @@ -497,17 +501,25 @@ def on_failure(self, exc_info, send_failed_event=True, return_ok=False): return self.on_retry(exc_info) # (acks_late) acknowledge after result stored. - requeue = False if self.task.acks_late: reject = ( self.task.reject_on_worker_lost and - isinstance(exc, WorkerLostError) + isinstance(exc, (WorkerLostError, MemoryError, Terminated)) ) ack = self.task.acks_on_failure_or_timeout if reject: - requeue = True - self.reject(requeue=requeue) - send_failed_event = False + if REJECT_TO_HIGH_MEMORY: + # If we have a higher memory queue, reject without retry + self.reject(requeue=False) + # Don't send a failure event + send_failed_event = False + return + else: + send_failed_event = True + return_ok = False + # Acknowledge the message so it doesn't get retried + # and can be marked as complete + self.acknowledge() elif ack: self.acknowledge() else: @@ -521,8 +533,8 @@ def on_failure(self, exc_info, send_failed_event=True, return_ok=False): self._announce_revoked( 'terminated', True, str(exc), False) send_failed_event = False # already sent revoked event - elif not requeue and (isinstance(exc, WorkerLostError) or not return_ok): - # only mark as failure if task has not been requeued + elif not return_ok: + # We do not ever want to retry failed tasks unless worker lost or terminated self.task.backend.mark_as_failure( self.id, exc, request=self._context, store_result=self.store_errors, @@ -626,6 +638,11 @@ def group_index(self): # used by backend.on_chord_part_return to order return values in group return self._request_dict.get('group_index') + @cached_property + def trailer_request(self): + # used by backend.on_chord_part_return to order return values in group + return self._request_dict.get('trailer_request') or [] + def create_request_cls(base, task, pool, hostname, eventer, ref=ref, revoked_tasks=revoked_tasks, diff --git a/requirements/dev.txt b/requirements/dev.txt deleted file mode 100644 index 9712c15a2e..0000000000 --- a/requirements/dev.txt +++ /dev/null @@ -1,5 +0,0 @@ -pytz>dev -git+https://github.com/celery/kombu.git -git+https://github.com/celery/py-amqp.git -git+https://github.com/celery/billiard.git -vine==1.3.0 \ No newline at end of file