-
Notifications
You must be signed in to change notification settings - Fork 0
/
fiber.lua
455 lines (430 loc) · 15.4 KB
/
fiber.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
-- Module for lightweight threads (fibers)
-- Disallow setting global variables in the implementation of this module:
_ENV = setmetatable({}, {
__index = _G,
__newindex = function() error("cannot set global variable", 2) end,
})
-- Table containing all public items of this module:
local _M = {}
-- Import "effect" module and "yield" module (which is used as "yield" effect):
local effect = require "effect"
local yield = require "yield"
-- Function creating a FIFO-like data structure where there can be no
-- duplicates (i.e. pushing an already existing element is a no-op):
local function fifoset()
local input_idx = 0
local output_idx = 0
local queue = {}
local set = {}
return {
-- Method appending a value to the queue if it doesn't exist in the queue:
push = function(self, value)
if not set[value] then
queue[input_idx] = value
set[value] = true
input_idx = input_idx + 1
end
end,
-- Method removing and returning the oldest value in the queue:
pop = function(self)
if output_idx == input_idx then
return nil
end
local value = queue[output_idx]
queue[output_idx] = nil
set[value] = nil
output_idx = output_idx + 1
return value
end,
-- Method returning a value without removing it, skipping a certain count
-- of values from the beginning (a skip_count of zero returns the very next
-- value):
peek = function(self, skip_count)
return queue[output_idx + skip_count]
end,
}
end
-- fiber.yield is an alias for the yield effect represented by the "yield"
-- module:
_M.yield = yield
-- Internally used effects (which are not exported) for "try_current",
-- "current", sleep", and "suicide" functions:
local try_current = effect.new("fiber.try_current")
local sleep = effect.new("fiber.sleep")
local suicide = effect.new("fiber.suicide")
-- Default handler for "try_current" effect:
effect.default_handlers[try_current] = function() return nil end
-- Function returning a handle of the currently running fiber, or nil if called
-- outside a scheduling environment:
function _M.try_current()
return try_current()
end
-- Function returning a handle of the currently running fiber:
local function current()
local x = try_current()
if x then
return x
end
error("not running in fiber environment", 0)
end
_M.current = current
-- Function putting the currently running fiber to sleep:
_M.sleep = function()
return sleep()
end
-- Function killing the currently running fiber:
_M.suicide = function()
return suicide()
end
-- Internal marker for attributes in the "fiber_methods" table:
local getter_magic = {}
-- Ephemeron storing fibers' attributes:
local fiber_attrs = setmetatable({}, { __mode = "k" })
-- Table containing all methods of fibers, plus public attributes where the
-- value in this table must be set to "getter_magic":
local fiber_methods = {
-- table with return values of fiber's function or false if fiber was killed:
results = getter_magic,
}
-- Method waking up the fiber (note that "self" is named "fiber" below):
function fiber_methods.wake(fiber)
while fiber do
local attrs = fiber_attrs[fiber]
-- Add fiber to woken_fibers FIFO set:
attrs.woken_fibers:push(fiber)
-- Repeat procedure for all parent fibers:
fiber = attrs.parent_fiber
end
end
-- Method putting the currently executed fiber to sleep until being able to
-- return the given fiber's ("self"'s) results (prefixed by true as first
-- return value) or until the fiber has been killed (in which case false is
-- returned):
function fiber_methods.try_await(self)
local attrs = fiber_attrs[self]
local results = attrs.results
-- Check if awaited fiber has been killed:
if results == false then
-- Awaited fiber has been killed.
-- Return false to indicate fiber has been killed and there are no results:
return false
end
-- Check if result is already available:
if results then
-- Result is already available.
-- Return true with available results:
return true, table.unpack(results, 1, results.n)
end
-- No result is available and awaited fiber has not been killed.
-- Add currently executed fiber to other fiber's waiting list:
table.insert(attrs.waiting_fibers, current())
-- Sleep until result is available or awaited fiber has been killed and
-- proceed same as above:
while true do
sleep()
local results = attrs.results
if results == false then
return false
end
if results then
return true, table.unpack(results, 1, results.n)
end
end
end
-- Same method as try_await but killing the current fiber if the awaited fiber
-- was killed (implemented redundantly for performance reasons):
function fiber_methods.await(self)
local attrs = fiber_attrs[self]
local results = attrs.results
-- Check if awaited fiber has been killed:
if results == false then
-- Awaited fiber has been killed.
-- Kill current fiber as well:
return suicide()
end
-- Check if result is already available:
if results then
-- Result is already available.
-- Return available results:
return table.unpack(results, 1, results.n)
end
-- No result is available and awaited fiber has not been killed.
-- Add currently executed fiber to other fiber's waiting list:
table.insert(attrs.waiting_fibers, current())
-- Sleep until result is available or awaited fiber has been killed and
-- proceed same as above:
while true do
sleep()
local results = attrs.results
if results == false then
return suicide()
end
if results then
return table.unpack(results, 1, results.n)
end
end
end
-- Method killing the fiber, i.e. stopping its further execution:
function fiber_methods.kill(self)
-- Check if killed fiber is current fiber:
if self == try_current() then
-- Killed fiber is currently running.
-- Simply kill current fiber:
return suicide()
end
-- Obtain attributes of fiber to kill:
local attrs = fiber_attrs[self]
-- Check if fiber has already terminated (with return value or killed):
if attrs.results ~= nil then
-- Fiber has already terminated; do nothing.
return
end
-- Mark fiber as killed:
attrs.results = false
-- Obtain resume function (which must exist at this point):
local resume = attrs.resume
-- Check if resume function is a continuation:
if attrs.started then
-- "resume" is a continuation.
-- Discontinue the continuation:
resume:discontinue()
end
-- Ensure that fiber is not continued when woken or cleaned up:
attrs.resume = nil
-- Wakeup all fibers that are waiting for that fiber's return values:
for i, waiting_fiber in ipairs(attrs.waiting_fibers) do
waiting_fiber:wake()
end
-- Remove fiber from open_fibers table to immediately free resources (may
-- still require yielding to remove fiber from woken_fibers):
attrs.open_fibers[self] = nil
end
-- Metatable for fiber handles:
_M.fiber_metatbl = {
__index = function(self, key)
-- Lookup method or attribute magic:
local value = fiber_methods[key]
-- Check if key is a public attribute:
if value == getter_magic then
-- Key is an attribute.
-- Obtain value from "fiber_attrs" ephemeron and return it:
return fiber_attrs[self][key]
end
-- Key is not an attribute.
-- Return method, if exists:
return value
end,
}
-- spawn(action, ...) spawns a new fiber with the given action function and
-- arguments to the action function:
function _M.spawn(...)
-- Use spawn function of current fiber:
return fiber_attrs[current()].spawn(...)
end
-- Function checking if there is any woken fiber:
function _M.pending()
local fiber = try_current()
while fiber do
local attrs = fiber_attrs[fiber]
local woken_fibers = attrs.woken_fibers
-- Check first two positions in woken_fibers FIFO because first position
-- could be a special (false) marker:
if woken_fibers:peek(0) or woken_fibers:peek(1) then
-- There is an entry in woken_fibers, which is not false,
-- i.e. there is a woken fiber.
return true
end
-- Repeat procedure for all parent fibers:
fiber = attrs.parent_fiber
end
return false
end
-- Internal metatable for set of all open (not yet terminated) fibers within
-- the scheduler:
local open_fibers_metatbl = {
-- Ensuring cleanup of all open fibers when set of open fibers is closed,
-- e.g. due to a non-resumed effect or due to an error:
__close = function(self)
-- Iterate through all keys:
for fiber in pairs(self) do
local attrs = fiber_attrs[fiber]
local resume = attrs.resume
-- Check if resume function exists and whether it is a continuation:
if resume and attrs.started then
-- "resume" is a continuation.
-- Discontinue the continuation:
resume:discontinue()
-- Note that it's not necessary to set attrs.resume to nil here,
-- because when open_fibers is closed, there will be no scheduler
-- anymore that would call the resume function. Moreover, the kill
-- method will short-circuit if the fiber has already been killed and
-- thus will also not use the resume function.
--attrs.resume = nil
end
-- Check if results are missing:
if attrs.results == nil then
-- Fiber did not generate a return value and was not killed.
-- Mark fiber as killed:
attrs.results = false
end
-- Wakeup all fibers that are waiting for this fiber's return values:
for i, waiting_fiber in ipairs(attrs.waiting_fibers) do
waiting_fiber:wake()
end
end
end,
}
-- scope(action, ...) runs the given "action" function with given arguments and
-- permits yielding/sleeping/spawning while it runs.
local function scope(...)
-- Obtain parent fiber unless running as top-level scheduler:
local parent_fiber = try_current()
-- Remember all open fibers in a set with a cleanup handler:
local open_fibers <close> = setmetatable({}, open_fibers_metatbl)
-- FIFO set of woken fibers:
local woken_fibers = fifoset()
-- Local variable (used as upvalue) for currently running fiber:
local current_fiber
-- Effect handlers:
local handlers = {
-- Effect resuming with a handle of the currently running fiber:
[try_current] = function(resume)
-- Resume with handle of current fiber:
return resume(current_fiber)
end,
-- Effect putting the currently running fiber to sleep:
[sleep] = function(resume)
-- Store continuation:
fiber_attrs[current_fiber].resume = resume:persistent()
end,
-- Effect yielding execution to another (unspecified) fiber:
[yield] = function(resume)
-- Ensure that currently running fiber is woken again:
woken_fibers:push(current_fiber)
-- Store continuation:
fiber_attrs[current_fiber].resume = resume:persistent()
end,
-- Effect invoked when current fiber is killed:
[suicide] = function(resume)
local attrs = fiber_attrs[current_fiber]
-- Mark fiber as killed:
attrs.results = false
-- Mark fiber as closed (i.e. remove it from "open_fibers" table):
open_fibers[current_fiber] = nil
-- Wakeup all fibers that are waiting for this fiber's return values:
for i, waiting_fiber in ipairs(attrs.waiting_fibers) do
waiting_fiber:wake()
end
end
}
-- Implementation of spawn function for current scheduler:
local function spawn(func, ...)
-- Create new fiber handle:
local fiber = setmetatable({}, _M.fiber_metatbl)
-- Create storage table for fiber's attributes:
local attrs = {
-- Store certain upvalues as private attributes:
open_fibers = open_fibers,
woken_fibers = woken_fibers,
spawn = spawn,
parent_fiber = parent_fiber,
-- Sequence of other fibers waiting on the newly spawned fiber:
waiting_fibers = {},
}
-- Store attribute table in ephemeron:
fiber_attrs[fiber] = attrs
-- Pack arguments to spawned fiber's function:
local args = table.pack(...)
-- Initialize resume function for first run:
attrs.resume = function()
-- Mark fiber as started, such that cleanup may take place later:
attrs.started = true
-- Run with effect handlers:
return effect.handle(handlers, function()
-- Run fiber's function and store its return values:
attrs.results = table.pack(func(table.unpack(args, 1, args.n)))
-- Mark fiber as closed (i.e. remove it from "open_fibers" table):
open_fibers[current_fiber] = nil
-- Wakeup all fibers that are waiting for this fiber's return values:
for i, waiting_fiber in ipairs(attrs.waiting_fibers) do
waiting_fiber:wake()
end
end)
end
-- Remember fiber as being open so it can be cleaned up later:
open_fibers[fiber] = true
-- Wakeup fiber for the first time (without waking parents because the
-- fiber's scheduler and all parent schedulers if existent are currently
-- running and will not sleep but only yield when there is a woken fiber):
woken_fibers:push(fiber)
-- Return fiber's handle:
return fiber
end
-- Spawn main fiber:
local main = spawn(...)
-- Unless running as top-level scheduler, include special marker (false) in
-- "woken_fiber" FIFO to indicate that control has to be yielded to the
-- parent scheduler:
if parent_fiber then
woken_fibers:push(false)
end
-- Main scheduling loop:
while true do
-- Check if main fiber has terminated:
local main_results = fiber_attrs[main].results
if main_results then
-- Main fiber has terminated.
-- Return results of main fiber:
return table.unpack(main_results, 1, main_results.n)
end
-- Obtain next fiber to resume (or special marker):
local fiber = woken_fibers:pop()
-- Check if entry in "woken_fibers" was special marker (false) and if there
-- are still fibers left:
if fiber == false and next(open_fibers) then
-- Special marker has been found and there are fibers left.
-- Check if there is any other fiber to-be-woken without removing it from
-- the FIFO:
if woken_fibers:peek(0) then
-- There is another fiber to-be-woken, so we only yield control to the
-- parent scheduler (and do not sleep):
yield()
else
-- All fibers are sleeping, so we sleep as well:
sleep()
end
-- Re-insert special marker to yield control to the parent next time
-- again:
woken_fibers:push(false)
else
-- No special marker has been found or there are no fibers left.
-- Check if there is no fiber to be woken and no parent:
if not fiber then
-- There is no woken fiber and no parent.
-- Throw an exception:
error("no running fiber remaining", 2)
end
-- Obtain resume function (if exists):
local attrs = fiber_attrs[fiber]
local resume = attrs.resume
-- Check if resume function exists to avoid resuming after termination:
if resume then
-- Resume function exists.
-- Remove resume function from fiber's attributes (avoids invocation
-- when fiber has already terminated):
attrs.resume = nil
-- Set current_fiber:
current_fiber = fiber
-- Run resume function:
resume()
end
end
end
end
_M.scope = scope
-- handle(handlers, action, ...) acts like effect.handle(action, ...) but also
-- applies to spawned fibers within the action.
function _M.handle(handlers, ...)
return effect.handle(handlers, scope, ...)
end
return _M