From c92d11e6974161520a0cbeafdc60bf6d15191961 Mon Sep 17 00:00:00 2001 From: Farid Zakaria Date: Fri, 14 Feb 2020 12:36:54 -0800 Subject: [PATCH 1/7] Introduce ThreadPoolExecutor without a Queue This adds the ability for a ThreadPoolExecutor to have a queue depth of 0. This is useful if you'd like to perform the rejection handler if no available threads are availabile. This is analogous to Java's use of SynchronousQueue in ThreadPoolExecutor. See https://stackoverflow.com/a/10186825/143733 for more details --- .../executor/java_thread_pool_executor.rb | 14 +++++- .../executor/ruby_thread_pool_executor.rb | 18 +++++-- .../executor/thread_pool_executor.rb | 3 +- .../executor/thread_pool_executor_shared.rb | 49 +++++++++++++++++++ 4 files changed, 79 insertions(+), 5 deletions(-) diff --git a/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb b/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb index b8cb5f1d5..e67066385 100644 --- a/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb +++ b/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb @@ -21,12 +21,18 @@ class JavaThreadPoolExecutor < JavaExecutorService # @!macro thread_pool_executor_constant_default_thread_timeout DEFAULT_THREAD_IDLETIMEOUT = 60 + # @!macro thread_pool_executor_constant_default_synchronous + DEFAULT_SYNCHRONOUS = false + # @!macro thread_pool_executor_attr_reader_max_length attr_reader :max_length # @!macro thread_pool_executor_attr_reader_max_queue attr_reader :max_queue + # @!macro thread_pool_executor_attr_reader_synchronous + attr_reader :synchronous + # @!macro thread_pool_executor_method_initialize def initialize(opts = {}) super(opts) @@ -94,8 +100,10 @@ def ns_initialize(opts) max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i @max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i + @synchronous = opts.fetch(:synchronous, DEFAULT_SYNCHRONOUS) @fallback_policy = opts.fetch(:fallback_policy, :abort) + raise ArgumentError.new("`synchronous` cannot be set unless `max_queue` is 0") if @synchronous && @max_queue > 0 raise ArgumentError.new("`max_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if max_length < DEFAULT_MIN_POOL_SIZE raise ArgumentError.new("`max_threads` cannot be greater than #{DEFAULT_MAX_POOL_SIZE}") if max_length > DEFAULT_MAX_POOL_SIZE raise ArgumentError.new("`min_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if min_length < DEFAULT_MIN_POOL_SIZE @@ -103,7 +111,11 @@ def ns_initialize(opts) raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICY_CLASSES.include?(@fallback_policy) if @max_queue == 0 - queue = java.util.concurrent.LinkedBlockingQueue.new + if @synchronous + queue = java.util.concurrent.SynchronousQueue.new + else + queue = java.util.concurrent.LinkedBlockingQueue.new + end else queue = java.util.concurrent.LinkedBlockingQueue.new(@max_queue) end diff --git a/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb b/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb index 0044fb40a..e2cd301e1 100644 --- a/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb +++ b/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb @@ -3,6 +3,7 @@ require 'concurrent/concern/logging' require 'concurrent/executor/ruby_executor_service' require 'concurrent/utility/monotonic_time' +require 'concurrent/mvar' module Concurrent @@ -23,6 +24,9 @@ class RubyThreadPoolExecutor < RubyExecutorService # @!macro thread_pool_executor_constant_default_thread_timeout DEFAULT_THREAD_IDLETIMEOUT = 60 + # @!macro thread_pool_executor_constant_default_synchronous + DEFAULT_SYNCHRONOUS = false + # @!macro thread_pool_executor_attr_reader_max_length attr_reader :max_length @@ -35,6 +39,9 @@ class RubyThreadPoolExecutor < RubyExecutorService # @!macro thread_pool_executor_attr_reader_max_queue attr_reader :max_queue + # @!macro thread_pool_executor_attr_reader_synchronous + attr_reader :synchronous + # @!macro thread_pool_executor_method_initialize def initialize(opts = {}) super(opts) @@ -114,9 +121,11 @@ def ns_initialize(opts) @max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i @idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i @max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i + @synchronous = opts.fetch(:synchronous, DEFAULT_SYNCHRONOUS) @fallback_policy = opts.fetch(:fallback_policy, :abort) - raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy) + raise ArgumentError.new("`synchronous` cannot be set unless `max_queue` is 0") if @synchronous && @max_queue > 0 + raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy) raise ArgumentError.new("`max_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if @max_length < DEFAULT_MIN_POOL_SIZE raise ArgumentError.new("`max_threads` cannot be greater than #{DEFAULT_MAX_POOL_SIZE}") if @max_length > DEFAULT_MAX_POOL_SIZE raise ArgumentError.new("`min_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if @min_length < DEFAULT_MIN_POOL_SIZE @@ -145,8 +154,11 @@ def ns_limited_queue? def ns_execute(*args, &task) ns_reset_if_forked - if ns_assign_worker(*args, &task) || ns_enqueue(*args, &task) + assigned_worker = ns_assign_worker(*args, &task) + if assigned_worker @scheduled_task_count += 1 + elsif !@synchronous + ns_enqueue(*args, &task) else handle_fallback(*args, &task) end @@ -233,7 +245,7 @@ def ns_add_busy_worker # # @!visibility private def ns_ready_worker(worker, success = true) - task_and_args = @queue.shift + task_and_args = @queue.shift if task_and_args worker << task_and_args else diff --git a/lib/concurrent-ruby/concurrent/executor/thread_pool_executor.rb b/lib/concurrent-ruby/concurrent/executor/thread_pool_executor.rb index 72e1bae85..4a911d3fa 100644 --- a/lib/concurrent-ruby/concurrent/executor/thread_pool_executor.rb +++ b/lib/concurrent-ruby/concurrent/executor/thread_pool_executor.rb @@ -73,7 +73,8 @@ class ThreadPoolExecutor < ThreadPoolExecutorImplementation # @option opts [Symbol] :fallback_policy (:abort) the policy for handling new # tasks that are received when the queue size has reached # `max_queue` or the executor has shut down - # + # @options opts [Boolean] :synchronous (DEFAULT_SYNCHRONOUS) whether or not a value of 0 + # for :max_queue means the queue must perform direct hand-off rather than unbounded. # @raise [ArgumentError] if `:max_threads` is less than one # @raise [ArgumentError] if `:min_threads` is less than zero # @raise [ArgumentError] if `:fallback_policy` is not one of the values specified diff --git a/spec/concurrent/executor/thread_pool_executor_shared.rb b/spec/concurrent/executor/thread_pool_executor_shared.rb index 3ae32c8d3..74f7ed216 100644 --- a/spec/concurrent/executor/thread_pool_executor_shared.rb +++ b/spec/concurrent/executor/thread_pool_executor_shared.rb @@ -137,6 +137,55 @@ end end + context '#synchronous' do + + subject do + described_class.new( + min_threads: 1, + max_threads: 2, + max_queue: 0, + synchronous: true, + fallback_policy: :abort + ) + end + + it 'cannot be set unless `max_queue` is zero' do + expect { + described_class.new( + min_threads: 2, + max_threads: 5, + max_queue: 1, + fallback_policy: :discard, + synchronous: true + ) + }.to raise_error(ArgumentError) + end + + it 'executes fallback policy once max_threads has been reached' do + latch = Concurrent::CountDownLatch.new(1) + (subject.max_length).times do + subject.post { + latch.wait + } + end + + expect(subject.queue_length).to eq 0 + + # verify nothing happening + 20.times { + expect { + subject.post { + sleep + } + }.to raise_error(Concurrent::RejectedExecutionError) + } + + # release + latch.count_down + end + + end + context '#queue_length', :truffle_bug => true do # only actually fails for RubyThreadPoolExecutor let!(:expected_max){ 10 } From 658ef56620e61b907d273e5233be5149591aa93c Mon Sep 17 00:00:00 2001 From: Farid Zakaria Date: Fri, 14 Feb 2020 12:50:56 -0800 Subject: [PATCH 2/7] Simply make queue always false when @synchronous --- .../concurrent/executor/ruby_thread_pool_executor.rb | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb b/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb index e2cd301e1..3f103f95c 100644 --- a/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb +++ b/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb @@ -154,11 +154,8 @@ def ns_limited_queue? def ns_execute(*args, &task) ns_reset_if_forked - assigned_worker = ns_assign_worker(*args, &task) - if assigned_worker + if ns_assign_worker(*args, &task) || ns_enqueue(*args, &task) @scheduled_task_count += 1 - elsif !@synchronous - ns_enqueue(*args, &task) else handle_fallback(*args, &task) end @@ -213,6 +210,10 @@ def ns_assign_worker(*args, &task) # # @!visibility private def ns_enqueue(*args, &task) + if @synchronous + return false + end + if !ns_limited_queue? || @queue.size < @max_queue @queue << [task, args] true From bf824c9c9121783a157465216faafd7a2e20938a Mon Sep 17 00:00:00 2001 From: Farid Zakaria Date: Fri, 14 Feb 2020 12:52:16 -0800 Subject: [PATCH 3/7] remove whitespace --- .../concurrent/executor/ruby_thread_pool_executor.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb b/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb index 3f103f95c..9caf6cf4f 100644 --- a/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb +++ b/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb @@ -246,7 +246,7 @@ def ns_add_busy_worker # # @!visibility private def ns_ready_worker(worker, success = true) - task_and_args = @queue.shift + task_and_args = @queue.shift if task_and_args worker << task_and_args else From 7fa7e82ac953786a16bb5a1db040600bb009614a Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Sun, 1 Mar 2020 10:30:21 +0100 Subject: [PATCH 4/7] Fix documentation --- .../concurrent/executor/fixed_thread_pool.rb | 7 +++++++ .../concurrent/executor/thread_pool_executor.rb | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb b/lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb index 681fe21fb..b665f6c56 100644 --- a/lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb +++ b/lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb @@ -16,6 +16,9 @@ module Concurrent # Default maximum number of seconds a thread in the pool may remain idle # before being reclaimed. + # @!macro thread_pool_executor_constant_default_synchronous + # Default value of the :synchronous option. + # @!macro thread_pool_executor_attr_reader_max_length # The maximum number of threads that may be created in the pool. # @return [Integer] The maximum number of threads that may be created in the pool. @@ -40,6 +43,10 @@ module Concurrent # The number of seconds that a thread may be idle before being reclaimed. # @return [Integer] The number of seconds that a thread may be idle before being reclaimed. + # @!macro thread_pool_executor_attr_reader_synchronous + # Whether or not a value of 0 for :max_queue option means the queue must perform direct hand-off or rather unbounded queue. + # @return [true, false] + # @!macro thread_pool_executor_attr_reader_max_queue # The maximum number of tasks that may be waiting in the work queue at any one time. # When the queue size reaches `max_queue` subsequent tasks will be rejected in diff --git a/lib/concurrent-ruby/concurrent/executor/thread_pool_executor.rb b/lib/concurrent-ruby/concurrent/executor/thread_pool_executor.rb index 4a911d3fa..253d46a9d 100644 --- a/lib/concurrent-ruby/concurrent/executor/thread_pool_executor.rb +++ b/lib/concurrent-ruby/concurrent/executor/thread_pool_executor.rb @@ -73,7 +73,7 @@ class ThreadPoolExecutor < ThreadPoolExecutorImplementation # @option opts [Symbol] :fallback_policy (:abort) the policy for handling new # tasks that are received when the queue size has reached # `max_queue` or the executor has shut down - # @options opts [Boolean] :synchronous (DEFAULT_SYNCHRONOUS) whether or not a value of 0 + # @option opts [Boolean] :synchronous (DEFAULT_SYNCHRONOUS) whether or not a value of 0 # for :max_queue means the queue must perform direct hand-off rather than unbounded. # @raise [ArgumentError] if `:max_threads` is less than one # @raise [ArgumentError] if `:min_threads` is less than zero From 578285b9da4b4b88b9219b7ffb284b2eb57ca9ee Mon Sep 17 00:00:00 2001 From: Farid Zakaria Date: Sun, 1 Mar 2020 16:27:25 -0800 Subject: [PATCH 5/7] remove 'concurrent/mvar' Co-Authored-By: Chalupa Petr --- .../concurrent/executor/ruby_thread_pool_executor.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb b/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb index 9caf6cf4f..e08ad7848 100644 --- a/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb +++ b/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb @@ -3,7 +3,6 @@ require 'concurrent/concern/logging' require 'concurrent/executor/ruby_executor_service' require 'concurrent/utility/monotonic_time' -require 'concurrent/mvar' module Concurrent From f11b6ec96dcf8680c3cfd83371ba02b744a56a0b Mon Sep 17 00:00:00 2001 From: Farid Zakaria Date: Sun, 1 Mar 2020 16:27:42 -0800 Subject: [PATCH 6/7] make if condition more ruby-idiomatic Co-Authored-By: Chalupa Petr --- .../concurrent/executor/ruby_thread_pool_executor.rb | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb b/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb index e08ad7848..dc20d765b 100644 --- a/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb +++ b/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb @@ -209,9 +209,7 @@ def ns_assign_worker(*args, &task) # # @!visibility private def ns_enqueue(*args, &task) - if @synchronous - return false - end + return false if @synchronous if !ns_limited_queue? || @queue.size < @max_queue @queue << [task, args] From 16f15a63f8fd11e5038910b4be2fc36c5d37f5d1 Mon Sep 17 00:00:00 2001 From: Farid Zakaria Date: Sun, 1 Mar 2020 16:32:06 -0800 Subject: [PATCH 7/7] Added changelog description --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d03d935f..6469e3d43 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## Current +concurrent-ruby: + +* (#853) Introduce ThreadPoolExecutor without a Queue + ## Release v1.1.6, edge v0.6.0 (10 Feb 2020) concurrent-ruby: