Skip to content

Commit

Permalink
Better error handling during force shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
wjordan committed May 17, 2020
1 parent 91e57f4 commit a290f14
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 75 deletions.
1 change: 1 addition & 0 deletions History.md
Expand Up @@ -45,6 +45,7 @@
* Fix `UserFileDefaultOptions#fetch` to properly use `default` (#2233)
* Improvements to `out_of_band` hook (#2234)
* Prefer the rackup file specified by the CLI (#2225)
* Better error handling during force shutdown (#2271)

* Refactor
* Remove unused loader argument from Plugin initializer (#2095)
Expand Down
16 changes: 7 additions & 9 deletions lib/puma/client.rb
Expand Up @@ -244,19 +244,17 @@ def eagerly_finish
end # IS_JRUBY

def finish(timeout)
return true if @ready
return if @ready
until try_to_finish
can_read = begin
IO.select([@to_io], nil, nil, timeout)
rescue ThreadPool::ForceShutdown
nil
end
unless can_read
write_error(408) if in_data_phase
unless IO.select([@to_io], nil, nil, timeout)
timeout_error
raise ConnectionError
end
end
true
end

def timeout_error
write_error(408) if in_data_phase
end

def write_error(status_code)
Expand Down
75 changes: 49 additions & 26 deletions lib/puma/server.rb
Expand Up @@ -88,6 +88,7 @@ def initialize(app, events=Events.stdio, options={})
@precheck_closing = true

@requests_count = 0
@queue_mutex = Mutex.new
end

attr_accessor :binder, :leak_stack_on_error, :early_hints
Expand Down Expand Up @@ -194,12 +195,18 @@ def run(background=true)
process_now = false

begin
if @queue_requests
process_now = client.eagerly_finish
else
client.finish(@first_data_timeout)
process_now = true
unless @queue_requests &&
!client.eagerly_finish &&
queue_client(client, @first_data_timeout)

@thread_pool.with_force_shutdown do
client.finish(@first_data_timeout)
end
process_now = process_client(client, buffer)
end
rescue ThreadPool::ForceShutdown
client.timeout_error
client.close
rescue MiniSSL::SSLError => e
ssl_socket = client.io
addr = ssl_socket.peeraddr.last
Expand All @@ -215,13 +222,6 @@ def run(background=true)
@events.parse_error self, client.env, e
rescue ConnectionError, EOFError
client.close
else
if process_now
process_client client, buffer
else
client.set_timeout @first_data_timeout
@reactor.add client
end
end

process_now
Expand Down Expand Up @@ -262,7 +262,6 @@ def handle_servers
check = @check
sockets = [check] + @binder.ios
pool = @thread_pool
queue_requests = @queue_requests

remote_addr_value = nil
remote_addr_header = nil
Expand Down Expand Up @@ -306,7 +305,7 @@ def handle_servers

@events.fire :state, @status

if queue_requests
if_queue_requests do
@queue_requests = false
@reactor.clear!
@reactor.shutdown
Expand Down Expand Up @@ -344,12 +343,33 @@ def handle_check
return false
end

# Runs the provided block if the Reactor queue is enabled.
# Returns block return value if queue is enabled, or nil if queue is not enabled.
def if_queue_requests
@queue_mutex.synchronize do
if @queue_requests
yield
end
end
end
private :if_queue_requests

# Queues the provided client
def queue_client(client, timeout)
if_queue_requests do
client.set_timeout timeout
@reactor.add client
end
end
private :queue_client

# Given a connection on +client+, handle the incoming requests.
#
# This method support HTTP Keep-Alive so it may, depending on if the client
# indicates that it supports keep alive, wait for another request before
# returning.
#
# Returns true if one or more complete requests were successfully handled.
def process_client(client, buffer)
begin

Expand All @@ -359,7 +379,7 @@ def process_client(client, buffer)
requests = 0

while true
case handle_request(client, buffer)
case handle_request(client, buffer).tap {requests += 1}
when false
return
when :async
Expand All @@ -370,8 +390,6 @@ def process_client(client, buffer)

ThreadPool.clean_thread_locals if clean_thread_locals

requests += 1

check_for_more_data = @status == :run

if requests >= MAX_FAST_INLINE
Expand All @@ -382,18 +400,19 @@ def process_client(client, buffer)
check_for_more_data = false
end

unless client.reset(check_for_more_data)
return unless @queue_requests
close_socket = false
client.set_timeout @persistent_timeout
@reactor.add client
next_request_ready = @thread_pool.with_force_shutdown do
client.reset(check_for_more_data)
end

unless next_request_ready
close_socket = false if queue_client(client, @persistent_timeout)
return
end
end
end

# The client disconnected while we were reading data
rescue ConnectionError
# The client disconnected or threadpool shut down while we were reading data
rescue ConnectionError, ThreadPool::ForceShutdown
# Swallow them. The ensure tries to close +client+ down

# SSL handshake error
Expand Down Expand Up @@ -435,6 +454,8 @@ def process_client(client, buffer)
rescue StandardError => e
@events.unknown_error self, e, "Client"
end

return requests.to_i > 0
end
end

Expand Down Expand Up @@ -571,7 +592,9 @@ def handle_request(req, lines)

begin
begin
status, headers, res_body = @app.call(env)
status, headers, res_body = @thread_pool.with_force_shutdown do
@app.call(env)
end

return :async if req.hijacked

Expand Down Expand Up @@ -869,7 +892,7 @@ def graceful_shutdown

if @thread_pool
if timeout = @options[:force_shutdown_after]
@thread_pool.shutdown timeout.to_i
@thread_pool.shutdown timeout.to_f
else
@thread_pool.shutdown
end
Expand Down
23 changes: 21 additions & 2 deletions lib/puma/thread_pool.rb
Expand Up @@ -62,6 +62,9 @@ def initialize(min, max, *extra, &block)
end

@clean_thread_locals = false

@force_shutdown = false
@shutdown_mutex = Mutex.new
end

attr_reader :spawned, :trim_requested, :waiting
Expand Down Expand Up @@ -315,6 +318,19 @@ def auto_reap!(timeout=5)
@reaper.start!
end

# Allows ThreadPool::ForceShutdown to be raised within the
# provided block if the thread is forced to shutdown during execution.
def with_force_shutdown
t = Thread.current
@shutdown_mutex.synchronize do
raise ForceShutdown if @force_shutdown
t[:with_force_shutdown] = true
end
yield
ensure
t[:with_force_shutdown] = false
end

# Tell all threads in the pool to exit and wait for them to finish.
# Wait +timeout+ seconds then raise +ForceShutdown+ in remaining threads.
# Next, wait an extra +grace+ seconds then force-kill remaining threads.
Expand Down Expand Up @@ -349,8 +365,11 @@ def shutdown(timeout=-1)
join.call(timeout)

# If threads are still running, raise ForceShutdown and wait to finish.
threads.each do |t|
t.raise ForceShutdown
@shutdown_mutex.synchronize do
@force_shutdown = true
threads.each do |t|
t.raise ForceShutdown if t[:with_force_shutdown]
end
end
join.call(SHUTDOWN_GRACE_TIME)

Expand Down
69 changes: 35 additions & 34 deletions test/test_puma_server.rb
Expand Up @@ -881,66 +881,67 @@ def assert_does_not_allow_http_injection(app, opts = {})
end

# Perform a server shutdown while requests are pending (one in app-server response, one still sending client request).
def shutdown_requests(app_delay: 2, request_delay: 1, post: false, response:, **options)
def shutdown_requests(s1_complete: true, s1_response: nil, post: false, s2_response: nil, **options)
@server = Puma::Server.new @app, @events, options
server_run app: ->(_) {
sleep app_delay
mutex = Mutex.new
app_finished = ConditionVariable.new
server_run app: ->(env) {
path = env['REQUEST_PATH']
mutex.synchronize do
app_finished.signal
app_finished.wait(mutex) if path == '/s1'
end
[204, {}, []]
}

s1 = send_http "GET / HTTP/1.1\r\n\r\n"
s1 = nil
s2 = send_http post ?
"POST / HTTP/1.1\r\nHost: test.com\r\nContent-Type: text/plain\r\nContent-Length: 5\r\n\r\nhi!" :
"GET / HTTP/1.1\r\n"
sleep 0.1

"POST /s2 HTTP/1.1\r\nHost: test.com\r\nContent-Type: text/plain\r\nContent-Length: 5\r\n\r\nhi!" :
"GET /s2 HTTP/1.1\r\n"
mutex.synchronize do
s1 = send_http "GET /s1 HTTP/1.1\r\n\r\n"
app_finished.wait(mutex)
app_finished.signal if s1_complete
end
@server.stop
sleep request_delay
Thread.pass until @server.instance_variable_get(:@thread_pool).instance_variable_get(:@shutdown)

s2 << "\r\n"
assert_match(s1_response, s1.gets) if s1_response

assert_match(/204/, s1.gets)
# Send s2 after shutdown begins
s2 << "\r\n" unless IO.select([s2], nil, nil, 0.1)

assert IO.select([s2], nil, nil, app_delay), 'timeout waiting for response'
assert IO.select([s2], nil, nil, 10), 'timeout waiting for response'
s2_result = begin
s2.gets
rescue Errno::ECONNABORTED, Errno::ECONNRESET
# Some platforms raise errors instead of returning a response/EOF when a TCP connection is aborted.
post ? '408' : nil
end

if response
assert_match response, s2_result
if s2_response
assert_match s2_response, s2_result
else
assert_nil s2_result
end
ensure
@server.stop(true)
end

# Shutdown should allow pending requests to complete.
# Shutdown should allow pending requests and app-responses to complete.
def test_shutdown_requests
shutdown_requests response: /204/
shutdown_requests response: /204/, queue_requests: false
end

# Requests stuck longer than `first_data_timeout` should have connection closed (408 w/pending POST body).
def test_shutdown_data_timeout
shutdown_requests request_delay: 3, first_data_timeout: 2, response: nil
shutdown_requests request_delay: 3, first_data_timeout: 2, response: nil, queue_requests: false
shutdown_requests request_delay: 3, first_data_timeout: 2, response: /408/, post: true
opts = {s1_response: /204/, s2_response: /204/}
shutdown_requests **opts
shutdown_requests **opts, queue_requests: false
end

# Requests still pending after `force_shutdown_after` should have connection closed (408 w/pending POST body).
# App-responses still pending should return 503 (uncaught Puma::ThreadPool::ForceShutdown exception).
def test_force_shutdown
shutdown_requests request_delay: 4, response: nil, force_shutdown_after: 3
shutdown_requests request_delay: 4, response: nil, force_shutdown_after: 3, queue_requests: false
shutdown_requests request_delay: 4, response: /408/, force_shutdown_after: 3, post: true
end

# App-responses still pending during `force_shutdown_after` should return 503
# (uncaught Puma::ThreadPool::ForceShutdown exception).
def test_force_shutdown_app
shutdown_requests app_delay: 3, response: /503/, force_shutdown_after: 3
shutdown_requests app_delay: 3, response: /503/, force_shutdown_after: 3, queue_requests: false
opts = {s1_complete: false, s1_response: /503/, s2_response: nil, force_shutdown_after: 0}
shutdown_requests **opts
shutdown_requests **opts, queue_requests: false
shutdown_requests **opts, post: true, s2_response: /408/
end

def test_http11_connection_header_queue
Expand Down
12 changes: 8 additions & 4 deletions test/test_thread_pool.rb
Expand Up @@ -222,8 +222,10 @@ def test_force_shutdown_immediately

pool = mutex_pool(0, 1) do
begin
pool.signal
sleep
pool.with_force_shutdown do
pool.signal
sleep
end
rescue Puma::ThreadPool::ForceShutdown
rescued = true
end
Expand All @@ -248,8 +250,10 @@ def test_shutdown_with_grace
rescued = []
pool = mutex_pool(2, 2) do
begin
pool.signal
sleep
pool.with_force_shutdown do
pool.signal
sleep
end
rescue Puma::ThreadPool::ForceShutdown
rescued << Thread.current
sleep
Expand Down

0 comments on commit a290f14

Please sign in to comment.