Skip to content

Commit

Permalink
Expose retry_policy for Redis result backend
Browse files Browse the repository at this point in the history
Rather than adding a new top-level config option, I have used a new key
in the already existing setting `result_backend_transport_options`.

Closes #6166
  • Loading branch information
ashb committed Sep 2, 2020
1 parent 7ba1b46 commit c8dd9cc
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 13 deletions.
30 changes: 17 additions & 13 deletions celery/backends/redis.py
Expand Up @@ -328,6 +328,15 @@ def _params_from_url(self, url, defaults):
connparams.update(query)
return connparams

@cached_property
def retry_policy(self):
retry_policy = super(RedisBackend, self).retry_policy
if "retry_policy" in self._transport_options:
retry_policy = retry_policy.copy()
retry_policy.update(self._transport_options['retry_policy'])

return retry_policy

def on_task_call(self, producer, task_id):
if not task_join_will_block():
self.result_consumer.consume_from(task_id)
Expand Down Expand Up @@ -401,10 +410,11 @@ def apply_chord(self, header_result, body, **kwargs):

@cached_property
def _chord_zset(self):
transport_options = self.app.conf.get(
'result_backend_transport_options', {}
)
return transport_options.get('result_chord_ordered', True)
return self._transport_options.get('result_chord_ordered', True)

@cached_property
def _transport_options(self):
return self.app.conf.get('result_backend_transport_options', {})

def on_chord_part_return(self, request, state, result,
propagate=None, **kwargs):
Expand Down Expand Up @@ -530,12 +540,8 @@ def _get_sentinel_instance(self, **params):
connparams = params.copy()

hosts = connparams.pop("hosts")
result_backend_transport_opts = self.app.conf.get(
"result_backend_transport_options", {})
min_other_sentinels = result_backend_transport_opts.get(
"min_other_sentinels", 0)
sentinel_kwargs = result_backend_transport_opts.get(
"sentinel_kwargs", {})
min_other_sentinels = self._transport_options.get("min_other_sentinels", 0)
sentinel_kwargs = self._tran.get("sentinel_kwargs", {})

sentinel_instance = self.sentinel.Sentinel(
[(cp['host'], cp['port']) for cp in hosts],
Expand All @@ -548,9 +554,7 @@ def _get_sentinel_instance(self, **params):
def _get_pool(self, **params):
sentinel_instance = self._get_sentinel_instance(**params)

result_backend_transport_opts = self.app.conf.get(
"result_backend_transport_options", {})
master_name = result_backend_transport_opts.get("master_name", None)
master_name = self._transport_options.get("master_name", None)

return sentinel_instance.master_for(
service_name=master_name,
Expand Down
15 changes: 15 additions & 0 deletions docs/getting-started/brokers/redis.rst
Expand Up @@ -94,6 +94,21 @@ If you are using Sentinel, you should specify the master_name using the :setting
app.conf.result_backend_transport_options = {'master_name': "mymaster"}
Connection timeouts
^^^^^^^^^^^^^^^^^^^

To configure the connection timeouts for the Redis result backend, use the ``retry_policy`` key under :setting:`result_backend_transport_options`:


.. code-block:: python
app.conf.result_backend_transport_options = {
'retry_policy': {
'timeout': 5.0
}
}
See :func:`~kombu.utils.functional.retry_over_time` for the possible retry policy options.

.. _redis-caveats:

Expand Down
21 changes: 21 additions & 0 deletions t/unit/backends/test_redis.py
Expand Up @@ -523,6 +523,27 @@ def test_on_connection_error(self, logger):
assert self.b.on_connection_error(10, exc, intervals, 3) == 30
logger.error.assert_called_with(self.E_LOST, 3, 10, 'in 30.00 seconds')

@patch('celery.backends.redis.retry_over_time')
def test_retry_policy_conf(self, retry_over_time):
self.app.conf.result_backend_transport_options = dict(
retry_policy=dict(
max_retries=2,
interval_start=0,
interval_step=0.01,
),
)
b = self.Backend(app=self.app)
# We don't want to re-test retry_over_time, just check we called it
# with the expected args
fn = lambda: 1
b.ensure(fn, (),)

retry_over_time.assert_called_with(
fn, b.connection_errors, (), {}, ANY,
max_retries=2, interval_start=0, interval_step=0.01, interval_max=1
)


def test_incr(self):
self.b.client = Mock(name='client')
self.b.incr('foo')
Expand Down

0 comments on commit c8dd9cc

Please sign in to comment.