Skip to content

Commit

Permalink
Pass queued requests to thread pool on server shutdown (#2122)
Browse files Browse the repository at this point in the history
* Pass queued requests to thread pool on server shutdown

* handle ThreadPool::ForceShutdown when waiting for client request

* Add extra shutdown-request test cases
Cover various scenario combinations related to timeout settings,
queue_requests configuration, and post/get request type.

* Rename #idle? to #can_close?
Update method documentation based on feedback.
  • Loading branch information
wjordan committed Apr 10, 2020
1 parent 7080f3e commit 19b2a21
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 7 deletions.
1 change: 1 addition & 0 deletions History.md
Expand Up @@ -28,6 +28,7 @@
* Rescue and log exceptions in hooks defined by users (on_worker_boot, after_worker_fork etc) (#1551)
* Read directly from the socket in #read_and_drop to avoid raising further SSL errors (#2198)
* Set `Connection: closed` header when queue requests is disabled (#2216)
* Pass queued requests to thread pool on server shutdown (#2122)

* Refactor
* Remove unused loader argument from Plugin initializer (#2095)
Expand Down
19 changes: 18 additions & 1 deletion lib/puma/client.rb
Expand Up @@ -246,7 +246,12 @@ def eagerly_finish
def finish(timeout)
return true if @ready
until try_to_finish
unless IO.select([@to_io], nil, nil, timeout)
can_read = begin
IO.select([@to_io], nil, nil, timeout)
rescue ThreadPool::ForceShutdown
nil
end
unless can_read
write_error(408) if in_data_phase
raise ConnectionError
end
Expand All @@ -273,6 +278,18 @@ def peerip
@peerip ||= @io.peeraddr.last
end

# Returns true if the persistent connection can be closed immediately
# without waiting for the configured idle/shutdown timeout.
def can_close?
# Allow connection to close if it's received at least one full request
# and hasn't received any data for a future request.
#
# From RFC 2616 section 8.1.4:
# Servers SHOULD always respond to at least one request per connection,
# if at all possible.
@requests_served > 0 && @parsed_bytes == 0
end

private

def setup_body
Expand Down
7 changes: 6 additions & 1 deletion lib/puma/reactor.rb
Expand Up @@ -189,7 +189,12 @@ def run_internal
if submon.value == @ready
false
else
submon.value.close
if submon.value.can_close?
submon.value.close
else
# Pass remaining open client connections to the thread pool.
@app_pool << submon.value
end
begin
selector.deregister submon.value
rescue IOError
Expand Down
10 changes: 5 additions & 5 deletions lib/puma/server.rb
Expand Up @@ -185,8 +185,6 @@ def run(background=true)

@status = :run

queue_requests = @queue_requests

@thread_pool = ThreadPool.new(@min_threads,
@max_threads,
::Puma::IOBuffer) do |client, buffer|
Expand All @@ -197,7 +195,7 @@ def run(background=true)
process_now = false

begin
if queue_requests
if @queue_requests
process_now = client.eagerly_finish
else
client.finish(@first_data_timeout)
Expand Down Expand Up @@ -230,7 +228,7 @@ def run(background=true)

@thread_pool.clean_thread_locals = @options[:clean_thread_locals]

if queue_requests
if @queue_requests
@reactor = Reactor.new self, @thread_pool
@reactor.run_in_thread
end
Expand Down Expand Up @@ -314,11 +312,12 @@ def handle_servers

@events.fire :state, @status

graceful_shutdown if @status == :stop || @status == :restart
if queue_requests
@queue_requests = false
@reactor.clear!
@reactor.shutdown
end
graceful_shutdown if @status == :stop || @status == :restart
rescue Exception => e
STDERR.puts "Exception handling servers: #{e.message} (#{e.class})"
STDERR.puts e.backtrace
Expand Down Expand Up @@ -388,6 +387,7 @@ def process_client(client, buffer)
end

unless client.reset(check_for_more_data)
return unless @queue_requests
close_socket = false
client.set_timeout @persistent_timeout
@reactor.add client
Expand Down
63 changes: 63 additions & 0 deletions test/test_puma_server.rb
Expand Up @@ -883,6 +883,69 @@ def assert_does_not_allow_http_injection(app, opts = {})
end
end

# Perform a server shutdown while requests are pending (one in app-server response, one still sending client request).
def shutdown_requests(app_delay: 2, request_delay: 1, post: false, response:, **options)
@server = Puma::Server.new @app, @events, options
server_run app: ->(_) {
sleep app_delay
[204, {}, []]
}

s1 = send_http "GET / HTTP/1.1\r\n\r\n"
s2 = send_http post ?
"POST / HTTP/1.1\r\nHost: test.com\r\nContent-Type: text/plain\r\nContent-Length: 5\r\n\r\nhi!" :
"GET / HTTP/1.1\r\n"
sleep 0.1

@server.stop
sleep request_delay

s2 << "\r\n"

assert_match /204/, s1.gets

assert IO.select([s2], nil, nil, app_delay), 'timeout waiting for response'
s2_result = begin
s2.gets
rescue Errno::ECONNABORTED, Errno::ECONNRESET
# Some platforms raise errors instead of returning a response/EOF when a TCP connection is aborted.
post ? '408' : nil
end

if response
assert_match response, s2_result
else
assert_nil s2_result
end
end

# Shutdown should allow pending requests to complete.
def test_shutdown_requests
shutdown_requests response: /204/
shutdown_requests response: /204/, queue_requests: false
end

# Requests stuck longer than `first_data_timeout` should have connection closed (408 w/pending POST body).
def test_shutdown_data_timeout
shutdown_requests request_delay: 3, first_data_timeout: 2, response: nil
shutdown_requests request_delay: 3, first_data_timeout: 2, response: nil, queue_requests: false
shutdown_requests request_delay: 3, first_data_timeout: 2, response: /408/, post: true
end

# Requests still pending after `force_shutdown_after` should have connection closed (408 w/pending POST body).
def test_force_shutdown
shutdown_requests request_delay: 4, response: nil, force_shutdown_after: 1
shutdown_requests request_delay: 4, response: nil, force_shutdown_after: 1, queue_requests: false
shutdown_requests request_delay: 4, response: /408/, force_shutdown_after: 1, post: true
end

# App-responses still pending during `force_shutdown_after` should return 503
# (uncaught Puma::ThreadPool::ForceShutdown exception).
def test_force_shutdown_app
shutdown_requests app_delay: 3, response: /503/, force_shutdown_after: 1
shutdown_requests app_delay: 3, response: /503/, force_shutdown_after: 1, queue_requests: false
end

def test_http11_connection_header_queue
server_run app: ->(_) { [200, {}, [""]] }

Expand Down

0 comments on commit 19b2a21

Please sign in to comment.