Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better error handling during force shutdown #2271

Merged
merged 1 commit into from Sep 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions History.md
Expand Up @@ -5,6 +5,7 @@

* Bugfixes
* Prevent connections from entering Reactor after shutdown begins (#2377)
* Better error handling during force shutdown (#2271)

* Refactor
* Change Events#ssl_error signature from (error, peeraddr, peercert) to (error, ssl_socket) (#2375)
Expand Down
20 changes: 14 additions & 6 deletions lib/puma/server.rb
Expand Up @@ -232,7 +232,9 @@ def run(background=true)
if @queue_requests
process_now = client.eagerly_finish
else
client.finish(@first_data_timeout)
@thread_pool.with_force_shutdown do
client.finish(@first_data_timeout)
end
process_now = true
end
rescue MiniSSL::SSLError => e
Expand All @@ -244,7 +246,7 @@ def run(background=true)
client.close

@events.parse_error e, client
rescue ConnectionError, EOFError => e
rescue ConnectionError, EOFError, ThreadPool::ForceShutdown => e
client.close

@events.connection_error e, client
Expand Down Expand Up @@ -422,7 +424,11 @@ def process_client(client, buffer)
check_for_more_data = false
end

unless client.reset(check_for_more_data)
next_request_ready = @thread_pool.with_force_shutdown do
client.reset(check_for_more_data)
end

unless next_request_ready
@shutdown_mutex.synchronize do
return unless @queue_requests
close_socket = false
Expand All @@ -435,7 +441,7 @@ def process_client(client, buffer)
end

# The client disconnected while we were reading data
rescue ConnectionError
rescue ConnectionError, ThreadPool::ForceShutdown
# Swallow them. The ensure tries to close +client+ down

# SSL handshake error
Expand Down Expand Up @@ -638,7 +644,9 @@ def handle_request(req, lines)

begin
begin
status, headers, res_body = @app.call(env)
status, headers, res_body = @thread_pool.with_force_shutdown do
@app.call(env)
end

return :async if req.hijacked

Expand Down Expand Up @@ -936,7 +944,7 @@ def graceful_shutdown

if @thread_pool
if timeout = @options[:force_shutdown_after]
@thread_pool.shutdown timeout.to_i
@thread_pool.shutdown timeout.to_f
else
@thread_pool.shutdown
end
Expand Down
22 changes: 20 additions & 2 deletions lib/puma/thread_pool.rb
Expand Up @@ -62,6 +62,8 @@ def initialize(min, max, *extra, &block)
end

@clean_thread_locals = false
@force_shutdown = false
@shutdown_mutex = Mutex.new
end

attr_reader :spawned, :trim_requested, :waiting
Expand Down Expand Up @@ -322,6 +324,19 @@ def auto_reap!(timeout=5)
@reaper.start!
end

# Allows ThreadPool::ForceShutdown to be raised within the
# provided block if the thread is forced to shutdown during execution.
def with_force_shutdown
t = Thread.current
@shutdown_mutex.synchronize do
raise ForceShutdown if @force_shutdown
t[:with_force_shutdown] = true
end
yield
ensure
t[:with_force_shutdown] = false
end

# Tell all threads in the pool to exit and wait for them to finish.
# Wait +timeout+ seconds then raise +ForceShutdown+ in remaining threads.
# Next, wait an extra +grace+ seconds then force-kill remaining threads.
Expand Down Expand Up @@ -356,8 +371,11 @@ def shutdown(timeout=-1)
join.call(timeout)

# If threads are still running, raise ForceShutdown and wait to finish.
threads.each do |t|
t.raise ForceShutdown
@shutdown_mutex.synchronize do
@force_shutdown = true
threads.each do |t|
t.raise ForceShutdown if t[:with_force_shutdown]
end
end
join.call(SHUTDOWN_GRACE_TIME)

Expand Down
67 changes: 33 additions & 34 deletions test/test_puma_server.rb
Expand Up @@ -949,66 +949,65 @@ def assert_does_not_allow_http_injection(app, opts = {})
end

# Perform a server shutdown while requests are pending (one in app-server response, one still sending client request).
def shutdown_requests(app_delay: 2, request_delay: 1, post: false, response:, **options)
def shutdown_requests(s1_complete: true, s1_response: nil, post: false, s2_response: nil, **options)
@server = Puma::Server.new @app, @events, options
server_run app: ->(_) {
sleep app_delay
mutex = Mutex.new
app_finished = ConditionVariable.new
server_run app: ->(env) {
path = env['REQUEST_PATH']
mutex.synchronize do
app_finished.signal
app_finished.wait(mutex) if path == '/s1'
end
[204, {}, []]
}

s1 = send_http "GET / HTTP/1.1\r\n\r\n"
s1 = nil
s2 = send_http post ?
"POST / HTTP/1.1\r\nHost: test.com\r\nContent-Type: text/plain\r\nContent-Length: 5\r\n\r\nhi!" :
"GET / HTTP/1.1\r\n"
sleep 0.1

"POST /s2 HTTP/1.1\r\nHost: test.com\r\nContent-Type: text/plain\r\nContent-Length: 5\r\n\r\nhi!" :
"GET /s2 HTTP/1.1\r\n"
mutex.synchronize do
s1 = send_http "GET /s1 HTTP/1.1\r\n\r\n"
app_finished.wait(mutex)
app_finished.signal if s1_complete
end
@server.stop
sleep request_delay
Thread.pass until @server.instance_variable_get(:@thread_pool).instance_variable_get(:@shutdown)

s2 << "\r\n"
assert_match(s1_response, s1.gets) if s1_response

assert_match(/204/, s1.gets)
# Send s2 after shutdown begins
s2 << "\r\n" unless IO.select([s2], nil, nil, 0.1)

assert IO.select([s2], nil, nil, app_delay), 'timeout waiting for response'
assert IO.select([s2], nil, nil, 10), 'timeout waiting for response'
s2_result = begin
s2.gets
rescue Errno::ECONNABORTED, Errno::ECONNRESET
# Some platforms raise errors instead of returning a response/EOF when a TCP connection is aborted.
post ? '408' : nil
end

if response
assert_match response, s2_result
if s2_response
assert_match s2_response, s2_result
else
assert_nil s2_result
end
end

# Shutdown should allow pending requests to complete.
# Shutdown should allow pending requests and app-responses to complete.
def test_shutdown_requests
shutdown_requests response: /204/
shutdown_requests response: /204/, queue_requests: false
end

# Requests stuck longer than `first_data_timeout` should have connection closed (408 w/pending POST body).
def test_shutdown_data_timeout
shutdown_requests request_delay: 3, first_data_timeout: 2, response: nil
shutdown_requests request_delay: 3, first_data_timeout: 2, response: nil, queue_requests: false
shutdown_requests request_delay: 3, first_data_timeout: 2, response: /408/, post: true
opts = {s1_response: /204/, s2_response: /204/}
shutdown_requests(**opts)
shutdown_requests(**opts, queue_requests: false)
end

# Requests still pending after `force_shutdown_after` should have connection closed (408 w/pending POST body).
# App-responses still pending should return 503 (uncaught Puma::ThreadPool::ForceShutdown exception).
def test_force_shutdown
shutdown_requests request_delay: 4, response: nil, force_shutdown_after: 3
shutdown_requests request_delay: 4, response: nil, force_shutdown_after: 3, queue_requests: false
shutdown_requests request_delay: 4, response: /408/, force_shutdown_after: 3, post: true
end

# App-responses still pending during `force_shutdown_after` should return 503
# (uncaught Puma::ThreadPool::ForceShutdown exception).
def test_force_shutdown_app
shutdown_requests app_delay: 3, response: /503/, force_shutdown_after: 3
shutdown_requests app_delay: 3, response: /503/, force_shutdown_after: 3, queue_requests: false
opts = {s1_complete: false, s1_response: /503/, s2_response: nil, force_shutdown_after: 0}
shutdown_requests(**opts)
shutdown_requests(**opts, queue_requests: false)
shutdown_requests(**opts, post: true, s2_response: /408/)
end

def test_http11_connection_header_queue
Expand Down
12 changes: 8 additions & 4 deletions test/test_thread_pool.rb
Expand Up @@ -222,8 +222,10 @@ def test_force_shutdown_immediately

pool = mutex_pool(0, 1) do
begin
pool.signal
sleep
pool.with_force_shutdown do
pool.signal
sleep
end
rescue Puma::ThreadPool::ForceShutdown
rescued = true
end
Expand All @@ -248,8 +250,10 @@ def test_shutdown_with_grace
rescued = []
pool = mutex_pool(2, 2) do
begin
pool.signal
sleep
pool.with_force_shutdown do
pool.signal
sleep
end
rescue Puma::ThreadPool::ForceShutdown
rescued << Thread.current
sleep
Expand Down