Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove AtExit code #841

Merged
merged 11 commits into from Feb 7, 2020
6 changes: 6 additions & 0 deletions CHANGELOG.md
@@ -1,5 +1,11 @@
## Current

concurrent-ruby:

* Concurrent.disable_at_exit_handlers! is no longer needed and was deprecated.
* AbstractExecutorService#auto_terminate= was deprecated and has no effect.
Set :auto_terminate option instead when executor is initialized.

## Release v1.1.6.pre1, edge v0.6.0.pre1 (26 Jan 2020)

concurrent-ruby:
Expand Down
2 changes: 1 addition & 1 deletion lib/concurrent-ruby-edge/concurrent/channel.rb
Expand Up @@ -15,7 +15,7 @@ class Channel
include Enumerable

# NOTE: Move to global IO pool once stable
GOROUTINES = Concurrent::CachedThreadPool.new(auto_terminate: true)
GOROUTINES = Concurrent::CachedThreadPool.new
private_constant :GOROUTINES

BUFFER_TYPES = {
Expand Down
12 changes: 7 additions & 5 deletions lib/concurrent-ruby/concurrent/configuration.rb
Expand Up @@ -3,13 +3,14 @@
require 'concurrent/errors'
require 'concurrent/atomic/atomic_reference'
require 'concurrent/concern/logging'
require 'concurrent/concern/deprecation'
require 'concurrent/executor/immediate_executor'
require 'concurrent/executor/cached_thread_pool'
require 'concurrent/utility/at_exit'
require 'concurrent/utility/processor_counter'

module Concurrent
extend Concern::Logging
extend Concern::Deprecation

autoload :Options, 'concurrent/options'
autoload :TimerSet, 'concurrent/executor/timer_set'
Expand Down Expand Up @@ -97,15 +98,15 @@ def self.global_logger=(value)
end

# @!visibility private
GLOBAL_FAST_EXECUTOR = Delay.new { Concurrent.new_fast_executor(auto_terminate: true) }
GLOBAL_FAST_EXECUTOR = Delay.new { Concurrent.new_fast_executor }
private_constant :GLOBAL_FAST_EXECUTOR

# @!visibility private
GLOBAL_IO_EXECUTOR = Delay.new { Concurrent.new_io_executor(auto_terminate: true) }
GLOBAL_IO_EXECUTOR = Delay.new { Concurrent.new_io_executor }
private_constant :GLOBAL_IO_EXECUTOR

# @!visibility private
GLOBAL_TIMER_SET = Delay.new { TimerSet.new(auto_terminate: true) }
GLOBAL_TIMER_SET = Delay.new { TimerSet.new }
private_constant :GLOBAL_TIMER_SET

# @!visibility private
Expand All @@ -125,9 +126,10 @@ def self.global_logger=(value)
# @note This method should *never* be called
# from within a gem. It should *only* be used from within the main
# application and even then it should be used only when necessary.
# @deprecated Has no effect since it is no longer needed, see https://github.com/ruby-concurrency/concurrent-ruby/pull/841.
#
def self.disable_at_exit_handlers!
AT_EXIT.enabled = false
deprecated "Method #disable_at_exit_handlers! has no effect since it is no longer needed, see https://github.com/ruby-concurrency/concurrent-ruby/pull/841."
end

# Global thread pool optimized for short, fast *operations*.
Expand Down
@@ -1,14 +1,15 @@
require 'concurrent/errors'
require 'concurrent/concern/deprecation'
require 'concurrent/executor/executor_service'
require 'concurrent/synchronization'
require 'concurrent/utility/at_exit'

module Concurrent

# @!macro abstract_executor_service_public_api
# @!visibility private
class AbstractExecutorService < Synchronization::LockableObject
include ExecutorService
include Concern::Deprecation

# The set of possible fallback policies that may be set at thread pool creation.
FALLBACK_POLICIES = [:abort, :discard, :caller_runs].freeze
Expand All @@ -22,8 +23,9 @@ class AbstractExecutorService < Synchronization::LockableObject
def initialize(opts = {}, &block)
super(&nil)
synchronize do
ns_initialize(opts, &block)
@auto_terminate = opts.fetch(:auto_terminate, true)
@name = opts.fetch(:name) if opts.key?(:name)
ns_initialize(opts, &block)
end
end

Expand Down Expand Up @@ -63,12 +65,12 @@ def shutdown?

# @!macro executor_service_method_auto_terminate_question
def auto_terminate?
synchronize { ns_auto_terminate? }
synchronize { @auto_terminate }
end

# @!macro executor_service_method_auto_terminate_setter
def auto_terminate=(value)
synchronize { self.ns_auto_terminate = value }
deprecated "Method #auto_terminate= has no effect. Set :auto_terminate option when executor is initialized."
end

private
Expand Down Expand Up @@ -119,25 +121,8 @@ def ns_kill_execution
end

def ns_auto_terminate?
!!@auto_terminate
@auto_terminate
end

def ns_auto_terminate=(value)
case value
when true
AT_EXIT.add(self) { terminate_at_exit }
@auto_terminate = true
when false
AT_EXIT.delete(self)
@auto_terminate = false
else
raise ArgumentError
end
end

def terminate_at_exit
kill # TODO be gentle first
wait_for_termination(10)
end
end
end
Expand Up @@ -51,10 +51,9 @@ def initialize(opts = {})
def ns_initialize(opts)
super(opts)
if Concurrent.on_jruby?
self.auto_terminate = opts.fetch(:auto_terminate, true)
@max_queue = 0
@executor = java.util.concurrent.Executors.newCachedThreadPool(
DaemonThreadFactory.new(self.auto_terminate?))
DaemonThreadFactory.new(ns_auto_terminate?))
@executor.setRejectedExecutionHandler(FALLBACK_POLICY_CLASSES[@fallback_policy].new)
@executor.setKeepAliveTime(opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT), java.util.concurrent.TimeUnit::SECONDS)
end
Expand Down
4 changes: 2 additions & 2 deletions lib/concurrent-ruby/concurrent/executor/executor_service.rb
Expand Up @@ -111,10 +111,10 @@ module Concurrent

