Skip to content

Commit

Permalink
Adds max_consecutive_exceptions to pool
Browse files Browse the repository at this point in the history
  • Loading branch information
egalpin committed Feb 2, 2018
1 parent 45f0e2a commit 9467932
Showing 1 changed file with 44 additions and 3 deletions.
47 changes: 44 additions & 3 deletions asyncpg/pool.py
Expand Up @@ -6,6 +6,7 @@


import asyncio
from concurrent.futures._base import TimeoutError
import functools
import inspect
import time
Expand All @@ -15,6 +16,15 @@
from . import exceptions


BAD_CONN_EXCEPTION = (
exceptions._base.PostgresError,
exceptions._base.FatalPostgresError,
exceptions._base.UnknownPostgresError,
TimeoutError,
ConnectionRefusedError,
)


class PoolConnectionProxyMeta(type):

def __new__(mcls, name, bases, dct, *, wrap=False):
Expand Down Expand Up @@ -96,10 +106,12 @@ class PoolConnectionHolder:
'_connect_args', '_connect_kwargs',
'_max_queries', '_setup', '_init',
'_max_inactive_time', '_in_use',
'_inactive_callback', '_timeout')
'_inactive_callback', '_timeout',
'_max_consecutive_exceptions', '_consecutive_exceptions')

def __init__(self, pool, *, connect_args, connect_kwargs,
max_queries, setup, init, max_inactive_time):
max_queries, setup, init, max_inactive_time,
max_consecutive_exceptions):

self._pool = pool
self._con = None
Expand All @@ -108,6 +120,8 @@ def __init__(self, pool, *, connect_args, connect_kwargs,
self._connect_kwargs = connect_kwargs
self._max_queries = max_queries
self._max_inactive_time = max_inactive_time
self._max_consecutive_exceptions = max_consecutive_exceptions
self._consecutive_exceptions = 0
self._setup = setup
self._init = init
self._inactive_callback = None
Expand Down Expand Up @@ -259,6 +273,16 @@ def _deactivate_connection(self):
self._con.terminate()
self._con = None

async def maybe_close_bad_connection(self, exc_type):
if self._max_consecutive_exceptions > 0 and \
isinstance(exc_type, BAD_CONN_EXCEPTION):

self._consecutive_exceptions += 1

if self._consecutive_exceptions > self._max_consecutive_exceptions:
await self.close()
self._consecutive_exceptions = 0


class Pool:
"""A connection pool.
Expand All @@ -285,6 +309,7 @@ def __init__(self, *connect_args,
init,
loop,
connection_class,
max_consecutive_exceptions,
**connect_kwargs):

if loop is None:
Expand Down Expand Up @@ -331,6 +356,7 @@ def __init__(self, *connect_args,
connect_kwargs=connect_kwargs,
max_queries=max_queries,
max_inactive_time=max_inactive_connection_lifetime,
max_consecutive_exceptions=max_consecutive_exceptions,
setup=setup,
init=init)

Expand Down Expand Up @@ -380,6 +406,7 @@ async def execute(self, query: str, *args, timeout: float=None) -> str:
async with self.acquire() as con:
return await con.execute(query, *args, timeout=timeout)


async def executemany(self, command: str, args, *, timeout: float=None):
"""Execute an SQL *command* for each sequence of arguments in *args*.
Expand Down Expand Up @@ -459,7 +486,8 @@ async def _acquire_impl():
ch = await self._queue.get() # type: PoolConnectionHolder
try:
proxy = await ch.acquire() # type: PoolConnectionProxy
except Exception:
except Exception as e:
await ch.maybe_close_bad_connection(e)
self._queue.put_nowait(ch)
raise
else:
Expand Down Expand Up @@ -580,6 +608,11 @@ async def __aexit__(self, *exc):
self.done = True
con = self.connection
self.connection = None
if not exc[0]:
con._holder._consecutive_exceptions = 0
else:
# Pass exception type to ConnectionHolder
await con._holder.maybe_close_bad_connection(exc[0])
await self.pool.release(con)

def __await__(self):
Expand All @@ -592,6 +625,7 @@ def create_pool(dsn=None, *,
max_size=10,
max_queries=50000,
max_inactive_connection_lifetime=300.0,
max_consecutive_exceptions=0,
setup=None,
init=None,
loop=None,
Expand Down Expand Up @@ -651,6 +685,12 @@ def create_pool(dsn=None, *,
Number of seconds after which inactive connections in the
pool will be closed. Pass ``0`` to disable this mechanism.
:param int max_consecutive_exceptions:
the maximum number of consecutive exceptions that may be raised by a
single connection before that connection is assumed corrupt (ex.
pointing to an old DB after a failover) and will therefore be closed.
Pass ``0`` to disable.
:param coroutine setup:
A coroutine to prepare a connection right before it is returned
from :meth:`Pool.acquire() <pool.Pool.acquire>`. An example use
Expand Down Expand Up @@ -699,4 +739,5 @@ def create_pool(dsn=None, *,
min_size=min_size, max_size=max_size,
max_queries=max_queries, loop=loop, setup=setup, init=init,
max_inactive_connection_lifetime=max_inactive_connection_lifetime,
max_consecutive_exceptions=max_consecutive_exceptions,
**connect_kwargs)

0 comments on commit 9467932

Please sign in to comment.