-
Notifications
You must be signed in to change notification settings - Fork 0
/
waitio_fiber.lua
306 lines (295 loc) · 7.82 KB
/
waitio_fiber.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
-- Module for handling waitio effects by sleeping (i.e. yielding to other
-- fibers)
-- Disallow setting global variables in the implementation of this module:
_ENV = setmetatable({}, {
__index = _G,
__newindex = function() error("cannot set global variable", 2) end,
})
-- Table containing all public items of this module:
local _M = {}
local fiber = require "fiber"
local waitio = require "waitio"
local lkq = require "lkq"
local function wake(self)
return self:wake()
end
local weak_mt = { __mode = "k" }
local function handle_call_noreset(self)
waitio.select("handle", self)
end
local function handle_call_reset(self)
waitio.select("handle", self)
self.ready = false
end
local handle_reset_metatbl = {
__call = handle_call_reset,
}
function _M.run(...)
local eventqueue <close> = lkq.new_queue()
local read_fd_locks, write_fd_locks, pid_locks, handle_locks = {}, {}, {}, {}
local function deregister_fd(fd)
eventqueue:deregister_fd(fd)
local fib = read_fd_locks[fd]
if fib then
read_fd_locks[fd] = nil
fib:wake()
end
local fib = write_fd_locks[fd]
if fib then
write_fd_locks[fd] = nil
fib:wake()
end
end
local poll_state_metatbl = {
__close = function(self)
local entries = self.read_fds
for fd in pairs(entries) do
if read_fd_locks[fd] then
eventqueue:remove_fd_read(fd)
end
read_fd_locks[fd] = nil
entries[fd] = nil
end
local entries = self.write_fds
for fd in pairs(entries) do
if write_fd_locks[fd] then
eventqueue:remove_fd_write(fd)
end
write_fd_locks[fd] = nil
entries[fd] = nil
end
local entries = self.pids
for pid in pairs(entries) do
eventqueue:remove_pid(pid)
pid_locks[pid] = nil
entries[pid] = nil
end
local entries = self.handles
for handle in pairs(entries) do
handle._fiber = false
handle_locks[handle] = nil
entries[handle] = nil
end
end,
}
local fiber_poll_states = setmetatable({}, weak_mt)
local function wait_select(...)
local current_fiber = fiber.current()
local poll_state = fiber_poll_states[current_fiber]
if not poll_state then
poll_state = setmetatable(
{ read_fds = {}, write_fds = {}, pids = {}, handles = {} },
poll_state_metatbl
)
fiber_poll_states[current_fiber] = poll_state
end
local poll_state <close> = poll_state
for argidx = 1, math.huge, 2 do
local rtype, arg = select(argidx, ...)
if rtype == nil then
break
end
if rtype == "fd_read" then
if read_fd_locks[arg] then
error(
"multiple fibers wait for reading from file descriptor " ..
tostring(arg)
)
end
poll_state.read_fds[arg] = true
read_fd_locks[arg] = current_fiber
eventqueue:add_fd_read_once(arg, current_fiber)
elseif rtype == "fd_write" then
if write_fd_locks[arg] then
error(
"multiple fibers wait for writing to file descriptor " ..
tostring(arg)
)
end
poll_state.write_fds[arg] = true
write_fd_locks[arg] = current_fiber
eventqueue:add_fd_write_once(arg, current_fiber)
elseif rtype == "pid" then
if pid_locks[arg] then
error(
"multiple fibers wait for process with PID " .. tostring(arg)
)
end
poll_state.pids[arg] = true
pid_locks[arg] = true
eventqueue:add_pid(arg, current_fiber)
elseif rtype == "handle" then
if arg.ready then
return
end
if handle_locks[arg] then
error("multiple fibers wait for handle " .. tostring(arg))
end
poll_state.handles[arg] = true
handle_locks[arg] = true
arg._fiber = current_fiber
else
error("unsupported resource type to wait for")
end
end
fiber.sleep()
end
local signal_handles = {}
local function catch_signal(sig)
local handles = signal_handles[sig]
if not handles then
handles = setmetatable({}, weak_mt)
signal_handles[sig] = false
eventqueue:add_signal(
sig,
{
wake = function()
for handle in pairs(handles) do
handle.ready = true
local fib = handle._fiber
if fib then
fib:wake()
end
end
end,
}
)
signal_handles[sig] = handles
end
local handle = setmetatable(
{ ready = false, _fiber = false },
handle_reset_metatbl
)
handles[handle] = true
return handle
end
local function clean_timeout(self)
local inner_handle = self._inner_handle
self._inner_handle = nil
if inner_handle then
eventqueue:remove_timeout(seconds, inner_handle)
end
end
local timeout_metatbl = {
__call = handle_call_noreset,
__close = clean_timeout,
__gc = clean_timeout,
}
local function timeout(seconds)
local handle = setmetatable(
{ ready = false, _fiber = false, _inner_handle = false },
timeout_metatbl
)
handle._inner_handle = eventqueue:add_timeout(
seconds,
{
wake = function()
handle.ready = true
local fib = handle._fiber
if fib then
fib:wake()
end
end,
}
)
return handle
end
local function clean_interval(self)
local inner_handle = self._inner_handle
self._inner_handle = nil
if inner_handle then
eventqueue:remove_interval(seconds, inner_handle)
end
end
local interval_metatbl = {
__call = handle_call_reset,
__newindex = handle_newindex,
__close = clean_interval,
__gc = clean_interval,
}
local function interval(seconds)
local handle = setmetatable(
{ ready = false, _fiber = false, _inner_handle = false },
interval_metatbl
)
handle._inner_handle = eventqueue:add_interval(
seconds,
{
wake = function()
handle.ready = true
local fib = handle._fiber
if fib then
fib:wake()
end
end,
}
)
return handle
end
local function sync()
local sleeper = setmetatable(
{ ready = false, _waiting = false },
handle_reset_metatbl
)
local function waker()
sleeper.ready = true
local fib = sleeper._fiber
if fib then
fib:wake()
end
end
return sleeper, waker
end
return fiber.handle(
{
[waitio.deregister_fd] = function(resume, ...)
return resume:call(deregister_fd, ...)
end,
[waitio.select] = function(resume, ...)
return resume:call(wait_select, ...)
end,
[waitio.wait_fd_read] = function(resume, fd)
return resume:call(wait_fd_read, fd)
end,
[waitio.wait_fd_write] = function(resume, fd)
return resume:call(wait_fd_write, fd)
end,
[waitio.wait_pid] = function(resume, pid)
return resume:call(wait_pid, pid)
end,
[waitio.catch_signal] = function(resume, sig)
return resume:call(catch_signal, sig)
end,
[waitio.timeout] = function(resume, seconds)
return resume:call(timeout, seconds)
end,
[waitio.interval] = function(resume, seconds)
return resume:call(interval, seconds)
end,
[waitio.sync] = function(resume)
return resume:call(sync)
end,
},
function(body, ...)
fiber.spawn(function()
while true do
if fiber.pending() then
eventqueue:poll(wake)
else
eventqueue:wait(wake)
end
fiber.yield()
end
end)
return body(...)
end,
...
)
end
function _M.main(...)
return fiber.main(
_M.run,
...
)
end
return _M