Skip to content

Commit

Permalink
Add tests for connection closure listeners
Browse files Browse the repository at this point in the history
  • Loading branch information
elprans committed May 2, 2020
1 parent 3a637f1 commit 0a23096
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 2 deletions.
4 changes: 4 additions & 0 deletions asyncpg/_testbase/fuzzer.py
Expand Up @@ -145,6 +145,10 @@ def _close_connection(self, connection):
if conn_task is not None:
conn_task.cancel()

def close_all_connections(self):
for conn in list(self.connections):
self._close_connection(conn)


class Connection:
def __init__(self, client_sock, backend_sock, proxy):
Expand Down
7 changes: 6 additions & 1 deletion asyncpg/connection.py
Expand Up @@ -185,11 +185,16 @@ def add_close_listener(self, callback):
:param callable callback:
A callable receiving one argument:
**connection**: a Connection the callback is registered with.
.. versionadded:: 0.21.0
"""
self._close_listeners.add(callback)

def remove_close_listener(self, callback):
"""Remove a listening callback for the connection closing."""
"""Remove a listening callback for the connection closing.
.. versionadded:: 0.21.0
"""
self._close_listeners.discard(callback)

def get_server_pid(self):
Expand Down
2 changes: 1 addition & 1 deletion tests/test_adversity.py
Expand Up @@ -70,4 +70,4 @@ def kill_connectivity():
await asyncio.gather(
*workers, return_exceptions=True)
finally:
pool.terminate()
pool.terminate()
42 changes: 42 additions & 0 deletions tests/test_listeners.py
Expand Up @@ -6,6 +6,10 @@


import asyncio
import os
import platform
import sys
import unittest

from asyncpg import _testbase as tb
from asyncpg import exceptions
Expand Down Expand Up @@ -272,3 +276,41 @@ def listener1(*args):
pass

con.add_log_listener(listener1)


@unittest.skipIf(os.environ.get('PGHOST'), 'using remote cluster for testing')
@unittest.skipIf(
platform.system() == 'Windows' and
sys.version_info >= (3, 8),
'not compatible with ProactorEventLoop which is default in Python 3.8')
class TestConnectionCloseListener(tb.ProxiedClusterTestCase):

async def test_connection_close_callback_called_on_remote(self):

called = False

def close_cb(con):
nonlocal called
called = True

con = await self.connect()
con.add_close_listener(close_cb)
self.proxy.close_all_connections()
try:
await con.fetchval('SELECT 1')
except Exception:
pass
self.assertTrue(called)

async def test_connection_close_callback_called_on_local(self):

called = False

def close_cb(con):
nonlocal called
called = True

con = await self.connect()
con.add_close_listener(close_cb)
await con.close()
self.assertTrue(called)

0 comments on commit 0a23096

Please sign in to comment.