forked from benoitc/gunicorn
/
async.py
146 lines (134 loc) · 5.45 KB
/
async.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
from datetime import datetime
import errno
import socket
import ssl
import sys
import gunicorn.http as http
import gunicorn.http.wsgi as wsgi
import gunicorn.util as util
import gunicorn.workers.base as base
from gunicorn import six
ALREADY_HANDLED = object()
class AsyncWorker(base.Worker):
def __init__(self, *args, **kwargs):
super(AsyncWorker, self).__init__(*args, **kwargs)
self.worker_connections = self.cfg.worker_connections
def timeout_ctx(self):
raise NotImplementedError()
def is_already_handled(self, respiter):
# some workers will need to overload this function to raise a StopIteration
return respiter == ALREADY_HANDLED
def handle(self, listener, client, addr):
req = None
try:
parser = http.RequestParser(self.cfg, client)
try:
listener_name = listener.getsockname()
if not self.cfg.keepalive:
req = six.next(parser)
self.handle_request(listener_name, req, client, addr)
else:
# keepalive loop
proxy_protocol_info = {}
while True:
req = None
with self.timeout_ctx():
req = six.next(parser)
if not req:
break
if req.proxy_protocol_info:
proxy_protocol_info = req.proxy_protocol_info
else:
req.proxy_protocol_info = proxy_protocol_info
self.handle_request(listener_name, req, client, addr)
except http.errors.NoMoreData as e:
self.log.debug("Ignored premature client disconnection", exc_info=e)
except StopIteration as e:
self.log.debug("Closing connection", exc_info=e)
except ssl.SSLError:
# pass to next try-except level
six.reraise(*sys.exc_info())
except EnvironmentError:
# pass to next try-except level
six.reraise(*sys.exc_info())
except Exception as e:
self.handle_error(req, client, addr, e)
except ssl.SSLError as e:
if e.args[0] == ssl.SSL_ERROR_EOF:
self.log.debug("ssl connection closed")
client.close()
else:
self.handle_error(req, client, addr, e)
except EnvironmentError as e:
if e.errno not in (errno.EPIPE, errno.ECONNRESET):
self.log.exception("Socket error processing request", exc_info=e)
else:
if e.errno == errno.ECONNRESET:
self.log.debug("Ignoring connection reset")
else:
self.log.debug("Ignoring EPIPE")
except Exception as e:
self.handle_error(req, client, addr, e)
finally:
util.close(client)
def handle_request(self, listener_name, req, sock, addr):
request_start = datetime.now()
environ = {}
resp = None
try:
self.cfg.pre_request(self, req)
resp, environ = wsgi.create(req, sock, addr,
listener_name, self.cfg)
environ["wsgi.multithread"] = True
self.nr += 1
if self.alive and self.nr >= self.max_requests:
self.log.info("Autorestarting worker after current request.")
resp.force_close()
self.alive = False
if not self.cfg.keepalive:
resp.force_close()
respiter = self.wsgi(environ, resp.start_response)
if self.is_already_handled(respiter):
return False
try:
if isinstance(respiter, environ['wsgi.file_wrapper']):
resp.write_file(respiter)
else:
for item in respiter:
resp.write(item)
resp.close()
request_time = datetime.now() - request_start
self.log.access(resp, req, environ, request_time)
finally:
if hasattr(respiter, "close"):
respiter.close()
if resp.should_close():
raise StopIteration()
except StopIteration:
raise
except EnvironmentError:
# If the original exception was a socket.error we delegate
# handling it to the caller (where handle() might ignore it)
six.reraise(*sys.exc_info())
except Exception as error:
if resp and resp.headers_sent:
# If the requests have already been sent, we should close the
# connection to indicate the error.
self.log.exception("Error handling request", exc_info=error)
try:
sock.shutdown(socket.SHUT_RDWR)
sock.close()
except EnvironmentError:
pass
raise StopIteration()
raise
finally:
try:
self.cfg.post_request(self, req, environ, resp)
except Exception as error:
self.log.exception("Exception in post_request hook", exc_info=error)
return True