diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index d66fe4824..7327a3188 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -44,11 +44,11 @@ class TConn(object): - def __init__(self, cfg, listener, sock, addr): + def __init__(self, cfg, sock, client, server): self.cfg = cfg - self.listener = listener self.sock = sock - self.addr = addr + self.client = client + self.server = server self.timeout = None self.parser = None @@ -127,11 +127,11 @@ def enqueue_req(self, conn): fs = self.tpool.submit(self.handle, conn) self._wrap_future(fs, conn) - def accept(self, listener): + def accept(self, server, listener): try: - client, addr = listener.accept() + sock, client = listener.accept() # initialize the connection object - conn = TConn(self.cfg, listener, client, addr) + conn = TConn(self.cfg, sock, client, server) self.nr_conns += 1 # enqueue the job self.enqueue_req(conn) @@ -192,11 +192,13 @@ def is_parent_alive(self): def run(self): # init listeners, add them to the event loop - for s in self.sockets: - s.setblocking(False) - self.poller.register(s, selectors.EVENT_READ, self.accept) - - timeout = self.cfg.timeout or 0.5 + for sock in self.sockets: + sock.setblocking(False) + # a race condition during graceful shutdown may make the listener + # name unavailable in the request handler so capture it once here + server = sock.getsockname() + acceptor = partial(self.accept, server) + self.poller.register(sock, selectors.EVENT_READ, acceptor) while self.alive: # notify the arbiter we are alive @@ -205,33 +207,37 @@ def run(self): # can we accept more connections? if self.nr_conns < self.worker_connections: # wait for an event - events = self.poller.select(0.02) + events = self.poller.select(1.0) for key, mask in events: callback = key.data callback(key.fileobj) + # check (but do not wait) for finished requests + result = futures.wait(self.futures, timeout=0, + return_when=futures.FIRST_COMPLETED) + else: + # wait for a request to finish + result = futures.wait(self.futures, timeout=1.0, + return_when=futures.FIRST_COMPLETED) + + # clean up finished requests + for fut in result.done: + self.futures.remove(fut) + if not self.is_parent_alive(): break # hanle keepalive timeouts self.murder_keepalived() - # if the number of connections is < to the max we can handle at - # the same time there is no need to wait for one - if len(self.futures) < self.cfg.threads: - continue - - result = futures.wait(self.futures, timeout=timeout, - return_when=futures.FIRST_COMPLETED) - - if not result.done: - break - else: - [self.futures.remove(f) for f in result.done] - self.tpool.shutdown(False) self.poller.close() + for s in self.sockets: + s.close() + + futures.wait(self.futures, timeout=self.cfg.graceful_timeout) + def finish_request(self, fs): if fs.cancelled(): fs.conn.close() @@ -285,7 +291,7 @@ def handle(self, conn): conn.sock.close() else: self.log.debug("Error processing SSL request.") - self.handle_error(req, conn.sock, conn.addr, e) + self.handle_error(req, conn.sock, conn.client, e) except EnvironmentError as e: if e.errno not in (errno.EPIPE, errno.ECONNRESET): @@ -296,7 +302,7 @@ def handle(self, conn): else: self.log.debug("Ignoring connection epipe") except Exception as e: - self.handle_error(req, conn.sock, conn.addr, e) + self.handle_error(req, conn.sock, conn.client, e) return (False, conn) @@ -306,8 +312,8 @@ def handle_request(self, req, conn): try: self.cfg.pre_request(self, req) request_start = datetime.now() - resp, environ = wsgi.create(req, conn.sock, conn.addr, - conn.listener.getsockname(), self.cfg) + resp, environ = wsgi.create(req, conn.sock, conn.client, + conn.server, self.cfg) environ["wsgi.multithread"] = True self.nr += 1 if self.alive and self.nr >= self.max_requests: