From 5a919c06793b394c4ea0d65b14bb9ae167a920c8 Mon Sep 17 00:00:00 2001 From: Omer Katz Date: Wed, 23 Sep 2020 18:00:47 +0300 Subject: [PATCH] Remove amqp backend. (#6360) Fixes #6356. --- celery/backends/amqp.py | 322 ------------------ .../reference/celery.backends.amqp.rst | 11 - docs/internals/reference/index.rst | 1 - docs/whatsnew-5.0.rst | 5 + t/unit/app/test_backends.py | 2 - t/unit/backends/test_amqp.py | 305 ----------------- 6 files changed, 5 insertions(+), 641 deletions(-) delete mode 100644 celery/backends/amqp.py delete mode 100644 docs/internals/reference/celery.backends.amqp.rst delete mode 100644 t/unit/backends/test_amqp.py diff --git a/celery/backends/amqp.py b/celery/backends/amqp.py deleted file mode 100644 index 6695aff277..0000000000 --- a/celery/backends/amqp.py +++ /dev/null @@ -1,322 +0,0 @@ -"""The old AMQP result backend, deprecated and replaced by the RPC backend.""" -import socket -import time -from collections import deque -from operator import itemgetter - -from kombu import Consumer, Exchange, Producer, Queue - -from celery import states -from celery.exceptions import TimeoutError -from celery.utils import deprecated -from celery.utils.log import get_logger - -from .base import BaseBackend - -__all__ = ('BacklogLimitExceeded', 'AMQPBackend') - -logger = get_logger(__name__) - - -class BacklogLimitExceeded(Exception): - """Too much state history to fast-forward.""" - - -def repair_uuid(s): - # Historically the dashes in UUIDS are removed from AMQ entity names, - # but there's no known reason to. Hopefully we'll be able to fix - # this in v4.0. - return '{}-{}-{}-{}-{}'.format(s[:8], s[8:12], s[12:16], s[16:20], s[20:]) - - -class NoCacheQueue(Queue): - can_cache_declaration = False - - -class AMQPBackend(BaseBackend): - """The AMQP result backend. - - Deprecated: Please use the RPC backend or a persistent backend. - """ - - Exchange = Exchange - Queue = NoCacheQueue - Consumer = Consumer - Producer = Producer - - BacklogLimitExceeded = BacklogLimitExceeded - - persistent = True - supports_autoexpire = True - supports_native_join = True - - retry_policy = { - 'max_retries': 20, - 'interval_start': 0, - 'interval_step': 1, - 'interval_max': 1, - } - - def __init__(self, app, connection=None, exchange=None, exchange_type=None, - persistent=None, serializer=None, auto_delete=True, **kwargs): - deprecated.warn( - 'The AMQP result backend', deprecation='4.0', removal='5.0', - alternative='Please use RPC backend or a persistent backend.') - super().__init__(app, **kwargs) - conf = self.app.conf - self._connection = connection - self.persistent = self.prepare_persistent(persistent) - self.delivery_mode = 2 if self.persistent else 1 - exchange = exchange or conf.result_exchange - exchange_type = exchange_type or conf.result_exchange_type - self.exchange = self._create_exchange( - exchange, exchange_type, self.delivery_mode, - ) - self.serializer = serializer or conf.result_serializer - self.auto_delete = auto_delete - - def _create_exchange(self, name, type='direct', delivery_mode=2): - return self.Exchange(name=name, - type=type, - delivery_mode=delivery_mode, - durable=self.persistent, - auto_delete=False) - - def _create_binding(self, task_id): - name = self.rkey(task_id) - return self.Queue( - name=name, - exchange=self.exchange, - routing_key=name, - durable=self.persistent, - auto_delete=self.auto_delete, - expires=self.expires, - ) - - def revive(self, channel): - pass - - def rkey(self, task_id): - return task_id.replace('-', '') - - def destination_for(self, task_id, request): - if request: - return self.rkey(task_id), request.correlation_id or task_id - return self.rkey(task_id), task_id - - def store_result(self, task_id, result, state, - traceback=None, request=None, **kwargs): - """Send task return value and state.""" - routing_key, correlation_id = self.destination_for(task_id, request) - if not routing_key: - return - - payload = {'task_id': task_id, 'status': state, - 'result': self.encode_result(result, state), - 'traceback': traceback, - 'children': self.current_task_children(request)} - if self.app.conf.find_value_for_key('extended', 'result'): - payload['name'] = getattr(request, 'task_name', None) - payload['args'] = getattr(request, 'args', None) - payload['kwargs'] = getattr(request, 'kwargs', None) - payload['worker'] = getattr(request, 'hostname', None) - payload['retries'] = getattr(request, 'retries', None) - payload['queue'] = request.delivery_info.get('routing_key')\ - if hasattr(request, 'delivery_info') \ - and request.delivery_info else None - - with self.app.amqp.producer_pool.acquire(block=True) as producer: - producer.publish( - payload, - exchange=self.exchange, - routing_key=routing_key, - correlation_id=correlation_id, - serializer=self.serializer, - retry=True, retry_policy=self.retry_policy, - declare=self.on_reply_declare(task_id), - delivery_mode=self.delivery_mode, - ) - - def on_reply_declare(self, task_id): - return [self._create_binding(task_id)] - - def wait_for(self, task_id, timeout=None, cache=True, - no_ack=True, on_interval=None, - READY_STATES=states.READY_STATES, - PROPAGATE_STATES=states.PROPAGATE_STATES, - **kwargs): - cached_meta = self._cache.get(task_id) - if cache and cached_meta and \ - cached_meta['status'] in READY_STATES: - return cached_meta - try: - return self.consume(task_id, timeout=timeout, no_ack=no_ack, - on_interval=on_interval) - except socket.timeout: - raise TimeoutError('The operation timed out.') - - def get_task_meta(self, task_id, backlog_limit=1000): - # Polling and using basic_get - with self.app.pool.acquire_channel(block=True) as (_, channel): - binding = self._create_binding(task_id)(channel) - binding.declare() - - prev = latest = acc = None - for i in range(backlog_limit): # spool ffwd - acc = binding.get( - accept=self.accept, no_ack=False, - ) - if not acc: # no more messages - break - if acc.payload['task_id'] == task_id: - prev, latest = latest, acc - if prev: - # backends are not expected to keep history, - # so we delete everything except the most recent state. - prev.ack() - prev = None - else: - raise self.BacklogLimitExceeded(task_id) - - if latest: - payload = self._cache[task_id] = self.meta_from_decoded( - latest.payload) - latest.requeue() - return payload - else: - # no new state, use previous - try: - return self._cache[task_id] - except KeyError: - # result probably pending. - return {'status': states.PENDING, 'result': None} - poll = get_task_meta # XXX compat - - def drain_events(self, connection, consumer, - timeout=None, on_interval=None, now=time.monotonic, wait=None): - wait = wait or connection.drain_events - results = {} - - def callback(meta, message): - if meta['status'] in states.READY_STATES: - results[meta['task_id']] = self.meta_from_decoded(meta) - - consumer.callbacks[:] = [callback] - time_start = now() - - while 1: - # Total time spent may exceed a single call to wait() - if timeout and now() - time_start >= timeout: - raise socket.timeout() - try: - wait(timeout=1) - except socket.timeout: - pass - if on_interval: - on_interval() - if results: # got event on the wanted channel. - break - self._cache.update(results) - return results - - def consume(self, task_id, timeout=None, no_ack=True, on_interval=None): - wait = self.drain_events - with self.app.pool.acquire_channel(block=True) as (conn, channel): - binding = self._create_binding(task_id) - with self.Consumer(channel, binding, - no_ack=no_ack, accept=self.accept) as consumer: - while 1: - try: - return wait( - conn, consumer, timeout, on_interval)[task_id] - except KeyError: - continue - - def _many_bindings(self, ids): - return [self._create_binding(task_id) for task_id in ids] - - def get_many(self, task_ids, timeout=None, no_ack=True, - on_message=None, on_interval=None, - now=time.monotonic, getfields=itemgetter('status', 'task_id'), - READY_STATES=states.READY_STATES, - PROPAGATE_STATES=states.PROPAGATE_STATES, **kwargs): - with self.app.pool.acquire_channel(block=True) as (conn, channel): - ids = set(task_ids) - cached_ids = set() - mark_cached = cached_ids.add - for task_id in ids: - try: - cached = self._cache[task_id] - except KeyError: - pass - else: - if cached['status'] in READY_STATES: - yield task_id, cached - mark_cached(task_id) - ids.difference_update(cached_ids) - results = deque() - push_result = results.append - push_cache = self._cache.__setitem__ - decode_result = self.meta_from_decoded - - def _on_message(message): - body = decode_result(message.decode()) - if on_message is not None: - on_message(body) - state, uid = getfields(body) - if state in READY_STATES: - push_result(body) \ - if uid in task_ids else push_cache(uid, body) - - bindings = self._many_bindings(task_ids) - with self.Consumer(channel, bindings, on_message=_on_message, - accept=self.accept, no_ack=no_ack): - wait = conn.drain_events - popleft = results.popleft - while ids: - wait(timeout=timeout) - while results: - state = popleft() - task_id = state['task_id'] - ids.discard(task_id) - push_cache(task_id, state) - yield task_id, state - if on_interval: - on_interval() - - def reload_task_result(self, task_id): - raise NotImplementedError( - 'reload_task_result is not supported by this backend.') - - def reload_group_result(self, task_id): - """Reload group result, even if it has been previously fetched.""" - raise NotImplementedError( - 'reload_group_result is not supported by this backend.') - - def save_group(self, group_id, result): - raise NotImplementedError( - 'save_group is not supported by this backend.') - - def restore_group(self, group_id, cache=True): - raise NotImplementedError( - 'restore_group is not supported by this backend.') - - def delete_group(self, group_id): - raise NotImplementedError( - 'delete_group is not supported by this backend.') - - def __reduce__(self, args=(), kwargs=None): - kwargs = kwargs if kwargs else {} - kwargs.update( - connection=self._connection, - exchange=self.exchange.name, - exchange_type=self.exchange.type, - persistent=self.persistent, - serializer=self.serializer, - auto_delete=self.auto_delete, - expires=self.expires, - ) - return super().__reduce__(args, kwargs) - - def as_uri(self, include_password=True): - return 'amqp://' diff --git a/docs/internals/reference/celery.backends.amqp.rst b/docs/internals/reference/celery.backends.amqp.rst deleted file mode 100644 index 61c99429fd..0000000000 --- a/docs/internals/reference/celery.backends.amqp.rst +++ /dev/null @@ -1,11 +0,0 @@ -======================================= - ``celery.backends.amqp`` -======================================= - -.. contents:: - :local: -.. currentmodule:: celery.backends.amqp - -.. automodule:: celery.backends.amqp - :members: - :undoc-members: diff --git a/docs/internals/reference/index.rst b/docs/internals/reference/index.rst index 87d0761892..cd587b8ae7 100644 --- a/docs/internals/reference/index.rst +++ b/docs/internals/reference/index.rst @@ -27,7 +27,6 @@ celery.backends.azureblockblob celery.backends.rpc celery.backends.database - celery.backends.amqp celery.backends.cache celery.backends.consul celery.backends.couchdb diff --git a/docs/whatsnew-5.0.rst b/docs/whatsnew-5.0.rst index d30f60ba34..b062a27506 100644 --- a/docs/whatsnew-5.0.rst +++ b/docs/whatsnew-5.0.rst @@ -229,6 +229,11 @@ We apologize for the lack of notice in advance but we feel that the chance you'll be affected by this breaking change is minimal which is why we did it. +AMQP Result Backend +------------------- + +The AMQP result backend has been removed as it was deprecated in version 4.0. + .. _new_command_line_interface: New Command Line Interface diff --git a/t/unit/app/test_backends.py b/t/unit/app/test_backends.py index 4dd54f99ea..a87f966505 100644 --- a/t/unit/app/test_backends.py +++ b/t/unit/app/test_backends.py @@ -3,7 +3,6 @@ import pytest from celery.app import backends -from celery.backends.amqp import AMQPBackend from celery.backends.cache import CacheBackend from celery.exceptions import ImproperlyConfigured @@ -11,7 +10,6 @@ class test_backends: @pytest.mark.parametrize('url,expect_cls', [ - ('amqp://', AMQPBackend), ('cache+memory://', CacheBackend), ]) def test_get_backend_aliases(self, url, expect_cls, app): diff --git a/t/unit/backends/test_amqp.py b/t/unit/backends/test_amqp.py deleted file mode 100644 index 09f4d49519..0000000000 --- a/t/unit/backends/test_amqp.py +++ /dev/null @@ -1,305 +0,0 @@ -import pickle -from contextlib import contextmanager -from datetime import timedelta -from pickle import dumps, loads -from queue import Empty, Queue -from unittest.mock import Mock - -import pytest -from billiard.einfo import ExceptionInfo -from case import mock - -from celery import states, uuid -from celery.app.task import Context -from celery.backends.amqp import AMQPBackend -from celery.result import AsyncResult - - -class SomeClass: - - def __init__(self, data): - self.data = data - - -class test_AMQPBackend: - - def setup(self): - self.app.conf.result_cache_max = 100 - - def create_backend(self, **opts): - opts = dict({'serializer': 'pickle', 'persistent': True}, **opts) - return AMQPBackend(self.app, **opts) - - def test_destination_for(self): - b = self.create_backend() - request = Mock() - assert b.destination_for('id', request) == ( - b.rkey('id'), request.correlation_id, - ) - - def test_store_result__no_routing_key(self): - b = self.create_backend() - b.destination_for = Mock() - b.destination_for.return_value = None, None - b.store_result('id', None, states.SUCCESS) - - def test_mark_as_done(self): - tb1 = self.create_backend(max_cached_results=1) - tb2 = self.create_backend(max_cached_results=1) - - tid = uuid() - - tb1.mark_as_done(tid, 42) - assert tb2.get_state(tid) == states.SUCCESS - assert tb2.get_result(tid) == 42 - assert tb2._cache.get(tid) - assert tb2.get_result(tid), 42 - - @pytest.mark.usefixtures('depends_on_current_app') - def test_pickleable(self): - assert loads(dumps(self.create_backend())) - - def test_revive(self): - tb = self.create_backend() - tb.revive(None) - - def test_is_pickled(self): - tb1 = self.create_backend() - tb2 = self.create_backend() - - tid2 = uuid() - result = {'foo': 'baz', 'bar': SomeClass(12345)} - tb1.mark_as_done(tid2, result) - # is serialized properly. - rindb = tb2.get_result(tid2) - assert rindb.get('foo') == 'baz' - assert rindb.get('bar').data == 12345 - - def test_mark_as_failure(self): - tb1 = self.create_backend() - tb2 = self.create_backend() - - tid3 = uuid() - try: - raise KeyError('foo') - except KeyError as exception: - einfo = ExceptionInfo() - tb1.mark_as_failure(tid3, exception, traceback=einfo.traceback) - assert tb2.get_state(tid3) == states.FAILURE - assert isinstance(tb2.get_result(tid3), KeyError) - assert tb2.get_traceback(tid3) == einfo.traceback - - def test_repair_uuid(self): - from celery.backends.amqp import repair_uuid - for i in range(10): - tid = uuid() - assert repair_uuid(tid.replace('-', '')) == tid - - def test_expires_is_int(self): - b = self.create_backend(expires=48) - q = b._create_binding('x1y2z3') - assert q.expires == 48 - - def test_expires_is_float(self): - b = self.create_backend(expires=48.3) - q = b._create_binding('x1y2z3') - assert q.expires == 48.3 - - def test_expires_is_timedelta(self): - b = self.create_backend(expires=timedelta(minutes=1)) - q = b._create_binding('x1y2z3') - assert q.expires == 60 - - @mock.sleepdeprived() - def test_store_result_retries(self): - iterations = [0] - stop_raising_at = [5] - - def publish(*args, **kwargs): - if iterations[0] > stop_raising_at[0]: - return - iterations[0] += 1 - raise KeyError('foo') - - backend = AMQPBackend(self.app) - from celery.app.amqp import Producer - prod, Producer.publish = Producer.publish, publish - try: - with pytest.raises(KeyError): - backend.retry_policy['max_retries'] = None - backend.store_result('foo', 'bar', 'STARTED') - - with pytest.raises(KeyError): - backend.retry_policy['max_retries'] = 10 - backend.store_result('foo', 'bar', 'STARTED') - finally: - Producer.publish = prod - - def test_poll_no_messages(self): - b = self.create_backend() - assert b.get_task_meta(uuid())['status'] == states.PENDING - - @contextmanager - def _result_context(self): - results = Queue() - - class Message: - acked = 0 - requeued = 0 - - def __init__(self, **merge): - self.payload = dict({'status': states.STARTED, - 'result': None}, **merge) - self.properties = {'correlation_id': merge.get('task_id')} - self.body = pickle.dumps(self.payload) - self.content_type = 'application/x-python-serialize' - self.content_encoding = 'binary' - - def ack(self, *args, **kwargs): - self.acked += 1 - - def requeue(self, *args, **kwargs): - self.requeued += 1 - - class MockBinding: - - def __init__(self, *args, **kwargs): - self.channel = Mock() - - def __call__(self, *args, **kwargs): - return self - - def declare(self): - pass - - def get(self, no_ack=False, accept=None): - try: - m = results.get(block=False) - if m: - m.accept = accept - return m - except Empty: - pass - - def is_bound(self): - return True - - class MockBackend(AMQPBackend): - Queue = MockBinding - - backend = MockBackend(self.app, max_cached_results=100) - backend._republish = Mock() - - yield results, backend, Message - - def test_backlog_limit_exceeded(self): - with self._result_context() as (results, backend, Message): - for i in range(1001): - results.put(Message(task_id='id', status=states.RECEIVED)) - with pytest.raises(backend.BacklogLimitExceeded): - backend.get_task_meta('id') - - def test_poll_result(self): - with self._result_context() as (results, backend, Message): - tid = uuid() - # FFWD's to the latest state. - state_messages = [ - Message(task_id=tid, status=states.RECEIVED, seq=1), - Message(task_id=tid, status=states.STARTED, seq=2), - Message(task_id=tid, status=states.FAILURE, seq=3), - ] - for state_message in state_messages: - results.put(state_message) - r1 = backend.get_task_meta(tid) - # FFWDs to the last state. - assert r1['status'] == states.FAILURE - assert r1['seq'] == 3 - - # Caches last known state. - tid = uuid() - results.put(Message(task_id=tid)) - backend.get_task_meta(tid) - assert tid, backend._cache in 'Caches last known state' - - assert state_messages[-1].requeued - - # Returns cache if no new states. - results.queue.clear() - assert not results.qsize() - backend._cache[tid] = 'hello' - # returns cache if no new states. - assert backend.get_task_meta(tid) == 'hello' - - def test_drain_events_decodes_exceptions_in_meta(self): - tid = uuid() - b = self.create_backend(serializer='json') - b.store_result(tid, RuntimeError('aap'), states.FAILURE) - result = AsyncResult(tid, backend=b) - - with pytest.raises(Exception) as excinfo: - result.get() - - assert excinfo.value.__class__.__name__ == 'RuntimeError' - assert str(excinfo.value) == 'aap' - - def test_no_expires(self): - b = self.create_backend(expires=None) - app = self.app - app.conf.result_expires = None - b = self.create_backend(expires=None) - q = b._create_binding('foo') - assert q.expires is None - - def test_process_cleanup(self): - self.create_backend().process_cleanup() - - def test_reload_task_result(self): - with pytest.raises(NotImplementedError): - self.create_backend().reload_task_result('x') - - def test_reload_group_result(self): - with pytest.raises(NotImplementedError): - self.create_backend().reload_group_result('x') - - def test_save_group(self): - with pytest.raises(NotImplementedError): - self.create_backend().save_group('x', 'x') - - def test_restore_group(self): - with pytest.raises(NotImplementedError): - self.create_backend().restore_group('x') - - def test_delete_group(self): - with pytest.raises(NotImplementedError): - self.create_backend().delete_group('x') - - -class test_AMQPBackend_result_extended: - def setup(self): - self.app.conf.result_extended = True - - def test_store_result(self): - b = AMQPBackend(self.app) - tid = uuid() - - request = Context(args=(1, 2, 3), kwargs={'foo': 'bar'}, - task_name='mytask', retries=2, - hostname='celery@worker_1', - delivery_info={'routing_key': 'celery'}) - - b.store_result(tid, {'fizz': 'buzz'}, states.SUCCESS, request=request) - - meta = b.get_task_meta(tid) - assert meta == { - 'args': [1, 2, 3], - 'children': [], - 'kwargs': {'foo': 'bar'}, - 'name': 'mytask', - 'queue': 'celery', - 'result': {'fizz': 'buzz'}, - 'retries': 2, - 'status': 'SUCCESS', - 'task_id': tid, - 'traceback': None, - 'worker': 'celery@worker_1', - }