diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d03d935f..6469e3d43 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## Current +concurrent-ruby: + +* (#853) Introduce ThreadPoolExecutor without a Queue + ## Release v1.1.6, edge v0.6.0 (10 Feb 2020) concurrent-ruby: diff --git a/lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb b/lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb index 681fe21fb..b665f6c56 100644 --- a/lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb +++ b/lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb @@ -16,6 +16,9 @@ module Concurrent # Default maximum number of seconds a thread in the pool may remain idle # before being reclaimed. + # @!macro thread_pool_executor_constant_default_synchronous + # Default value of the :synchronous option. + # @!macro thread_pool_executor_attr_reader_max_length # The maximum number of threads that may be created in the pool. # @return [Integer] The maximum number of threads that may be created in the pool. @@ -40,6 +43,10 @@ module Concurrent # The number of seconds that a thread may be idle before being reclaimed. # @return [Integer] The number of seconds that a thread may be idle before being reclaimed. + # @!macro thread_pool_executor_attr_reader_synchronous + # Whether or not a value of 0 for :max_queue option means the queue must perform direct hand-off or rather unbounded queue. + # @return [true, false] + # @!macro thread_pool_executor_attr_reader_max_queue # The maximum number of tasks that may be waiting in the work queue at any one time. # When the queue size reaches `max_queue` subsequent tasks will be rejected in diff --git a/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb b/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb index b8cb5f1d5..e67066385 100644 --- a/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb +++ b/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb @@ -21,12 +21,18 @@ class JavaThreadPoolExecutor < JavaExecutorService # @!macro thread_pool_executor_constant_default_thread_timeout DEFAULT_THREAD_IDLETIMEOUT = 60 + # @!macro thread_pool_executor_constant_default_synchronous + DEFAULT_SYNCHRONOUS = false + # @!macro thread_pool_executor_attr_reader_max_length attr_reader :max_length # @!macro thread_pool_executor_attr_reader_max_queue attr_reader :max_queue + # @!macro thread_pool_executor_attr_reader_synchronous + attr_reader :synchronous + # @!macro thread_pool_executor_method_initialize def initialize(opts = {}) super(opts) @@ -94,8 +100,10 @@ def ns_initialize(opts) max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i @max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i + @synchronous = opts.fetch(:synchronous, DEFAULT_SYNCHRONOUS) @fallback_policy = opts.fetch(:fallback_policy, :abort) + raise ArgumentError.new("`synchronous` cannot be set unless `max_queue` is 0") if @synchronous && @max_queue > 0 raise ArgumentError.new("`max_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if max_length < DEFAULT_MIN_POOL_SIZE raise ArgumentError.new("`max_threads` cannot be greater than #{DEFAULT_MAX_POOL_SIZE}") if max_length > DEFAULT_MAX_POOL_SIZE raise ArgumentError.new("`min_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if min_length < DEFAULT_MIN_POOL_SIZE @@ -103,7 +111,11 @@ def ns_initialize(opts) raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICY_CLASSES.include?(@fallback_policy) if @max_queue == 0 - queue = java.util.concurrent.LinkedBlockingQueue.new + if @synchronous + queue = java.util.concurrent.SynchronousQueue.new + else + queue = java.util.concurrent.LinkedBlockingQueue.new + end else queue = java.util.concurrent.LinkedBlockingQueue.new(@max_queue) end diff --git a/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb b/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb index 0044fb40a..dc20d765b 100644 --- a/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb +++ b/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb @@ -23,6 +23,9 @@ class RubyThreadPoolExecutor < RubyExecutorService # @!macro thread_pool_executor_constant_default_thread_timeout DEFAULT_THREAD_IDLETIMEOUT = 60 + # @!macro thread_pool_executor_constant_default_synchronous + DEFAULT_SYNCHRONOUS = false + # @!macro thread_pool_executor_attr_reader_max_length attr_reader :max_length @@ -35,6 +38,9 @@ class RubyThreadPoolExecutor < RubyExecutorService # @!macro thread_pool_executor_attr_reader_max_queue attr_reader :max_queue + # @!macro thread_pool_executor_attr_reader_synchronous + attr_reader :synchronous + # @!macro thread_pool_executor_method_initialize def initialize(opts = {}) super(opts) @@ -114,9 +120,11 @@ def ns_initialize(opts) @max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i @idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i @max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i + @synchronous = opts.fetch(:synchronous, DEFAULT_SYNCHRONOUS) @fallback_policy = opts.fetch(:fallback_policy, :abort) - raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy) + raise ArgumentError.new("`synchronous` cannot be set unless `max_queue` is 0") if @synchronous && @max_queue > 0 + raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy) raise ArgumentError.new("`max_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if @max_length < DEFAULT_MIN_POOL_SIZE raise ArgumentError.new("`max_threads` cannot be greater than #{DEFAULT_MAX_POOL_SIZE}") if @max_length > DEFAULT_MAX_POOL_SIZE raise ArgumentError.new("`min_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if @min_length < DEFAULT_MIN_POOL_SIZE @@ -201,6 +209,8 @@ def ns_assign_worker(*args, &task) # # @!visibility private def ns_enqueue(*args, &task) + return false if @synchronous + if !ns_limited_queue? || @queue.size < @max_queue @queue << [task, args] true diff --git a/lib/concurrent-ruby/concurrent/executor/thread_pool_executor.rb b/lib/concurrent-ruby/concurrent/executor/thread_pool_executor.rb index 72e1bae85..253d46a9d 100644 --- a/lib/concurrent-ruby/concurrent/executor/thread_pool_executor.rb +++ b/lib/concurrent-ruby/concurrent/executor/thread_pool_executor.rb @@ -73,7 +73,8 @@ class ThreadPoolExecutor < ThreadPoolExecutorImplementation # @option opts [Symbol] :fallback_policy (:abort) the policy for handling new # tasks that are received when the queue size has reached # `max_queue` or the executor has shut down - # + # @option opts [Boolean] :synchronous (DEFAULT_SYNCHRONOUS) whether or not a value of 0 + # for :max_queue means the queue must perform direct hand-off rather than unbounded. # @raise [ArgumentError] if `:max_threads` is less than one # @raise [ArgumentError] if `:min_threads` is less than zero # @raise [ArgumentError] if `:fallback_policy` is not one of the values specified diff --git a/spec/concurrent/executor/thread_pool_executor_shared.rb b/spec/concurrent/executor/thread_pool_executor_shared.rb index 3ae32c8d3..74f7ed216 100644 --- a/spec/concurrent/executor/thread_pool_executor_shared.rb +++ b/spec/concurrent/executor/thread_pool_executor_shared.rb @@ -137,6 +137,55 @@ end end + context '#synchronous' do + + subject do + described_class.new( + min_threads: 1, + max_threads: 2, + max_queue: 0, + synchronous: true, + fallback_policy: :abort + ) + end + + it 'cannot be set unless `max_queue` is zero' do + expect { + described_class.new( + min_threads: 2, + max_threads: 5, + max_queue: 1, + fallback_policy: :discard, + synchronous: true + ) + }.to raise_error(ArgumentError) + end + + it 'executes fallback policy once max_threads has been reached' do + latch = Concurrent::CountDownLatch.new(1) + (subject.max_length).times do + subject.post { + latch.wait + } + end + + expect(subject.queue_length).to eq 0 + + # verify nothing happening + 20.times { + expect { + subject.post { + sleep + } + }.to raise_error(Concurrent::RejectedExecutionError) + } + + # release + latch.count_down + end + + end + context '#queue_length', :truffle_bug => true do # only actually fails for RubyThreadPoolExecutor let!(:expected_max){ 10 }