-
-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
http_protocol.py
243 lines (219 loc) · 7.83 KB
/
http_protocol.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
from __future__ import annotations
from typing import TYPE_CHECKING, Optional
from sanic.touchup.meta import TouchUpMeta
if TYPE_CHECKING: # no cov
from sanic.app import Sanic
import sys
from asyncio import CancelledError
from time import monotonic as current_time
from sanic.exceptions import RequestTimeout, ServiceUnavailable
from sanic.http import Http, Stage
from sanic.log import error_logger, logger
from sanic.models.server_types import ConnInfo
from sanic.request import Request
from sanic.server.protocols.base_protocol import SanicProtocol
class HttpProtocol(SanicProtocol, metaclass=TouchUpMeta):
"""
This class provides implements the HTTP 1.1 protocol on top of our
Sanic Server transport
"""
__touchup__ = (
"send",
"connection_task",
)
__slots__ = (
# request params
"request",
# request config
"request_handler",
"request_timeout",
"response_timeout",
"keep_alive_timeout",
"request_max_size",
"request_class",
"error_handler",
# enable or disable access log purpose
"access_log",
# connection management
"state",
"url",
"_handler_task",
"_http",
"_exception",
"recv_buffer",
)
def __init__(
self,
*,
loop,
app: Sanic,
signal=None,
connections=None,
state=None,
unix=None,
**kwargs,
):
super().__init__(
loop=loop,
app=app,
signal=signal,
connections=connections,
unix=unix,
)
self.url = None
self.request: Optional[Request] = None
self.access_log = self.app.config.ACCESS_LOG
self.request_handler = self.app.handle_request
self.error_handler = self.app.error_handler
self.request_timeout = self.app.config.REQUEST_TIMEOUT
self.response_timeout = self.app.config.RESPONSE_TIMEOUT
self.keep_alive_timeout = self.app.config.KEEP_ALIVE_TIMEOUT
self.request_max_size = self.app.config.REQUEST_MAX_SIZE
self.request_class = self.app.request_class or Request
self.state = state if state else {}
if "requests_count" not in self.state:
self.state["requests_count"] = 0
self._exception = None
def _setup_connection(self):
self._http = Http(self)
self._time = current_time()
self.check_timeouts()
async def connection_task(self): # no cov
"""
Run a HTTP connection.
Timeouts and some additional error handling occur here, while most of
everything else happens in class Http or in code called from there.
"""
try:
self._setup_connection()
await self.app.dispatch(
"http.lifecycle.begin",
inline=True,
context={"conn_info": self.conn_info},
)
await self._http.http1()
except CancelledError:
pass
except Exception:
error_logger.exception("protocol.connection_task uncaught")
finally:
if (
self.app.debug
and self._http
and self.transport
and not self._http.upgrade_websocket
):
ip = self.transport.get_extra_info("peername")
error_logger.error(
"Connection lost before response written"
f" @ {ip} {self._http.request}"
)
self._http = None
self._task = None
try:
self.close()
except BaseException:
error_logger.exception("Closing failed")
finally:
await self.app.dispatch(
"http.lifecycle.complete",
inline=True,
context={"conn_info": self.conn_info},
)
# Important to keep this Ellipsis here for the TouchUp module
...
def check_timeouts(self):
"""
Runs itself periodically to enforce any expired timeouts.
"""
try:
if not self._task:
return
duration = current_time() - self._time
stage = self._http.stage
if stage is Stage.IDLE and duration > self.keep_alive_timeout:
logger.debug("KeepAlive Timeout. Closing connection.")
elif stage is Stage.REQUEST and duration > self.request_timeout:
logger.debug("Request Timeout. Closing connection.")
self._http.exception = RequestTimeout("Request Timeout")
elif stage is Stage.HANDLER and self._http.upgrade_websocket:
logger.debug("Handling websocket. Timeouts disabled.")
return
elif (
stage in (Stage.HANDLER, Stage.RESPONSE, Stage.FAILED)
and duration > self.response_timeout
):
logger.debug("Response Timeout. Closing connection.")
self._http.exception = ServiceUnavailable("Response Timeout")
else:
interval = (
min(
self.keep_alive_timeout,
self.request_timeout,
self.response_timeout,
)
/ 2
)
self.loop.call_later(max(0.1, interval), self.check_timeouts)
return
cancel_msg_args = ()
if sys.version_info >= (3, 9):
cancel_msg_args = ("Cancel connection task with a timeout",)
self._task.cancel(*cancel_msg_args)
except Exception:
error_logger.exception("protocol.check_timeouts")
async def send(self, data): # no cov
"""
Writes HTTP data with backpressure control.
"""
await self._can_write.wait()
if self.transport.is_closing():
raise CancelledError
await self.app.dispatch(
"http.lifecycle.send",
inline=True,
context={"data": data},
)
self.transport.write(data)
self._time = current_time()
def close_if_idle(self) -> bool:
"""
Close the connection if a request is not being sent or received
:return: boolean - True if closed, false if staying open
"""
if self._http is None or self._http.stage is Stage.IDLE:
self.close()
return True
return False
# -------------------------------------------- #
# Only asyncio.Protocol callbacks below this
# -------------------------------------------- #
def connection_made(self, transport):
"""
HTTP-protocol-specific new connection handler
"""
try:
# TODO: Benchmark to find suitable write buffer limits
transport.set_write_buffer_limits(low=16384, high=65536)
self.connections.add(self)
self.transport = transport
self._task = self.loop.create_task(self.connection_task())
self.recv_buffer = bytearray()
self.conn_info = ConnInfo(self.transport, unix=self._unix)
except Exception:
error_logger.exception("protocol.connect_made")
def data_received(self, data: bytes):
try:
self._time = current_time()
if not data:
return self.close()
self.recv_buffer += data
if (
len(self.recv_buffer) >= self.app.config.REQUEST_BUFFER_SIZE
and self.transport
):
self.transport.pause_reading()
if self._data_received:
self._data_received.set()
except Exception:
error_logger.exception("protocol.data_received")