Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose retry_policy for Redis result backend #6330

Merged
merged 1 commit into from Sep 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 17 additions & 13 deletions celery/backends/redis.py
Expand Up @@ -328,6 +328,15 @@ def _params_from_url(self, url, defaults):
connparams.update(query)
return connparams

@cached_property
def retry_policy(self):
retry_policy = super().retry_policy
if "retry_policy" in self._transport_options:
retry_policy = retry_policy.copy()
retry_policy.update(self._transport_options['retry_policy'])

return retry_policy

def on_task_call(self, producer, task_id):
if not task_join_will_block():
self.result_consumer.consume_from(task_id)
Expand Down Expand Up @@ -401,10 +410,11 @@ def apply_chord(self, header_result, body, **kwargs):

@cached_property
def _chord_zset(self):
transport_options = self.app.conf.get(
'result_backend_transport_options', {}
)
return transport_options.get('result_chord_ordered', True)
return self._transport_options.get('result_chord_ordered', True)

@cached_property
def _transport_options(self):
return self.app.conf.get('result_backend_transport_options', {})

def on_chord_part_return(self, request, state, result,
propagate=None, **kwargs):
Expand Down Expand Up @@ -530,12 +540,8 @@ def _get_sentinel_instance(self, **params):
connparams = params.copy()

hosts = connparams.pop("hosts")
result_backend_transport_opts = self.app.conf.get(
"result_backend_transport_options", {})
min_other_sentinels = result_backend_transport_opts.get(
"min_other_sentinels", 0)
sentinel_kwargs = result_backend_transport_opts.get(
"sentinel_kwargs", {})
min_other_sentinels = self._transport_options.get("min_other_sentinels", 0)
sentinel_kwargs = self._transport_options.get("sentinel_kwargs", {})

sentinel_instance = self.sentinel.Sentinel(
[(cp['host'], cp['port']) for cp in hosts],
Expand All @@ -548,9 +554,7 @@ def _get_sentinel_instance(self, **params):
def _get_pool(self, **params):
sentinel_instance = self._get_sentinel_instance(**params)

result_backend_transport_opts = self.app.conf.get(
"result_backend_transport_options", {})
master_name = result_backend_transport_opts.get("master_name", None)
master_name = self._transport_options.get("master_name", None)

return sentinel_instance.master_for(
service_name=master_name,
Expand Down
15 changes: 15 additions & 0 deletions docs/getting-started/brokers/redis.rst
Expand Up @@ -94,6 +94,21 @@ If you are using Sentinel, you should specify the master_name using the :setting
app.conf.result_backend_transport_options = {'master_name': "mymaster"}
Connection timeouts
^^^^^^^^^^^^^^^^^^^

To configure the connection timeouts for the Redis result backend, use the ``retry_policy`` key under :setting:`result_backend_transport_options`:


.. code-block:: python
app.conf.result_backend_transport_options = {
'retry_policy': {
'timeout': 5.0
}
}
See :func:`~kombu.utils.functional.retry_over_time` for the possible retry policy options.

.. _redis-caveats:

Expand Down
23 changes: 23 additions & 0 deletions t/unit/backends/test_redis.py
Expand Up @@ -523,6 +523,29 @@ def test_on_connection_error(self, logger):
assert self.b.on_connection_error(10, exc, intervals, 3) == 30
logger.error.assert_called_with(self.E_LOST, 3, 10, 'in 30.00 seconds')

@patch('celery.backends.redis.retry_over_time')
def test_retry_policy_conf(self, retry_over_time):
self.app.conf.result_backend_transport_options = dict(
retry_policy=dict(
max_retries=2,
interval_start=0,
interval_step=0.01,
),
)
b = self.Backend(app=self.app)

def fn():
return 1

# We don't want to re-test retry_over_time, just check we called it
# with the expected args
b.ensure(fn, (),)

retry_over_time.assert_called_with(
fn, b.connection_errors, (), {}, ANY,
max_retries=2, interval_start=0, interval_step=0.01, interval_max=1
)

def test_incr(self):
self.b.client = Mock(name='client')
self.b.incr('foo')
Expand Down