Skip to content

Commit

Permalink
[Fix #6361] Remove RabbitMQ ha_policy from queue
Browse files Browse the repository at this point in the history
  • Loading branch information
safwanrahman authored and auvipy committed Oct 28, 2020
1 parent a2498d3 commit 8c5e988
Show file tree
Hide file tree
Showing 4 changed files with 3 additions and 78 deletions.
21 changes: 3 additions & 18 deletions celery/app/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ class Queues(dict):
create_missing (bool): By default any unknown queues will be
added automatically, but if this flag is disabled the occurrence
of unknown queues in `wanted` will raise :exc:`KeyError`.
ha_policy (Sequence, str): Default HA policy for queues with none set.
max_priority (int): Default x-max-priority for queues with none set.
"""

Expand All @@ -55,14 +54,13 @@ class Queues(dict):
_consume_from = None

def __init__(self, queues=None, default_exchange=None,
create_missing=True, ha_policy=None, autoexchange=None,
create_missing=True, autoexchange=None,
max_priority=None, default_routing_key=None):
dict.__init__(self)
self.aliases = WeakValueDictionary()
self.default_exchange = default_exchange
self.default_routing_key = default_routing_key
self.create_missing = create_missing
self.ha_policy = ha_policy
self.autoexchange = Exchange if autoexchange is None else autoexchange
self.max_priority = max_priority
if queues is not None and not isinstance(queues, Mapping):
Expand Down Expand Up @@ -122,24 +120,13 @@ def _add(self, queue):
queue.exchange = self.default_exchange
if not queue.routing_key:
queue.routing_key = self.default_routing_key
if self.ha_policy:
if queue.queue_arguments is None:
queue.queue_arguments = {}
self._set_ha_policy(queue.queue_arguments)
if self.max_priority is not None:
if queue.queue_arguments is None:
queue.queue_arguments = {}
self._set_max_priority(queue.queue_arguments)
self[queue.name] = queue
return queue

def _set_ha_policy(self, args):
policy = self.ha_policy
if isinstance(policy, (list, tuple)):
return args.update({'ha-mode': 'nodes',
'ha-params': list(policy)})
args['ha-mode'] = policy

def _set_max_priority(self, args):
if 'x-max-priority' not in args and self.max_priority is not None:
return args.update({'x-max-priority': self.max_priority})
Expand Down Expand Up @@ -251,16 +238,14 @@ def create_task_message(self):
def send_task_message(self):
return self._create_task_sender()

def Queues(self, queues, create_missing=None, ha_policy=None,
def Queues(self, queues, create_missing=None,
autoexchange=None, max_priority=None):
# Create new :class:`Queues` instance, using queue defaults
# from the current configuration.
conf = self.app.conf
default_routing_key = conf.task_default_routing_key
if create_missing is None:
create_missing = conf.task_create_missing_queues
if ha_policy is None:
ha_policy = conf.task_queue_ha_policy
if max_priority is None:
max_priority = conf.task_queue_max_priority
if not queues and conf.task_default_queue:
Expand All @@ -271,7 +256,7 @@ def Queues(self, queues, create_missing=None, ha_policy=None,
else autoexchange)
return self.queues_cls(
queues, self.default_exchange, create_missing,
ha_policy, autoexchange, max_priority, default_routing_key,
autoexchange, max_priority, default_routing_key,
)

def Router(self, queues=None, create_missing=None):
Expand Down
1 change: 0 additions & 1 deletion celery/app/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,6 @@ def __repr__(self):
type='dict', old={'celery_task_publish_retry_policy'},
),
queues=Option(type='dict'),
queue_ha_policy=Option(None, type='string'),
queue_max_priority=Option(None, type='int'),
reject_on_worker_lost=Option(type='bool'),
remote_tracebacks=Option(False, type='bool'),
Expand Down
27 changes: 0 additions & 27 deletions docs/userguide/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2100,33 +2100,6 @@ The final routing options for ``tasks.add`` will become:
See :ref:`routers` for more examples.
.. setting:: task_queue_ha_policy
``task_queue_ha_policy``
~~~~~~~~~~~~~~~~~~~~~~~~
:brokers: RabbitMQ
Default: :const:`None`.
This will set the default HA policy for a queue, and the value
can either be a string (usually ``all``):
.. code-block:: python
task_queue_ha_policy = 'all'
Using 'all' will replicate the queue to all current nodes,
Or you can give it a list of nodes to replicate to:
.. code-block:: python
task_queue_ha_policy = ['rabbit@host1', 'rabbit@host2']
Using a list will implicitly set ``ha-mode`` to 'nodes' and
``ha-params`` to the given list of nodes.
See http://www.rabbitmq.com/ha.html for more information.
.. setting:: task_queue_max_priority
``task_queue_max_priority``
Expand Down
32 changes: 0 additions & 32 deletions t/unit/app/test_amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,23 +89,6 @@ def test_setitem_adds_default_exchange(self):
q['foo'] = queue
assert q['foo'].exchange == q.default_exchange

@pytest.mark.parametrize('ha_policy,qname,q,qargs,expected', [
(None, 'xyz', 'xyz', None, None),
(None, 'xyz', 'xyz', {'x-foo': 'bar'}, {'x-foo': 'bar'}),
('all', 'foo', Queue('foo'), None, {'ha-mode': 'all'}),
('all', 'xyx2',
Queue('xyx2', queue_arguments={'x-foo': 'bar'}),
None,
{'ha-mode': 'all', 'x-foo': 'bar'}),
(['A', 'B', 'C'], 'foo', Queue('foo'), None, {
'ha-mode': 'nodes',
'ha-params': ['A', 'B', 'C']}),
])
def test_with_ha_policy(self, ha_policy, qname, q, qargs, expected):
queues = Queues(ha_policy=ha_policy, create_missing=False)
queues.add(q, queue_arguments=qargs)
assert queues[qname].queue_arguments == expected

def test_select_add(self):
q = Queues()
q.select(['foo', 'bar'])
Expand All @@ -118,11 +101,6 @@ def test_deselect(self):
q.deselect('bar')
assert sorted(q._consume_from.keys()) == ['foo']

def test_with_ha_policy_compat(self):
q = Queues(ha_policy='all')
q.add('bar')
assert q['bar'].queue_arguments == {'ha-mode': 'all'}

def test_add_default_exchange(self):
ex = Exchange('fff', 'fanout')
q = Queues(default_exchange=ex)
Expand All @@ -143,12 +121,6 @@ def test_alias(self):
({'max_priority': 10},
'moo', Queue('moo', queue_arguments=None),
{'x-max-priority': 10}),
({'ha_policy': 'all', 'max_priority': 5},
'bar', 'bar',
{'ha-mode': 'all', 'x-max-priority': 5}),
({'ha_policy': 'all', 'max_priority': 5},
'xyx2', Queue('xyx2', queue_arguments={'x-max-priority': 2}),
{'ha-mode': 'all', 'x-max-priority': 2}),
({'max_priority': None},
'foo2', 'foo2',
None),
Expand Down Expand Up @@ -255,10 +227,6 @@ def test_countdown_negative(self):
with pytest.raises(ValueError):
self.app.amqp.as_task_v2(uuid(), 'foo', countdown=-1232132323123)

def test_Queues__with_ha_policy(self):
x = self.app.amqp.Queues({}, ha_policy='all')
assert x.ha_policy == 'all'

def test_Queues__with_max_priority(self):
x = self.app.amqp.Queues({}, max_priority=23)
assert x.max_priority == 23
Expand Down

0 comments on commit 8c5e988

Please sign in to comment.