Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds max_consecutive_exceptions to pool #253

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
46 changes: 43 additions & 3 deletions asyncpg/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@


import asyncio
from concurrent.futures._base import TimeoutError
import functools
import inspect
import time
Expand All @@ -15,6 +16,15 @@
from . import exceptions


BAD_CONN_EXCEPTION = (
exceptions._base.PostgresError,
exceptions._base.FatalPostgresError,
exceptions._base.UnknownPostgresError,
TimeoutError,
ConnectionRefusedError,
)


class PoolConnectionProxyMeta(type):

def __new__(mcls, name, bases, dct, *, wrap=False):
Expand Down Expand Up @@ -96,10 +106,12 @@ class PoolConnectionHolder:
'_connect_args', '_connect_kwargs',
'_max_queries', '_setup', '_init',
'_max_inactive_time', '_in_use',
'_inactive_callback', '_timeout')
'_inactive_callback', '_timeout',
'_max_consecutive_exceptions', '_consecutive_exceptions')

def __init__(self, pool, *, connect_args, connect_kwargs,
max_queries, setup, init, max_inactive_time):
max_queries, setup, init, max_inactive_time,
max_consecutive_exceptions):

self._pool = pool
self._con = None
Expand All @@ -108,6 +120,8 @@ def __init__(self, pool, *, connect_args, connect_kwargs,
self._connect_kwargs = connect_kwargs
self._max_queries = max_queries
self._max_inactive_time = max_inactive_time
self._max_consecutive_exceptions = max_consecutive_exceptions
self._consecutive_exceptions = 0
self._setup = setup
self._init = init
self._inactive_callback = None
Expand Down Expand Up @@ -259,6 +273,16 @@ def _deactivate_connection(self):
self._con.terminate()
self._con = None

async def maybe_close_bad_connection(self, exc_type):
if self._max_consecutive_exceptions > 0 and \
isinstance(exc_type, BAD_CONN_EXCEPTION):

self._consecutive_exceptions += 1

if self._consecutive_exceptions > self._max_consecutive_exceptions:
await self.close()
self._consecutive_exceptions = 0


class Pool:
"""A connection pool.
Expand All @@ -285,6 +309,7 @@ def __init__(self, *connect_args,
init,
loop,
connection_class,
max_consecutive_exceptions,
**connect_kwargs):

if loop is None:
Expand Down Expand Up @@ -331,6 +356,7 @@ def __init__(self, *connect_args,
connect_kwargs=connect_kwargs,
max_queries=max_queries,
max_inactive_time=max_inactive_connection_lifetime,
max_consecutive_exceptions=max_consecutive_exceptions,
setup=setup,
init=init)

Expand Down Expand Up @@ -459,7 +485,8 @@ async def _acquire_impl():
ch = await self._queue.get() # type: PoolConnectionHolder
try:
proxy = await ch.acquire() # type: PoolConnectionProxy
except Exception:
except Exception as e:
await ch.maybe_close_bad_connection(e)
self._queue.put_nowait(ch)
raise
else:
Expand Down Expand Up @@ -580,6 +607,11 @@ async def __aexit__(self, *exc):
self.done = True
con = self.connection
self.connection = None
if not exc[0]:
con._holder._consecutive_exceptions = 0
else:
# Pass exception type to ConnectionHolder
await con._holder.maybe_close_bad_connection(exc[0])
await self.pool.release(con)

def __await__(self):
Expand All @@ -592,6 +624,7 @@ def create_pool(dsn=None, *,
max_size=10,
max_queries=50000,
max_inactive_connection_lifetime=300.0,
max_consecutive_exceptions=0,
setup=None,
init=None,
loop=None,
Expand Down Expand Up @@ -651,6 +684,12 @@ def create_pool(dsn=None, *,
Number of seconds after which inactive connections in the
pool will be closed. Pass ``0`` to disable this mechanism.

:param int max_consecutive_exceptions:
the maximum number of consecutive exceptions that may be raised by a
single connection before that connection is assumed corrupt (ex.
pointing to an old DB after a failover) and will therefore be closed.
Pass ``0`` to disable.

:param coroutine setup:
A coroutine to prepare a connection right before it is returned
from :meth:`Pool.acquire() <pool.Pool.acquire>`. An example use
Expand Down Expand Up @@ -699,4 +738,5 @@ def create_pool(dsn=None, *,
min_size=min_size, max_size=max_size,
max_queries=max_queries, loop=loop, setup=setup, init=init,
max_inactive_connection_lifetime=max_inactive_connection_lifetime,
max_consecutive_exceptions=max_consecutive_exceptions,
**connect_kwargs)