-
Notifications
You must be signed in to change notification settings - Fork 416
/
mvar.rb
242 lines (213 loc) · 7.2 KB
/
mvar.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
require 'concurrent/concern/dereferenceable'
require 'concurrent/synchronization'
module Concurrent
# An `MVar` is a synchronized single element container. They are empty or
# contain one item. Taking a value from an empty `MVar` blocks, as does
# putting a value into a full one. You can either think of them as blocking
# queue of length one, or a special kind of mutable variable.
#
# On top of the fundamental `#put` and `#take` operations, we also provide a
# `#mutate` that is atomic with respect to operations on the same instance.
# These operations all support timeouts.
#
# We also support non-blocking operations `#try_put!` and `#try_take!`, a
# `#set!` that ignores existing values, a `#value` that returns the value
# without removing it or returns `MVar::EMPTY`, and a `#modify!` that yields
# `MVar::EMPTY` if the `MVar` is empty and can be used to set `MVar::EMPTY`.
# You shouldn't use these operations in the first instance.
#
# `MVar` is a [Dereferenceable](Dereferenceable).
#
# `MVar` is related to M-structures in Id, `MVar` in Haskell and `SyncVar` in Scala.
#
# Note that unlike the original Haskell paper, our `#take` is blocking. This is how
# Haskell and Scala do it today.
#
# @!macro copy_options
#
# ## See Also
#
# 1. P. Barth, R. Nikhil, and Arvind. [M-Structures: Extending a parallel, non- strict, functional language with state](http://dl.acm.org/citation.cfm?id=652538). In Proceedings of the 5th
# ACM Conference on Functional Programming Languages and Computer Architecture (FPCA), 1991.
#
# 2. S. Peyton Jones, A. Gordon, and S. Finne. [Concurrent Haskell](http://dl.acm.org/citation.cfm?id=237794).
# In Proceedings of the 23rd Symposium on Principles of Programming Languages
# (PoPL), 1996.
class MVar < Synchronization::Object
include Concern::Dereferenceable
safe_initialization!
# Unique value that represents that an `MVar` was empty
EMPTY = ::Object.new
# Unique value that represents that an `MVar` timed out before it was able
# to produce a value.
TIMEOUT = ::Object.new
# Create a new `MVar`, either empty or with an initial value.
#
# @param [Hash] opts the options controlling how the future will be processed
#
# @!macro deref_options
def initialize(value = EMPTY, opts = {})
@value = value
@mutex = Mutex.new
@empty_condition = ConditionVariable.new
@full_condition = ConditionVariable.new
set_deref_options(opts)
end
# Remove the value from an `MVar`, leaving it empty, and blocking if there
# isn't a value. A timeout can be set to limit the time spent blocked, in
# which case it returns `TIMEOUT` if the time is exceeded.
# @return [Object] the value that was taken, or `TIMEOUT`
def take(timeout = nil)
@mutex.synchronize do
wait_for_full(timeout)
# If we timed out we'll still be empty
if unlocked_full?
value = @value
@value = EMPTY
@empty_condition.signal
apply_deref_options(value)
else
TIMEOUT
end
end
end
# acquires lock on the from an `MVAR`, yields the value to provided block,
# and release lock. A timeout can be set to limit the time spent blocked,
# in which case it returns `TIMEOUT` if the time is exceeded.
# @return [Object] the value returned by the block, or `TIMEOUT`
def borrow(timeout = nil)
@mutex.synchronize do
wait_for_full(timeout)
# if we timeoud out we'll still be empty
if unlocked_full?
yield @value
else
TIMEOUT
end
end
end
# Put a value into an `MVar`, blocking if there is already a value until
# it is empty. A timeout can be set to limit the time spent blocked, in
# which case it returns `TIMEOUT` if the time is exceeded.
# @return [Object] the value that was put, or `TIMEOUT`
def put(value, timeout = nil)
@mutex.synchronize do
wait_for_empty(timeout)
# If we timed out we won't be empty
if unlocked_empty?
@value = value
@full_condition.signal
apply_deref_options(value)
else
TIMEOUT
end
end
end
# Atomically `take`, yield the value to a block for transformation, and then
# `put` the transformed value. Returns the transformed value. A timeout can
# be set to limit the time spent blocked, in which case it returns `TIMEOUT`
# if the time is exceeded.
# @return [Object] the transformed value, or `TIMEOUT`
def modify(timeout = nil)
raise ArgumentError.new('no block given') unless block_given?
@mutex.synchronize do
wait_for_full(timeout)
# If we timed out we'll still be empty
if unlocked_full?
value = @value
@value = yield value
@full_condition.signal
apply_deref_options(value)
else
TIMEOUT
end
end
end
# Non-blocking version of `take`, that returns `EMPTY` instead of blocking.
def try_take!
@mutex.synchronize do
if unlocked_full?
value = @value
@value = EMPTY
@empty_condition.signal
apply_deref_options(value)
else
EMPTY
end
end
end
# Non-blocking version of `put`, that returns whether or not it was successful.
def try_put!(value)
@mutex.synchronize do
if unlocked_empty?
@value = value
@full_condition.signal
true
else
false
end
end
end
# Non-blocking version of `put` that will overwrite an existing value.
def set!(value)
@mutex.synchronize do
old_value = @value
@value = value
@full_condition.signal
apply_deref_options(old_value)
end
end
# Non-blocking version of `modify` that will yield with `EMPTY` if there is no value yet.
def modify!
raise ArgumentError.new('no block given') unless block_given?
@mutex.synchronize do
value = @value
@value = yield value
if unlocked_empty?
@empty_condition.signal
else
@full_condition.signal
end
apply_deref_options(value)
end
end
# Returns if the `MVar` is currently empty.
def empty?
@mutex.synchronize { @value == EMPTY }
end
# Returns if the `MVar` currently contains a value.
def full?
!empty?
end
protected
def synchronize(&block)
@mutex.synchronize(&block)
end
private
def unlocked_empty?
@value == EMPTY
end
def unlocked_full?
! unlocked_empty?
end
def wait_for_full(timeout)
wait_while(@full_condition, timeout) { unlocked_empty? }
end
def wait_for_empty(timeout)
wait_while(@empty_condition, timeout) { unlocked_full? }
end
def wait_while(condition, timeout)
if timeout.nil?
while yield
condition.wait(@mutex)
end
else
stop = Concurrent.monotonic_time + timeout
while yield && timeout > 0.0
condition.wait(@mutex, timeout)
timeout = stop - Concurrent.monotonic_time
end
end
end
end
end