forked from sanic-org/sanic
-
Notifications
You must be signed in to change notification settings - Fork 1
/
asgi.py
236 lines (206 loc) · 8.14 KB
/
asgi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
from __future__ import annotations
import warnings
from typing import TYPE_CHECKING, Optional
from urllib.parse import quote
from sanic.compat import Header
from sanic.exceptions import ServerError
from sanic.helpers import _default
from sanic.http import Stage
from sanic.log import logger
from sanic.models.asgi import ASGIReceive, ASGIScope, ASGISend, MockTransport
from sanic.request import Request
from sanic.response import BaseHTTPResponse
from sanic.server import ConnInfo
from sanic.server.websockets.connection import WebSocketConnection
if TYPE_CHECKING: # no cov
from sanic import Sanic
class Lifespan:
def __init__(self, asgi_app: ASGIApp) -> None:
self.asgi_app = asgi_app
if self.asgi_app.sanic_app.state.verbosity > 0:
if (
"server.init.before"
in self.asgi_app.sanic_app.signal_router.name_index
):
logger.debug(
'You have set a listener for "before_server_start" '
"in ASGI mode. "
"It will be executed as early as possible, but not before "
"the ASGI server is started."
)
if (
"server.shutdown.after"
in self.asgi_app.sanic_app.signal_router.name_index
):
logger.debug(
'You have set a listener for "after_server_stop" '
"in ASGI mode. "
"It will be executed as late as possible, but not after "
"the ASGI server is stopped."
)
async def startup(self) -> None:
"""
Gather the listeners to fire on server start.
Because we are using a third-party server and not Sanic server, we do
not have access to fire anything BEFORE the server starts.
Therefore, we fire before_server_start and after_server_start
in sequence since the ASGI lifespan protocol only supports a single
startup event.
"""
await self.asgi_app.sanic_app._startup()
await self.asgi_app.sanic_app._server_event("init", "before")
await self.asgi_app.sanic_app._server_event("init", "after")
if self.asgi_app.sanic_app.config.USE_UVLOOP is not _default:
warnings.warn(
"You have set the USE_UVLOOP configuration option, but Sanic "
"cannot control the event loop when running in ASGI mode."
"This option will be ignored."
)
async def shutdown(self) -> None:
"""
Gather the listeners to fire on server stop.
Because we are using a third-party server and not Sanic server, we do
not have access to fire anything AFTER the server stops.
Therefore, we fire before_server_stop and after_server_stop
in sequence since the ASGI lifespan protocol only supports a single
shutdown event.
"""
await self.asgi_app.sanic_app._server_event("shutdown", "before")
await self.asgi_app.sanic_app._server_event("shutdown", "after")
async def __call__(
self, scope: ASGIScope, receive: ASGIReceive, send: ASGISend
) -> None:
message = await receive()
if message["type"] == "lifespan.startup":
await self.startup()
await send({"type": "lifespan.startup.complete"})
message = await receive()
if message["type"] == "lifespan.shutdown":
await self.shutdown()
await send({"type": "lifespan.shutdown.complete"})
class ASGIApp:
sanic_app: Sanic
request: Request
transport: MockTransport
lifespan: Lifespan
ws: Optional[WebSocketConnection]
stage: Stage
response: Optional[BaseHTTPResponse]
def __init__(self) -> None:
self.ws = None
@classmethod
async def create(
cls, sanic_app, scope: ASGIScope, receive: ASGIReceive, send: ASGISend
) -> "ASGIApp":
instance = cls()
instance.sanic_app = sanic_app
instance.transport = MockTransport(scope, receive, send)
instance.transport.loop = sanic_app.loop
instance.stage = Stage.IDLE
instance.response = None
setattr(instance.transport, "add_task", sanic_app.loop.create_task)
headers = Header(
[
(key.decode("latin-1"), value.decode("latin-1"))
for key, value in scope.get("headers", [])
]
)
instance.lifespan = Lifespan(instance)
if scope["type"] == "lifespan":
await instance.lifespan(scope, receive, send)
else:
path = (
scope["path"][1:]
if scope["path"].startswith("/")
else scope["path"]
)
url = "/".join([scope.get("root_path", ""), quote(path)])
url_bytes = url.encode("latin-1")
url_bytes += b"?" + scope["query_string"]
if scope["type"] == "http":
version = scope["http_version"]
method = scope["method"]
elif scope["type"] == "websocket":
version = "1.1"
method = "GET"
instance.ws = instance.transport.create_websocket_connection(
send, receive
)
else:
raise ServerError("Received unknown ASGI scope")
request_class = sanic_app.request_class or Request
instance.request = request_class(
url_bytes,
headers,
version,
method,
instance.transport,
sanic_app,
)
instance.request.stream = instance
instance.request_body = True
instance.request.conn_info = ConnInfo(instance.transport)
await sanic_app.dispatch(
"http.lifecycle.request",
inline=True,
context={"request": instance.request},
fail_not_found=False,
)
return instance
async def read(self) -> Optional[bytes]:
"""
Read and stream the body in chunks from an incoming ASGI message.
"""
if self.stage is Stage.IDLE:
self.stage = Stage.REQUEST
message = await self.transport.receive()
body = message.get("body", b"")
if not message.get("more_body", False):
self.request_body = False
if not body:
return None
return body
async def __aiter__(self):
while self.request_body:
data = await self.read()
if data:
yield data
def respond(self, response: BaseHTTPResponse):
if self.stage is not Stage.HANDLER:
self.stage = Stage.FAILED
raise RuntimeError("Response already started")
if self.response is not None:
self.response.stream = None
response.stream, self.response = self, response
return response
async def send(self, data, end_stream):
self.stage = Stage.IDLE if end_stream else Stage.RESPONSE
if self.response:
response, self.response = self.response, None
await self.transport.send(
{
"type": "http.response.start",
"status": response.status,
"headers": response.processed_headers,
}
)
response_body = getattr(response, "body", None)
if response_body:
data = response_body + data if data else response_body
await self.transport.send(
{
"type": "http.response.body",
"body": data.encode() if hasattr(data, "encode") else data,
"more_body": not end_stream,
}
)
_asgi_single_callable = True # We conform to ASGI 3.0 single-callable
async def __call__(self) -> None:
"""
Handle the incoming request.
"""
try:
self.stage = Stage.HANDLER
await self.sanic_app.handle_request(self.request)
except Exception as e:
await self.sanic_app.handle_exception(self.request, e)