Skip to content

Commit

Permalink
attempt to provide only healthy connections from the pool
Browse files Browse the repository at this point in the history
Adds redis.selector, a module that provides the best selector strategy
available on the current platform. A redis.selector polls a socket to
provide two pieces of functionality:

1. Check whether data can be read from the socket. Prior versions of redis-py
provided this behavior with just select.select(). select() has lots of
limitations, most notably a limit of ~1024 file descriptors. Now that
better selectors are available, this should make can_read() faster and
able to accomodate more clients. See redis#1115 and redis#486

2. Check whether a socket is ready for a command to be sent. This doubles
as a health check. It ensures that the socket is available for writing,
has no data to read and has no known errors. Anytime a socket is
disconnected or hung up, data is available to be read, typically zero bytes.

ConnectionPool.get_connection has been modified to ensure that connections
it returns are connected and are ready for a command to be sent. If
get_connection encounters a case where a socket isn't ready for a command
the connection is reconnected and checked again.

TODO: more tests for this stuff. implement EPoll and KQueue selectors.

Fixes redis#1115
Fixes redis#486

python2 compat

test all selectors via pytest parameterization
  • Loading branch information
andymccurdy authored and belvedere-trading-user committed Sep 25, 2020
1 parent 4d0b0af commit 0a85942
Show file tree
Hide file tree
Showing 5 changed files with 378 additions and 7 deletions.
2 changes: 1 addition & 1 deletion redis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
)


__version__ = '2.10.3'
__version__ = '2.10.3777'
VERSION = tuple(map(int, __version__.split('.')))

__all__ = [
Expand Down
42 changes: 41 additions & 1 deletion redis/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
ExecAbortError,
ReadOnlyError
)
from redis.selector import DefaultSelector
from redis.utils import HIREDIS_AVAILABLE
if HIREDIS_AVAILABLE:
import hiredis
Expand Down Expand Up @@ -436,6 +437,7 @@ def connect(self):
raise ConnectionError(self._error_message(e))

self._sock = sock
self._selector = DefaultSelector(sock)
try:
self.on_connect()
except RedisError:
Expand Down Expand Up @@ -561,7 +563,11 @@ def can_read(self):
if not sock:
self.connect()
sock = self._sock
return bool(select([sock], [], [], 0)[0]) or self._parser.can_read()
return self._parser.can_read() or self._selector.can_read()

def is_ready_for_command(self):
"Check if the connection is ready for a command"
return self._selector.is_ready_for_command()

def read_response(self):
"Read the response from a previously sent command"
Expand Down Expand Up @@ -873,6 +879,23 @@ def get_connection(self, command_name, *keys, **options):
except IndexError:
connection = self.make_connection()
self._in_use_connections.add(connection)
try:
# ensure this connection is connected to Redis
connection.connect()
# connections that the pool provides should be ready to send
# a command. if not, the connection was either returned to the
# pool before all data has been read or the socket has been
# closed. either way, reconnect and verify everything is good.
if not connection.is_ready_for_command():
connection.disconnect()
connection.connect()
if not connection.is_ready_for_command():
raise ConnectionError('Connection not ready')
except: # noqa: E722
# release the connection back to the pool so that we don't leak it
self.release(connection)
raise

return connection

def make_connection(self):
Expand Down Expand Up @@ -994,6 +1017,23 @@ def get_connection(self, command_name, *keys, **options):
if connection is None:
connection = self.make_connection()

try:
# ensure this connection is connected to Redis
connection.connect()
# connections that the pool provides should be ready to send
# a command. if not, the connection was either returned to the
# pool before all data has been read or the socket has been
# closed. either way, reconnect and verify everything is good.
if not connection.is_ready_for_command():
connection.disconnect()
connection.connect()
if not connection.is_ready_for_command():
raise ConnectionError('Connection not ready')
except: # noqa: E722
# release the connection back to the pool so that we don't leak it
self.release(connection)
raise

return connection

def release(self, connection):
Expand Down
196 changes: 196 additions & 0 deletions redis/selector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
import errno
import select
from redis.exceptions import RedisError


_DEFAULT_SELECTOR = None


class BaseSelector(object):
"""
Base class for all Selectors
"""
def __init__(self, sock):
self.sock = sock

def can_read(self, timeout=0):
"""
Return True if data is ready to be read from the socket,
otherwise False.
This doesn't guarentee that the socket is still connected, just that
there is data to read.
Automatically retries EINTR errors based on PEP 475.
"""
while True:
try:
return self.check_can_read(timeout)
except (select.error, IOError) as ex:
if self.errno_from_exception(ex) == errno.EINTR:
continue
return False

def is_ready_for_command(self, timeout=0):
"""
Return True if the socket is ready to send a command,
otherwise False.
Automatically retries EINTR errors based on PEP 475.
"""
while True:
try:
return self.check_is_ready_for_command(timeout)
except (select.error, IOError) as ex:
if self.errno_from_exception(ex) == errno.EINTR:
continue
return False

def check_can_read(self, timeout):
"""
Perform the can_read check. Subclasses should implement this.
"""
raise NotImplementedError

def check_is_ready_for_command(self, timeout):
"""
Perform the is_ready_for_command check. Subclasses should
implement this.
"""
raise NotImplementedError

def close(self):
"""
Close the selector.
"""
self.sock = None

def errno_from_exception(self, ex):
"""
Get the error number from an exception
"""
if hasattr(ex, 'errno'):
return ex.errno
elif ex.args:
return ex.args[0]
else:
return None


