forked from ruby-concurrency/concurrent-ruby
-
Notifications
You must be signed in to change notification settings - Fork 0
/
abstract_executor_service.rb
128 lines (106 loc) · 3.22 KB
/
abstract_executor_service.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
require 'concurrent/errors'
require 'concurrent/concern/deprecation'
require 'concurrent/executor/executor_service'
require 'concurrent/synchronization'
module Concurrent
# @!macro abstract_executor_service_public_api
# @!visibility private
class AbstractExecutorService < Synchronization::LockableObject
include ExecutorService
include Concern::Deprecation
# The set of possible fallback policies that may be set at thread pool creation.
FALLBACK_POLICIES = [:abort, :discard, :caller_runs].freeze
# @!macro executor_service_attr_reader_fallback_policy
attr_reader :fallback_policy
attr_reader :name
# Create a new thread pool.
def initialize(opts = {}, &block)
super(&nil)
synchronize do
@auto_terminate = opts.fetch(:auto_terminate, true)
@name = opts.fetch(:name) if opts.key?(:name)
ns_initialize(opts, &block)
end
end
def to_s
name ? "#{super[0..-2]} name: #{name}>" : super
end
# @!macro executor_service_method_shutdown
def shutdown
raise NotImplementedError
end
# @!macro executor_service_method_kill
def kill
raise NotImplementedError
end
# @!macro executor_service_method_wait_for_termination
def wait_for_termination(timeout = nil)
raise NotImplementedError
end
# @!macro executor_service_method_running_question
def running?
synchronize { ns_running? }
end
# @!macro executor_service_method_shuttingdown_question
def shuttingdown?
synchronize { ns_shuttingdown? }
end
# @!macro executor_service_method_shutdown_question
def shutdown?
synchronize { ns_shutdown? }
end
# @!macro executor_service_method_auto_terminate_question
def auto_terminate?
synchronize { @auto_terminate }
end
# @!macro executor_service_method_auto_terminate_setter
def auto_terminate=(value)
deprecated "Method #auto_terminate= has no effect. Set :auto_terminate option when executor is initialized."
end
private
# Handler which executes the `fallback_policy` once the queue size
# reaches `max_queue`.
#
# @param [Array] args the arguments to the task which is being handled.
#
# @!visibility private
def handle_fallback(*args)
case fallback_policy
when :abort
raise RejectedExecutionError
when :discard
false
when :caller_runs
begin
yield(*args)
rescue => ex
# let it fail
log DEBUG, ex
end
true
else
fail "Unknown fallback policy #{fallback_policy}"
end
end
def ns_execute(*args, &task)
raise NotImplementedError
end
# @!macro executor_service_method_ns_shutdown_execution
#
# Callback method called when an orderly shutdown has completed.
# The default behavior is to signal all waiting threads.
def ns_shutdown_execution
# do nothing
end
# @!macro executor_service_method_ns_kill_execution
#
# Callback method called when the executor has been killed.
# The default behavior is to do nothing.
def ns_kill_execution
# do nothing
end
def ns_auto_terminate?
@auto_terminate
end
end
end