From 8d610275f6ca0c6094e75e93a3529ea61732fd57 Mon Sep 17 00:00:00 2001 From: Will Jordan Date: Thu, 9 Apr 2020 16:59:31 -0700 Subject: [PATCH] ThreadPool concurrency refactoring - Wait for threads to enter waiting loop on ThreadPool startup - Simplify #spawn_thread inner threadpool loop - Refactor TestThreadPool to make tests faster and more stable --- lib/puma/thread_pool.rb | 51 +++++----- test/test_thread_pool.rb | 214 +++++++++++++++++---------------------- 2 files changed, 118 insertions(+), 147 deletions(-) diff --git a/lib/puma/thread_pool.rb b/lib/puma/thread_pool.rb index 85ba68c96e..c0d9c5432e 100644 --- a/lib/puma/thread_pool.rb +++ b/lib/puma/thread_pool.rb @@ -54,7 +54,10 @@ def initialize(min, max, *extra, &block) @reaper = nil @mutex.synchronize do - @min.times { spawn_thread } + @min.times do + spawn_thread + @not_full.wait(@mutex) + end end @clean_thread_locals = false @@ -72,7 +75,7 @@ def self.clean_thread_locals # How many objects have yet to be processed by the pool? # def backlog - @mutex.synchronize { @todo.size } + with_mutex { @todo.size } end def pool_capacity @@ -99,20 +102,13 @@ def spawn_thread while true work = nil - continue = true - mutex.synchronize do while todo.empty? if @trim_requested > 0 @trim_requested -= 1 - continue = false - not_full.signal - break - end - - if @shutdown - continue = false - break + @spawned -= 1 + @workers.delete th + Thread.exit end @waiting += 1 @@ -121,11 +117,9 @@ def spawn_thread @waiting -= 1 end - work = todo.shift if continue + work = todo.shift end - break unless continue - if @clean_thread_locals ThreadPool.clean_thread_locals end @@ -136,11 +130,6 @@ def spawn_thread STDERR.puts "Error reached top of thread-pool: #{e.message} (#{e.class})" end end - - mutex.synchronize do - @spawned -= 1 - @workers.delete th - end end @workers << th @@ -150,9 +139,15 @@ def spawn_thread private :spawn_thread + def with_mutex(&block) + @mutex.owned? ? + yield : + @mutex.synchronize(&block) + end + # Add +work+ to the todo list for a Thread to pickup and process. def <<(work) - @mutex.synchronize do + with_mutex do if @shutdown raise "Unable to add work while shutting down" end @@ -197,7 +192,7 @@ def <<(work) # Returns the current number of busy threads, or +nil+ if shutting down. # def wait_until_not_full - @mutex.synchronize do + with_mutex do while true return if @shutdown @@ -213,13 +208,14 @@ def wait_until_not_full end end - # If too many threads are in the pool, tell one to finish go ahead + # If there are any free threads in the pool, tell one to go ahead # and exit. If +force+ is true, then a trim request is requested # even if all threads are being utilized. # def trim(force=false) - @mutex.synchronize do - if (force or @waiting > 0) and @spawned - @trim_requested > @min + with_mutex do + free = @waiting - @todo.size + if (force or free > 0) and @spawned - @trim_requested > @min @trim_requested += 1 @not_empty.signal end @@ -229,7 +225,7 @@ def trim(force=false) # If there are dead threads in the pool make them go away while decreasing # spawned counter so that new healthy threads could be created again. def reap - @mutex.synchronize do + with_mutex do dead_workers = @workers.reject(&:alive?) dead_workers.each do |worker| @@ -283,8 +279,9 @@ def auto_reap!(timeout=5) # Tell all threads in the pool to exit and wait for them to finish. # def shutdown(timeout=-1) - threads = @mutex.synchronize do + threads = with_mutex do @shutdown = true + @trim_requested = @spawned @not_empty.broadcast @not_full.broadcast diff --git a/test/test_thread_pool.rb b/test/test_thread_pool.rb index dab202cb9a..d9ea83f7f9 100644 --- a/test/test_thread_pool.rb +++ b/test/test_thread_pool.rb @@ -10,35 +10,56 @@ def teardown def new_pool(min, max, &block) block = proc { } unless block - @work_mutex = Mutex.new - @work_done = ConditionVariable.new @pool = Puma::ThreadPool.new(min, max, &block) end - def pause - sleep 1 + def mutex_pool(min, max, &block) + block = proc { } unless block + @pool = MutexPool.new(min, max, &block) + end + + # Wraps ThreadPool work in mutex for better concurrency control. + class MutexPool < Puma::ThreadPool + # Wait until the added work is completed before returning. + # Array argument is treated as a batch of work items to be added. + # Block will run after work is added but before it is executed on a worker thread. + def <<(work, &block) + work = [work] unless work.is_a?(Array) + with_mutex do + work.each {|arg| super arg} + yield if block_given? + @not_full.wait(@mutex) + end + end + + def signal + @not_full.signal + end + + # If +wait+ is true, wait until the trim request is completed before returning. + def trim(force=false, wait: true) + super(force) + Thread.pass until @trim_requested == 0 if wait + end end def test_append_spawns saw = [] - thread_name = nil - - pool = new_pool(0, 1) do |work| - @work_mutex.synchronize do - saw << work - thread_name = Thread.current.name if Thread.current.respond_to?(:name) - @work_done.signal - end + pool = mutex_pool(0, 1) do |work| + saw << work end pool << 1 + assert_equal 1, pool.spawned + assert_equal [1], saw + end - @work_mutex.synchronize do - @work_done.wait(@work_mutex, 5) - assert_equal 1, pool.spawned - assert_equal [1], saw - assert_equal('puma threadpool 001', thread_name) if Thread.current.respond_to?(:name) - end + def test_thread_name + skip 'Thread.name not supported' unless Thread.current.respond_to?(:name) + thread_name = nil + pool = mutex_pool(0, 1) {thread_name = Thread.current.name} + pool << 1 + assert_equal('puma threadpool 001', thread_name) end def test_converts_pool_sizes @@ -64,110 +85,68 @@ def test_append_queues_on_max end def test_trim - skip_on :jruby, :truffleruby # Undiagnose thread race. TODO fix - pool = new_pool(0, 1) do |work| - @work_mutex.synchronize do - @work_done.signal - end - end + pool = mutex_pool(0, 1) pool << 1 - @work_mutex.synchronize do - @work_done.wait(@work_mutex, 5) - assert_equal 1, pool.spawned - end + assert_equal 1, pool.spawned pool.trim - # wait/join required here for MRI, JRuby races the access here - worker = pool.instance_variable_get(:@workers).first - worker.join if worker - assert_equal 0, pool.spawned end def test_trim_leaves_min - skip_on :jruby, :truffleruby # Undiagnose thread race. TODO fix - pool = new_pool(1, 2) do |work| - @work_mutex.synchronize do - @work_done.signal - end - end + pool = mutex_pool(1, 2) - pool << 1 - pool << 2 + pool << [1, 2] - @work_mutex.synchronize do - @work_done.wait(@work_mutex, 5) - assert_equal 2, pool.spawned - end + assert_equal 2, pool.spawned pool.trim - pause assert_equal 1, pool.spawned - pool.trim - pause assert_equal 1, pool.spawned end def test_force_trim_doesnt_overtrim - finish = false - pool = new_pool(1, 2) { Thread.pass until finish } - - pool << 1 - pool << 2 - - assert_equal 2, pool.spawned - pool.trim true - pool.trim true + pool = mutex_pool(1, 2) - finish = true - - pause + pool.<< [1, 2] do + assert_equal 2, pool.spawned + pool.trim true, wait: false + pool.trim true, wait: false + end assert_equal 1, pool.spawned end def test_trim_is_ignored_if_no_waiting_threads - finish = false - pool = new_pool(1, 2) { Thread.pass until finish } + pool = mutex_pool(1, 2) - pool << 1 - pool << 2 + pool.<< [1, 2] do + assert_equal 2, pool.spawned + pool.trim + pool.trim + end assert_equal 2, pool.spawned - pool.trim - pool.trim - assert_equal 0, pool.trim_requested - - finish = true - - pause end def test_autotrim - finish = false - pool = new_pool(1, 2) { Thread.pass until finish } - - pool << 1 - pool << 2 - - assert_equal 2, pool.spawned - - finish = true + pool = mutex_pool(1, 2) - pause + timeout = 0 + pool.auto_trim! timeout - assert_equal 2, pool.spawned - - pool.auto_trim! 1 - - sleep 1 + pool.<< [1, 2] do + assert_equal 2, pool.spawned + end - pause + start = Time.now + Thread.pass until pool.spawned == 1 || + Time.now - start > 1 assert_equal 1, pool.spawned end @@ -175,23 +154,15 @@ def test_autotrim def test_cleanliness values = [] n = 100 - mutex = Mutex.new - finished = false - - pool = new_pool(1,1) { - mutex.synchronize { values.push Thread.current[:foo] } + pool = mutex_pool(1,1) { + values.push Thread.current[:foo] Thread.current[:foo] = :hai - Thread.pass until finished } pool.clean_thread_locals = true - n.times { pool << 1 } - - finished = true - - pause + pool << [1] * n assert_equal n, values.length @@ -199,14 +170,16 @@ def test_cleanliness end def test_reap_only_dead_threads - pool = new_pool(2,2) { Thread.current.kill } + pool = mutex_pool(2,2) do + th = Thread.current + Thread.new {th.join; pool.signal} + th.kill + end assert_equal 2, pool.spawned pool << 1 - pause - assert_equal 2, pool.spawned pool.reap @@ -215,8 +188,6 @@ def test_reap_only_dead_threads pool << 2 - pause - assert_equal 1, pool.spawned pool.reap @@ -225,45 +196,48 @@ def test_reap_only_dead_threads end def test_auto_reap_dead_threads - pool = new_pool(2,2) { Thread.current.kill } - - pool.auto_reap! 0.1 + pool = mutex_pool(2,2) do + th = Thread.current + Thread.new {th.join; pool.signal} + th.kill + end - pause + timeout = 0 + pool.auto_reap! timeout assert_equal 2, pool.spawned pool << 1 pool << 2 - pause + start = Time.now + Thread.pass until pool.spawned == 0 || + Time.now - start > 1 assert_equal 0, pool.spawned end def test_force_shutdown_immediately - finish = false rescued = false - pool = new_pool(0, 1) do |work| + pool = mutex_pool(0, 1) do begin - @work_mutex.synchronize do - @work_done.signal - end - Thread.pass until finish + pool.signal + sleep rescue Puma::ThreadPool::ForceShutdown rescued = true end end pool << 1 + pool.shutdown(0) - @work_mutex.synchronize do - @work_done.wait(@work_mutex, 5) - pool.shutdown(0) - finish = true - assert_equal 0, pool.spawned - assert rescued - end + assert_equal 0, pool.spawned + assert rescued + end + + def test_waiting_on_startup + pool = new_pool(1, 2) + assert_equal 1, pool.waiting end end