Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass socket_keepalive_options to redis client for result_backend #8297

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
15 changes: 10 additions & 5 deletions celery/backends/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,16 +218,19 @@ def __init__(self, host=None, port=None, db=None, password=None,
if host and '://' in host:
url, host = host, None

self.max_connections = (
max_connections or
_get('redis_max_connections') or
self.max_connections)
if max_connections is not None:
self.max_connections = max_connections
elif _get('redis_max_connections') is not None:
self.max_connections = _get('redis_max_connections')
else:
self.max_connections = self.max_connections
self._ConnectionPool = connection_pool

socket_timeout = _get('redis_socket_timeout')
socket_connect_timeout = _get('redis_socket_connect_timeout')
retry_on_timeout = _get('redis_retry_on_timeout')
socket_keepalive = _get('redis_socket_keepalive')
socket_keepalive_options = self._transport_options.get('socket_keepalive_options', {})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we document this change?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have this description for those parameters and the rest parameters are in code they aren't described either.
So I haven't described it as well. So the question is: should I be highlighted?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at least New in version 5.4. annotation?

health_check_interval = _get('redis_backend_health_check_interval')

self.connparams = {
Expand Down Expand Up @@ -259,6 +262,8 @@ def __init__(self, host=None, port=None, db=None, password=None,
# absent in redis.connection.UnixDomainSocketConnection
if socket_keepalive:
self.connparams['socket_keepalive'] = socket_keepalive
if socket_keepalive_options:
self.connparams['socket_keepalive_options'] = socket_keepalive_options

# "redis_backend_use_ssl" must be a dict with the keys:
# 'ssl_cert_reqs', 'ssl_ca_certs', 'ssl_certfile', 'ssl_keyfile'
Expand Down Expand Up @@ -453,7 +458,7 @@ def apply_chord(self, header_result_args, body, **kwargs):
def _chord_zset(self):
return self._transport_options.get('result_chord_ordered', True)

@cached_property
@property
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you sure about this change?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I have this cached_property 4 test are doing to failure:

======================================================================= short test summary info ========================================================================
FAILED t/unit/backends/test_redis.py::test_RedisBackend_chords_simple::test_on_chord_part_return__unordered - AssertionError: assert 0
FAILED t/unit/backends/test_redis.py::test_RedisBackend_chords_simple::test_on_chord_part_return_no_expiry__unordered - AssertionError: assert 0
FAILED t/unit/backends/test_redis.py::test_RedisBackend_chords_simple::test_on_chord_part_return__ChordError__unordered - TypeError: '_ContextMock' object is not subscriptable
FAILED t/unit/backends/test_redis.py::test_RedisBackend_chords_simple::test_on_chord_part_return__other_error__unordered - TypeError: '_ContextMock' object is not subscriptable
=============================================================== 4 failed, 82 passed in 63.34s (0:01:03) ================================================================

Its looks like _transport_options is setup once and every test use it (doesn't update their custom setup).
Also I don't see reason that should be cached if:

  1. It is in settings object so all worker have their own copy.
  2. Get something form dict don't cost much for computation.

Maybe something I have missed or I haven't know and any advice how to fix will be welcome and I change if back :)

def _transport_options(self):
return self.app.conf.get('result_backend_transport_options', {})

Expand Down
32 changes: 32 additions & 0 deletions t/unit/backends/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from contextlib import contextmanager
from datetime import timedelta
from pickle import dumps, loads
from platform import system
from unittest.mock import ANY, Mock, call, patch

import pytest
Expand Down Expand Up @@ -452,6 +453,7 @@ def test_socket_url(self):
assert x.connparams['socket_timeout'] == 30.0
assert 'socket_connect_timeout' not in x.connparams
assert 'socket_keepalive' not in x.connparams
assert 'socket_keepalive_options' not in x.connparams
assert x.connparams['db'] == 3

def test_backend_ssl(self):
Expand Down Expand Up @@ -1096,6 +1098,36 @@ def test_on_chord_part_return__other_error__ordered(self):
callback.id, exc=ANY,
)

@pytest.mark.skipif(system() != 'Linux', reason="Test supported only for Linux setup")
def test_socket_keepalive_options(self):
pytest.importorskip('redis')
from socket import TCP_KEEPCNT, TCP_KEEPIDLE, TCP_KEEPINTVL

self.app.conf.redis_socket_keepalive = True
self.app.conf.result_backend_transport_options = {
'socket_keepalive_options': {
TCP_KEEPIDLE: 300,
TCP_KEEPCNT: 9,
TCP_KEEPINTVL: 45
}
}

x = self.Backend('redis://:bosco@vandelay.com:123//1', app=self.app)

assert x.connparams['socket_keepalive'] is True
assert x.connparams['socket_keepalive_options'] == {4: 300, 6: 9, 5: 45}

def test_setup_proper_max_conenction_value_depends_on_passed_value(self):
x = self.Backend('redis://:bosco@vandelay.com:123//1', app=self.app)
assert x.connparams['max_connections'] is None

x = self.Backend('redis://:bosco@vandelay.com:123//1', app=self.app, max_connections=0)
assert x.connparams['max_connections'] == 0

self.app.conf.redis_max_connections = 10
x = self.Backend('redis://:bosco@vandelay.com:123//1', app=self.app)
assert x.connparams['max_connections'] == 10


class test_RedisBackend_chords_complex(basetest_RedisBackend):
@pytest.fixture(scope="function", autouse=True)
Expand Down
7 changes: 4 additions & 3 deletions t/unit/tasks/test_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,10 +396,11 @@ def test_ready(self):
)
def test_del(self):
with patch('celery.result.AsyncResult.backend') as backend:
result = self.app.AsyncResult(self.task1['id'])
result.backend = backend
result = self.app.AsyncResult(self.task1['id'], backend=backend)
result_clone = copy.copy(result)
del result
# del result do not exactly call __del__ method. The gc does.
# https://docs.python.org/3/reference/datamodel.html#object.__del__
result.__del__()
backend.remove_pending_result.assert_called_once_with(
result_clone
)
Expand Down