Skip to content

Commit

Permalink
some pool usage tuning
Browse files Browse the repository at this point in the history
  • Loading branch information
sonic182 committed Feb 6, 2024
1 parent ef02c74 commit 558fb1a
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 20 deletions.
16 changes: 7 additions & 9 deletions aiosonic/__init__.py
Expand Up @@ -208,7 +208,8 @@ async def json(self, json_decoder=loads) -> dict:

async def read_chunks(self) -> AsyncIterator[bytes]:
"""Read chunks from chunked response."""
assert self.connection
if not self.connection:
raise ConnectionError("missing connection, possible already read response.")
try:
while True and not self.chunks_readed:
chunk_size = int((await self.connection.readline()).rstrip(), 16)
Expand All @@ -221,15 +222,13 @@ async def read_chunks(self) -> AsyncIterator[bytes]:
self.chunks_readed = True
finally:
# Ensure the conn get's released
await self.connection.release()
self.connection.release()
self.connection = None

def __del__(self):
# clean it
if self.connection and self.connection.blocked:
if self.connection.writer:
self.connection.writer._transport.abort()
self.connection.blocked = False
self.connection.connector.pool.release(self.connection)
if self.connection:
self.connection.ensure_released()

def _set_request_meta(self, urlparsed: ParseResult):
self.request_meta = {"from_path": urlparsed.path or "/"}
Expand Down Expand Up @@ -474,10 +473,9 @@ async def _do_request(

if keepalive:
connection.keep_alive()
response._set_connection(connection)
else:
connection.keep = False
response._set_connection(connection)
response._set_connection(connection)

return response

Expand Down
16 changes: 12 additions & 4 deletions aiosonic/connection.py
Expand Up @@ -88,7 +88,7 @@ async def readexactly(self, size: int):
raise MissingReaderException("reader not set.")
return await self.reader.readexactly(size)

async def readuntil(self, separator: bytes = b'\n'):
async def readuntil(self, separator: bytes = b"\n"):
"""Read until separator"""
if not self.reader:
raise MissingReaderException("reader not set.")
Expand Down Expand Up @@ -182,13 +182,13 @@ async def __aexit__(self, exc_type: None, exc: None, tb: None) -> None:
self.close()

if not self.blocked:
await self.release()
self.release()
if self.h2handler: # pragma: no cover
self.h2handler.cleanup()

async def release(self) -> None:
def release(self) -> None:
"""Release connection."""
await self.connector.release(self)
self.connector.release(self)
self.requests_count += 1
# if keep False and blocked (by latest chunked response), close it.
# server said to close it.
Expand All @@ -204,6 +204,14 @@ def __del__(self) -> None:
"""Cleanup."""
self.close(True)

def ensure_released(self):
"""Ensure the connection is released."""
if self.blocked:
if self.writer:
self.writer._transport.abort()
self.blocked = False
self.release()

def close(self, check_closing: bool = False) -> None:
"""Close connection if opened."""
if self.writer:
Expand Down
10 changes: 4 additions & 6 deletions aiosonic/connectors.py
Expand Up @@ -97,18 +97,16 @@ async def after_acquire(self, urlparsed, conn, verify, ssl, timeouts, http2):
)
except TimeoutException:
conn.close()
await self.release(conn)
self.release(conn)
raise ConnectTimeout()
except BaseException as ex:
await self.release(conn)
self.release(conn)
raise ex
return conn

async def release(self, conn):
def release(self, conn):
"""Release connection."""
res = self.pool.release(conn)
if isinstance(res, Coroutine):
await res
self.pool.release(conn)

async def wait_free_pool(self):
"""Wait until free pool."""
Expand Down
4 changes: 3 additions & 1 deletion tests/test_aiosonic.py
Expand Up @@ -486,7 +486,9 @@ async def test_get_chunked_response(app, aiohttp_server):

async for chunk in res.read_chunks():
assert chunk in chunks
assert await res.text() == "" # chunks already readed manually

with pytest.raises(ConnectionError):
assert await res.text() == "" # chunks already readed manually
await server.close()


Expand Down

0 comments on commit 558fb1a

Please sign in to comment.