Skip to content

Commit

Permalink
attempt to provide only healthy connections from the pool
Browse files Browse the repository at this point in the history
Adds redis.selector, a module that provides the best selector strategy
available on the current platform. A redis.selector polls a socket to
provide two pieces of functionality:

1. Check whether data can be read from the socket. Prior versions of redis-py
provided this behavior with just select.select(). select() has lots of
limitations, most notably a limit of ~1024 file descriptors. Now that
better selectors are available, this should make can_read() faster and
able to accomodate more clients. See #1115 and #486

2. Check whether a socket is ready for a command to be sent. This doubles
as a health check. It ensures that the socket is available for writing,
has no data to read and has no known errors. Anytime a socket is
disconnected or hung up, data is available to be read, typically zero bytes.

ConnectionPool.get_connection has been modified to ensure that connections
it returns are connected and are ready for a command to be sent. If
get_connection encounters a case where a socket isn't ready for a command
the connection is reconnected and checked again.

TODO: more tests for this stuff. implement EPoll and KQueue selectors.

Fixes #1115
Fixes #486
  • Loading branch information
andymccurdy committed Feb 5, 2019
1 parent a464459 commit cfa2bc9
Show file tree
Hide file tree
Showing 4 changed files with 295 additions and 21 deletions.
13 changes: 0 additions & 13 deletions redis/_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,6 @@
import socket
import time

from select import select as _select, error as select_error

def select(rlist, wlist, xlist, timeout):
while True:
try:
return _select(rlist, wlist, xlist, timeout)
except select_error as e:
if e.args[0] == errno.EINTR:
continue
raise

# Wrapper for handling interruptable system calls.
def _retryable_call(s, func, *args, **kwargs):
# Some modules (SSL) use the _fileobject wrapper directly and
Expand Down Expand Up @@ -65,8 +54,6 @@ def recv_into(sock, *args, **kwargs):
return _retryable_call(sock, sock.recv_into, *args, **kwargs)

else: # Python 3.5 and above automatically retry EINTR
from select import select

def recv(sock, *args, **kwargs):
return sock.recv(*args, **kwargs)

Expand Down
45 changes: 42 additions & 3 deletions redis/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from redis._compat import (xrange, imap, byte_to_chr, unicode, long,
nativestr, basestring, iteritems,
LifoQueue, Empty, Full, urlparse, parse_qs,
recv, recv_into, select, unquote)
recv, recv_into, unquote)
from redis.exceptions import (
DataError,
RedisError,
Expand All @@ -31,6 +31,7 @@
ExecAbortError,
ReadOnlyError
)
from redis.selector import DefaultSelector
from redis.utils import HIREDIS_AVAILABLE
if HIREDIS_AVAILABLE:
import hiredis
Expand Down Expand Up @@ -496,6 +497,7 @@ def connect(self):
raise ConnectionError(self._error_message(e))

self._sock = sock
self._selector = DefaultSelector(sock)
try:
self.on_connect()
except RedisError:
Expand Down Expand Up @@ -623,8 +625,11 @@ def can_read(self, timeout=0):
if not sock:
self.connect()
sock = self._sock
return self._parser.can_read() or \
bool(select([sock], [], [], timeout)[0])
return self._parser.can_read() or self._selector.can_read(timeout)

def is_ready_for_command(self):
"Check if the connection is ready for a command"
return self._selector.is_ready_for_command()

def read_response(self):
"Read the response from a previously sent command"
Expand Down Expand Up @@ -984,6 +989,23 @@ def get_connection(self, command_name, *keys, **options):
except IndexError:
connection = self.make_connection()
self._in_use_connections.add(connection)
try:
# ensure this connection is connected to Redis
connection.connect()
# connections that the pool provides should be ready to send
# a command. if not, the connection was either returned to the
# pool before all data has been read or the socket has been
# closed. either way, reconnect and verify everything is good.
if not connection.is_ready_for_command():
connection.disconnect()
connection.connect()
if not connection.is_ready_for_command():
raise ConnectionError('Connection not ready')
except: # noqa: E722
# release the connection back to the pool so that we don't leak it
self.release(connection)
raise

return connection

def get_encoder(self):
Expand Down Expand Up @@ -1115,6 +1137,23 @@ def get_connection(self, command_name, *keys, **options):
if connection is None:
connection = self.make_connection()

try:
# ensure this connection is connected to Redis
connection.connect()
# connections that the pool provides should be ready to send
# a command. if not, the connection was either returned to the
# pool before all data has been read or the socket has been
# closed. either way, reconnect and verify everything is good.
if not connection.is_ready_for_command():
connection.disconnect()
connection.connect()
if not connection.is_ready_for_command():
raise ConnectionError('Connection not ready')
except: # noqa: E722
# release the connection back to the pool so that we don't leak it
self.release(connection)
raise

