Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[thread] close sockets at graceful shutdown #1230

Merged
merged 1 commit into from Mar 25, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
64 changes: 35 additions & 29 deletions gunicorn/workers/gthread.py
Expand Up @@ -44,11 +44,11 @@

class TConn(object):

def __init__(self, cfg, listener, sock, addr):
def __init__(self, cfg, sock, client, server):
self.cfg = cfg
self.listener = listener
self.sock = sock
self.addr = addr
self.client = client
self.server = server

self.timeout = None
self.parser = None
Expand Down Expand Up @@ -127,11 +127,11 @@ def enqueue_req(self, conn):
fs = self.tpool.submit(self.handle, conn)
self._wrap_future(fs, conn)

def accept(self, listener):
def accept(self, server, listener):
try:
client, addr = listener.accept()
sock, client = listener.accept()
# initialize the connection object
conn = TConn(self.cfg, listener, client, addr)
conn = TConn(self.cfg, sock, client, server)
self.nr_conns += 1
# enqueue the job
self.enqueue_req(conn)
Expand Down Expand Up @@ -192,11 +192,13 @@ def is_parent_alive(self):

def run(self):
# init listeners, add them to the event loop
for s in self.sockets:
s.setblocking(False)
self.poller.register(s, selectors.EVENT_READ, self.accept)

timeout = self.cfg.timeout or 0.5
for sock in self.sockets:
sock.setblocking(False)
# a race condition during graceful shutdown may make the listener
# name unavailable in the request handler so capture it once here
server = sock.getsockname()
acceptor = partial(self.accept, server)
self.poller.register(sock, selectors.EVENT_READ, acceptor)

while self.alive:
# notify the arbiter we are alive
Expand All @@ -205,33 +207,37 @@ def run(self):
# can we accept more connections?
if self.nr_conns < self.worker_connections:
# wait for an event
events = self.poller.select(0.02)
events = self.poller.select(1.0)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to return faster in the loop there. Why increasing the wait time?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why faster? We sleep for 1.0 in gevent and eventlet. The granularity here only matters for closing keep-alive connections and responding to graceful shutdown signals. The poll returns as soon as a connection is ready for accept or ready to read a keep-alive request, and the actual requests are handled in other threads. This main thread should sleep as long as it can.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason to be faster there is that this selector is done on read to check if the socket enter in a state where we can accept. If we wait too much we can queue more accepts than we want and we fail to balance correctly the connections between threads and workers. So we should be fast there imo.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will return as soon as the first socket is ready. There is no danger
such as you describe.

On Fri, Mar 25, 2016, 03:10 Benoit Chesneau notifications@github.com
wrote:

In gunicorn/workers/gthread.py
#1230 (comment):

@@ -205,33 +207,37 @@ def run(self):
# can we accept more connections?
if self.nr_conns < self.worker_connections:
# wait for an event

  •            events = self.poller.select(0.02)
    
  •            events = self.poller.select(1.0)
    

the reason to be faster there is that this selector is done on read to
check if the socket enter in a state where we can accept. If we wait too
much we can queue more accepts than we want and we fail to balance
correctly the connections between threads and workers. So we should be fast
there imo.


You are receiving this because you were mentioned.
Reply to this email directly or view it on GitHub
https://github.com/benoitc/gunicorn/pull/1230/files/f2418a95e0df21ae53fc8dba96995e18b2efa961#r57432600

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hrm actually probably. go ahead then :)

for key, mask in events:
callback = key.data
callback(key.fileobj)

# check (but do not wait) for finished requests
result = futures.wait(self.futures, timeout=0,
return_when=futures.FIRST_COMPLETED)
else:
# wait for a request to finish
result = futures.wait(self.futures, timeout=1.0,
return_when=futures.FIRST_COMPLETED)

# clean up finished requests
for fut in result.done:
self.futures.remove(fut)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

result.done maybe None there, we should probably do the the following:

if result.done is not None:
    [self.futures.remove(f) for f in result.done]

Thoughts?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs say it returns "a named 2-tuple of sets" 1 and that seems to be how its implemented.


if not self.is_parent_alive():
break

# hanle keepalive timeouts
self.murder_keepalived()

# if the number of connections is < to the max we can handle at
# the same time there is no need to wait for one
if len(self.futures) < self.cfg.threads:
continue

result = futures.wait(self.futures, timeout=timeout,
return_when=futures.FIRST_COMPLETED)

if not result.done:
break
else:
[self.futures.remove(f) for f in result.done]

self.tpool.shutdown(False)
self.poller.close()

for s in self.sockets:
s.close()

futures.wait(self.futures, timeout=self.cfg.graceful_timeout)

def finish_request(self, fs):
if fs.cancelled():
fs.conn.close()
Expand Down Expand Up @@ -285,7 +291,7 @@ def handle(self, conn):
conn.sock.close()
else:
self.log.debug("Error processing SSL request.")
self.handle_error(req, conn.sock, conn.addr, e)
self.handle_error(req, conn.sock, conn.client, e)

except EnvironmentError as e:
if e.errno not in (errno.EPIPE, errno.ECONNRESET):
Expand All @@ -296,7 +302,7 @@ def handle(self, conn):
else:
self.log.debug("Ignoring connection epipe")
except Exception as e:
self.handle_error(req, conn.sock, conn.addr, e)
self.handle_error(req, conn.sock, conn.client, e)

return (False, conn)

Expand All @@ -306,8 +312,8 @@ def handle_request(self, req, conn):
try:
self.cfg.pre_request(self, req)
request_start = datetime.now()
resp, environ = wsgi.create(req, conn.sock, conn.addr,
conn.listener.getsockname(), self.cfg)
resp, environ = wsgi.create(req, conn.sock, conn.client,
conn.server, self.cfg)
environ["wsgi.multithread"] = True
self.nr += 1
if self.alive and self.nr >= self.max_requests:
Expand Down