Skip to content

Commit

Permalink
Added a configuration flag for enable request task handler cancelling…
Browse files Browse the repository at this point in the history
… when client connection closing. (#7056)

<!-- Thank you for your contribution! -->

## Related to #6719 #6727. Added a configuration flag for enable request
task handler cancelling when client connection closing.

After changes in version 3.8.3, there is no longer any way to enable
this behaviour. In our services, we want to handle protocol-level
errors, for example for canceling the execution of a heavy query in the
DBMS if the user's connection is broken.

Now I created this PR in order to discuss my solution, of course if I
did everything well I will add tests changelog, etc.

<!-- Please give a short brief about these changes. -->

## I guess this breakdown can be solved using the configuration flag
that is passed to the Server instance.

Of course `AppRunner` and `SiteRunner` can pass this through `**kwargs`
too.

<!-- Outline any notable behaviour for the end users. -->

## Related issue number #6719

<!-- Are there any issues opened that will be resolved by merging this
change? -->

## Checklist

- [ ] I think the code is well written
- [ ] Unit tests for the changes exist
- [ ] Documentation reflects the changes
- [ ] If you provide code modification, please add yourself to
`CONTRIBUTORS.txt`
  * The format is &lt;Name&gt; &lt;Surname&gt;.
  * Please keep alphabetical order, the file is sorted by names.
- [ ] Add a new news fragment into the `CHANGES` folder
  * name it `<issue_id>.<type>` for example (588.bugfix)
* if you don't have an `issue_id` change it to the pr id after creating
the pr
  * ensure type is one of the following:
    * `.feature`: Signifying a new feature.
    * `.bugfix`: Signifying a bug fix.
    * `.doc`: Signifying a documentation improvement.
    * `.removal`: Signifying a deprecation or removal of public API.
* `.misc`: A ticket has been closed, but it is not of interest to users.
* Make sure to use full sentences with correct case and punctuation, for
example: "Fix issue with non-ascii contents in doctest text files."

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Sam Bull <aa6bs0@sambull.org>
  • Loading branch information
3 people committed Dec 11, 2022
1 parent 463fdf4 commit 38b9ec5
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGES/7056.feature
@@ -0,0 +1 @@
Added ``handler_cancellation`` parameter to cancel web handler on client disconnection. -- by :user:`mosquito`
4 changes: 4 additions & 0 deletions aiohttp/web.py
Expand Up @@ -307,6 +307,7 @@ async def _run_app(
handle_signals: bool = True,
reuse_address: Optional[bool] = None,
reuse_port: Optional[bool] = None,
handler_cancellation: bool = False,
) -> None:
# An internal function to actually do all dirty job for application running
if asyncio.iscoroutine(app):
Expand All @@ -321,6 +322,7 @@ async def _run_app(
access_log_format=access_log_format,
access_log=access_log,
keepalive_timeout=keepalive_timeout,
handler_cancellation=handler_cancellation,
)

await runner.setup()
Expand Down Expand Up @@ -481,6 +483,7 @@ def run_app(
handle_signals: bool = True,
reuse_address: Optional[bool] = None,
reuse_port: Optional[bool] = None,
handler_cancellation: bool = False,
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> None:
"""Run an app locally"""
Expand Down Expand Up @@ -513,6 +516,7 @@ def run_app(
handle_signals=handle_signals,
reuse_address=reuse_address,
reuse_port=reuse_port,
handler_cancellation=handler_cancellation,
)
)

Expand Down
6 changes: 6 additions & 0 deletions aiohttp/web_protocol.py
Expand Up @@ -313,6 +313,9 @@ def connection_lost(self, exc: Optional[BaseException]) -> None:

super().connection_lost(exc)

# Grab value before setting _manager to None.
handler_cancellation = self._manager.handler_cancellation

self._manager = None
self._force_close = True
self._request_factory = None
Expand All @@ -330,6 +333,9 @@ def connection_lost(self, exc: Optional[BaseException]) -> None:
if self._waiter is not None:
self._waiter.cancel()

if handler_cancellation and self._task_handler is not None:
self._task_handler.cancel()

self._task_handler = None

if self._payload_parser is not None:
Expand Down
2 changes: 2 additions & 0 deletions aiohttp/web_server.py
Expand Up @@ -19,6 +19,7 @@ def __init__(
*,
request_factory: Optional[_RequestFactory] = None,
debug: Optional[bool] = None,
handler_cancellation: bool = False,
**kwargs: Any,
) -> None:
if debug is not None:
Expand All @@ -33,6 +34,7 @@ def __init__(
self.requests_count = 0
self.request_handler = handler
self.request_factory = request_factory or self._make_request
self.handler_cancellation = handler_cancellation

@property
def connections(self) -> List[RequestHandler]:
Expand Down
11 changes: 10 additions & 1 deletion docs/web_reference.rst
Expand Up @@ -2809,7 +2809,8 @@ Utilities
access_log=aiohttp.log.access_logger, \
handle_signals=True, \
reuse_address=None, \
reuse_port=None)
reuse_port=None, \
handler_cancellation=False)

