diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ed55254f..c93e48f51 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ ## Current +concurrent-ruby: + +* Concurrent.disable_at_exit_handlers! is no longer needed and was deprecated. +* AbstractExecutorService#auto_terminate= was deprecated and has no effect. + Set :auto_terminate option instead when executor is initialized. + ## Release v1.1.6.pre1, edge v0.6.0.pre1 (26 Jan 2020) concurrent-ruby: diff --git a/lib/concurrent-ruby-edge/concurrent/channel.rb b/lib/concurrent-ruby-edge/concurrent/channel.rb index 93e139b5e..36f0c1e55 100644 --- a/lib/concurrent-ruby-edge/concurrent/channel.rb +++ b/lib/concurrent-ruby-edge/concurrent/channel.rb @@ -15,7 +15,7 @@ class Channel include Enumerable # NOTE: Move to global IO pool once stable - GOROUTINES = Concurrent::CachedThreadPool.new(auto_terminate: true) + GOROUTINES = Concurrent::CachedThreadPool.new private_constant :GOROUTINES BUFFER_TYPES = { diff --git a/lib/concurrent-ruby/concurrent/configuration.rb b/lib/concurrent-ruby/concurrent/configuration.rb index 69c9847a5..a00dc8440 100644 --- a/lib/concurrent-ruby/concurrent/configuration.rb +++ b/lib/concurrent-ruby/concurrent/configuration.rb @@ -3,13 +3,14 @@ require 'concurrent/errors' require 'concurrent/atomic/atomic_reference' require 'concurrent/concern/logging' +require 'concurrent/concern/deprecation' require 'concurrent/executor/immediate_executor' require 'concurrent/executor/cached_thread_pool' -require 'concurrent/utility/at_exit' require 'concurrent/utility/processor_counter' module Concurrent extend Concern::Logging + extend Concern::Deprecation autoload :Options, 'concurrent/options' autoload :TimerSet, 'concurrent/executor/timer_set' @@ -97,15 +98,15 @@ def self.global_logger=(value) end # @!visibility private - GLOBAL_FAST_EXECUTOR = Delay.new { Concurrent.new_fast_executor(auto_terminate: true) } + GLOBAL_FAST_EXECUTOR = Delay.new { Concurrent.new_fast_executor } private_constant :GLOBAL_FAST_EXECUTOR # @!visibility private - GLOBAL_IO_EXECUTOR = Delay.new { Concurrent.new_io_executor(auto_terminate: true) } + GLOBAL_IO_EXECUTOR = Delay.new { Concurrent.new_io_executor } private_constant :GLOBAL_IO_EXECUTOR # @!visibility private - GLOBAL_TIMER_SET = Delay.new { TimerSet.new(auto_terminate: true) } + GLOBAL_TIMER_SET = Delay.new { TimerSet.new } private_constant :GLOBAL_TIMER_SET # @!visibility private @@ -125,9 +126,10 @@ def self.global_logger=(value) # @note This method should *never* be called # from within a gem. It should *only* be used from within the main # application and even then it should be used only when necessary. + # @deprecated Has no effect since it is no longer needed, see https://github.com/ruby-concurrency/concurrent-ruby/pull/841. # def self.disable_at_exit_handlers! - AT_EXIT.enabled = false + deprecated "Method #disable_at_exit_handlers! has no effect since it is no longer needed, see https://github.com/ruby-concurrency/concurrent-ruby/pull/841." end # Global thread pool optimized for short, fast *operations*. diff --git a/lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb b/lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb index b768d1c93..6d0b0474d 100644 --- a/lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb +++ b/lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb @@ -1,7 +1,7 @@ require 'concurrent/errors' +require 'concurrent/concern/deprecation' require 'concurrent/executor/executor_service' require 'concurrent/synchronization' -require 'concurrent/utility/at_exit' module Concurrent @@ -9,6 +9,7 @@ module Concurrent # @!visibility private class AbstractExecutorService < Synchronization::LockableObject include ExecutorService + include Concern::Deprecation # The set of possible fallback policies that may be set at thread pool creation. FALLBACK_POLICIES = [:abort, :discard, :caller_runs].freeze @@ -22,8 +23,9 @@ class AbstractExecutorService < Synchronization::LockableObject def initialize(opts = {}, &block) super(&nil) synchronize do - ns_initialize(opts, &block) + @auto_terminate = opts.fetch(:auto_terminate, true) @name = opts.fetch(:name) if opts.key?(:name) + ns_initialize(opts, &block) end end @@ -63,12 +65,12 @@ def shutdown? # @!macro executor_service_method_auto_terminate_question def auto_terminate? - synchronize { ns_auto_terminate? } + synchronize { @auto_terminate } end # @!macro executor_service_method_auto_terminate_setter def auto_terminate=(value) - synchronize { self.ns_auto_terminate = value } + deprecated "Method #auto_terminate= has no effect. Set :auto_terminate option when executor is initialized." end private @@ -119,25 +121,8 @@ def ns_kill_execution end def ns_auto_terminate? - !!@auto_terminate + @auto_terminate end - def ns_auto_terminate=(value) - case value - when true - AT_EXIT.add(self) { terminate_at_exit } - @auto_terminate = true - when false - AT_EXIT.delete(self) - @auto_terminate = false - else - raise ArgumentError - end - end - - def terminate_at_exit - kill # TODO be gentle first - wait_for_termination(10) - end end end diff --git a/lib/concurrent-ruby/concurrent/executor/cached_thread_pool.rb b/lib/concurrent-ruby/concurrent/executor/cached_thread_pool.rb index 5c2d2088b..de50ed179 100644 --- a/lib/concurrent-ruby/concurrent/executor/cached_thread_pool.rb +++ b/lib/concurrent-ruby/concurrent/executor/cached_thread_pool.rb @@ -51,10 +51,9 @@ def initialize(opts = {}) def ns_initialize(opts) super(opts) if Concurrent.on_jruby? - self.auto_terminate = opts.fetch(:auto_terminate, true) @max_queue = 0 @executor = java.util.concurrent.Executors.newCachedThreadPool( - DaemonThreadFactory.new(self.auto_terminate?)) + DaemonThreadFactory.new(ns_auto_terminate?)) @executor.setRejectedExecutionHandler(FALLBACK_POLICY_CLASSES[@fallback_policy].new) @executor.setKeepAliveTime(opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT), java.util.concurrent.TimeUnit::SECONDS) end diff --git a/lib/concurrent-ruby/concurrent/executor/executor_service.rb b/lib/concurrent-ruby/concurrent/executor/executor_service.rb index 0fcbeeeb2..7e344919e 100644 --- a/lib/concurrent-ruby/concurrent/executor/executor_service.rb +++ b/lib/concurrent-ruby/concurrent/executor/executor_service.rb @@ -111,10 +111,10 @@ module Concurrent # @!macro executor_service_method_auto_terminate_setter # - # Set the auto-terminate behavior for this executor. # + # Set the auto-terminate behavior for this executor. + # @deprecated Has no effect # @param [Boolean] value The new auto-terminate value to set for this executor. - # # @return [Boolean] `true` when auto-termination is enabled else `false`. ################################################################### diff --git a/lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb b/lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb index 197bf61c1..681fe21fb 100644 --- a/lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb +++ b/lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb @@ -121,9 +121,7 @@ module Concurrent # * `max_queue`: The maximum number of tasks that may be waiting in the work queue at # any one time. When the queue size reaches `max_queue` and no new threads can be created, # subsequent tasks will be rejected in accordance with the configured `fallback_policy`. - # * `auto_terminate`: When true (default) an `at_exit` handler will be registered which - # will stop the thread pool when the application exits. See below for more information - # on shutting down thread pools. + # * `auto_terminate`: When true (default), the threads started will be marked as daemon. # * `fallback_policy`: The policy defining how rejected tasks are handled. # # Three fallback policies are supported: @@ -148,16 +146,12 @@ module Concurrent # # On some runtime platforms (most notably the JVM) the application will not # exit until all thread pools have been shutdown. To prevent applications from - # "hanging" on exit all thread pools include an `at_exit` handler that will - # stop the thread pool when the application exits. This handler uses a brute - # force method to stop the pool and makes no guarantees regarding resources being - # used by any tasks still running. Registration of this `at_exit` handler can be - # prevented by setting the thread pool's constructor `:auto_terminate` option to - # `false` when the thread pool is created. All thread pools support this option. + # "hanging" on exit, all threads can be marked as daemon according to the + # `:auto_terminate` option. # # ```ruby - # pool1 = Concurrent::FixedThreadPool.new(5) # an `at_exit` handler will be registered - # pool2 = Concurrent::FixedThreadPool.new(5, auto_terminate: false) # prevent `at_exit` handler registration + # pool1 = Concurrent::FixedThreadPool.new(5) # threads will be marked as daemon + # pool2 = Concurrent::FixedThreadPool.new(5, auto_terminate: false) # mark threads as non-daemon # ``` # # @note Failure to properly shutdown a thread pool can lead to unpredictable results. @@ -166,7 +160,7 @@ module Concurrent # @see http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html Java Tutorials: Thread Pools # @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html Java Executors class # @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html Java ExecutorService interface - # @see http://ruby-doc.org//core-2.2.0/Kernel.html#method-i-at_exit Kernel#at_exit + # @see https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#setDaemon-boolean- diff --git a/lib/concurrent-ruby/concurrent/executor/java_executor_service.rb b/lib/concurrent-ruby/concurrent/executor/java_executor_service.rb index 238b5f83a..9c0f3100c 100644 --- a/lib/concurrent-ruby/concurrent/executor/java_executor_service.rb +++ b/lib/concurrent-ruby/concurrent/executor/java_executor_service.rb @@ -38,7 +38,6 @@ def wait_for_termination(timeout = nil) def shutdown synchronize do - self.ns_auto_terminate = false @executor.shutdown nil end @@ -46,7 +45,6 @@ def shutdown def kill synchronize do - self.ns_auto_terminate = false @executor.shutdownNow nil end @@ -83,5 +81,23 @@ def run end private_constant :Job end + + class DaemonThreadFactory + # hide include from YARD + send :include, java.util.concurrent.ThreadFactory + + def initialize(daemonize = true) + @daemonize = daemonize + end + + def newThread(runnable) + thread = java.util.concurrent.Executors.defaultThreadFactory().newThread(runnable) + thread.setDaemon(@daemonize) + return thread + end + end + + private_constant :DaemonThreadFactory + end end diff --git a/lib/concurrent-ruby/concurrent/executor/java_single_thread_executor.rb b/lib/concurrent-ruby/concurrent/executor/java_single_thread_executor.rb index 1cf59b065..7aa24f2d7 100644 --- a/lib/concurrent-ruby/concurrent/executor/java_single_thread_executor.rb +++ b/lib/concurrent-ruby/concurrent/executor/java_single_thread_executor.rb @@ -17,12 +17,13 @@ def initialize(opts = {}) end private - + def ns_initialize(opts) - @executor = java.util.concurrent.Executors.newSingleThreadExecutor + @executor = java.util.concurrent.Executors.newSingleThreadExecutor( + DaemonThreadFactory.new(ns_auto_terminate?) + ) @fallback_policy = opts.fetch(:fallback_policy, :discard) raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICY_CLASSES.keys.include?(@fallback_policy) - self.auto_terminate = opts.fetch(:auto_terminate, true) end end end diff --git a/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb b/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb index 985b9f78a..b8cb5f1d5 100644 --- a/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb +++ b/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb @@ -108,35 +108,17 @@ def ns_initialize(opts) queue = java.util.concurrent.LinkedBlockingQueue.new(@max_queue) end - self.auto_terminate = opts.fetch(:auto_terminate, true) - @executor = java.util.concurrent.ThreadPoolExecutor.new( min_length, max_length, idletime, java.util.concurrent.TimeUnit::SECONDS, queue, - DaemonThreadFactory.new(self.auto_terminate?), + DaemonThreadFactory.new(ns_auto_terminate?), FALLBACK_POLICY_CLASSES[@fallback_policy].new) end end - class DaemonThreadFactory - # hide include from YARD - send :include, java.util.concurrent.ThreadFactory - - def initialize(daemonize = true) - @daemonize = daemonize - end - - def newThread(runnable) - thread = java.util.concurrent.Executors.defaultThreadFactory().newThread(runnable) - thread.setDaemon(@daemonize) - return thread - end - end - - private_constant :DaemonThreadFactory end end diff --git a/lib/concurrent-ruby/concurrent/executor/ruby_executor_service.rb b/lib/concurrent-ruby/concurrent/executor/ruby_executor_service.rb index 7b2ee7377..06658d376 100644 --- a/lib/concurrent-ruby/concurrent/executor/ruby_executor_service.rb +++ b/lib/concurrent-ruby/concurrent/executor/ruby_executor_service.rb @@ -27,7 +27,6 @@ def post(*args, &task) def shutdown synchronize do break unless running? - self.ns_auto_terminate = false stop_event.set ns_shutdown_execution end @@ -37,7 +36,6 @@ def shutdown def kill synchronize do break if shutdown? - self.ns_auto_terminate = false stop_event.set ns_kill_execution stopped_event.set diff --git a/lib/concurrent-ruby/concurrent/executor/ruby_single_thread_executor.rb b/lib/concurrent-ruby/concurrent/executor/ruby_single_thread_executor.rb index 305a49e62..916337d4b 100644 --- a/lib/concurrent-ruby/concurrent/executor/ruby_single_thread_executor.rb +++ b/lib/concurrent-ruby/concurrent/executor/ruby_single_thread_executor.rb @@ -15,7 +15,6 @@ def initialize(opts = {}) max_queue: 0, idletime: DEFAULT_THREAD_IDLETIMEOUT, fallback_policy: opts.fetch(:fallback_policy, :discard), - auto_terminate: opts.fetch(:auto_terminate, true) ) end end diff --git a/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb b/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb index d9d44d5bb..0044fb40a 100644 --- a/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb +++ b/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb @@ -122,8 +122,6 @@ def ns_initialize(opts) raise ArgumentError.new("`min_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if @min_length < DEFAULT_MIN_POOL_SIZE raise ArgumentError.new("`min_threads` cannot be more than `max_threads`") if min_length > max_length - self.auto_terminate = opts.fetch(:auto_terminate, true) - @pool = [] # all workers @ready = [] # used as a stash (most idle worker is at the start) @queue = [] # used as queue diff --git a/lib/concurrent-ruby/concurrent/executor/timer_set.rb b/lib/concurrent-ruby/concurrent/executor/timer_set.rb index 364910152..0dfaf1288 100644 --- a/lib/concurrent-ruby/concurrent/executor/timer_set.rb +++ b/lib/concurrent-ruby/concurrent/executor/timer_set.rb @@ -77,7 +77,6 @@ def ns_initialize(opts) @timer_executor = SingleThreadExecutor.new @condition = Event.new @ruby_pid = $$ # detects if Ruby has forked - self.auto_terminate = opts.fetch(:auto_terminate, true) end # Post the task to the internal queue. diff --git a/lib/concurrent-ruby/concurrent/utility/at_exit.rb b/lib/concurrent-ruby/concurrent/utility/at_exit.rb deleted file mode 100644 index 006264385..000000000 --- a/lib/concurrent-ruby/concurrent/utility/at_exit.rb +++ /dev/null @@ -1,96 +0,0 @@ -require 'logger' -require 'concurrent/synchronization' - -module Concurrent - - # Provides ability to add and remove handlers to be run at `Kernel#at_exit`, order is undefined. - # Each handler is executed at most once. - # - # @!visibility private - class AtExitImplementation < Synchronization::LockableObject - include Logger::Severity - - def initialize(*args) - super() - synchronize { ns_initialize(*args) } - end - - # Add a handler to be run at `Kernel#at_exit` - # @param [Object] handler_id optionally provide an id, if already present, handler is replaced - # @yield the handler - # @return id of the handler - def add(handler_id = nil, &handler) - id = handler_id || handler.object_id - synchronize { @handlers[id] = handler } - id - end - - # Delete a handler by handler_id - # @return [true, false] - def delete(handler_id) - !!synchronize { @handlers.delete handler_id } - end - - # Is handler with handler_id rpesent? - # @return [true, false] - def handler?(handler_id) - synchronize { @handlers.key? handler_id } - end - - # @return copy of the handlers - def handlers - synchronize { @handlers }.clone - end - - # install `Kernel#at_exit` callback to execute added handlers - def install - synchronize do - @installed ||= begin - at_exit { runner } - true - end - self - end - end - - # Will it run during `Kernel#at_exit` - def enabled? - synchronize { @enabled } - end - - # Configure if it runs during `Kernel#at_exit` - def enabled=(value) - synchronize { @enabled = value } - end - - # run the handlers manually - # @return ids of the handlers - def run - handlers, _ = synchronize { handlers, @handlers = @handlers, {} } - handlers.each do |_, handler| - begin - handler.call - rescue => error - Concurrent.global_logger.call(ERROR, error) - end - end - handlers.keys - end - - private - - def ns_initialize(enabled = true) - @handlers = {} - @enabled = enabled - end - - def runner - run if synchronize { @enabled } - end - end - - private_constant :AtExitImplementation - - # @!visibility private - AT_EXIT = AtExitImplementation.new.install -end diff --git a/spec/concurrent/configuration_spec.rb b/spec/concurrent/configuration_spec.rb index 57f1aa960..1662c874c 100644 --- a/spec/concurrent/configuration_spec.rb +++ b/spec/concurrent/configuration_spec.rb @@ -19,13 +19,6 @@ module Concurrent expect(Concurrent.global_io_executor).to respond_to(:post) end - specify 'Concurrent::AtExit.run acts on all executors with auto_terminate: true' do - # The 'at_least(:once)' clauses account for global config reset - expect(Concurrent.global_fast_executor).to receive(:kill).at_least(:once).with(no_args) - expect(Concurrent.global_io_executor).to receive(:kill).at_least(:once).with(no_args) - expect(Concurrent.global_timer_set).to receive(:kill).at_least(:once).with(no_args) - Concurrent::AT_EXIT.run - end end end end diff --git a/spec/concurrent/executor/cached_thread_pool_spec.rb b/spec/concurrent/executor/cached_thread_pool_spec.rb index 17f5f921c..40e352627 100644 --- a/spec/concurrent/executor/cached_thread_pool_spec.rb +++ b/spec/concurrent/executor/cached_thread_pool_spec.rb @@ -201,38 +201,16 @@ module Concurrent end end - context 'auto terminate' do - - # https://github.com/ruby-concurrency/concurrent-ruby/issues/817 - # https://github.com/ruby-concurrency/concurrent-ruby/issues/839 - it 'does not stop shutdown ' do - Timeout.timeout(10) do - begin - test_file = File.join File.dirname(__FILE__), 'pool_quits.rb' - pid = spawn RbConfig.ruby, test_file - Process.waitpid pid - expect($?.success?).to eq true - rescue Timeout::Error => e - Process.kill :KILL, pid - raise e - end - end - end - - end - context 'stress', notravis: true do configurations = [ { min_threads: 2, max_threads: ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE, - auto_terminate: false, idletime: 0.1, # 1 minute max_queue: 0, # unlimited fallback_policy: :caller_runs, # shouldn't matter -- 0 max queue gc_interval: 0.1 }, { min_threads: 2, max_threads: 4, - auto_terminate: false, idletime: 0.1, # 1 minute max_queue: 0, # unlimited fallback_policy: :caller_runs, # shouldn't matter -- 0 max queue diff --git a/spec/concurrent/executor/pool_quits.rb b/spec/concurrent/executor/executor_quits.rb similarity index 52% rename from spec/concurrent/executor/pool_quits.rb rename to spec/concurrent/executor/executor_quits.rb index 1791e1cff..d8c1c2cd5 100644 --- a/spec/concurrent/executor/pool_quits.rb +++ b/spec/concurrent/executor/executor_quits.rb @@ -4,11 +4,11 @@ require 'concurrent-ruby' -# the test relies on replicating that Minitest messed up the AtExit handling -Concurrent.disable_at_exit_handlers! -pool = Concurrent::CachedThreadPool.new -pool.post do - sleep # sleep indefinitely +executors = [Concurrent::CachedThreadPool.new, Concurrent::SingleThreadExecutor.new, Concurrent::FixedThreadPool.new(1)] +executors.each do |executor| + executor.post do + sleep # sleep indefinitely + end end # the process main thread should quit out which should kill the daemon CachedThreadPool diff --git a/spec/concurrent/executor/executor_service_shared.rb b/spec/concurrent/executor/executor_service_shared.rb index 32e7eb6f7..8661caffe 100644 --- a/spec/concurrent/executor/executor_service_shared.rb +++ b/spec/concurrent/executor/executor_service_shared.rb @@ -33,6 +33,28 @@ end end + context 'auto terminate' do + + # https://github.com/ruby-concurrency/concurrent-ruby/issues/817 + # https://github.com/ruby-concurrency/concurrent-ruby/issues/839 + it 'does not stop shutdown ' do + Timeout.timeout(10) do + begin + test_file = File.join File.dirname(__FILE__), 'executor_quits.rb' + pid = spawn RbConfig.ruby, test_file + Process.waitpid pid + expect($?.success?).to eq true + rescue Errno::ECHILD + # child already gone + rescue Timeout::Error => e + Process.kill :KILL, pid + raise e + end + end + end + + end + context '#running?' do it 'returns true when the thread pool is running' do