Skip to content

Commit

Permalink
ThreadPool concurrency refactoring (#2220)
Browse files Browse the repository at this point in the history
- Wait for threads to enter waiting loop on ThreadPool startup
- Simplify #spawn_thread inner threadpool loop
- Refactor TestThreadPool to make tests faster and more stable

Co-authored-by: Nate Berkopec <nate.berkopec@gmail.com>
  • Loading branch information
wjordan and nateberkopec committed Apr 14, 2020
1 parent f47d6d1 commit b16d8cb
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 147 deletions.
2 changes: 2 additions & 0 deletions History.md
Expand Up @@ -29,13 +29,15 @@
* Read directly from the socket in #read_and_drop to avoid raising further SSL errors (#2198)
* Set `Connection: closed` header when queue requests is disabled (#2216)
* Pass queued requests to thread pool on server shutdown (#2122)
* Fixed a few minor concurrency bugs in ThreadPool that may have affected non-GVL Rubies (#2220)

* Refactor
* Remove unused loader argument from Plugin initializer (#2095)
* Simplify `Configuration.random_token` and remove insecure fallback (#2102)
* Simplify `Runner#start_control` URL parsing (#2111)
* Removed the IOBuffer extension and replaced with Ruby (#1980)
* Update `Rack::Handler::Puma.run` to use `**options` (#2189)
* ThreadPool concurrency refactoring (#2220)
* JSON parse cluster worker stats instead of regex (#2124)

## 4.3.3 and 3.12.4 / 2020-02-28
Expand Down
51 changes: 24 additions & 27 deletions lib/puma/thread_pool.rb
Expand Up @@ -54,7 +54,10 @@ def initialize(min, max, *extra, &block)
@reaper = nil

@mutex.synchronize do
@min.times { spawn_thread }
@min.times do
spawn_thread
@not_full.wait(@mutex)
end
end

@clean_thread_locals = false
Expand All @@ -72,7 +75,7 @@ def self.clean_thread_locals
# How many objects have yet to be processed by the pool?
#
def backlog
@mutex.synchronize { @todo.size }
with_mutex { @todo.size }
end

def pool_capacity
Expand All @@ -99,20 +102,13 @@ def spawn_thread
while true
work = nil

continue = true

mutex.synchronize do
while todo.empty?
if @trim_requested > 0
@trim_requested -= 1
continue = false
not_full.signal
break
end

if @shutdown
continue = false
break
@spawned -= 1
@workers.delete th
Thread.exit
end

@waiting += 1
Expand All @@ -121,11 +117,9 @@ def spawn_thread
@waiting -= 1
end

work = todo.shift if continue
work = todo.shift
end

break unless continue

if @clean_thread_locals
ThreadPool.clean_thread_locals
end
Expand All @@ -136,11 +130,6 @@ def spawn_thread
STDERR.puts "Error reached top of thread-pool: #{e.message} (#{e.class})"
end
end

mutex.synchronize do
@spawned -= 1
@workers.delete th
end
end

@workers << th
Expand All @@ -150,9 +139,15 @@ def spawn_thread

private :spawn_thread

def with_mutex(&block)
@mutex.owned? ?
yield :
@mutex.synchronize(&block)
end

# Add +work+ to the todo list for a Thread to pickup and process.
def <<(work)
@mutex.synchronize do
with_mutex do
if @shutdown
raise "Unable to add work while shutting down"
end
Expand Down Expand Up @@ -197,7 +192,7 @@ def <<(work)
# Returns the current number of busy threads, or +nil+ if shutting down.
#
def wait_until_not_full
@mutex.synchronize do
with_mutex do
while true
return if @shutdown

Expand All @@ -213,13 +208,14 @@ def wait_until_not_full
end
end

# If too many threads are in the pool, tell one to finish go ahead
# If there are any free threads in the pool, tell one to go ahead
# and exit. If +force+ is true, then a trim request is requested
# even if all threads are being utilized.
#
def trim(force=false)
@mutex.synchronize do
if (force or @waiting > 0) and @spawned - @trim_requested > @min
with_mutex do
free = @waiting - @todo.size
if (force or free > 0) and @spawned - @trim_requested > @min
@trim_requested += 1
@not_empty.signal
end
Expand All @@ -229,7 +225,7 @@ def trim(force=false)
# If there are dead threads in the pool make them go away while decreasing
# spawned counter so that new healthy threads could be created again.
def reap
@mutex.synchronize do
with_mutex do
dead_workers = @workers.reject(&:alive?)

dead_workers.each do |worker|
Expand Down Expand Up @@ -283,8 +279,9 @@ def auto_reap!(timeout=5)
# Tell all threads in the pool to exit and wait for them to finish.
#
def shutdown(timeout=-1)
threads = @mutex.synchronize do
threads = with_mutex do
@shutdown = true
@trim_requested = @spawned
@not_empty.broadcast
@not_full.broadcast

Expand Down

0 comments on commit b16d8cb

Please sign in to comment.