Skip to content

Commit

Permalink
Close cursor portals once the iterator is exhausted (#1088)
Browse files Browse the repository at this point in the history
When iterating on a cursor, make sure to close the portal once iteration
is done.  This prevents the cursor from holding onto resources until the
end of transaction.

Fixes: #1008
  • Loading branch information
elprans committed Oct 9, 2023
1 parent b7ffab6 commit ca9f03b
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 6 deletions.
16 changes: 15 additions & 1 deletion asyncpg/cursor.py
Expand Up @@ -158,6 +158,17 @@ async def _exec(self, n, timeout):
self._state, self._portal_name, n, True, timeout)
return buffer

async def _close_portal(self, timeout):
self._check_ready()

if not self._portal_name:
raise exceptions.InterfaceError(
'cursor does not have an open portal')

protocol = self._connection._protocol
await protocol.close_portal(self._portal_name, timeout)
self._portal_name = None

def __repr__(self):
attrs = []
if self._exhausted:
Expand Down Expand Up @@ -219,14 +230,17 @@ async def __anext__(self):
)
self._state.attach()

if not self._portal_name:
if not self._portal_name and not self._exhausted:
buffer = await self._bind_exec(self._prefetch, self._timeout)
self._buffer.extend(buffer)

if not self._buffer and not self._exhausted:
buffer = await self._exec(self._prefetch, self._timeout)
self._buffer.extend(buffer)

if self._portal_name and self._exhausted:
await self._close_portal(self._timeout)

if self._buffer:
return self._buffer.popleft()

Expand Down
23 changes: 23 additions & 0 deletions asyncpg/protocol/protocol.pyx
Expand Up @@ -327,6 +327,29 @@ cdef class BaseProtocol(CoreProtocol):
finally:
return await waiter

@cython.iterable_coroutine
async def close_portal(self, str portal_name, timeout):

if self.cancel_waiter is not None:
await self.cancel_waiter
if self.cancel_sent_waiter is not None:
await self.cancel_sent_waiter
self.cancel_sent_waiter = None

self._check_state()
timeout = self._get_timeout_impl(timeout)

waiter = self._new_waiter(timeout)
try:
self._close(
portal_name,
True) # network op
except Exception as ex:
waiter.set_exception(ex)
self._coreproto_error()
finally:
return await waiter

@cython.iterable_coroutine
async def query(self, query, timeout):
if self.cancel_waiter is not None:
Expand Down
20 changes: 15 additions & 5 deletions tests/test_cursor.py
Expand Up @@ -84,11 +84,21 @@ async def test_cursor_iterable_06(self):
recs = []

async with self.con.transaction():
async for rec in self.con.cursor(
'SELECT generate_series(0, $1::int)', 10):
recs.append(rec)

self.assertEqual(recs, [(i,) for i in range(11)])
await self.con.execute('''
CREATE TABLE cursor_iterable_06 (id int);
INSERT INTO cursor_iterable_06 VALUES (0), (1);
''')
try:
cur = self.con.cursor('SELECT * FROM cursor_iterable_06')
async for rec in cur:
recs.append(rec)
finally:
# Check that after iteration has exhausted the cursor,
# its associated portal is closed properly, unlocking
# the table.
await self.con.execute('DROP TABLE cursor_iterable_06')

self.assertEqual(recs, [(i,) for i in range(2)])


class TestCursor(tb.ConnectedTestCase):
Expand Down

0 comments on commit ca9f03b

Please sign in to comment.