-
Notifications
You must be signed in to change notification settings - Fork 417
/
core.rb
220 lines (187 loc) · 7.73 KB
/
core.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
require 'concurrent/concern/logging'
require 'concurrent/executors'
module Concurrent
module Actor
require 'set'
# Core of the actor.
# @note Whole class should be considered private. An user should use {Context}s and {Reference}s only.
# @note devel: core should not block on anything, e.g. it cannot wait on children to terminate
# that would eat up all threads in task pool and deadlock
class Core < Synchronization::LockableObject
include TypeCheck
include Concern::Logging
# @!attribute [r] reference
# Reference to this actor which can be safely passed around.
# @return [Reference]
# @!attribute [r] name
# The name of actor instance, it should be uniq (not enforced). Allows easier orientation
# between actor instances.
# @return [String]
# @!attribute [r] path
# Path of this actor. It is used for easier orientation and logging.
# Path is constructed recursively with: `parent.path + self.name` up to a {Actor.root},
# e.g. `/an_actor/its_child`.
# @return [String]
# @!attribute [r] executor
# Executor which is used to process messages.
# @return [Executor]
# @!attribute [r] actor_class
# A subclass of {AbstractContext} representing Actor's behaviour.
# @return [Context]
attr_reader :reference, :name, :path, :executor, :context_class, :context, :behaviour_definition
# @option opts [String] name
# @option opts [Context] actor_class a class to be instantiated defining Actor's behaviour
# @option opts [Array<Object>] args arguments for actor_class instantiation
# @option opts [Executor] executor, default is `global_io_executor`
# @option opts [true, false] link, atomically link the actor to its parent (default: true)
# @option opts [Class] reference a custom descendant of {Reference} to use
# @option opts [Array<Array(Behavior::Abstract, Array<Object>)>] behaviour_definition, array of pairs
# where each pair is behaviour class and its args, see {Behaviour.basic_behaviour_definition}
# @option opts [ResolvableFuture, nil] initialized, if present it'll be set or failed after {Context} initialization
# @option opts [Reference, nil] parent **private api** parent of the actor (the one spawning )
# @option opts [Proc, nil] logger a proc accepting (level, progname, message = nil, &block) params,
# can be used to hook actor instance to any logging system, see {Concurrent::Concern::Logging}
# @param [Proc] block for class instantiation
def initialize(opts = {}, &block)
super(&nil)
synchronize { ns_initialize(opts, &block) }
end
# A parent Actor. When actor is spawned the {Actor.current} becomes its parent.
# When actor is spawned from a thread outside of an actor ({Actor.current} is nil) {Actor.root} is assigned.
# @return [Reference, nil]
def parent
@parent_core && @parent_core.reference
end
# @see AbstractContext#dead_letter_routing
def dead_letter_routing
@context.dead_letter_routing
end
# @return [Array<Reference>] of children actors
def children
guard!
@children.to_a
end
# @api private
def add_child(child)
guard!
Type! child, Reference
@children.add child
nil
end
# @api private
def remove_child(child)
guard!
Type! child, Reference
@children.delete child
nil
end
# is executed by Reference scheduling processing of new messages
# can be called from other alternative Reference implementations
# @param [Envelope] envelope
def on_envelope(envelope)
log(DEBUG) { "is #{envelope.future ? 'asked' : 'told'} #{envelope.message.inspect} by #{envelope.sender}" }
schedule_execution do
log(DEBUG) { "was #{envelope.future ? 'asked' : 'told'} #{envelope.message.inspect} by #{envelope.sender}" }
process_envelope envelope
end
nil
end
# ensures that we are inside of the executor
def guard!
unless Actor.current == reference
raise "can be called only inside actor #{reference} but was #{Actor.current}"
end
end
def log(level, message = nil, &block)
super level, @path, message, &block
end
# Schedules blocks to be executed on executor sequentially,
# sets Actress.current
def schedule_execution
@serialized_execution.post(@executor) do
synchronize do
begin
Thread.current[:__current_actor__] = reference
yield
rescue => e
log FATAL, e
ensure
Thread.current[:__current_actor__] = nil
end
end
end
nil
end
def broadcast(public, event)
log(DEBUG) { "event: #{event.inspect} (#{public ? 'public' : 'private'})" }
@first_behaviour.on_event(public, event)
end
# @param [Class] behaviour_class
# @return [Behaviour::Abstract, nil] based on behaviour_class
def behaviour(behaviour_class)
@behaviours[behaviour_class]
end
# @param [Class] behaviour_class
# @return [Behaviour::Abstract] based on behaviour_class
# @raise [KeyError] when no behaviour
def behaviour!(behaviour_class)
@behaviours.fetch behaviour_class
end
# @api private
def allocate_context
@context = @context_class.allocate
end
# @api private
def build_context
@context.send :initialize_core, self
@context.send :initialize, *@args, &@block
end
# @api private
def process_envelope(envelope)
@first_behaviour.on_envelope envelope
end
private
def ns_initialize(opts, &block)
@mailbox = ::Array.new
@serialized_execution = SerializedExecution.new
@children = Set.new
@context_class = Child! opts.fetch(:class), AbstractContext
allocate_context
@executor = Type! opts.fetch(:executor, @context.default_executor), Concurrent::AbstractExecutorService
@reference = (Child! opts[:reference_class] || @context.default_reference_class, Reference).new self
@name = (Type! opts.fetch(:name), String, Symbol).to_s
parent = opts[:parent]
@parent_core = (Type! parent, Reference, NilClass) && parent.send(:core)
if @parent_core.nil? && @name != '/'
raise 'only root has no parent'
end
@path = @parent_core ? File.join(@parent_core.path, @name) : @name
@logger = opts[:logger]
@parent_core.add_child reference if @parent_core
initialize_behaviours opts
@args = opts.fetch(:args, [])
@block = block
initialized = Type! opts[:initialized], Promises::ResolvableFuture, NilClass
schedule_execution do
begin
build_context
initialized.fulfill reference if initialized
log DEBUG, 'spawned'
rescue => ex
log ERROR, ex
@first_behaviour.terminate!
initialized.reject ex if initialized
end
end
end
def initialize_behaviours(opts)
@behaviour_definition = (Type! opts[:behaviour_definition] || @context.behaviour_definition, ::Array).each do |(behaviour, _)|
Child! behaviour, Behaviour::Abstract
end
@behaviours = {}
@first_behaviour = @behaviour_definition.reverse.
reduce(nil) { |last, (behaviour, *args)| @behaviours[behaviour] = behaviour.new(self, last, opts, *args) }
end
end
end
end