Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Misc/app configure timeout ceil threshold #6316

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/6316.misc
@@ -0,0 +1 @@
Configure timeout value ceil threshold at the application level.
1 change: 1 addition & 0 deletions CONTRIBUTORS.txt
Expand Up @@ -158,6 +158,7 @@ Jakob Ackermann
Jakub Wilk
Jan Buchar
Jashandeep Sohi
Jean-Baptiste Estival
Jens Steinhauser
Jeonghun Lee
Jeongkyu Shin
Expand Down
11 changes: 9 additions & 2 deletions aiohttp/client.py
Expand Up @@ -143,6 +143,7 @@ class ClientTimeout:
connect: Optional[float] = None
sock_read: Optional[float] = None
sock_connect: Optional[float] = None
ceil_threshold: float = 5

# pool_queue_timeout: Optional[float] = None
# dns_resolution_timeout: Optional[float] = None
Expand Down Expand Up @@ -402,7 +403,9 @@ async def _request(
real_timeout = timeout
# timeout is cumulative for all request operations
# (request, redirects, responses, data consuming)
tm = TimeoutHandle(self._loop, real_timeout.total)
tm = TimeoutHandle(
self._loop, real_timeout.total, ceil_threshold=real_timeout.ceil_threshold
)
handle = tm.start()

if read_bufsize is None:
Expand Down Expand Up @@ -489,7 +492,10 @@ async def _request(

# connection timeout
try:
async with ceil_timeout(real_timeout.connect):
async with ceil_timeout(
real_timeout.connect,
ceil_threshold=real_timeout.ceil_threshold,
):
assert self._connector is not None
conn = await self._connector.connect(
req, traces=traces, timeout=real_timeout
Expand All @@ -509,6 +515,7 @@ async def _request(
auto_decompress=self._auto_decompress,
read_timeout=real_timeout.sock_read,
read_bufsize=read_bufsize,
timeout_ceil_threshold=self._connector._timeout_ceil_threshold,
)

try:
Expand Down
5 changes: 5 additions & 0 deletions aiohttp/client_proto.py
Expand Up @@ -36,6 +36,8 @@ def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
self._read_timeout = None # type: Optional[float]
self._read_timeout_handle = None # type: Optional[asyncio.TimerHandle]

self._timeout_ceil_threshold = 5 # type: Optional[float]

self.closed = self._loop.create_future() # type: asyncio.Future[None]

@property
Expand Down Expand Up @@ -150,12 +152,15 @@ def set_response_params(
auto_decompress: bool = True,
read_timeout: Optional[float] = None,
read_bufsize: int = 2 ** 16,
timeout_ceil_threshold: float = 5,
) -> None:
self._skip_payload = skip_payload

self._read_timeout = read_timeout
self._reschedule_timeout()

self._timeout_ceil_threshold = timeout_ceil_threshold

self._parser = HttpResponseParser(
self,
self._loop,
Expand Down
14 changes: 12 additions & 2 deletions aiohttp/client_ws.py
Expand Up @@ -94,7 +94,12 @@ def _reset_heartbeat(self) -> None:

if self._heartbeat is not None:
self._heartbeat_cb = call_later(
self._send_heartbeat, self._heartbeat, self._loop
self._send_heartbeat,
self._heartbeat,
self._loop,
timeout_ceil_threshold=self._conn._connector._timeout_ceil_threshold
if self._conn is not None
else 5,
)

def _send_heartbeat(self) -> None:
Expand All @@ -107,7 +112,12 @@ def _send_heartbeat(self) -> None:
if self._pong_response_cb is not None:
self._pong_response_cb.cancel()
self._pong_response_cb = call_later(
self._pong_not_received, self._pong_heartbeat, self._loop
self._pong_not_received,
self._pong_heartbeat,
self._loop,
timeout_ceil_threshold=self._conn._connector._timeout_ceil_threshold
if self._conn is not None
else 5,
)

def _pong_not_received(self) -> None:
Expand Down
46 changes: 38 additions & 8 deletions aiohttp/connector.py
Expand Up @@ -171,6 +171,8 @@ class BaseConnector:
limit_per_host - Number of simultaneous connections to one host.
enable_cleanup_closed - Enables clean-up closed ssl transports.
Disabled by default.
timeout_ceil_threshold - Trigger ceiling of timeout values when
it's above timeout_ceil_threshold.
loop - Optional event loop.
"""

Expand All @@ -188,6 +190,7 @@ def __init__(
limit: int = 100,
limit_per_host: int = 0,
enable_cleanup_closed: bool = False,
timeout_ceil_threshold: float = 5,
) -> None:

if force_close:
Expand All @@ -199,6 +202,8 @@ def __init__(
if keepalive_timeout is sentinel:
keepalive_timeout = 15.0

self._timeout_ceil_threshold = timeout_ceil_threshold

loop = asyncio.get_running_loop()

self._closed = False
Expand Down Expand Up @@ -326,7 +331,11 @@ def _cleanup(self) -> None:

if self._conns:
self._cleanup_handle = helpers.weakref_handle(
self, "_cleanup", timeout, self._loop
self,
"_cleanup",
timeout,
self._loop,
timeout_ceil_threshold=self._timeout_ceil_threshold,
)

def _drop_acquired_per_host(
Expand Down Expand Up @@ -356,7 +365,11 @@ def _cleanup_closed(self) -> None:

if not self._cleanup_closed_disabled:
self._cleanup_closed_handle = helpers.weakref_handle(
self, "_cleanup_closed", self._cleanup_closed_period, self._loop
self,
"_cleanup_closed",
self._cleanup_closed_period,
self._loop,
timeout_ceil_threshold=self._timeout_ceil_threshold,
)

async def close(self) -> None:
Expand Down Expand Up @@ -639,7 +652,11 @@ def _release(

if self._cleanup_handle is None:
self._cleanup_handle = helpers.weakref_handle(
self, "_cleanup", self._keepalive_timeout, self._loop
self,
"_cleanup",
self._keepalive_timeout,
self._loop,
timeout_ceil_threshold=self._timeout_ceil_threshold,
)

async def _create_connection(
Expand Down Expand Up @@ -728,13 +745,15 @@ def __init__(
limit: int = 100,
limit_per_host: int = 0,
enable_cleanup_closed: bool = False,
timeout_ceil_threshold: float = 5,
) -> None:
super().__init__(
keepalive_timeout=keepalive_timeout,
force_close=force_close,
limit=limit,
limit_per_host=limit_per_host,
enable_cleanup_closed=enable_cleanup_closed,
timeout_ceil_threshold=timeout_ceil_threshold,
)

if not isinstance(ssl, SSL_ALLOWED_TYPES):
Expand Down Expand Up @@ -945,7 +964,9 @@ async def _wrap_create_connection(
**kwargs: Any,
) -> Tuple[asyncio.Transport, ResponseHandler]:
try:
async with ceil_timeout(timeout.sock_connect):
async with ceil_timeout(
timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
):
return await self._loop.create_connection(*args, **kwargs) # type: ignore[return-value] # noqa
except cert_errors as exc:
raise ClientConnectorCertificateError(req.connection_key, exc) from exc
Expand Down Expand Up @@ -1009,7 +1030,9 @@ async def _start_tls_connection(
sslcontext = cast(ssl.SSLContext, self._get_ssl_context(req))

try:
async with ceil_timeout(timeout.sock_connect):
async with ceil_timeout(
timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
):
try:
tls_transport = await self._loop.start_tls(
underlying_transport,
Expand Down Expand Up @@ -1185,7 +1208,10 @@ async def _create_proxy_connection(
# read_until_eof=True will ensure the connection isn't closed
# once the response is received and processed allowing
# START_TLS to work on the connection below.
protocol.set_response_params(read_until_eof=True)
protocol.set_response_params(
read_until_eof=True,
timeout_ceil_threshold=self._timeout_ceil_threshold,
)
resp = await proxy_resp.start(conn)
except BaseException:
proxy_resp.close()
Expand Down Expand Up @@ -1263,7 +1289,9 @@ async def _create_connection(
self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"
) -> ResponseHandler:
try:
async with ceil_timeout(timeout.sock_connect):
async with ceil_timeout(
timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
):
_, proto = await self._loop.create_unix_connection(
self._factory, self._path
)
Expand Down Expand Up @@ -1319,7 +1347,9 @@ async def _create_connection(
self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"
) -> ResponseHandler:
try:
async with ceil_timeout(timeout.sock_connect):
async with ceil_timeout(
timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
):
_, proto = await self._loop.create_pipe_connection( # type: ignore[attr-defined] # noqa: E501
self._factory, self._path
)
Expand Down
29 changes: 21 additions & 8 deletions aiohttp/helpers.py
Expand Up @@ -573,23 +573,30 @@ def _weakref_handle(info: "Tuple[weakref.ref[object], str]") -> None:


def weakref_handle(
ob: object, name: str, timeout: float, loop: asyncio.AbstractEventLoop
ob: object,
name: str,
timeout: float,
loop: asyncio.AbstractEventLoop,
timeout_ceil_threshold: float = 5,
) -> Optional[asyncio.TimerHandle]:
if timeout is not None and timeout > 0:
when = loop.time() + timeout
if timeout >= 5:
if timeout >= timeout_ceil_threshold:
when = ceil(when)

return loop.call_at(when, _weakref_handle, (weakref.ref(ob), name))
return None


def call_later(
cb: Callable[[], Any], timeout: float, loop: asyncio.AbstractEventLoop
cb: Callable[[], Any],
timeout: float,
loop: asyncio.AbstractEventLoop,
timeout_ceil_threshold: float = 5,
) -> Optional[asyncio.TimerHandle]:
if timeout is not None and timeout > 0:
when = loop.time() + timeout
if timeout > 5:
if timeout > timeout_ceil_threshold:
when = ceil(when)
return loop.call_at(when, cb)
return None
Expand All @@ -599,10 +606,14 @@ class TimeoutHandle:
"""Timeout handle"""

def __init__(
self, loop: asyncio.AbstractEventLoop, timeout: Optional[float]
self,
loop: asyncio.AbstractEventLoop,
timeout: Optional[float],
ceil_threshold: float = 5,
) -> None:
self._timeout = timeout
self._loop = loop
self._ceil_threshold = ceil_threshold
self._callbacks = (
[]
) # type: List[Tuple[Callable[..., None], Tuple[Any, ...], Dict[str, Any]]]
Expand All @@ -619,7 +630,7 @@ def start(self) -> Optional[asyncio.Handle]:
timeout = self._timeout
if timeout is not None and timeout > 0:
when = self._loop.time() + timeout
if timeout >= 5:
if timeout >= self._ceil_threshold:
when = ceil(when)
return self._loop.call_at(when, self.__call__)
else:
Expand Down Expand Up @@ -701,14 +712,16 @@ def timeout(self) -> None:
self._cancelled = True


def ceil_timeout(delay: Optional[float]) -> async_timeout.Timeout:
def ceil_timeout(
delay: Optional[float], ceil_threshold: float = 5
) -> async_timeout.Timeout:
if delay is None or delay <= 0:
return async_timeout.timeout(None)

loop = asyncio.get_running_loop()
now = loop.time()
when = now + delay
if delay > 5:
if delay > ceil_threshold:
when = ceil(when)
return async_timeout.timeout_at(when)

Expand Down
15 changes: 14 additions & 1 deletion aiohttp/web_protocol.py
Expand Up @@ -144,6 +144,10 @@ class RequestHandler(BaseProtocol):

max_headers -- Optional maximum header size

timeout_ceil_threshold -- Optional value to specify
threshold to ceil() timeout
values

"""

KEEPALIVE_RESCHEDULE_DELAY = 1
Expand Down Expand Up @@ -173,6 +177,7 @@ class RequestHandler(BaseProtocol):
"_close",
"_force_close",
"_current_request",
"_timeout_ceil_threshold",
)

def __init__(
Expand All @@ -192,6 +197,7 @@ def __init__(
lingering_time: float = 10.0,
read_bufsize: int = 2 ** 16,
auto_decompress: bool = True,
timeout_ceil_threshold: float = 5,
):
super().__init__(loop)

Expand Down Expand Up @@ -228,6 +234,12 @@ def __init__(
auto_decompress=auto_decompress,
) # type: Optional[HttpRequestParser]

self._timeout_ceil_threshold = 5 # type: float
try:
self._timeout_ceil_threshold = float(timeout_ceil_threshold)
except (TypeError, ValueError):
pass

self.logger = logger
self.access_log = access_log
if access_log:
Expand Down Expand Up @@ -440,7 +452,8 @@ def _process_keepalive(self) -> None:
# not all request handlers are done,
# reschedule itself to next second
self._keepalive_handle = self._loop.call_later(
self.KEEPALIVE_RESCHEDULE_DELAY, self._process_keepalive
self.KEEPALIVE_RESCHEDULE_DELAY,
self._process_keepalive,
)

async def _handle_request(
Expand Down
11 changes: 10 additions & 1 deletion aiohttp/web_server.py
Expand Up @@ -65,4 +65,13 @@ async def shutdown(self, timeout: Optional[float] = None) -> None:
self._connections.clear()

def __call__(self) -> RequestHandler:
return RequestHandler(self, loop=self._loop, **self._kwargs)
try:
return RequestHandler(self, loop=self._loop, **self._kwargs)
except TypeError:
# Failsafe creation: remove all custom handler_args
kwargs = {
k: v
for k, v in self._kwargs.items()
if k in ["debug", "access_log_class"]
}
return RequestHandler(self, loop=self._loop, **kwargs)