Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ThreadPool concurrency refactoring #2220

Merged
merged 2 commits into from Apr 14, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions History.md
Expand Up @@ -36,6 +36,7 @@
* Simplify `Runner#start_control` URL parsing (#2111)
* Removed the IOBuffer extension and replaced with Ruby (#1980)
* Update `Rack::Handler::Puma.run` to use `**options` (#2189)
* ThreadPool concurrency refactoring (#2220)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the trim bugfix could also be mentioned


## 4.3.3 and 3.12.4 / 2020-02-28

Expand Down
51 changes: 24 additions & 27 deletions lib/puma/thread_pool.rb
Expand Up @@ -54,7 +54,10 @@ def initialize(min, max, *extra, &block)
@reaper = nil

@mutex.synchronize do
@min.times { spawn_thread }
@min.times do
spawn_thread
@not_full.wait(@mutex)
end
end

@clean_thread_locals = false
Expand All @@ -72,7 +75,7 @@ def self.clean_thread_locals
# How many objects have yet to be processed by the pool?
#
def backlog
@mutex.synchronize { @todo.size }
with_mutex { @todo.size }
end

def pool_capacity
Expand All @@ -99,20 +102,13 @@ def spawn_thread
while true
work = nil

continue = true

mutex.synchronize do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clever change and much more readable

while todo.empty?
if @trim_requested > 0
@trim_requested -= 1
continue = false
not_full.signal
break
end

if @shutdown
continue = false
break
@spawned -= 1
@workers.delete th
Thread.exit
end

@waiting += 1
Expand All @@ -121,11 +117,9 @@ def spawn_thread
@waiting -= 1
end

work = todo.shift if continue
work = todo.shift
end

break unless continue

if @clean_thread_locals
ThreadPool.clean_thread_locals
end
Expand All @@ -136,11 +130,6 @@ def spawn_thread
STDERR.puts "Error reached top of thread-pool: #{e.message} (#{e.class})"
end
end

mutex.synchronize do
@spawned -= 1
@workers.delete th
end
end

@workers << th
Expand All @@ -150,9 +139,15 @@ def spawn_thread

private :spawn_thread

def with_mutex(&block)
@mutex.owned? ?
yield :
@mutex.synchronize(&block)
end

# Add +work+ to the todo list for a Thread to pickup and process.
def <<(work)
@mutex.synchronize do
with_mutex do
if @shutdown
raise "Unable to add work while shutting down"
end
Expand Down Expand Up @@ -197,7 +192,7 @@ def <<(work)
# Returns the current number of busy threads, or +nil+ if shutting down.
#
def wait_until_not_full
@mutex.synchronize do
with_mutex do
while true
return if @shutdown

Expand All @@ -213,13 +208,14 @@ def wait_until_not_full
end
end

# If too many threads are in the pool, tell one to finish go ahead
# If there are any free threads in the pool, tell one to go ahead
# and exit. If +force+ is true, then a trim request is requested
# even if all threads are being utilized.
#
def trim(force=false)
@mutex.synchronize do
if (force or @waiting > 0) and @spawned - @trim_requested > @min
with_mutex do
free = @waiting - @todo.size
if (force or free > 0) and @spawned - @trim_requested > @min
@trim_requested += 1
@not_empty.signal
end
Expand All @@ -229,7 +225,7 @@ def trim(force=false)
# If there are dead threads in the pool make them go away while decreasing
# spawned counter so that new healthy threads could be created again.
def reap
@mutex.synchronize do
with_mutex do
dead_workers = @workers.reject(&:alive?)

dead_workers.each do |worker|
Expand Down Expand Up @@ -283,8 +279,9 @@ def auto_reap!(timeout=5)
# Tell all threads in the pool to exit and wait for them to finish.
#
def shutdown(timeout=-1)
threads = @mutex.synchronize do
threads = with_mutex do
@shutdown = true
@trim_requested = @spawned
@not_empty.broadcast
@not_full.broadcast

Expand Down