A high-level function for running an application, serving it until
keyboard interrupt and performing a
Expand Down Expand Up @@ -2905,6 +2906,9 @@ Utilities
this flag when being created. This option is not
supported on Windows.

:param bool handler_cancellation: cancels the web handler task if the client
drops the connection.

.. versionadded:: 3.0

Support *access_log_class* parameter.
Expand All @@ -2915,6 +2919,11 @@ Utilities

Accept a coroutine as *app* parameter.

.. versionadded:: 3.9

Support handler_cancellation parameter (this was the default behaviour
in aiohttp <3.7).

Constants
---------

Expand Down
78 changes: 78 additions & 0 deletions tests/test_web_server.py
@@ -1,5 +1,6 @@
# type: ignore
import asyncio
from contextlib import suppress
from typing import Any
from unittest import mock

Expand Down Expand Up @@ -207,3 +208,80 @@ async def handler(request):
)

logger.exception.assert_called_with("Error handling request", exc_info=exc)


async def test_handler_cancellation(aiohttp_unused_port) -> None:
event = asyncio.Event()
port = aiohttp_unused_port()

async def on_request(_: web.Request) -> web.Response:
nonlocal event
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
event.set()
raise
else:
raise web.HTTPInternalServerError()

app = web.Application()
app.router.add_route("GET", "/", on_request)

runner = web.AppRunner(app, handler_cancellation=True)
await runner.setup()

site = web.TCPSite(runner, host="localhost", port=port)

await site.start()

try:
assert runner.server.handler_cancellation, "Flag was not propagated"

async with client.ClientSession(
timeout=client.ClientTimeout(total=0.1)
) as sess:
with pytest.raises(asyncio.TimeoutError):
await sess.get(f"http://localhost:{port}/")

with suppress(asyncio.TimeoutError):
await asyncio.wait_for(event.wait(), timeout=1)
assert event.is_set(), "Request handler hasn't been cancelled"
finally:
await asyncio.gather(runner.shutdown(), site.stop())


async def test_no_handler_cancellation(aiohttp_unused_port) -> None:
timeout_event = asyncio.Event()
done_event = asyncio.Event()
port = aiohttp_unused_port()

async def on_request(_: web.Request) -> web.Response:
nonlocal done_event, timeout_event
await asyncio.wait_for(timeout_event.wait(), timeout=5)
done_event.set()
return web.Response()

app = web.Application()
app.router.add_route("GET", "/", on_request)

runner = web.AppRunner(app)
await runner.setup()

site = web.TCPSite(runner, host="localhost", port=port)

await site.start()

try:
async with client.ClientSession(
timeout=client.ClientTimeout(total=0.1)
) as sess:
with pytest.raises(asyncio.TimeoutError):
await sess.get(f"http://localhost:{port}/")
await asyncio.sleep(0.1)
timeout_event.set()

with suppress(asyncio.TimeoutError):
await asyncio.wait_for(done_event.wait(), timeout=1)
assert done_event.is_set()
finally:
await asyncio.gather(runner.shutdown(), site.stop())

0 comments on commit 38b9ec5

Please sign in to comment.