Skip to content

Commit

Permalink
Some pool usage tuning (#457)
Browse files Browse the repository at this point in the history
* some pool usage tuning

* restore test

* private conn and remove shutdown method

* more timeout pypy test
  • Loading branch information
sonic182 committed Feb 6, 2024
1 parent ef02c74 commit f432b41
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 47 deletions.
31 changes: 12 additions & 19 deletions aiosonic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def __init__(self):
self.raw_headers = []
self.body = b""
self.response_initial = {}
self.connection = None
self._connection = None
self.chunked = False
self.compressed = b""
self.chunks_readed = False
Expand Down Expand Up @@ -139,7 +139,7 @@ def _update_cookies(self, header_tuple):

def _set_connection(self, connection: Connection):
"""Set header to response."""
self.connection = connection
self._connection = connection

@property
def status_code(self) -> int:
Expand Down Expand Up @@ -208,28 +208,27 @@ async def json(self, json_decoder=loads) -> dict:

async def read_chunks(self) -> AsyncIterator[bytes]:
"""Read chunks from chunked response."""
assert self.connection
if not self._connection:
raise ConnectionError("missing connection, possible already read response.")
try:
while True and not self.chunks_readed:
chunk_size = int((await self.connection.readline()).rstrip(), 16)
chunk_size = int((await self._connection.readline()).rstrip(), 16)
if not chunk_size:
# read last CRLF
await self.connection.readline()
await self._connection.readline()
break
chunk = await self.connection.readexactly(chunk_size + 2)
chunk = await self._connection.readexactly(chunk_size + 2)
yield chunk[:-2]
self.chunks_readed = True
finally:
# Ensure the conn get's released
await self.connection.release()
self._connection.release()
self._connection = None

def __del__(self):
# clean it
if self.connection and self.connection.blocked:
if self.connection.writer:
self.connection.writer._transport.abort()
self.connection.blocked = False
self.connection.connector.pool.release(self.connection)
if self._connection:
self._connection.ensure_released()

def _set_request_meta(self, urlparsed: ParseResult):
self.request_meta = {"from_path": urlparsed.path or "/"}
Expand Down Expand Up @@ -474,10 +473,9 @@ async def _do_request(

if keepalive:
connection.keep_alive()
response._set_connection(connection)
else:
connection.keep = False
response._set_connection(connection)
response._set_connection(connection)

return response

Expand Down Expand Up @@ -514,11 +512,6 @@ async def __aenter__(self):
async def __aexit__(self, _exc_type, exc, _tb): # type: ignore
if exc:
raise exc
await self.shutdown()

async def shutdown(self):
"""Cleanup connections, this method makes client unusable."""
await self.connector.cleanup()

async def _request_with_body(
self,
Expand Down
16 changes: 12 additions & 4 deletions aiosonic/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async def readexactly(self, size: int):
raise MissingReaderException("reader not set.")
return await self.reader.readexactly(size)

async def readuntil(self, separator: bytes = b'\n'):
async def readuntil(self, separator: bytes = b"\n"):
"""Read until separator"""
if not self.reader:
raise MissingReaderException("reader not set.")
Expand Down Expand Up @@ -182,13 +182,13 @@ async def __aexit__(self, exc_type: None, exc: None, tb: None) -> None:
self.close()

if not self.blocked:
await self.release()
self.release()
if self.h2handler: # pragma: no cover
self.h2handler.cleanup()

async def release(self) -> None:
def release(self) -> None:
"""Release connection."""
await self.connector.release(self)
self.connector.release(self)
self.requests_count += 1
# if keep False and blocked (by latest chunked response), close it.
# server said to close it.
Expand All @@ -204,6 +204,14 @@ def __del__(self) -> None:
"""Cleanup."""
self.close(True)

def ensure_released(self):
"""Ensure the connection is released."""
if self.blocked:
if self.writer:
self.writer._transport.abort()
self.blocked = False
self.release()

def close(self, check_closing: bool = False) -> None:
"""Close connection if opened."""
if self.writer:
Expand Down
10 changes: 4 additions & 6 deletions aiosonic/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,16 @@ async def after_acquire(self, urlparsed, conn, verify, ssl, timeouts, http2):
)
except TimeoutException:
conn.close()
await self.release(conn)
self.release(conn)
raise ConnectTimeout()
except BaseException as ex:
await self.release(conn)
self.release(conn)
raise ex
return conn

async def release(self, conn):
def release(self, conn):
"""Release connection."""
res = self.pool.release(conn)
if isinstance(res, Coroutine):
await res
self.pool.release(conn)

async def wait_free_pool(self):
"""Wait until free pool."""
Expand Down
2 changes: 1 addition & 1 deletion aiosonic/pools.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async def cleanup(self):


class SmartPool:
"""Pool which utilizes alive connections."""
"""Pool which priorizes the reusage of connections."""

def __init__(self, connector, pool_size, connection_cls):
self.pool_size = pool_size
Expand Down
31 changes: 15 additions & 16 deletions tests/test_aiosonic.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,19 +456,16 @@ async def test_simple_get_ssl_ctx(app, aiohttp_server, ssl_context):
await server.close()


# sometimes doesn't raise and get stuck
#
# @pytest.mark.asyncio
# @pytest.mark.timeout(2)
# async def test_simple_get_ssl_no_valid(app, aiohttp_server, ssl_context):
# """Test simple get with https no valid."""
# server = await aiohttp_server(app, ssl=ssl_context)
# url = 'https://localhost:%d' % server.port
# async with aiosonic.HTTPClient() as client:
#
# with pytest.raises(ssl.SSLError):
# await client.get(url)
# await server.close()
@pytest.mark.asyncio
@pytest.mark.timeout(2)
async def test_simple_get_ssl_no_valid(app, aiohttp_server, ssl_context):
"""Test simple get with https no valid."""
server = await aiohttp_server(app, ssl=ssl_context)
url = "https://localhost:%d" % server.port
async with aiosonic.HTTPClient() as client:
with pytest.raises(ssl.SSLError):
await client.get(url)
await server.close()


@pytest.mark.asyncio
Expand All @@ -479,14 +476,16 @@ async def test_get_chunked_response(app, aiohttp_server):

async with aiosonic.HTTPClient() as client:
res = await client.get(url)
assert res.connection
assert res._connection
assert res.status_code == 200

chunks = [b"foo", b"bar"]

async for chunk in res.read_chunks():
assert chunk in chunks
assert await res.text() == "" # chunks already readed manually

with pytest.raises(ConnectionError):
assert await res.text() == "" # chunks already readed manually
await server.close()


Expand Down Expand Up @@ -527,7 +526,7 @@ async def test_read_chunks_by_text_method(app, aiohttp_server):

async with aiosonic.HTTPClient() as client:
res = await client.get(url)
assert res.connection
assert res._connection
assert res.status_code == 200
assert await res.text() == "foobar"
await server.close()
Expand Down
2 changes: 1 addition & 1 deletion tests/test_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


@pytest.mark.asyncio
@pytest.mark.timeout(15)
@pytest.mark.timeout(30)
async def test_proxy_request(app, aiohttp_server, proxy_serv):
"""Test proxy request."""
server = await aiohttp_server(app)
Expand Down

0 comments on commit f432b41

Please sign in to comment.