forked from ruby-concurrency/concurrent-ruby
/
async.rb
448 lines (424 loc) · 17.9 KB
/
async.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
require 'concurrent/configuration'
require 'concurrent/ivar'
require 'concurrent/synchronization/lockable_object'
module Concurrent
# A mixin module that provides simple asynchronous behavior to a class,
# turning it into a simple actor. Loosely based on Erlang's
# [gen_server](http://www.erlang.org/doc/man/gen_server.html), but without
# supervision or linking.
#
# A more feature-rich {Concurrent::Actor} is also available when the
# capabilities of `Async` are too limited.
#
# ```cucumber
# Feature:
# As a stateful, plain old Ruby class
# I want safe, asynchronous behavior
# So my long-running methods don't block the main thread
# ```
#
# The `Async` module is a way to mix simple yet powerful asynchronous
# capabilities into any plain old Ruby object or class, turning each object
# into a simple Actor. Method calls are processed on a background thread. The
# caller is free to perform other actions while processing occurs in the
# background.
#
# Method calls to the asynchronous object are made via two proxy methods:
# `async` (alias `cast`) and `await` (alias `call`). These proxy methods post
# the method call to the object's background thread and return a "future"
# which will eventually contain the result of the method call.
#
# This behavior is loosely patterned after Erlang's `gen_server` behavior.
# When an Erlang module implements the `gen_server` behavior it becomes
# inherently asynchronous. The `start` or `start_link` function spawns a
# process (similar to a thread but much more lightweight and efficient) and
# returns the ID of the process. Using the process ID, other processes can
# send messages to the `gen_server` via the `cast` and `call` methods. Unlike
# Erlang's `gen_server`, however, `Async` classes do not support linking or
# supervision trees.
#
# ## Basic Usage
#
# When this module is mixed into a class, objects of the class become inherently
# asynchronous. Each object gets its own background thread on which to post
# asynchronous method calls. Asynchronous method calls are executed in the
# background one at a time in the order they are received.
#
# To create an asynchronous class, simply mix in the `Concurrent::Async` module:
#
# ```
# class Hello
# include Concurrent::Async
#
# def hello(name)
# "Hello, #{name}!"
# end
# end
# ```
#
# Mixing this module into a class provides each object two proxy methods:
# `async` and `await`. These methods are thread safe with respect to the
# enclosing object. The former proxy allows methods to be called
# asynchronously by posting to the object's internal thread. The latter proxy
# allows a method to be called synchronously but does so safely with respect
# to any pending asynchronous method calls and ensures proper ordering. Both
# methods return a {Concurrent::IVar} which can be inspected for the result
# of the proxied method call. Calling a method with `async` will return a
# `:pending` `IVar` whereas `await` will return a `:complete` `IVar`.
#
# ```
# class Echo
# include Concurrent::Async
#
# def echo(msg)
# print "#{msg}\n"
# end
# end
#
# horn = Echo.new
# horn.echo('zero') # synchronous, not thread-safe
# # returns the actual return value of the method
#
# horn.async.echo('one') # asynchronous, non-blocking, thread-safe
# # returns an IVar in the :pending state
#
# horn.await.echo('two') # synchronous, blocking, thread-safe
# # returns an IVar in the :complete state
# ```
#
# ## Let It Fail
#
# The `async` and `await` proxy methods have built-in error protection based
# on Erlang's famous "let it fail" philosophy. Instance methods should not be
# programmed defensively. When an exception is raised by a delegated method
# the proxy will rescue the exception, expose it to the caller as the `reason`
# attribute of the returned future, then process the next method call.
#
# ## Calling Methods Internally
#
# External method calls should *always* use the `async` and `await` proxy
# methods. When one method calls another method, the `async` proxy should
# rarely be used and the `await` proxy should *never* be used.
#
# When an object calls one of its own methods using the `await` proxy the
# second call will be enqueued *behind* the currently running method call.
# Any attempt to wait on the result will fail as the second call will never
# run until after the current call completes.
#
# Calling a method using the `await` proxy from within a method that was
# itself called using `async` or `await` will irreversibly deadlock the
# object. Do *not* do this, ever.
#
# ## Instance Variables and Attribute Accessors
#
# Instance variables do not need to be thread-safe so long as they are private.
# Asynchronous method calls are processed in the order they are received and
# are processed one at a time. Therefore private instance variables can only
# be accessed by one thread at a time. This is inherently thread-safe.
#
# When using private instance variables within asynchronous methods, the best
# practice is to read the instance variable into a local variable at the start
# of the method then update the instance variable at the *end* of the method.
# This way, should an exception be raised during method execution the internal
# state of the object will not have been changed.
#
# ### Reader Attributes
#
# The use of `attr_reader` is discouraged. Internal state exposed externally,
# when necessary, should be done through accessor methods. The instance
# variables exposed by these methods *must* be thread-safe, or they must be
# called using the `async` and `await` proxy methods. These two approaches are
# subtly different.
#
# When internal state is accessed via the `async` and `await` proxy methods,
# the returned value represents the object's state *at the time the call is
# processed*, which may *not* be the state of the object at the time the call
# is made.
#
# To get the state *at the current* time, irrespective of an enqueued method
# calls, a reader method must be called directly. This is inherently unsafe
# unless the instance variable is itself thread-safe, preferably using one
# of the thread-safe classes within this library. Because the thread-safe
# classes within this library are internally-locking or non-locking, they can
# be safely used from within asynchronous methods without causing deadlocks.
#
# Generally speaking, the best practice is to *not* expose internal state via
# reader methods. The best practice is to simply use the method's return value.
#
# ### Writer Attributes
#
# Writer attributes should never be used with asynchronous classes. Changing
# the state externally, even when done in the thread-safe way, is not logically
# consistent. Changes to state need to be timed with respect to all asynchronous
# method calls which my be in-process or enqueued. The only safe practice is to
# pass all necessary data to each method as arguments and let the method update
# the internal state as necessary.
#
# ## Class Constants, Variables, and Methods
#
# ### Class Constants
#
# Class constants do not need to be thread-safe. Since they are read-only and
# immutable they may be safely read both externally and from within
# asynchronous methods.
#
# ### Class Variables
#
# Class variables should be avoided. Class variables represent shared state.
# Shared state is anathema to concurrency. Should there be a need to share
# state using class variables they *must* be thread-safe, preferably
# using the thread-safe classes within this library. When updating class
# variables, never assign a new value/object to the variable itself. Assignment
# is not thread-safe in Ruby. Instead, use the thread-safe update functions
# of the variable itself to change the value.
#
# The best practice is to *never* use class variables with `Async` classes.
#
# ### Class Methods
#
# Class methods which are pure functions are safe. Class methods which modify
# class variables should be avoided, for all the reasons listed above.
#
# ## An Important Note About Thread Safe Guarantees
#
# > Thread safe guarantees can only be made when asynchronous method calls
# > are not mixed with direct method calls. Use only direct method calls
# > when the object is used exclusively on a single thread. Use only
# > `async` and `await` when the object is shared between threads. Once you
# > call a method using `async` or `await`, you should no longer call methods
# > directly on the object. Use `async` and `await` exclusively from then on.
#
# @example
#
# class Echo
# include Concurrent::Async
#
# def echo(msg)
# print "#{msg}\n"
# end
# end
#
# horn = Echo.new
# horn.echo('zero') # synchronous, not thread-safe
# # returns the actual return value of the method
#
# horn.async.echo('one') # asynchronous, non-blocking, thread-safe
# # returns an IVar in the :pending state
#
# horn.await.echo('two') # synchronous, blocking, thread-safe
# # returns an IVar in the :complete state
#
# @see Concurrent::Actor
# @see https://en.wikipedia.org/wiki/Actor_model "Actor Model" at Wikipedia
# @see http://www.erlang.org/doc/man/gen_server.html Erlang gen_server
# @see http://c2.com/cgi/wiki?LetItCrash "Let It Crash" at http://c2.com/
module Async
# @!method self.new(*args, &block)
#
# Instanciate a new object and ensure proper initialization of the
# synchronization mechanisms.
#
# @param [Array<Object>] args Zero or more arguments to be passed to the
# object's initializer.
# @param [Proc] block Optional block to pass to the object's initializer.
# @return [Object] A properly initialized object of the asynchronous class.
# Check for the presence of a method on an object and determine if a given
# set of arguments matches the required arity.
#
# @param [Object] obj the object to check against
# @param [Symbol] method the method to check the object for
# @param [Array] args zero or more arguments for the arity check
#
# @raise [NameError] the object does not respond to `method` method
# @raise [ArgumentError] the given `args` do not match the arity of `method`
#
# @note This check is imperfect because of the way Ruby reports the arity of
# methods with a variable number of arguments. It is possible to determine
# if too few arguments are given but impossible to determine if too many
# arguments are given. This check may also fail to recognize dynamic behavior
# of the object, such as methods simulated with `method_missing`.
#
# @see http://www.ruby-doc.org/core-2.1.1/Method.html#method-i-arity Method#arity
# @see http://ruby-doc.org/core-2.1.0/Object.html#method-i-respond_to-3F Object#respond_to?
# @see http://www.ruby-doc.org/core-2.1.0/BasicObject.html#method-i-method_missing BasicObject#method_missing
#
# @!visibility private
def self.validate_argc(obj, method, *args)
argc = args.length
arity = obj.method(method).arity
if arity >= 0 && argc != arity
raise ArgumentError.new("wrong number of arguments (#{argc} for #{arity})")
elsif arity < 0 && (arity = (arity + 1).abs) > argc
raise ArgumentError.new("wrong number of arguments (#{argc} for #{arity}..*)")
end
end
# @!visibility private
def self.included(base)
base.singleton_class.send(:alias_method, :original_new, :new)
base.extend(ClassMethods)
super(base)
end
# @!visibility private
module ClassMethods
def new(*args, &block)
obj = original_new(*args, &block)
obj.send(:init_synchronization)
obj
end
end
private_constant :ClassMethods
# Delegates asynchronous, thread-safe method calls to the wrapped object.
#
# @!visibility private
class AsyncDelegator < Synchronization::LockableObject
safe_initialization!
# Create a new delegator object wrapping the given delegate.
#
# @param [Object] delegate the object to wrap and delegate method calls to
def initialize(delegate)
super()
@delegate = delegate
@queue = []
@executor = Concurrent.global_io_executor
@ruby_pid = $$
end
# Delegates method calls to the wrapped object.
#
# @param [Symbol] method the method being called
# @param [Array] args zero or more arguments to the method
#
# @return [IVar] the result of the method call
#
# @raise [NameError] the object does not respond to `method` method
# @raise [ArgumentError] the given `args` do not match the arity of `method`
def method_missing(method, *args, &block)
super unless @delegate.respond_to?(method)
Async::validate_argc(@delegate, method, *args)
ivar = Concurrent::IVar.new
synchronize do
reset_if_forked
@queue.push [ivar, method, args, block]
@executor.post { perform } if @queue.length == 1
end
ivar
end
# Check whether the method is responsive
#
# @param [Symbol] method the method being called
def respond_to_missing?(method, include_private = false)
@delegate.respond_to?(method) || super
end
# Perform all enqueued tasks.
#
# This method must be called from within the executor. It must not be
# called while already running. It will loop until the queue is empty.
def perform
loop do
ivar, method, args, block = synchronize { @queue.first }
break unless ivar # queue is empty
begin
ivar.set(@delegate.send(method, *args, &block))
rescue => error
ivar.fail(error)
end
synchronize do
@queue.shift
return if @queue.empty?
end
end
end
def reset_if_forked
if $$ != @ruby_pid
@queue.clear
@ruby_pid = $$
end
end
end
private_constant :AsyncDelegator
# Delegates synchronous, thread-safe method calls to the wrapped object.
#
# @!visibility private
class AwaitDelegator
# Create a new delegator object wrapping the given delegate.
#
# @param [AsyncDelegator] delegate the object to wrap and delegate method calls to
def initialize(delegate)
@delegate = delegate
end
# Delegates method calls to the wrapped object.
#
# @param [Symbol] method the method being called
# @param [Array] args zero or more arguments to the method
#
# @return [IVar] the result of the method call
#
# @raise [NameError] the object does not respond to `method` method
# @raise [ArgumentError] the given `args` do not match the arity of `method`
def method_missing(method, *args, &block)
ivar = @delegate.send(method, *args, &block)
ivar.wait
ivar
end
# Check whether the method is responsive
#
# @param [Symbol] method the method being called
def respond_to_missing?(method, include_private = false)
@delegate.respond_to?(method) || super
end
end
private_constant :AwaitDelegator
# Causes the chained method call to be performed asynchronously on the
# object's thread. The delegated method will return a future in the
# `:pending` state and the method call will have been scheduled on the
# object's thread. The final disposition of the method call can be obtained
# by inspecting the returned future.
#
# @!macro async_thread_safety_warning
# @note The method call is guaranteed to be thread safe with respect to
# all other method calls against the same object that are called with
# either `async` or `await`. The mutable nature of Ruby references
# (and object orientation in general) prevent any other thread safety
# guarantees. Do NOT mix direct method calls with delegated method calls.
# Use *only* delegated method calls when sharing the object between threads.
#
# @return [Concurrent::IVar] the pending result of the asynchronous operation
#
# @raise [NameError] the object does not respond to the requested method
# @raise [ArgumentError] the given `args` do not match the arity of
# the requested method
def async
@__async_delegator__
end
alias_method :cast, :async
# Causes the chained method call to be performed synchronously on the
# current thread. The delegated will return a future in either the
# `:fulfilled` or `:rejected` state and the delegated method will have
# completed. The final disposition of the delegated method can be obtained
# by inspecting the returned future.
#
# @!macro async_thread_safety_warning
#
# @return [Concurrent::IVar] the completed result of the synchronous operation
#
# @raise [NameError] the object does not respond to the requested method
# @raise [ArgumentError] the given `args` do not match the arity of the
# requested method
def await
@__await_delegator__
end
alias_method :call, :await
# Initialize the internal serializer and other stnchronization mechanisms.
#
# @note This method *must* be called immediately upon object construction.
# This is the only way thread-safe initialization can be guaranteed.
#
# @!visibility private
def init_synchronization
return self if defined?(@__async_initialized__) && @__async_initialized__
@__async_initialized__ = true
@__async_delegator__ = AsyncDelegator.new(self)
@__await_delegator__ = AwaitDelegator.new(@__async_delegator__)
self
end
end
end