if hasattr(select, 'select'):
class SelectSelector(BaseSelector):
"""
A select-based selector that should work on most platforms.
This is the worst poll strategy and should only be used if no other
option is available.
"""
def check_can_read(self, timeout):
"""
Return True if data is ready to be read from the socket,
otherwise False.
This doesn't guarentee that the socket is still connected, just
that there is data to read.
"""
return bool(select.select([self.sock], [], [], timeout)[0])

def check_is_ready_for_command(self, timeout):
"""
Return True if the socket is ready to send a command,
otherwise False.
"""
r, w, e = select.select([self.sock], [self.sock], [self.sock],
timeout)
return bool(w and not r and not e)


if hasattr(select, 'poll'):
class PollSelector(BaseSelector):
"""
A poll-based selector that should work on (almost?) all versions
of Unix
"""
READ_MASK = select.POLLIN | select.POLLPRI
ERROR_MASK = select.POLLERR | select.POLLHUP
WRITE_MASK = select.POLLOUT

_READ_POLLER_MASK = READ_MASK | ERROR_MASK
_READY_POLLER_MASK = READ_MASK | ERROR_MASK | WRITE_MASK

def __init__(self, sock):
super(PollSelector, self).__init__(sock)
self.read_poller = select.poll()
self.read_poller.register(sock, self._READ_POLLER_MASK)
self.ready_poller = select.poll()
self.ready_poller.register(sock, self._READY_POLLER_MASK)

def close(self):
"""
Close the selector.
"""
for poller in (self.read_poller, self.ready_poller):
try:
self.read_poller.unregister(self.sock)
except (KeyError, ValueError):
# KeyError is raised if somehow the socket was not
# registered
# ValueError is raised if the socket's file descriptor is
# negative.
# In either case, we can't do anything better than to
# remove the reference to the poller.
pass
self.read_poller = None
self.ready_poller = None
self.sock = None

def check_can_read(self, timeout=0):
"""
Return True if data is ready to be read from the socket,
otherwise False.
This doesn't guarentee that the socket is still connected, just
that there is data to read.
"""
timeout = int(timeout * 1000)
events = self.read_poller.poll(timeout)
return bool(events and events[0][1] & self.READ_MASK)

def check_is_ready_for_command(self, timeout=0):
"""
Return True if the socket is ready to send a command,
otherwise False
"""
timeout = timeout * 1000
events = self.ready_poller.poll(timeout)
return bool(events and events[0][1] == self.WRITE_MASK)


def has_selector(selector):
"Determine if the current platform has the selector available"
try:
if selector == 'poll':
# the select module offers the poll selector even if the platform
# doesn't support it. Attempt to poll for nothing to make sure
# poll is available
p = select.poll()
p.poll(0)
else:
# the other selectors will fail when instantiated
getattr(select, selector)().close()
return True
except (OSError, AttributeError):
return False


def DefaultSelector(sock):
"Return the best selector for the platform"
global _DEFAULT_SELECTOR
if _DEFAULT_SELECTOR is None:
if has_selector('poll'):
_DEFAULT_SELECTOR = PollSelector
elif hasattr(select, 'select'):
_DEFAULT_SELECTOR = SelectSelector
else:
raise RedisError('Platform does not support any selectors')
return _DEFAULT_SELECTOR(sock)
23 changes: 18 additions & 5 deletions tests/test_connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@ def __init__(self, **kwargs):
self.kwargs = kwargs
self.pid = os.getpid()

def connect(self):
pass

def is_ready_for_command(self):
return True


class TestConnectionPool(object):
def get_pool(self, connection_kwargs=None, max_connections=None,
connection_class=DummyConnection):
connection_class=redis.Connection):
connection_kwargs = connection_kwargs or {}
pool = redis.ConnectionPool(
connection_class=connection_class,
Expand All @@ -30,7 +36,8 @@ def get_pool(self, connection_kwargs=None, max_connections=None,

def test_connection_creation(self):
connection_kwargs = {'foo': 'bar', 'biz': 'baz'}
pool = self.get_pool(connection_kwargs=connection_kwargs)
pool = self.get_pool(connection_kwargs=connection_kwargs,
connection_class=DummyConnection)
connection = pool.get_connection('_')
assert isinstance(connection, DummyConnection)
assert connection.kwargs == connection_kwargs
Expand Down Expand Up @@ -305,14 +312,20 @@ def test_defaults(self):
@pytest.mark.skipif(not ssl_available, reason="SSL not installed")
def test_cert_reqs_options(self):
import ssl
pool = redis.ConnectionPool.from_url('rediss://?ssl_cert_reqs=none')

class DummyConnectionPool(redis.ConnectionPool):
def get_connection(self, *args, **kwargs):
return self.make_connection()

pool = DummyConnectionPool.from_url(
'rediss://?ssl_cert_reqs=none')
assert pool.get_connection('_').cert_reqs == ssl.CERT_NONE

pool = redis.ConnectionPool.from_url(
pool = DummyConnectionPool.from_url(
'rediss://?ssl_cert_reqs=optional')
assert pool.get_connection('_').cert_reqs == ssl.CERT_OPTIONAL

pool = redis.ConnectionPool.from_url(
pool = DummyConnectionPool.from_url(
'rediss://?ssl_cert_reqs=required')
assert pool.get_connection('_').cert_reqs == ssl.CERT_REQUIRED

Expand Down

0 comments on commit 0a85942

Please sign in to comment.