-
-
Notifications
You must be signed in to change notification settings - Fork 273
/
_find_digest_in_process_set.lua
53 lines (44 loc) · 1.52 KB
/
_find_digest_in_process_set.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
local function find_digest_in_process_set(digest, threshold)
local process_cursor = 0
local job_cursor = 0
local pattern = "*" .. digest .. "*"
local found = false
log_debug("Searching in process list",
"for digest:", digest,
"cursor:", process_cursor)
repeat
local process_paginator = redis.call("SSCAN", "processes", process_cursor, "MATCH", "*")
local next_process_cursor = process_paginator[1]
local processes = process_paginator[2]
log_debug("Found number of processes:", #processes, "next cursor:", next_process_cursor)
for _, process in ipairs(processes) do
local workers_key = process .. ":workers"
log_debug("searching in process set:", process,
"for digest:", digest,
"cursor:", process_cursor)
local jobs = redis.call("HGETALL", workers_key)
if #jobs == 0 then
log_debug("No entries in:", workers_key)
else
for i = 1, #jobs, 2 do
local jobstr = jobs[i +1]
if string.find(jobstr, digest) then
log_debug("Found digest", digest, "in:", workers_key)
found = true
break
end
local job = cjson.decode(jobstr)
if job.created_at > threshold then
found = true
break
end
end
end
if found == true then
break
end
end
process_cursor = next_process_cursor
until found == true or process_cursor == "0"
return found
end