Skip to content

Commit

Permalink
[thread] close sockets at graceful shutdown
Browse files Browse the repository at this point in the history
The run loop has to change slightly to support graceful shutdown.
There is no way to interrupt a call to `futures.wait` so instead
the pattern, used by the async workers, is to sleep for only one
second at the most. The poll is extended to a one second timeout
to match.

Since threads are preemptively scheduled, it's possible that the
listener is closed when the request is actually handled. For this
reason it is necessary to slightly refactor the TConn class to store
the listening socket name. The name is checked once at the start of
the worker run loop.

Ref #922
  • Loading branch information
tilgovi committed Mar 21, 2016
1 parent 5b32dde commit f2418a9
Showing 1 changed file with 35 additions and 29 deletions.
64 changes: 35 additions & 29 deletions gunicorn/workers/gthread.py
Expand Up @@ -44,11 +44,11 @@

class TConn(object):

def __init__(self, cfg, listener, sock, addr):
def __init__(self, cfg, sock, client, server):
self.cfg = cfg
self.listener = listener
self.sock = sock
self.addr = addr
self.client = client
self.server = server

self.timeout = None
self.parser = None
Expand Down Expand Up @@ -127,11 +127,11 @@ def enqueue_req(self, conn):
fs = self.tpool.submit(self.handle, conn)
self._wrap_future(fs, conn)

def accept(self, listener):
def accept(self, server, listener):
try:
client, addr = listener.accept()
sock, client = listener.accept()
# initialize the connection object
conn = TConn(self.cfg, listener, client, addr)
conn = TConn(self.cfg, sock, client, server)
self.nr_conns += 1
# enqueue the job
self.enqueue_req(conn)
Expand Down Expand Up @@ -192,11 +192,13 @@ def is_parent_alive(self):

def run(self):
# init listeners, add them to the event loop
for s in self.sockets:
s.setblocking(False)
self.poller.register(s, selectors.EVENT_READ, self.accept)

timeout = self.cfg.timeout or 0.5
for sock in self.sockets:
sock.setblocking(False)
# a race condition during graceful shutdown may make the listener
# name unavailable in the request handler so capture it once here
server = sock.getsockname()
acceptor = partial(self.accept, server)
self.poller.register(sock, selectors.EVENT_READ, acceptor)

while self.alive:
# notify the arbiter we are alive
Expand All @@ -205,33 +207,37 @@ def run(self):
# can we accept more connections?
if self.nr_conns < self.worker_connections:
# wait for an event
events = self.poller.select(0.02)
events = self.poller.select(1.0)
for key, mask in events:
callback = key.data
callback(key.fileobj)

# check (but do not wait) for finished requests
result = futures.wait(self.futures, timeout=0,
return_when=futures.FIRST_COMPLETED)
else:
# wait for a request to finish
result = futures.wait(self.futures, timeout=1.0,
return_when=futures.FIRST_COMPLETED)

# clean up finished requests
for fut in result.done:
self.futures.remove(fut)

if not self.is_parent_alive():
break

# hanle keepalive timeouts
self.murder_keepalived()

# if the number of connections is < to the max we can handle at
# the same time there is no need to wait for one
if len(self.futures) < self.cfg.threads:
continue

result = futures.wait(self.futures, timeout=timeout,
return_when=futures.FIRST_COMPLETED)

if not result.done:
break
else:
[self.futures.remove(f) for f in result.done]

self.tpool.shutdown(False)
self.poller.close()

for s in self.sockets:
s.close()

futures.wait(self.futures, timeout=self.cfg.graceful_timeout)

def finish_request(self, fs):
if fs.cancelled():
fs.conn.close()
Expand Down Expand Up @@ -285,7 +291,7 @@ def handle(self, conn):
conn.sock.close()
else:
self.log.debug("Error processing SSL request.")
self.handle_error(req, conn.sock, conn.addr, e)
self.handle_error(req, conn.sock, conn.client, e)

except EnvironmentError as e:
if e.errno not in (errno.EPIPE, errno.ECONNRESET):
Expand All @@ -296,7 +302,7 @@ def handle(self, conn):
else:
self.log.debug("Ignoring connection epipe")
except Exception as e:
self.handle_error(req, conn.sock, conn.addr, e)
self.handle_error(req, conn.sock, conn.client, e)

return (False, conn)

Expand All @@ -306,8 +312,8 @@ def handle_request(self, req, conn):
try:
self.cfg.pre_request(self, req)
request_start = datetime.now()
resp, environ = wsgi.create(req, conn.sock, conn.addr,
conn.listener.getsockname(), self.cfg)
resp, environ = wsgi.create(req, conn.sock, conn.client,
conn.server, self.cfg)
environ["wsgi.multithread"] = True
self.nr += 1
if self.alive and self.nr >= self.max_requests:
Expand Down

0 comments on commit f2418a9

Please sign in to comment.