return connection

def release(self, connection):
Expand Down
187 changes: 187 additions & 0 deletions redis/selector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
import errno
import select
from redis.exceptions import RedisError


_DEFAULT_SELECTOR = None


class BaseSelector(object):
"""
Base class for all Selectors
"""
def __init__(self, sock):
self.sock = sock

def can_read(self, timeout=0):
"""
Return True if data is ready to be read from the socket,
otherwise False.
This doesn't guarentee that the socket is still connected, just that
there is data to read.
Automatically retries EINTR errors based on PEP 475.
"""
while True:
try:
return self.check_can_read(timeout)
except (select.error, IOError) as ex:
if self.errno_from_exception(ex) == errno.EINTR:
continue
return False

def is_ready_for_command(self, timeout=0):
"""
Return True if the socket is ready to send a command,
otherwise False.
Automatically retries EINTR errors based on PEP 475.
"""
while True:
try:
return self.check_is_ready_for_command(timeout)
except (select.error, IOError) as ex:
if self.errno_from_exception(ex) == errno.EINTR:
continue
return False

def check_can_read(self, timeout):
"""
Perform the can_read check. Subclasses should implement this.
"""
raise NotImplementedError

def check_is_ready_for_command(self, timeout):
"""
Perform the is_ready_for_command check. Subclasses should
implement this.
"""
raise NotImplementedError

def close(self):
"""
Close the selector.
"""
self.sock = None

def errno_from_exception(self, ex):
"""
Get the error number from an exception
"""
if hasattr(ex, 'errno'):
return ex.errno
elif ex.args:
return ex.args[0]
else:
return None


if hasattr(select, 'select'):
class SelectSelector(BaseSelector):
"""
A select-based selector that should work on most platforms.
This is the worst poll strategy and should only be used if no other
option is available.
"""
def check_can_read(self, timeout):
"""
Return True if data is ready to be read from the socket,
otherwise False.
This doesn't guarentee that the socket is still connected, just
that there is data to read.
"""
return bool(select.select([self.sock], [], [], timeout)[0])

def check_is_ready_for_command(self, timeout):
"""
Return True if the socket is ready to send a command,
otherwise False.
"""
r, w, e = select.select([self.sock], [self.sock], [self.sock],
timeout)
return bool(w and not r and not e)


if hasattr(select, 'poll'):
class PollSelector(BaseSelector):
"""
A poll-based selector that should work on (almost?) all versions
of Unix
"""
_EVENT_MASK = (select.POLLIN | select.POLLPRI | select.POLLOUT |
select.POLLERR | select.POLLHUP)
_READ_MASK = select.POLLIN | select.POLLPRI
_WRITE_MASK = select.POLLOUT

def __init__(self, sock):
super().__init__(sock)
self.poller = select.poll()
self.poller.register(sock, self._EVENT_MASK)

def close(self):
"""
Close the selector.
"""
try:
self.poller.unregister(self.sock)
except (KeyError, ValueError):
# KeyError is raised if somehow the socket was not registered
# ValueError is raised if the socket's file descriptor is
# negative.
# In either case, we can't do anything better than to remove
# the reference to the poller.
pass
self.poller = None
self.sock = None

def check_can_read(self, timeout=0):
"""
Return True if data is ready to be read from the socket,
otherwise False.
This doesn't guarentee that the socket is still connected, just
that there is data to read.
"""
events = self.poller.poll(0)
return bool(events and events[0][1] & self._READ_MASK)

def check_is_ready_for_command(self, timeout=0):
"""
Return True if the socket is ready to send a command,
otherwise False
"""
events = self.poller.poll(0)
return bool(events and events[0][1] == self._WRITE_MASK)


def has_selector(selector):
"Determine if the current platform has the selector available"
try:
if selector == 'poll':
# the select module offers the poll selector even if the platform
# doesn't support it. Attempt to poll for nothing to make sure
# poll is available
p = select.poll()
p.poll(0)
else:
# the other selectors will fail when instantiated
getattr(select, selector)().close()
return True
except (OSError, AttributeError):
return False


def DefaultSelector(sock):
"Return the best selector for the platform"
global _DEFAULT_SELECTOR
if _DEFAULT_SELECTOR is None:
if has_selector('poll'):
_DEFAULT_SELECTOR = PollSelector
elif hasattr(select, 'select'):
_DEFAULT_SELECTOR = SelectSelector
else:
raise RedisError('Platform does not support any selectors')
return _DEFAULT_SELECTOR(sock)

0 comments on commit cfa2bc9

Please sign in to comment.