From f2418a95e0df21ae53fc8dba96995e18b2efa961 Mon Sep 17 00:00:00 2001 From: Randall Leeds Date: Sun, 20 Mar 2016 17:34:55 -0700 Subject: [PATCH] [thread] close sockets at graceful shutdown The run loop has to change slightly to support graceful shutdown. There is no way to interrupt a call to `futures.wait` so instead the pattern, used by the async workers, is to sleep for only one second at the most. The poll is extended to a one second timeout to match. Since threads are preemptively scheduled, it's possible that the listener is closed when the request is actually handled. For this reason it is necessary to slightly refactor the TConn class to store the listening socket name. The name is checked once at the start of the worker run loop. Ref #922 --- gunicorn/workers/gthread.py | 64 ++++++++++++++++++++----------------- 1 file changed, 35 insertions(+), 29 deletions(-) diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index d66fe4824..7327a3188 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -44,11 +44,11 @@ class TConn(object): - def __init__(self, cfg, listener, sock, addr): + def __init__(self, cfg, sock, client, server): self.cfg = cfg - self.listener = listener self.sock = sock - self.addr = addr + self.client = client + self.server = server self.timeout = None self.parser = None @@ -127,11 +127,11 @@ def enqueue_req(self, conn): fs = self.tpool.submit(self.handle, conn) self._wrap_future(fs, conn) - def accept(self, listener): + def accept(self, server, listener): try: - client, addr = listener.accept() + sock, client = listener.accept() # initialize the connection object - conn = TConn(self.cfg, listener, client, addr) + conn = TConn(self.cfg, sock, client, server) self.nr_conns += 1 # enqueue the job self.enqueue_req(conn) @@ -192,11 +192,13 @@ def is_parent_alive(self): def run(self): # init listeners, add them to the event loop - for s in self.sockets: - s.setblocking(False) - self.poller.register(s, selectors.EVENT_READ, self.accept) - - timeout = self.cfg.timeout or 0.5 + for sock in self.sockets: + sock.setblocking(False) + # a race condition during graceful shutdown may make the listener + # name unavailable in the request handler so capture it once here + server = sock.getsockname() + acceptor = partial(self.accept, server) + self.poller.register(sock, selectors.EVENT_READ, acceptor) while self.alive: # notify the arbiter we are alive @@ -205,33 +207,37 @@ def run(self): # can we accept more connections? if self.nr_conns < self.worker_connections: # wait for an event - events = self.poller.select(0.02) + events = self.poller.select(1.0) for key, mask in events: callback = key.data callback(key.fileobj) + # check (but do not wait) for finished requests + result = futures.wait(self.futures, timeout=0, + return_when=futures.FIRST_COMPLETED) + else: + # wait for a request to finish + result = futures.wait(self.futures, timeout=1.0, + return_when=futures.FIRST_COMPLETED) + + # clean up finished requests + for fut in result.done: + self.futures.remove(fut) + if not self.is_parent_alive(): break # hanle keepalive timeouts self.murder_keepalived() - # if the number of connections is < to the max we can handle at - # the same time there is no need to wait for one - if len(self.futures) < self.cfg.threads: - continue - - result = futures.wait(self.futures, timeout=timeout, - return_when=futures.FIRST_COMPLETED) - - if not result.done: - break - else: - [self.futures.remove(f) for f in result.done] - self.tpool.shutdown(False) self.poller.close() + for s in self.sockets: + s.close() + + futures.wait(self.futures, timeout=self.cfg.graceful_timeout) + def finish_request(self, fs): if fs.cancelled(): fs.conn.close() @@ -285,7 +291,7 @@ def handle(self, conn): conn.sock.close() else: self.log.debug("Error processing SSL request.") - self.handle_error(req, conn.sock, conn.addr, e) + self.handle_error(req, conn.sock, conn.client, e) except EnvironmentError as e: if e.errno not in (errno.EPIPE, errno.ECONNRESET): @@ -296,7 +302,7 @@ def handle(self, conn): else: self.log.debug("Ignoring connection epipe") except Exception as e: - self.handle_error(req, conn.sock, conn.addr, e) + self.handle_error(req, conn.sock, conn.client, e) return (False, conn) @@ -306,8 +312,8 @@ def handle_request(self, req, conn): try: self.cfg.pre_request(self, req) request_start = datetime.now() - resp, environ = wsgi.create(req, conn.sock, conn.addr, - conn.listener.getsockname(), self.cfg) + resp, environ = wsgi.create(req, conn.sock, conn.client, + conn.server, self.cfg) environ["wsgi.multithread"] = True self.nr += 1 if self.alive and self.nr >= self.max_requests: