Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass queued requests to thread pool on server shutdown #2122

Merged
merged 4 commits into from Apr 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions History.md
Expand Up @@ -28,6 +28,7 @@
* Rescue and log exceptions in hooks defined by users (on_worker_boot, after_worker_fork etc) (#1551)
* Read directly from the socket in #read_and_drop to avoid raising further SSL errors (#2198)
* Set `Connection: closed` header when queue requests is disabled (#2216)
* Pass queued requests to thread pool on server shutdown (#2122)

* Refactor
* Remove unused loader argument from Plugin initializer (#2095)
Expand Down
19 changes: 18 additions & 1 deletion lib/puma/client.rb
Expand Up @@ -246,7 +246,12 @@ def eagerly_finish
def finish(timeout)
return true if @ready
until try_to_finish
unless IO.select([@to_io], nil, nil, timeout)
can_read = begin
IO.select([@to_io], nil, nil, timeout)
rescue ThreadPool::ForceShutdown
nil
end
unless can_read
write_error(408) if in_data_phase
raise ConnectionError
end
Expand All @@ -273,6 +278,18 @@ def peerip
@peerip ||= @io.peeraddr.last
end

# Returns true if the persistent connection can be closed immediately
# without waiting for the configured idle/shutdown timeout.
def can_close?
# Allow connection to close if it's received at least one full request
# and hasn't received any data for a future request.
#
# From RFC 2616 section 8.1.4:
# Servers SHOULD always respond to at least one request per connection,
# if at all possible.
@requests_served > 0 && @parsed_bytes == 0
end

private

def setup_body
Expand Down
7 changes: 6 additions & 1 deletion lib/puma/reactor.rb
Expand Up @@ -189,7 +189,12 @@ def run_internal
if submon.value == @ready
false
else
submon.value.close
if submon.value.can_close?
submon.value.close
else
# Pass remaining open client connections to the thread pool.
@app_pool << submon.value
end
begin
selector.deregister submon.value
rescue IOError
Expand Down
10 changes: 5 additions & 5 deletions lib/puma/server.rb
Expand Up @@ -185,8 +185,6 @@ def run(background=true)

@status = :run

queue_requests = @queue_requests
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This removes a change from cc4ad17, since we need to change behavior based on the updated value of the @queue_requests variable after the reactor is shutdown. I assume this was a performance optimization, but I don't know if it actually makes any measurable difference.


@thread_pool = ThreadPool.new(@min_threads,
@max_threads,
::Puma::IOBuffer) do |client, buffer|
Expand All @@ -197,7 +195,7 @@ def run(background=true)
process_now = false

begin
if queue_requests
if @queue_requests
process_now = client.eagerly_finish
else
client.finish(@first_data_timeout)
Expand Down Expand Up @@ -230,7 +228,7 @@ def run(background=true)

@thread_pool.clean_thread_locals = @options[:clean_thread_locals]

if queue_requests
if @queue_requests
@reactor = Reactor.new self, @thread_pool
@reactor.run_in_thread
end
Expand Down Expand Up @@ -314,11 +312,12 @@ def handle_servers

@events.fire :state, @status

graceful_shutdown if @status == :stop || @status == :restart
if queue_requests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume this needs to be @queue_requests given your change. But I'd also rather you not change this to an ivar read and leave it as a local var.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There were actually two separate queue_requests local vars- one within Server#run, and one within Server#handle_servers.

The local var in #run had to be removed in order for this PR to work at all- see my comment at #2122 (review). However, this one in #handle_servers actually doesn't need to be changed as well, so I left it as-is to minimize the diff.

That said, since this local var in #handle_servers is only ever read on server shutdown and probably has no performance impact, I would lean towards removing it for consistency/simplicity if that's your preference.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This explanation works for me.

@queue_requests = false
@reactor.clear!
@reactor.shutdown
end
graceful_shutdown if @status == :stop || @status == :restart
rescue Exception => e
STDERR.puts "Exception handling servers: #{e.message} (#{e.class})"
STDERR.puts e.backtrace
Expand Down Expand Up @@ -388,6 +387,7 @@ def process_client(client, buffer)
end

unless client.reset(check_for_more_data)
return unless @queue_requests
close_socket = false
client.set_timeout @persistent_timeout
@reactor.add client
Expand Down
63 changes: 63 additions & 0 deletions test/test_puma_server.rb
Expand Up @@ -883,6 +883,69 @@ def assert_does_not_allow_http_injection(app, opts = {})
end
end

# Perform a server shutdown while requests are pending (one in app-server response, one still sending client request).
def shutdown_requests(app_delay: 2, request_delay: 1, post: false, response:, **options)
@server = Puma::Server.new @app, @events, options
server_run app: ->(_) {
sleep app_delay
[204, {}, []]
}

s1 = send_http "GET / HTTP/1.1\r\n\r\n"
s2 = send_http post ?
"POST / HTTP/1.1\r\nHost: test.com\r\nContent-Type: text/plain\r\nContent-Length: 5\r\n\r\nhi!" :
"GET / HTTP/1.1\r\n"
sleep 0.1

@server.stop
sleep request_delay

s2 << "\r\n"

assert_match /204/, s1.gets

assert IO.select([s2], nil, nil, app_delay), 'timeout waiting for response'
s2_result = begin
s2.gets
rescue Errno::ECONNABORTED, Errno::ECONNRESET
# Some platforms raise errors instead of returning a response/EOF when a TCP connection is aborted.
post ? '408' : nil
end

if response
assert_match response, s2_result
else
assert_nil s2_result
end
end

# Shutdown should allow pending requests to complete.
def test_shutdown_requests
shutdown_requests response: /204/
shutdown_requests response: /204/, queue_requests: false
end

# Requests stuck longer than `first_data_timeout` should have connection closed (408 w/pending POST body).
def test_shutdown_data_timeout
shutdown_requests request_delay: 3, first_data_timeout: 2, response: nil
shutdown_requests request_delay: 3, first_data_timeout: 2, response: nil, queue_requests: false
shutdown_requests request_delay: 3, first_data_timeout: 2, response: /408/, post: true
end

# Requests still pending after `force_shutdown_after` should have connection closed (408 w/pending POST body).
def test_force_shutdown
shutdown_requests request_delay: 4, response: nil, force_shutdown_after: 1
shutdown_requests request_delay: 4, response: nil, force_shutdown_after: 1, queue_requests: false
shutdown_requests request_delay: 4, response: /408/, force_shutdown_after: 1, post: true
end

# App-responses still pending during `force_shutdown_after` should return 503
# (uncaught Puma::ThreadPool::ForceShutdown exception).
def test_force_shutdown_app
shutdown_requests app_delay: 3, response: /503/, force_shutdown_after: 1
shutdown_requests app_delay: 3, response: /503/, force_shutdown_after: 1, queue_requests: false
end

def test_http11_connection_header_queue
server_run app: ->(_) { [200, {}, [""]] }

Expand Down