-
-
Notifications
You must be signed in to change notification settings - Fork 805
/
asgi.py
160 lines (126 loc) · 4.89 KB
/
asgi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
from typing import TYPE_CHECKING, Callable, List, Optional, Tuple, Union
from urllib.parse import unquote
import httpcore
import sniffio
if TYPE_CHECKING: # pragma: no cover
import asyncio
import trio
Event = Union[asyncio.Event, trio.Event]
def create_event() -> "Event":
if sniffio.current_async_library() == "trio":
import trio
return trio.Event()
else:
import asyncio
return asyncio.Event()
class ASGITransport(httpcore.AsyncHTTPTransport):
"""
A custom AsyncTransport that handles sending requests directly to an ASGI app.
The simplest way to use this functionality is to use the `app` argument.
```
client = httpx.AsyncClient(app=app)
```
Alternatively, you can setup the transport instance explicitly.
This allows you to include any additional configuration arguments specific
to the ASGITransport class:
```
transport = httpx.ASGITransport(
app=app,
root_path="/submount",
client=("1.2.3.4", 123)
)
client = httpx.AsyncClient(transport=transport)
```
Arguments:
* `app` - The ASGI application.
* `raise_app_exceptions` - Boolean indicating if exceptions in the application
should be raised. Default to `True`. Can be set to `False` for use cases
such as testing the content of a client 500 response.
* `root_path` - The root path on which the ASGI application should be mounted.
* `client` - A two-tuple indicating the client IP and port of incoming requests.
```
"""
def __init__(
self,
app: Callable,
raise_app_exceptions: bool = True,
root_path: str = "",
client: Tuple[str, int] = ("127.0.0.1", 123),
) -> None:
self.app = app
self.raise_app_exceptions = raise_app_exceptions
self.root_path = root_path
self.client = client
async def arequest(
self,
method: bytes,
url: Tuple[bytes, bytes, Optional[int], bytes],
headers: List[Tuple[bytes, bytes]] = None,
stream: httpcore.AsyncByteStream = None,
ext: dict = None,
) -> Tuple[int, List[Tuple[bytes, bytes]], httpcore.AsyncByteStream, dict]:
headers = [] if headers is None else headers
stream = httpcore.PlainByteStream(content=b"") if stream is None else stream
# ASGI scope.
scheme, host, port, full_path = url
path, _, query = full_path.partition(b"?")
scope = {
"type": "http",
"asgi": {"version": "3.0"},
"http_version": "1.1",
"method": method.decode(),
"headers": headers,
"scheme": scheme.decode("ascii"),
"path": unquote(path.decode("ascii")),
"query_string": query,
"server": (host.decode("ascii"), port),
"client": self.client,
"root_path": self.root_path,
}
# Request.
request_body_chunks = stream.__aiter__()
request_complete = False
# Response.
status_code = None
response_headers = None
body_parts = []
response_started = False
response_complete = create_event()
# ASGI callables.
async def receive() -> dict:
nonlocal request_complete
if request_complete:
await response_complete.wait()
return {"type": "http.disconnect"}
try:
body = await request_body_chunks.__anext__()
except StopAsyncIteration:
request_complete = True
return {"type": "http.request", "body": b"", "more_body": False}
return {"type": "http.request", "body": body, "more_body": True}
async def send(message: dict) -> None:
nonlocal status_code, response_headers, response_started
if message["type"] == "http.response.start":
assert not response_started
status_code = message["status"]
response_headers = message.get("headers", [])
response_started = True
elif message["type"] == "http.response.body":
assert not response_complete.is_set()
body = message.get("body", b"")
more_body = message.get("more_body", False)
if body and method != b"HEAD":
body_parts.append(body)
if not more_body:
response_complete.set()
try:
await self.app(scope, receive, send)
except Exception:
if self.raise_app_exceptions or not response_complete.is_set():
raise
assert response_complete.is_set()
assert status_code is not None
assert response_headers is not None
stream = httpcore.PlainByteStream(content=b"".join(body_parts))
ext = {}
return (status_code, response_headers, stream, ext)