Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix out_of_band hook never executed when multiple worker threads (clo… #2180

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions History.md
Expand Up @@ -24,6 +24,7 @@
* Rescue IO::WaitReadable instead of EAGAIN for blocking read (#2121)
* Ensure `BUNDLE_GEMFILE` is unspecified in workers if unspecified in master when using `prune_bundler` (#2154)
* Rescue and log exceptions in hooks defined by users (on_worker_boot, after_worker_fork etc) (#1551)
* Fix `out_of_band` hook never executed if the number of worker threads is > 1 (#2177)

* Refactor
* Remove unused loader argument from Plugin initializer (#2095)
Expand Down
30 changes: 26 additions & 4 deletions lib/puma/server.rb
Expand Up @@ -63,6 +63,9 @@ def initialize(app, events=Events.stdio, options={})

@status = :stop

@all_worker_threads_free = false
@all_worker_threads_free_mutex = Mutex.new

@min_threads = 0
@max_threads = 16
@auto_trim_time = 30
Expand Down Expand Up @@ -189,6 +192,7 @@ def run(background=true)

@thread_pool = ThreadPool.new(@min_threads,
@max_threads,
Proc.new { signal_all_worker_threads_free },
GuiTeK marked this conversation as resolved.
Show resolved Hide resolved
::Puma::IOBuffer) do |client, buffer|

# Advertise this server into the thread
Expand Down Expand Up @@ -290,10 +294,6 @@ def handle_servers
end

pool << client
busy_threads = pool.wait_until_not_full
if busy_threads == 0
@options[:out_of_band].each(&:call) if @options[:out_of_band]
end
end
rescue SystemCallError
# nothing
Expand All @@ -310,6 +310,19 @@ def handle_servers
rescue Object => e
@events.unknown_error self, e, "Listen loop"
end

if @all_worker_threads_free_mutex.try_lock # Do not block the critical path if mutex isn't available
if @all_worker_threads_free
begin
@options[:out_of_band].each(&:call) if @options[:out_of_band]
rescue Exception => e
STDERR.puts "Exception handling OOB callbacks: #{e.message} (#{e.class})"
STDERR.puts e.backtrace
end
@all_worker_threads_free = false
end
@all_worker_threads_free_mutex.unlock
end
end

@events.fire :state, @status
Expand Down Expand Up @@ -344,6 +357,8 @@ def handle_check
when RESTART_COMMAND
@status = :restart
return true
when NOTIFY_THREADS_FREE_COMMAND
@all_worker_threads_free = true
end

return false
Expand Down Expand Up @@ -945,5 +960,12 @@ def possible_header_injection?(header_value)
HTTP_INJECTION_REGEX =~ header_value.to_s
end
private :possible_header_injection?

def signal_all_worker_threads_free
@all_worker_threads_free_mutex.synchronize do
@all_worker_threads_free = true
end
end
private :signal_all_worker_threads_free
end
end
13 changes: 12 additions & 1 deletion lib/puma/thread_pool.rb
Expand Up @@ -29,7 +29,7 @@ class ForceShutdown < RuntimeError
# The block passed is the work that will be performed in each
# thread.
#
def initialize(min, max, *extra, &block)
def initialize(min, max, all_threads_free_cb, *extra, &block)
@not_empty = ConditionVariable.new
@not_full = ConditionVariable.new
@mutex = Mutex.new
Expand All @@ -41,6 +41,7 @@ def initialize(min, max, *extra, &block)

@min = Integer(min)
@max = Integer(max)
@all_threads_free_cb = all_threads_free_cb
@block = block
@extra = extra

Expand Down Expand Up @@ -116,6 +117,7 @@ def spawn_thread
end

@waiting += 1
signal_threadpool_empty
not_full.signal
not_empty.wait mutex
@waiting -= 1
Expand Down Expand Up @@ -213,6 +215,15 @@ def wait_until_not_full
end
end

# Call the callback +@all_threads_free_cb+ if all worker threads are free.
#
def signal_threadpool_empty
busy_threads = @spawned - @waiting
if busy_threads == 0
@all_threads_free_cb.call
end
end

# If too many threads are in the pool, tell one to finish go ahead
# and exit. If +force+ is true, then a trim request is requested
# even if all threads are being utilized.
Expand Down
3 changes: 2 additions & 1 deletion test/test_thread_pool.rb
Expand Up @@ -10,9 +10,10 @@ def teardown

def new_pool(min, max, &block)
block = proc { } unless block
all_worker_threads_free_cb = proc { }
@work_mutex = Mutex.new
@work_done = ConditionVariable.new
@pool = Puma::ThreadPool.new(min, max, &block)
@pool = Puma::ThreadPool.new(min, max, all_worker_threads_free_cb, &block)
end

def pause
Expand Down