# @!macro executor_service_method_auto_terminate_setter
#
# Set the auto-terminate behavior for this executor.
#
# Set the auto-terminate behavior for this executor.
# @deprecated Has no effect
# @param [Boolean] value The new auto-terminate value to set for this executor.
#
# @return [Boolean] `true` when auto-termination is enabled else `false`.

###################################################################
Expand Down
18 changes: 6 additions & 12 deletions lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb
Expand Up @@ -121,9 +121,7 @@ module Concurrent
# * `max_queue`: The maximum number of tasks that may be waiting in the work queue at
# any one time. When the queue size reaches `max_queue` and no new threads can be created,
# subsequent tasks will be rejected in accordance with the configured `fallback_policy`.
# * `auto_terminate`: When true (default) an `at_exit` handler will be registered which
# will stop the thread pool when the application exits. See below for more information
# on shutting down thread pools.
# * `auto_terminate`: When true (default), the threads started will be marked as daemon.
# * `fallback_policy`: The policy defining how rejected tasks are handled.
#
# Three fallback policies are supported:
Expand All @@ -148,16 +146,12 @@ module Concurrent
#
# On some runtime platforms (most notably the JVM) the application will not
# exit until all thread pools have been shutdown. To prevent applications from
# "hanging" on exit all thread pools include an `at_exit` handler that will
# stop the thread pool when the application exits. This handler uses a brute
# force method to stop the pool and makes no guarantees regarding resources being
# used by any tasks still running. Registration of this `at_exit` handler can be
# prevented by setting the thread pool's constructor `:auto_terminate` option to
# `false` when the thread pool is created. All thread pools support this option.
# "hanging" on exit, all threads can be marked as daemon according to the
# `:auto_terminate` option.
#
# ```ruby
# pool1 = Concurrent::FixedThreadPool.new(5) # an `at_exit` handler will be registered
# pool2 = Concurrent::FixedThreadPool.new(5, auto_terminate: false) # prevent `at_exit` handler registration
# pool1 = Concurrent::FixedThreadPool.new(5) # threads will be marked as daemon
# pool2 = Concurrent::FixedThreadPool.new(5, auto_terminate: false) # mark threads as non-daemon
# ```
#
# @note Failure to properly shutdown a thread pool can lead to unpredictable results.
Expand All @@ -166,7 +160,7 @@ module Concurrent
# @see http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html Java Tutorials: Thread Pools
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html Java Executors class
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html Java ExecutorService interface
# @see http://ruby-doc.org//core-2.2.0/Kernel.html#method-i-at_exit Kernel#at_exit
# @see https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#setDaemon-boolean-



