diff --git a/celery/backends/redis.py b/celery/backends/redis.py index 660af701ac..1b9db7433f 100644 --- a/celery/backends/redis.py +++ b/celery/backends/redis.py @@ -328,6 +328,15 @@ def _params_from_url(self, url, defaults): connparams.update(query) return connparams + @cached_property + def retry_policy(self): + retry_policy = super().retry_policy + if "retry_policy" in self._transport_options: + retry_policy = retry_policy.copy() + retry_policy.update(self._transport_options['retry_policy']) + + return retry_policy + def on_task_call(self, producer, task_id): if not task_join_will_block(): self.result_consumer.consume_from(task_id) @@ -401,10 +410,11 @@ def apply_chord(self, header_result, body, **kwargs): @cached_property def _chord_zset(self): - transport_options = self.app.conf.get( - 'result_backend_transport_options', {} - ) - return transport_options.get('result_chord_ordered', True) + return self._transport_options.get('result_chord_ordered', True) + + @cached_property + def _transport_options(self): + return self.app.conf.get('result_backend_transport_options', {}) def on_chord_part_return(self, request, state, result, propagate=None, **kwargs): @@ -530,12 +540,8 @@ def _get_sentinel_instance(self, **params): connparams = params.copy() hosts = connparams.pop("hosts") - result_backend_transport_opts = self.app.conf.get( - "result_backend_transport_options", {}) - min_other_sentinels = result_backend_transport_opts.get( - "min_other_sentinels", 0) - sentinel_kwargs = result_backend_transport_opts.get( - "sentinel_kwargs", {}) + min_other_sentinels = self._transport_options.get("min_other_sentinels", 0) + sentinel_kwargs = self._transport_options.get("sentinel_kwargs", {}) sentinel_instance = self.sentinel.Sentinel( [(cp['host'], cp['port']) for cp in hosts], @@ -548,9 +554,7 @@ def _get_sentinel_instance(self, **params): def _get_pool(self, **params): sentinel_instance = self._get_sentinel_instance(**params) - result_backend_transport_opts = self.app.conf.get( - "result_backend_transport_options", {}) - master_name = result_backend_transport_opts.get("master_name", None) + master_name = self._transport_options.get("master_name", None) return sentinel_instance.master_for( service_name=master_name, diff --git a/docs/getting-started/brokers/redis.rst b/docs/getting-started/brokers/redis.rst index 9dde8c9086..52a9b6944b 100644 --- a/docs/getting-started/brokers/redis.rst +++ b/docs/getting-started/brokers/redis.rst @@ -94,6 +94,21 @@ If you are using Sentinel, you should specify the master_name using the :setting app.conf.result_backend_transport_options = {'master_name': "mymaster"} +Connection timeouts +^^^^^^^^^^^^^^^^^^^ + +To configure the connection timeouts for the Redis result backend, use the ``retry_policy`` key under :setting:`result_backend_transport_options`: + + +.. code-block:: python + + app.conf.result_backend_transport_options = { + 'retry_policy': { + 'timeout': 5.0 + } + } + +See :func:`~kombu.utils.functional.retry_over_time` for the possible retry policy options. .. _redis-caveats: diff --git a/t/unit/backends/test_redis.py b/t/unit/backends/test_redis.py index 1415978cbf..7c3e3e7d90 100644 --- a/t/unit/backends/test_redis.py +++ b/t/unit/backends/test_redis.py @@ -523,6 +523,29 @@ def test_on_connection_error(self, logger): assert self.b.on_connection_error(10, exc, intervals, 3) == 30 logger.error.assert_called_with(self.E_LOST, 3, 10, 'in 30.00 seconds') + @patch('celery.backends.redis.retry_over_time') + def test_retry_policy_conf(self, retry_over_time): + self.app.conf.result_backend_transport_options = dict( + retry_policy=dict( + max_retries=2, + interval_start=0, + interval_step=0.01, + ), + ) + b = self.Backend(app=self.app) + + def fn(): + return 1 + + # We don't want to re-test retry_over_time, just check we called it + # with the expected args + b.ensure(fn, (),) + + retry_over_time.assert_called_with( + fn, b.connection_errors, (), {}, ANY, + max_retries=2, interval_start=0, interval_step=0.01, interval_max=1 + ) + def test_incr(self): self.b.client = Mock(name='client') self.b.incr('foo')