Skip to content

Commit

Permalink
anyio integration (#1157)
Browse files Browse the repository at this point in the history
* First whack at anyio integration

* Fix formatting

* Remove debug messages

* mypy fixes

* Update README.md

Co-authored-by: Marcelo Trylesinski <marcelotryle@gmail.com>

* Fix install_requires typo

* move_on_after blocks if deadline is too small

* Linter fixes

* Improve WSGI structured concurrency

* Tests use anyio

* Checkin progress on testclient

* Prep for anyio 3

* Remove debug backend option

* Use anyio 3.0.0rc1

* Remove old style executor from GraphQLApp

* Fix extra import

* Don't cancel task scope early

* Wait for wsgi sender to finish before exiting

* Use memory object streams in websocket tests

* Test on asyncio, asyncio+uvloop, and trio

* Formatting fixes

* run_until_first_complete doesn't need a return

* Fix middleware app call

* Simplify middleware exceptions

* Use anyio for websocket test

* Set STARLETTE_TESTCLIENT_ASYNC_BACKEND in tests

* Pass async backend to portal

* Formatting fixes

* Bump anyio

* Cleanup portals and add TestClient.async_backend

* Use anyio.run_async_from_thread to send from worker thread

* Use websocket_connect as context manager

* Document changes in TestClient

* Formatting fix

* Fix websocket raises coverage

* Update to anyio 3.0.0rc3 and replace aiofiles

* Apply suggestions from code review

Co-authored-by: Alex Grönholm <alex.gronholm@nextday.fi>

* Bump to require anyio 3.0.0 final

* Remove mention of aiofiles in README.md

* Pin jinja2 to releases before 3 due to DeprecationWarnings

* Add task_group as application attribute

* Remove run_until_first_complete

* Undo jinja pin

* Refactor anyio.sleep into an event

* Use one less task in test_websocket_concurrency_pattern

* Apply review suggestions

* Rename argument

* fix start_task_soon type

* fix BaseHTTPMiddleware when used without Starlette

* Testclient receive() is a non-trapping function if the response is already complete

This allows for a zero deadline when waiting for a disconnect message

* Use variable annotation for async_backend

* Update docs regarding dependency on anyio

* Use CancelScope instead of move_on_after in request.is_disconnected

* Cancel task group after returning middleware response

Add test for #1022

* Add link to anyio backend options in testclient docs

* Add types-dataclasses

* Re-implement starlette.concurrency.run_until_first_complete and add a test

* Fix type on handler callable

* Apply review comments to clarify run_until_first_complete scope

Co-authored-by: Marcelo Trylesinski <marcelotryle@gmail.com>
Co-authored-by: Alex Grönholm <alex.gronholm@nextday.fi>
Co-authored-by: Thomas Grainger <tagrain@gmail.com>
  • Loading branch information
4 people committed Jun 18, 2021
1 parent 15761fb commit 42592d6
Show file tree
Hide file tree
Showing 28 changed files with 335 additions and 258 deletions.
9 changes: 4 additions & 5 deletions README.md
Expand Up @@ -22,7 +22,7 @@
# Starlette

Starlette is a lightweight [ASGI](https://asgi.readthedocs.io/en/latest/) framework/toolkit,
which is ideal for building high performance asyncio services.
which is ideal for building high performance async services.

It is production-ready, and gives you the following:

Expand All @@ -36,7 +36,8 @@ It is production-ready, and gives you the following:
* Session and Cookie support.
* 100% test coverage.
* 100% type annotated codebase.
* Zero hard dependencies.
* Few hard dependencies.
* Compatible with `asyncio` and `trio` backends.

## Requirements

Expand Down Expand Up @@ -84,10 +85,9 @@ For a more complete example, see [encode/starlette-example](https://github.com/e

## Dependencies

Starlette does not have any hard dependencies, but the following are optional:
Starlette only requires `anyio`, and the following are optional:

* [`requests`][requests] - Required if you want to use the `TestClient`.
* [`aiofiles`][aiofiles] - Required if you want to use `FileResponse` or `StaticFiles`.
* [`jinja2`][jinja2] - Required if you want to use `Jinja2Templates`.
* [`python-multipart`][python-multipart] - Required if you want to support form parsing, with `request.form()`.
* [`itsdangerous`][itsdangerous] - Required for `SessionMiddleware` support.
Expand Down Expand Up @@ -167,7 +167,6 @@ gunicorn -k uvicorn.workers.UvicornH11Worker ...
<p align="center"><i>Starlette is <a href="https://github.com/encode/starlette/blob/master/LICENSE.md">BSD licensed</a> code. Designed & built in Brighton, England.</i></p>

[requests]: http://docs.python-requests.org/en/master/
[aiofiles]: https://github.com/Tinche/aiofiles
[jinja2]: http://jinja.pocoo.org/
[python-multipart]: https://andrew-d.github.io/python-multipart/
[graphene]: https://graphene-python.org/
Expand Down
6 changes: 2 additions & 4 deletions docs/index.md
Expand Up @@ -32,7 +32,7 @@ It is production-ready, and gives you the following:
* Session and Cookie support.
* 100% test coverage.
* 100% type annotated codebase.
* Zero hard dependencies.
* Few hard dependencies.

## Requirements

Expand Down Expand Up @@ -79,10 +79,9 @@ For a more complete example, [see here](https://github.com/encode/starlette-exam

## Dependencies

Starlette does not have any hard dependencies, but the following are optional:
Starlette only requires `anyio`, and the following dependencies are optional:

* [`requests`][requests] - Required if you want to use the `TestClient`.
* [`aiofiles`][aiofiles] - Required if you want to use `FileResponse` or `StaticFiles`.
* [`jinja2`][jinja2] - Required if you want to use `Jinja2Templates`.
* [`python-multipart`][python-multipart] - Required if you want to support form parsing, with `request.form()`.
* [`itsdangerous`][itsdangerous] - Required for `SessionMiddleware` support.
Expand Down Expand Up @@ -161,7 +160,6 @@ gunicorn -k uvicorn.workers.UvicornH11Worker ...
<p align="center"><i>Starlette is <a href="https://github.com/encode/starlette/blob/master/LICENSE.md">BSD licensed</a> code. Designed & built in Brighton, England.</i></p>

[requests]: http://docs.python-requests.org/en/master/
[aiofiles]: https://github.com/Tinche/aiofiles
[jinja2]: http://jinja.pocoo.org/
[python-multipart]: https://andrew-d.github.io/python-multipart/
[graphene]: https://graphene-python.org/
Expand Down
18 changes: 18 additions & 0 deletions docs/testclient.md
Expand Up @@ -31,6 +31,22 @@ application. Occasionally you might want to test the content of 500 error
responses, rather than allowing client to raise the server exception. In this
case you should use `client = TestClient(app, raise_server_exceptions=False)`.

### Selecting the Async backend

`TestClient.async_backend` is a dictionary which allows you to set the options
for the backend used to run tests. These options are passed to
`anyio.start_blocking_portal()`. See the [anyio documentation](https://anyio.readthedocs.io/en/stable/basics.html#backend-options)
for more information about backend options. By default, `asyncio` is used.

To run `Trio`, set `async_backend["backend"] = "trio"`, for example:

```python
def test_app()
client = TestClient(app)
client.async_backend["backend"] = "trio"
...
```

### Testing WebSocket sessions

You can also test websocket sessions with the test client.
Expand Down Expand Up @@ -72,6 +88,8 @@ always raised by the test client.

May raise `starlette.websockets.WebSocketDisconnect` if the application does not accept the websocket connection.

`websocket_connect()` must be used as a context manager (in a `with` block).

#### Sending data

* `.send_text(data)` - Send the given text to the application.
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Expand Up @@ -18,9 +18,10 @@ types-requests
types-contextvars
types-aiofiles
types-PyYAML
types-dataclasses
pytest
pytest-cov
pytest-asyncio
trio

# Documentation
mkdocs
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -37,9 +37,9 @@ def get_long_description():
packages=find_packages(exclude=["tests*"]),
package_data={"starlette": ["py.typed"]},
include_package_data=True,
install_requires=["anyio>=3.0.0,<4"],
extras_require={
"full": [
"aiofiles",
"graphene",
"itsdangerous",
"jinja2",
Expand Down
27 changes: 13 additions & 14 deletions starlette/concurrency.py
@@ -1,43 +1,42 @@
import asyncio
import functools
import sys
import typing
from typing import Any, AsyncGenerator, Iterator

import anyio

try:
import contextvars # Python 3.7+ only or via contextvars backport.
except ImportError: # pragma: no cover
contextvars = None # type: ignore

if sys.version_info >= (3, 7): # pragma: no cover
from asyncio import create_task
else: # pragma: no cover
from asyncio import ensure_future as create_task

T = typing.TypeVar("T")


async def run_until_first_complete(*args: typing.Tuple[typing.Callable, dict]) -> None:
tasks = [create_task(handler(**kwargs)) for handler, kwargs in args]
(done, pending) = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
[task.cancel() for task in pending]
[task.result() for task in done]
async with anyio.create_task_group() as task_group:

async def run(func: typing.Callable[[], typing.Coroutine]) -> None:
await func()
task_group.cancel_scope.cancel()

for func, kwargs in args:
task_group.start_soon(run, functools.partial(func, **kwargs))


async def run_in_threadpool(
func: typing.Callable[..., T], *args: typing.Any, **kwargs: typing.Any
) -> T:
loop = asyncio.get_event_loop()
if contextvars is not None: # pragma: no cover
# Ensure we run in the same context
child = functools.partial(func, *args, **kwargs)
context = contextvars.copy_context()
func = context.run
args = (child,)
elif kwargs: # pragma: no cover
# loop.run_in_executor doesn't accept 'kwargs', so bind them in here
# run_sync doesn't accept 'kwargs', so bind them in here
func = functools.partial(func, **kwargs)
return await loop.run_in_executor(None, func, *args)
return await anyio.to_thread.run_sync(func, *args)


class _StopIteration(Exception):
Expand All @@ -57,6 +56,6 @@ def _next(iterator: Iterator) -> Any:
async def iterate_in_threadpool(iterator: Iterator) -> AsyncGenerator:
while True:
try:
yield await run_in_threadpool(_next, iterator)
yield await anyio.to_thread.run_sync(_next, iterator)
except _StopIteration:
break
21 changes: 5 additions & 16 deletions starlette/graphql.py
Expand Up @@ -31,29 +31,18 @@ class GraphQLApp:
def __init__(
self,
schema: "graphene.Schema",
executor: typing.Any = None,
executor_class: type = None,
graphiql: bool = True,
) -> None:
self.schema = schema
self.graphiql = graphiql
if executor is None:
# New style in 0.10.0. Use 'executor_class'.
# See issue https://github.com/encode/starlette/issues/242
self.executor = executor
self.executor_class = executor_class
self.is_async = executor_class is not None and issubclass(
executor_class, AsyncioExecutor
)
else:
# Old style. Use 'executor'.
# We should remove this in the next median/major version bump.
self.executor = executor
self.executor_class = None
self.is_async = isinstance(executor, AsyncioExecutor)
self.executor_class = executor_class
self.is_async = executor_class is not None and issubclass(
executor_class, AsyncioExecutor
)

async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if self.executor is None and self.executor_class is not None:
if self.executor_class is not None:
self.executor = self.executor_class()

request = Request(scope, receive=receive)
Expand Down
69 changes: 32 additions & 37 deletions starlette/middleware/base.py
@@ -1,9 +1,10 @@
import asyncio
import typing

import anyio

from starlette.requests import Request
from starlette.responses import Response, StreamingResponse
from starlette.types import ASGIApp, Message, Receive, Scope, Send
from starlette.types import ASGIApp, Receive, Scope, Send

RequestResponseEndpoint = typing.Callable[[Request], typing.Awaitable[Response]]
DispatchFunction = typing.Callable[
Expand All @@ -21,45 +22,39 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
await self.app(scope, receive, send)
return

request = Request(scope, receive=receive)
response = await self.dispatch_func(request, self.call_next)
await response(scope, receive, send)
async def call_next(request: Request) -> Response:
send_stream, recv_stream = anyio.create_memory_object_stream()

async def call_next(self, request: Request) -> Response:
loop = asyncio.get_event_loop()
queue: "asyncio.Queue[typing.Optional[Message]]" = asyncio.Queue()
async def coro() -> None:
async with send_stream:
await self.app(scope, request.receive, send_stream.send)

scope = request.scope
receive = request.receive
send = queue.put
task_group.start_soon(coro)

async def coro() -> None:
try:
await self.app(scope, receive, send)
finally:
await queue.put(None)

task = loop.create_task(coro())
message = await queue.get()
if message is None:
task.result()
raise RuntimeError("No response returned.")
assert message["type"] == "http.response.start"

async def body_stream() -> typing.AsyncGenerator[bytes, None]:
while True:
message = await queue.get()
if message is None:
break
assert message["type"] == "http.response.body"
yield message.get("body", b"")
task.result()

response = StreamingResponse(
status_code=message["status"], content=body_stream()
)
response.raw_headers = message["headers"]
return response
message = await recv_stream.receive()
except anyio.EndOfStream:
raise RuntimeError("No response returned.")

assert message["type"] == "http.response.start"

async def body_stream() -> typing.AsyncGenerator[bytes, None]:
async with recv_stream:
async for message in recv_stream:
assert message["type"] == "http.response.body"
yield message.get("body", b"")

response = StreamingResponse(
status_code=message["status"], content=body_stream()
)
response.raw_headers = message["headers"]
return response

async with anyio.create_task_group() as task_group:
request = Request(scope, receive=receive)
response = await self.dispatch_func(request, call_next)
await response(scope, receive, send)
task_group.cancel_scope.cancel()

async def dispatch(
self, request: Request, call_next: RequestResponseEndpoint
Expand Down

0 comments on commit 42592d6

Please sign in to comment.