Skip to content

Commit

Permalink
Merge pull request #841 from fzakaria/remove-at-exit
Browse files Browse the repository at this point in the history
Remove AtExit code
  • Loading branch information
pitr-ch committed Feb 7, 2020
2 parents a4d7a75 + 2c13ac6 commit 8ccb214
Show file tree
Hide file tree
Showing 19 changed files with 80 additions and 204 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
@@ -1,5 +1,11 @@
## Current

concurrent-ruby:

* Concurrent.disable_at_exit_handlers! is no longer needed and was deprecated.
* AbstractExecutorService#auto_terminate= was deprecated and has no effect.
Set :auto_terminate option instead when executor is initialized.

## Release v1.1.6.pre1, edge v0.6.0.pre1 (26 Jan 2020)

concurrent-ruby:
Expand Down
2 changes: 1 addition & 1 deletion lib/concurrent-ruby-edge/concurrent/channel.rb
Expand Up @@ -15,7 +15,7 @@ class Channel
include Enumerable

# NOTE: Move to global IO pool once stable
GOROUTINES = Concurrent::CachedThreadPool.new(auto_terminate: true)
GOROUTINES = Concurrent::CachedThreadPool.new
private_constant :GOROUTINES

BUFFER_TYPES = {
Expand Down
12 changes: 7 additions & 5 deletions lib/concurrent-ruby/concurrent/configuration.rb
Expand Up @@ -3,13 +3,14 @@
require 'concurrent/errors'
require 'concurrent/atomic/atomic_reference'
require 'concurrent/concern/logging'
require 'concurrent/concern/deprecation'
require 'concurrent/executor/immediate_executor'
require 'concurrent/executor/cached_thread_pool'
require 'concurrent/utility/at_exit'
require 'concurrent/utility/processor_counter'

module Concurrent
extend Concern::Logging
extend Concern::Deprecation

autoload :Options, 'concurrent/options'
autoload :TimerSet, 'concurrent/executor/timer_set'
Expand Down Expand Up @@ -97,15 +98,15 @@ def self.global_logger=(value)
end

# @!visibility private
GLOBAL_FAST_EXECUTOR = Delay.new { Concurrent.new_fast_executor(auto_terminate: true) }
GLOBAL_FAST_EXECUTOR = Delay.new { Concurrent.new_fast_executor }
private_constant :GLOBAL_FAST_EXECUTOR

# @!visibility private
GLOBAL_IO_EXECUTOR = Delay.new { Concurrent.new_io_executor(auto_terminate: true) }
GLOBAL_IO_EXECUTOR = Delay.new { Concurrent.new_io_executor }
private_constant :GLOBAL_IO_EXECUTOR

# @!visibility private
GLOBAL_TIMER_SET = Delay.new { TimerSet.new(auto_terminate: true) }
GLOBAL_TIMER_SET = Delay.new { TimerSet.new }
private_constant :GLOBAL_TIMER_SET

# @!visibility private
Expand All @@ -125,9 +126,10 @@ def self.global_logger=(value)
# @note This method should *never* be called
# from within a gem. It should *only* be used from within the main
# application and even then it should be used only when necessary.
# @deprecated Has no effect since it is no longer needed, see https://github.com/ruby-concurrency/concurrent-ruby/pull/841.
#
def self.disable_at_exit_handlers!
AT_EXIT.enabled = false
deprecated "Method #disable_at_exit_handlers! has no effect since it is no longer needed, see https://github.com/ruby-concurrency/concurrent-ruby/pull/841."
end

# Global thread pool optimized for short, fast *operations*.
Expand Down
@@ -1,14 +1,15 @@
require 'concurrent/errors'
require 'concurrent/concern/deprecation'
require 'concurrent/executor/executor_service'
require 'concurrent/synchronization'
require 'concurrent/utility/at_exit'

module Concurrent

# @!macro abstract_executor_service_public_api
# @!visibility private
class AbstractExecutorService < Synchronization::LockableObject
include ExecutorService
include Concern::Deprecation

# The set of possible fallback policies that may be set at thread pool creation.
FALLBACK_POLICIES = [:abort, :discard, :caller_runs].freeze
Expand All @@ -22,8 +23,9 @@ class AbstractExecutorService < Synchronization::LockableObject
def initialize(opts = {}, &block)
super(&nil)
synchronize do
ns_initialize(opts, &block)
@auto_terminate = opts.fetch(:auto_terminate, true)
@name = opts.fetch(:name) if opts.key?(:name)
ns_initialize(opts, &block)
end
end

Expand Down Expand Up @@ -63,12 +65,12 @@ def shutdown?

# @!macro executor_service_method_auto_terminate_question
def auto_terminate?
synchronize { ns_auto_terminate? }
synchronize { @auto_terminate }
end

# @!macro executor_service_method_auto_terminate_setter
def auto_terminate=(value)
synchronize { self.ns_auto_terminate = value }
deprecated "Method #auto_terminate= has no effect. Set :auto_terminate option when executor is initialized."
end

private
Expand Down Expand Up @@ -119,25 +121,8 @@ def ns_kill_execution
end

def ns_auto_terminate?
!!@auto_terminate
@auto_terminate
end

def ns_auto_terminate=(value)
case value
when true
AT_EXIT.add(self) { terminate_at_exit }
@auto_terminate = true
when false
AT_EXIT.delete(self)
@auto_terminate = false
else
raise ArgumentError
end
end

def terminate_at_exit
kill # TODO be gentle first
wait_for_termination(10)
end
end
end
Expand Up @@ -51,10 +51,9 @@ def initialize(opts = {})
def ns_initialize(opts)
super(opts)
if Concurrent.on_jruby?
self.auto_terminate = opts.fetch(:auto_terminate, true)
@max_queue = 0
@executor = java.util.concurrent.Executors.newCachedThreadPool(
DaemonThreadFactory.new(self.auto_terminate?))
DaemonThreadFactory.new(ns_auto_terminate?))
@executor.setRejectedExecutionHandler(FALLBACK_POLICY_CLASSES[@fallback_policy].new)
@executor.setKeepAliveTime(opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT), java.util.concurrent.TimeUnit::SECONDS)
end
Expand Down
4 changes: 2 additions & 2 deletions lib/concurrent-ruby/concurrent/executor/executor_service.rb
Expand Up @@ -111,10 +111,10 @@ module Concurrent

