diff --git a/CHANGES/6316.misc b/CHANGES/6316.misc new file mode 100644 index 00000000000..fa8519e12d3 --- /dev/null +++ b/CHANGES/6316.misc @@ -0,0 +1 @@ +Configure timeout value ceil threshold at the application level. diff --git a/CONTRIBUTORS.txt b/CONTRIBUTORS.txt index bd78d659a48..903c87af870 100644 --- a/CONTRIBUTORS.txt +++ b/CONTRIBUTORS.txt @@ -158,6 +158,7 @@ Jakob Ackermann Jakub Wilk Jan Buchar Jashandeep Sohi +Jean-Baptiste Estival Jens Steinhauser Jeonghun Lee Jeongkyu Shin diff --git a/aiohttp/client.py b/aiohttp/client.py index d30aed073f5..12e455325ee 100644 --- a/aiohttp/client.py +++ b/aiohttp/client.py @@ -143,6 +143,7 @@ class ClientTimeout: connect: Optional[float] = None sock_read: Optional[float] = None sock_connect: Optional[float] = None + ceil_threshold: float = 5 # pool_queue_timeout: Optional[float] = None # dns_resolution_timeout: Optional[float] = None @@ -402,7 +403,9 @@ async def _request( real_timeout = timeout # timeout is cumulative for all request operations # (request, redirects, responses, data consuming) - tm = TimeoutHandle(self._loop, real_timeout.total) + tm = TimeoutHandle( + self._loop, real_timeout.total, ceil_threshold=real_timeout.ceil_threshold + ) handle = tm.start() if read_bufsize is None: @@ -489,7 +492,10 @@ async def _request( # connection timeout try: - async with ceil_timeout(real_timeout.connect): + async with ceil_timeout( + real_timeout.connect, + ceil_threshold=real_timeout.ceil_threshold, + ): assert self._connector is not None conn = await self._connector.connect( req, traces=traces, timeout=real_timeout @@ -509,6 +515,7 @@ async def _request( auto_decompress=self._auto_decompress, read_timeout=real_timeout.sock_read, read_bufsize=read_bufsize, + timeout_ceil_threshold=self._connector._timeout_ceil_threshold, ) try: diff --git a/aiohttp/client_proto.py b/aiohttp/client_proto.py index c265521ba3b..47ed65d16d0 100644 --- a/aiohttp/client_proto.py +++ b/aiohttp/client_proto.py @@ -36,6 +36,8 @@ def __init__(self, loop: asyncio.AbstractEventLoop) -> None: self._read_timeout = None # type: Optional[float] self._read_timeout_handle = None # type: Optional[asyncio.TimerHandle] + self._timeout_ceil_threshold = 5 # type: Optional[float] + self.closed = self._loop.create_future() # type: asyncio.Future[None] @property @@ -150,12 +152,15 @@ def set_response_params( auto_decompress: bool = True, read_timeout: Optional[float] = None, read_bufsize: int = 2 ** 16, + timeout_ceil_threshold: float = 5, ) -> None: self._skip_payload = skip_payload self._read_timeout = read_timeout self._reschedule_timeout() + self._timeout_ceil_threshold = timeout_ceil_threshold + self._parser = HttpResponseParser( self, self._loop, diff --git a/aiohttp/client_ws.py b/aiohttp/client_ws.py index 17690f2a076..181ceaaea60 100644 --- a/aiohttp/client_ws.py +++ b/aiohttp/client_ws.py @@ -94,7 +94,12 @@ def _reset_heartbeat(self) -> None: if self._heartbeat is not None: self._heartbeat_cb = call_later( - self._send_heartbeat, self._heartbeat, self._loop + self._send_heartbeat, + self._heartbeat, + self._loop, + timeout_ceil_threshold=self._conn._connector._timeout_ceil_threshold + if self._conn is not None + else 5, ) def _send_heartbeat(self) -> None: @@ -107,7 +112,12 @@ def _send_heartbeat(self) -> None: if self._pong_response_cb is not None: self._pong_response_cb.cancel() self._pong_response_cb = call_later( - self._pong_not_received, self._pong_heartbeat, self._loop + self._pong_not_received, + self._pong_heartbeat, + self._loop, + timeout_ceil_threshold=self._conn._connector._timeout_ceil_threshold + if self._conn is not None + else 5, ) def _pong_not_received(self) -> None: diff --git a/aiohttp/connector.py b/aiohttp/connector.py index 2ebb4641529..39d08c3fde2 100644 --- a/aiohttp/connector.py +++ b/aiohttp/connector.py @@ -171,6 +171,8 @@ class BaseConnector: limit_per_host - Number of simultaneous connections to one host. enable_cleanup_closed - Enables clean-up closed ssl transports. Disabled by default. + timeout_ceil_threshold - Trigger ceiling of timeout values when + it's above timeout_ceil_threshold. loop - Optional event loop. """ @@ -188,6 +190,7 @@ def __init__( limit: int = 100, limit_per_host: int = 0, enable_cleanup_closed: bool = False, + timeout_ceil_threshold: float = 5, ) -> None: if force_close: @@ -199,6 +202,8 @@ def __init__( if keepalive_timeout is sentinel: keepalive_timeout = 15.0 + self._timeout_ceil_threshold = timeout_ceil_threshold + loop = asyncio.get_running_loop() self._closed = False @@ -326,7 +331,11 @@ def _cleanup(self) -> None: if self._conns: self._cleanup_handle = helpers.weakref_handle( - self, "_cleanup", timeout, self._loop + self, + "_cleanup", + timeout, + self._loop, + timeout_ceil_threshold=self._timeout_ceil_threshold, ) def _drop_acquired_per_host( @@ -356,7 +365,11 @@ def _cleanup_closed(self) -> None: if not self._cleanup_closed_disabled: self._cleanup_closed_handle = helpers.weakref_handle( - self, "_cleanup_closed", self._cleanup_closed_period, self._loop + self, + "_cleanup_closed", + self._cleanup_closed_period, + self._loop, + timeout_ceil_threshold=self._timeout_ceil_threshold, ) async def close(self) -> None: @@ -639,7 +652,11 @@ def _release( if self._cleanup_handle is None: self._cleanup_handle = helpers.weakref_handle( - self, "_cleanup", self._keepalive_timeout, self._loop + self, + "_cleanup", + self._keepalive_timeout, + self._loop, + timeout_ceil_threshold=self._timeout_ceil_threshold, ) async def _create_connection( @@ -728,6 +745,7 @@ def __init__( limit: int = 100, limit_per_host: int = 0, enable_cleanup_closed: bool = False, + timeout_ceil_threshold: float = 5, ) -> None: super().__init__( keepalive_timeout=keepalive_timeout, @@ -735,6 +753,7 @@ def __init__( limit=limit, limit_per_host=limit_per_host, enable_cleanup_closed=enable_cleanup_closed, + timeout_ceil_threshold=timeout_ceil_threshold, ) if not isinstance(ssl, SSL_ALLOWED_TYPES): @@ -945,7 +964,9 @@ async def _wrap_create_connection( **kwargs: Any, ) -> Tuple[asyncio.Transport, ResponseHandler]: try: - async with ceil_timeout(timeout.sock_connect): + async with ceil_timeout( + timeout.sock_connect, ceil_threshold=timeout.ceil_threshold + ): return await self._loop.create_connection(*args, **kwargs) # type: ignore[return-value] # noqa except cert_errors as exc: raise ClientConnectorCertificateError(req.connection_key, exc) from exc @@ -1009,7 +1030,9 @@ async def _start_tls_connection( sslcontext = cast(ssl.SSLContext, self._get_ssl_context(req)) try: - async with ceil_timeout(timeout.sock_connect): + async with ceil_timeout( + timeout.sock_connect, ceil_threshold=timeout.ceil_threshold + ): try: tls_transport = await self._loop.start_tls( underlying_transport, @@ -1185,7 +1208,10 @@ async def _create_proxy_connection( # read_until_eof=True will ensure the connection isn't closed # once the response is received and processed allowing # START_TLS to work on the connection below. - protocol.set_response_params(read_until_eof=True) + protocol.set_response_params( + read_until_eof=True, + timeout_ceil_threshold=self._timeout_ceil_threshold, + ) resp = await proxy_resp.start(conn) except BaseException: proxy_resp.close() @@ -1263,7 +1289,9 @@ async def _create_connection( self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout" ) -> ResponseHandler: try: - async with ceil_timeout(timeout.sock_connect): + async with ceil_timeout( + timeout.sock_connect, ceil_threshold=timeout.ceil_threshold + ): _, proto = await self._loop.create_unix_connection( self._factory, self._path ) @@ -1319,7 +1347,9 @@ async def _create_connection( self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout" ) -> ResponseHandler: try: - async with ceil_timeout(timeout.sock_connect): + async with ceil_timeout( + timeout.sock_connect, ceil_threshold=timeout.ceil_threshold + ): _, proto = await self._loop.create_pipe_connection( # type: ignore[attr-defined] # noqa: E501 self._factory, self._path ) diff --git a/aiohttp/helpers.py b/aiohttp/helpers.py index 7969a098e98..b08e17c57bb 100644 --- a/aiohttp/helpers.py +++ b/aiohttp/helpers.py @@ -573,11 +573,15 @@ def _weakref_handle(info: "Tuple[weakref.ref[object], str]") -> None: def weakref_handle( - ob: object, name: str, timeout: float, loop: asyncio.AbstractEventLoop + ob: object, + name: str, + timeout: float, + loop: asyncio.AbstractEventLoop, + timeout_ceil_threshold: float = 5, ) -> Optional[asyncio.TimerHandle]: if timeout is not None and timeout > 0: when = loop.time() + timeout - if timeout >= 5: + if timeout >= timeout_ceil_threshold: when = ceil(when) return loop.call_at(when, _weakref_handle, (weakref.ref(ob), name)) @@ -585,11 +589,14 @@ def weakref_handle( def call_later( - cb: Callable[[], Any], timeout: float, loop: asyncio.AbstractEventLoop + cb: Callable[[], Any], + timeout: float, + loop: asyncio.AbstractEventLoop, + timeout_ceil_threshold: float = 5, ) -> Optional[asyncio.TimerHandle]: if timeout is not None and timeout > 0: when = loop.time() + timeout - if timeout > 5: + if timeout > timeout_ceil_threshold: when = ceil(when) return loop.call_at(when, cb) return None @@ -599,10 +606,14 @@ class TimeoutHandle: """Timeout handle""" def __init__( - self, loop: asyncio.AbstractEventLoop, timeout: Optional[float] + self, + loop: asyncio.AbstractEventLoop, + timeout: Optional[float], + ceil_threshold: float = 5, ) -> None: self._timeout = timeout self._loop = loop + self._ceil_threshold = ceil_threshold self._callbacks = ( [] ) # type: List[Tuple[Callable[..., None], Tuple[Any, ...], Dict[str, Any]]] @@ -619,7 +630,7 @@ def start(self) -> Optional[asyncio.Handle]: timeout = self._timeout if timeout is not None and timeout > 0: when = self._loop.time() + timeout - if timeout >= 5: + if timeout >= self._ceil_threshold: when = ceil(when) return self._loop.call_at(when, self.__call__) else: @@ -701,14 +712,16 @@ def timeout(self) -> None: self._cancelled = True -def ceil_timeout(delay: Optional[float]) -> async_timeout.Timeout: +def ceil_timeout( + delay: Optional[float], ceil_threshold: float = 5 +) -> async_timeout.Timeout: if delay is None or delay <= 0: return async_timeout.timeout(None) loop = asyncio.get_running_loop() now = loop.time() when = now + delay - if delay > 5: + if delay > ceil_threshold: when = ceil(when) return async_timeout.timeout_at(when) diff --git a/aiohttp/web_protocol.py b/aiohttp/web_protocol.py index 6a775cf38ee..bcb2e7acf96 100644 --- a/aiohttp/web_protocol.py +++ b/aiohttp/web_protocol.py @@ -144,6 +144,10 @@ class RequestHandler(BaseProtocol): max_headers -- Optional maximum header size + timeout_ceil_threshold -- Optional value to specify + threshold to ceil() timeout + values + """ KEEPALIVE_RESCHEDULE_DELAY = 1 @@ -173,6 +177,7 @@ class RequestHandler(BaseProtocol): "_close", "_force_close", "_current_request", + "_timeout_ceil_threshold", ) def __init__( @@ -192,6 +197,7 @@ def __init__( lingering_time: float = 10.0, read_bufsize: int = 2 ** 16, auto_decompress: bool = True, + timeout_ceil_threshold: float = 5, ): super().__init__(loop) @@ -228,6 +234,12 @@ def __init__( auto_decompress=auto_decompress, ) # type: Optional[HttpRequestParser] + self._timeout_ceil_threshold = 5 # type: float + try: + self._timeout_ceil_threshold = float(timeout_ceil_threshold) + except (TypeError, ValueError): + pass + self.logger = logger self.access_log = access_log if access_log: @@ -440,7 +452,8 @@ def _process_keepalive(self) -> None: # not all request handlers are done, # reschedule itself to next second self._keepalive_handle = self._loop.call_later( - self.KEEPALIVE_RESCHEDULE_DELAY, self._process_keepalive + self.KEEPALIVE_RESCHEDULE_DELAY, + self._process_keepalive, ) async def _handle_request( diff --git a/aiohttp/web_server.py b/aiohttp/web_server.py index 1c9fbf34ca8..6998fff34ae 100644 --- a/aiohttp/web_server.py +++ b/aiohttp/web_server.py @@ -65,4 +65,13 @@ async def shutdown(self, timeout: Optional[float] = None) -> None: self._connections.clear() def __call__(self) -> RequestHandler: - return RequestHandler(self, loop=self._loop, **self._kwargs) + try: + return RequestHandler(self, loop=self._loop, **self._kwargs) + except TypeError: + # Failsafe creation: remove all custom handler_args + kwargs = { + k: v + for k, v in self._kwargs.items() + if k in ["debug", "access_log_class"] + } + return RequestHandler(self, loop=self._loop, **kwargs) diff --git a/aiohttp/web_ws.py b/aiohttp/web_ws.py index 2303faa2c6a..d1158ee7fbb 100644 --- a/aiohttp/web_ws.py +++ b/aiohttp/web_ws.py @@ -128,7 +128,12 @@ def _reset_heartbeat(self) -> None: if self._heartbeat is not None: assert self._loop is not None self._heartbeat_cb = call_later( - self._send_heartbeat, self._heartbeat, self._loop + self._send_heartbeat, + self._heartbeat, + self._loop, + timeout_ceil_threshold=self._req._protocol._timeout_ceil_threshold + if self._req is not None + else 5, ) def _send_heartbeat(self) -> None: @@ -142,7 +147,12 @@ def _send_heartbeat(self) -> None: if self._pong_response_cb is not None: self._pong_response_cb.cancel() self._pong_response_cb = call_later( - self._pong_not_received, self._pong_heartbeat, self._loop + self._pong_not_received, + self._pong_heartbeat, + self._loop, + timeout_ceil_threshold=self._req._protocol._timeout_ceil_threshold + if self._req is not None + else 5, ) def _pong_not_received(self) -> None: diff --git a/docs/client_quickstart.rst b/docs/client_quickstart.rst index 655a642e08e..c7fec9f1936 100644 --- a/docs/client_quickstart.rst +++ b/docs/client_quickstart.rst @@ -444,13 +444,17 @@ Supported :class:`ClientTimeout` fields are: The maximal number of seconds allowed for period between reading a new data portion from a peer. + ``ceil_threshold`` + + The threshold value to trigger ceiling of absolute timeout values. + All fields are floats, ``None`` or ``0`` disables a particular timeout check, see the :class:`ClientTimeout` reference for defaults and additional details. Thus the default timeout is:: aiohttp.ClientTimeout(total=5*60, connect=None, - sock_connect=None, sock_read=None) + sock_connect=None, sock_read=None, ceil_threshold=5) .. note:: @@ -467,4 +471,5 @@ Thus the default timeout is:: timeout expiration. Smaller timeouts are not rounded to help testing; in the real life network - timeouts usually greater than tens of seconds. + timeouts usually greater than tens of seconds. However, the default threshold + value of 5 seconds can be configured using the ``ceil_threshold`` parameter. diff --git a/docs/web_advanced.rst b/docs/web_advanced.rst index 244983e0c09..d60ed88156d 100644 --- a/docs/web_advanced.rst +++ b/docs/web_advanced.rst @@ -845,6 +845,25 @@ Signal handler may look like:: Both :func:`run_app` and :meth:`AppRunner.cleanup` call shutdown signal handlers. +.. _aiohttp-web-ceil-absolute-timeout: + +Ceil of absolute timeout value +------------------------------ + +*aiohttp* **ceils** internal timeout values if the value is equal or +greater than 5 seconds. The timeout expires at the next integer second +greater than ``current_time + timeout``. + +More details about ceiling absolute timeout values is available here +:ref:`aiohttp-client-timeouts`. + +The default threshold can be configured at :class:`aiohttp.web.Application` +level using the ``handler_args`` parameter. + +.. code-block:: python3 + + app = web.Application(handler_args={"timeout_ceil_threshold": 1}) + .. _aiohttp-web-background-tasks: Background tasks diff --git a/tests/test_helpers.py b/tests/test_helpers.py index 6c56e5d3aae..9cfebc7b3a6 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -4,7 +4,8 @@ import datetime import gc import platform -from math import isclose, modf +import weakref +from math import ceil, isclose, modf from unittest import mock from urllib.request import getproxies_environment @@ -352,6 +353,18 @@ def test_when_timeout_smaller_second(loop) -> None: assert isclose(when - timer, 0, abs_tol=0.001) +def test_when_timeout_smaller_second_with_low_threshold(loop) -> None: + timeout = 0.1 + timer = loop.time() + timeout + + handle = helpers.TimeoutHandle(loop, timeout, 0.01) + when = handle.start()._when + handle.close() + + assert isinstance(when, int) + assert when == ceil(timer) + + def test_timeout_handle_cb_exc(loop) -> None: handle = helpers.TimeoutHandle(loop, 10.2) cb = mock.Mock() @@ -389,6 +402,16 @@ async def test_weakref_handle(loop) -> None: assert cb.test.called +async def test_weakref_handle_with_small_threshold(loop) -> None: + cb = mock.Mock() + loop = mock.Mock() + loop.time.return_value = 10 + helpers.weakref_handle(cb, "test", 0.1, loop, 0.01) + loop.call_at.assert_called_with( + 11, helpers._weakref_handle, (weakref.ref(cb), "test") + ) + + async def test_weakref_handle_weak(loop) -> None: cb = mock.Mock() helpers.weakref_handle(cb, "test", 0.01, loop) @@ -408,6 +431,14 @@ def test_ceil_call_later() -> None: loop.call_at.assert_called_with(21.0, cb) +def test_ceil_call_later_with_small_threshold() -> None: + cb = mock.Mock() + loop = mock.Mock() + loop.time.return_value = 10.1 + helpers.call_later(cb, 4.5, loop, 1) + loop.call_at.assert_called_with(15, cb) + + def test_ceil_call_later_no_timeout() -> None: cb = mock.Mock() loop = mock.Mock() @@ -433,6 +464,12 @@ async def test_ceil_timeout_small(loop) -> None: assert frac != 0 +async def test_ceil_timeout_small_with_overriden_threshold(loop) -> None: + async with helpers.ceil_timeout(1.5, ceil_threshold=1) as cm: + frac, integer = modf(cm.deadline) + assert frac == 0 + + # -------------------------------- ContentDisposition ------------------- diff --git a/tests/test_web_runner.py b/tests/test_web_runner.py index 48ec3944337..82965432890 100644 --- a/tests/test_web_runner.py +++ b/tests/test_web_runner.py @@ -113,6 +113,36 @@ def test_app_handler_args() -> None: assert runner._kwargs == {"access_log_class": web.AccessLogger, "test": True} +async def test_app_handler_args_failure() -> None: + app = web.Application(handler_args={"unknown_parameter": 5}) + runner = web.AppRunner(app) + await runner.setup() + assert runner._server + rh = runner._server() + assert rh._timeout_ceil_threshold == 5 + await runner.cleanup() + assert app + + +@pytest.mark.parametrize( + ("value", "expected"), + ( + (2, 2), + (None, 5), + ("2", 2), + ), +) +async def test_app_handler_args_ceil_threshold(value: Any, expected: Any) -> None: + app = web.Application(handler_args={"timeout_ceil_threshold": value}) + runner = web.AppRunner(app) + await runner.setup() + assert runner._server + rh = runner._server() + assert rh._timeout_ceil_threshold == expected + await runner.cleanup() + assert app + + async def test_app_make_handler_access_log_class_bad_type1() -> None: class Logger: pass