Expand Down
20 changes: 18 additions & 2 deletions lib/concurrent-ruby/concurrent/executor/java_executor_service.rb
Expand Up @@ -38,15 +38,13 @@ def wait_for_termination(timeout = nil)

def shutdown
synchronize do
self.ns_auto_terminate = false
@executor.shutdown
nil
end
end

def kill
synchronize do
self.ns_auto_terminate = false
@executor.shutdownNow
nil
end
Expand Down Expand Up @@ -83,5 +81,23 @@ def run
end
private_constant :Job
end

class DaemonThreadFactory
# hide include from YARD
send :include, java.util.concurrent.ThreadFactory

def initialize(daemonize = true)
@daemonize = daemonize
end

def newThread(runnable)
thread = java.util.concurrent.Executors.defaultThreadFactory().newThread(runnable)
thread.setDaemon(@daemonize)
return thread
end
end

private_constant :DaemonThreadFactory
fzakaria marked this conversation as resolved.
Show resolved Hide resolved

end
end
Expand Up @@ -17,12 +17,13 @@ def initialize(opts = {})
end

private

def ns_initialize(opts)
@executor = java.util.concurrent.Executors.newSingleThreadExecutor
@executor = java.util.concurrent.Executors.newSingleThreadExecutor(
fzakaria marked this conversation as resolved.
Show resolved Hide resolved
DaemonThreadFactory.new(ns_auto_terminate?)
)
@fallback_policy = opts.fetch(:fallback_policy, :discard)
raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICY_CLASSES.keys.include?(@fallback_policy)
self.auto_terminate = opts.fetch(:auto_terminate, true)
end
end
end
Expand Down
Expand Up @@ -116,27 +116,11 @@ def ns_initialize(opts)
idletime,
java.util.concurrent.TimeUnit::SECONDS,
queue,
DaemonThreadFactory.new(self.auto_terminate?),
DaemonThreadFactory.new(ns_auto_terminate?),
FALLBACK_POLICY_CLASSES[@fallback_policy].new)

end
end

class DaemonThreadFactory
# hide include from YARD
send :include, java.util.concurrent.ThreadFactory

def initialize(daemonize = true)
@daemonize = daemonize
end

def newThread(runnable)
thread = java.util.concurrent.Executors.defaultThreadFactory().newThread(runnable)
thread.setDaemon(@daemonize)
return thread
end
end

private_constant :DaemonThreadFactory
end
end
Expand Up @@ -27,7 +27,6 @@ def post(*args, &task)
def shutdown
synchronize do
break unless running?
self.ns_auto_terminate = false
stop_event.set
ns_shutdown_execution
end
Expand All @@ -37,7 +36,6 @@ def shutdown
def kill
synchronize do
break if shutdown?
self.ns_auto_terminate = false
stop_event.set
ns_kill_execution
stopped_event.set
Expand Down
Expand Up @@ -15,7 +15,6 @@ def initialize(opts = {})
max_queue: 0,
idletime: DEFAULT_THREAD_IDLETIMEOUT,
fallback_policy: opts.fetch(:fallback_policy, :discard),
auto_terminate: opts.fetch(:auto_terminate, true)
)
end
end
Expand Down
Expand Up @@ -122,8 +122,6 @@ def ns_initialize(opts)
raise ArgumentError.new("`min_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if @min_length < DEFAULT_MIN_POOL_SIZE
raise ArgumentError.new("`min_threads` cannot be more than `max_threads`") if min_length > max_length

self.auto_terminate = opts.fetch(:auto_terminate, true)

@pool = [] # all workers
@ready = [] # used as a stash (most idle worker is at the start)
@queue = [] # used as queue
Expand Down
1 change: 0 additions & 1 deletion lib/concurrent-ruby/concurrent/executor/timer_set.rb
Expand Up @@ -77,7 +77,6 @@ def ns_initialize(opts)
@timer_executor = SingleThreadExecutor.new
@condition = Event.new
@ruby_pid = $$ # detects if Ruby has forked
self.auto_terminate = opts.fetch(:auto_terminate, true)
end

# Post the task to the internal queue.
Expand Down