# @!macro executor_service_method_auto_terminate_setter
#
# Set the auto-terminate behavior for this executor.
#
# Set the auto-terminate behavior for this executor.
# @deprecated Has no effect
# @param [Boolean] value The new auto-terminate value to set for this executor.
#
# @return [Boolean] `true` when auto-termination is enabled else `false`.

###################################################################
Expand Down
18 changes: 6 additions & 12 deletions lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb
Expand Up @@ -121,9 +121,7 @@ module Concurrent
# * `max_queue`: The maximum number of tasks that may be waiting in the work queue at
# any one time. When the queue size reaches `max_queue` and no new threads can be created,
# subsequent tasks will be rejected in accordance with the configured `fallback_policy`.
# * `auto_terminate`: When true (default) an `at_exit` handler will be registered which
# will stop the thread pool when the application exits. See below for more information
# on shutting down thread pools.
# * `auto_terminate`: When true (default), the threads started will be marked as daemon.
# * `fallback_policy`: The policy defining how rejected tasks are handled.
#
# Three fallback policies are supported:
Expand All @@ -148,16 +146,12 @@ module Concurrent
#
# On some runtime platforms (most notably the JVM) the application will not
# exit until all thread pools have been shutdown. To prevent applications from
# "hanging" on exit all thread pools include an `at_exit` handler that will
# stop the thread pool when the application exits. This handler uses a brute
# force method to stop the pool and makes no guarantees regarding resources being
# used by any tasks still running. Registration of this `at_exit` handler can be
# prevented by setting the thread pool's constructor `:auto_terminate` option to
# `false` when the thread pool is created. All thread pools support this option.
# "hanging" on exit, all threads can be marked as daemon according to the
# `:auto_terminate` option.
#
# ```ruby
# pool1 = Concurrent::FixedThreadPool.new(5) # an `at_exit` handler will be registered
# pool2 = Concurrent::FixedThreadPool.new(5, auto_terminate: false) # prevent `at_exit` handler registration
# pool1 = Concurrent::FixedThreadPool.new(5) # threads will be marked as daemon
# pool2 = Concurrent::FixedThreadPool.new(5, auto_terminate: false) # mark threads as non-daemon
# ```
#
# @note Failure to properly shutdown a thread pool can lead to unpredictable results.
Expand All @@ -166,7 +160,7 @@ module Concurrent
# @see http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html Java Tutorials: Thread Pools
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html Java Executors class
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html Java ExecutorService interface
# @see http://ruby-doc.org//core-2.2.0/Kernel.html#method-i-at_exit Kernel#at_exit
# @see https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#setDaemon-boolean-



