Skip to content

Commit

Permalink
Fix ThreadPool#shutdown timeout accuracy
Browse files Browse the repository at this point in the history
  • Loading branch information
wjordan committed Apr 10, 2020
1 parent 19b2a21 commit d5bca80
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 21 deletions.
1 change: 1 addition & 0 deletions History.md
Expand Up @@ -29,6 +29,7 @@
* Read directly from the socket in #read_and_drop to avoid raising further SSL errors (#2198)
* Set `Connection: closed` header when queue requests is disabled (#2216)
* Pass queued requests to thread pool on server shutdown (#2122)
* Fix ThreadPool#shutdown timeout accuracy

* Refactor
* Remove unused loader argument from Plugin initializer (#2095)
Expand Down
2 changes: 1 addition & 1 deletion lib/puma/dsl.rb
Expand Up @@ -243,7 +243,7 @@ def force_shutdown_after(val=:forever)
when :immediately
0
else
Integer(val)
Float(val)
end

@options[:force_shutdown_after] = i
Expand Down
31 changes: 16 additions & 15 deletions lib/puma/thread_pool.rb
Expand Up @@ -281,8 +281,11 @@ def auto_reap!(timeout=5)
end

# Tell all threads in the pool to exit and wait for them to finish.
# Wait +timeout+ seconds then raise +ForceShutdown+ in remaining threads.
# Next, wait an extra +grace+ seconds then force-kill remaining threads.
# Finally, wait +kill_grace+ seconds for remaining threads to exit.
#
def shutdown(timeout=-1)
def shutdown(timeout=-1, grace: SHUTDOWN_GRACE_TIME, kill_grace: 1)
threads = @mutex.synchronize do
@shutdown = true
@not_empty.broadcast
Expand All @@ -298,27 +301,25 @@ def shutdown(timeout=-1)
# Wait for threads to finish without force shutdown.
threads.each(&:join)
else
# Wait for threads to finish after n attempts (+timeout+).
# If threads are still running, it will forcefully kill them.
timeout.times do
threads.delete_if do |t|
t.join 1
end

if threads.empty?
break
else
sleep 1
join = ->(timeout) do
start = Time.now.to_f
threads.reject! do |t|
t.join timeout + start - Time.now.to_f
end
end

# Wait for threads to finish after +timeout+ seconds.
join.call(timeout)

# If threads are still running, raise ForceShutdown.
threads.each do |t|
t.raise ForceShutdown
end
join.call(grace)

threads.each do |t|
t.join SHUTDOWN_GRACE_TIME
end
# If threads are _still_ running, forcefully kill them.
threads.each(&:kill)
join.call(kill_grace)
end

@spawned = 0
Expand Down
10 changes: 5 additions & 5 deletions test/test_puma_server.rb
Expand Up @@ -934,16 +934,16 @@ def test_shutdown_data_timeout

# Requests still pending after `force_shutdown_after` should have connection closed (408 w/pending POST body).
def test_force_shutdown
shutdown_requests request_delay: 4, response: nil, force_shutdown_after: 1
shutdown_requests request_delay: 4, response: nil, force_shutdown_after: 1, queue_requests: false
shutdown_requests request_delay: 4, response: /408/, force_shutdown_after: 1, post: true
shutdown_requests request_delay: 4, response: nil, force_shutdown_after: 3
shutdown_requests request_delay: 4, response: nil, force_shutdown_after: 3, queue_requests: false
shutdown_requests request_delay: 4, response: /408/, force_shutdown_after: 3, post: true
end

# App-responses still pending during `force_shutdown_after` should return 503
# (uncaught Puma::ThreadPool::ForceShutdown exception).
def test_force_shutdown_app
shutdown_requests app_delay: 3, response: /503/, force_shutdown_after: 1
shutdown_requests app_delay: 3, response: /503/, force_shutdown_after: 1, queue_requests: false
shutdown_requests app_delay: 3, response: /503/, force_shutdown_after: 3
shutdown_requests app_delay: 3, response: /503/, force_shutdown_after: 3, queue_requests: false
end

def test_http11_connection_header_queue
Expand Down
32 changes: 32 additions & 0 deletions test/test_thread_pool.rb
Expand Up @@ -266,4 +266,36 @@ def test_force_shutdown_immediately
assert rescued
end
end

def test_shutdown_with_grace
timeout = 0.01
grace = 0.01

finish = false
rescued = []
pool = new_pool(2, 2) do
begin
@work_mutex.synchronize do
@work_done.signal
end
Thread.pass until finish
rescue Puma::ThreadPool::ForceShutdown
rescued << Thread.current
sleep
end
end

pool << 1
pool << 2

@work_mutex.synchronize do
@work_done.wait(@work_mutex, 5)
start = Time.now.to_f
pool.shutdown(timeout, grace: grace)
finish = true
assert_equal 0, pool.spawned
assert_equal 2, rescued.length
refute rescued.any?(&:alive?)
end
end
end

0 comments on commit d5bca80

Please sign in to comment.