Expand Down
20 changes: 18 additions & 2 deletions lib/concurrent-ruby/concurrent/executor/java_executor_service.rb
Expand Up @@ -38,15 +38,13 @@ def wait_for_termination(timeout = nil)

def shutdown
synchronize do
self.ns_auto_terminate = false
@executor.shutdown
nil
end
end

def kill
synchronize do
self.ns_auto_terminate = false
@executor.shutdownNow
nil
end
Expand Down Expand Up @@ -83,5 +81,23 @@ def run
end
private_constant :Job
end

class DaemonThreadFactory
# hide include from YARD
send :include, java.util.concurrent.ThreadFactory

def initialize(daemonize = true)
@daemonize = daemonize
end

def newThread(runnable)
thread = java.util.concurrent.Executors.defaultThreadFactory().newThread(runnable)
thread.setDaemon(@daemonize)
return thread
end
end

private_constant :DaemonThreadFactory

end
end
Expand Up @@ -17,12 +17,13 @@ def initialize(opts = {})
end

private

def ns_initialize(opts)
@executor = java.util.concurrent.Executors.newSingleThreadExecutor
@executor = java.util.concurrent.Executors.newSingleThreadExecutor(
DaemonThreadFactory.new(ns_auto_terminate?)
)
@fallback_policy = opts.fetch(:fallback_policy, :discard)
raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICY_CLASSES.keys.include?(@fallback_policy)
self.auto_terminate = opts.fetch(:auto_terminate, true)
end
end
end
Expand Down
Expand Up @@ -108,35 +108,17 @@ def ns_initialize(opts)
queue = java.util.concurrent.LinkedBlockingQueue.new(@max_queue)
end

self.auto_terminate = opts.fetch(:auto_terminate, true)

@executor = java.util.concurrent.ThreadPoolExecutor.new(
min_length,
max_length,
idletime,
java.util.concurrent.TimeUnit::SECONDS,
queue,
DaemonThreadFactory.new(self.auto_terminate?),
DaemonThreadFactory.new(ns_auto_terminate?),
FALLBACK_POLICY_CLASSES[@fallback_policy].new)

end
end

class DaemonThreadFactory
# hide include from YARD
send :include, java.util.concurrent.ThreadFactory

def initialize(daemonize = true)
@daemonize = daemonize
end

def newThread(runnable)
thread = java.util.concurrent.Executors.defaultThreadFactory().newThread(runnable)
thread.setDaemon(@daemonize)
return thread
end
end

private_constant :DaemonThreadFactory
end
end
Expand Up @@ -27,7 +27,6 @@ def post(*args, &task)
def shutdown
synchronize do
break unless running?
self.ns_auto_terminate = false
stop_event.set
ns_shutdown_execution
end
Expand All @@ -37,7 +36,6 @@ def shutdown
def kill
synchronize do
break if shutdown?
self.ns_auto_terminate = false
stop_event.set
ns_kill_execution
stopped_event.set
Expand Down
Expand Up @@ -15,7 +15,6 @@ def initialize(opts = {})
max_queue: 0,
idletime: DEFAULT_THREAD_IDLETIMEOUT,
fallback_policy: opts.fetch(:fallback_policy, :discard),
auto_terminate: opts.fetch(:auto_terminate, true)
)
end
end
Expand Down
Expand Up @@ -122,8 +122,6 @@ def ns_initialize(opts)
raise ArgumentError.new("`min_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if @min_length < DEFAULT_MIN_POOL_SIZE
raise ArgumentError.new("`min_threads` cannot be more than `max_threads`") if min_length > max_length

self.auto_terminate = opts.fetch(:auto_terminate, true)

@pool = [] # all workers
@ready = [] # used as a stash (most idle worker is at the start)
@queue = [] # used as queue
Expand Down
1 change: 0 additions & 1 deletion lib/concurrent-ruby/concurrent/executor/timer_set.rb
Expand Up @@ -77,7 +77,6 @@ def ns_initialize(opts)
@timer_executor = SingleThreadExecutor.new
@condition = Event.new
@ruby_pid = $$ # detects if Ruby has forked
self.auto_terminate = opts.fetch(:auto_terminate, true)
end

# Post the task to the internal queue.
Expand Down

0 comments on commit 8ccb214

Please sign in to comment.