-
-
Notifications
You must be signed in to change notification settings - Fork 273
/
reap_orphans.lua
94 lines (75 loc) · 2.9 KB
/
reap_orphans.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
redis.replicate_commands()
-------- BEGIN keys ---------
local digests_set = KEYS[1]
local schedule_set = KEYS[2]
local retry_set = KEYS[3]
-------- END keys ---------
-------- BEGIN argv ---------
local reaper_count = tonumber(ARGV[1])
local threshold = tonumber(ARGV[2])
-------- END argv ---------
-------- BEGIN injected arguments --------
local current_time = tonumber(ARGV[3])
local debug_lua = ARGV[4] == "true"
local max_history = tonumber(ARGV[5])
local script_name = ARGV[6] .. ".lua"
local redisversion = ARGV[7]
--------- END injected arguments ---------
-------- BEGIN local functions --------
<%= include_partial "shared/_common.lua" %>
<%= include_partial "shared/_find_digest_in_queues.lua" %>
<%= include_partial "shared/_find_digest_in_sorted_set.lua" %>
<%= include_partial "shared/_find_digest_in_process_set.lua" %>
---------- END local functions ----------
-------- BEGIN delete_orphaned.lua --------
log_debug("BEGIN")
local found = false
local per = 50
local total = redis.call("ZCARD", digests_set)
local index = 0
local del_count = 0
local redis_ver = toversion(redisversion)
local del_cmd = "DEL"
if tonumber(redis_ver["major"]) >= 4 then del_cmd = "UNLINK"; end
repeat
log_debug("Interating through:", digests_set, "for orphaned locks")
local digests = redis.call("ZREVRANGE", digests_set, index, index + per -1)
for _, digest in pairs(digests) do
log_debug("Searching for digest:", digest, "in", schedule_set)
found = find_digest_in_sorted_set(schedule_set, digest)
if found ~= true then
log_debug("Searching for digest:", digest, "in", retry_set)
found = find_digest_in_sorted_set(retry_set, digest)
end
if found ~= true then
log_debug("Searching for digest:", digest, "in all queues")
local queue = find_digest_in_queues(digest)
if queue then
log_debug("found digest:", digest, "in queue:", queue)
found = true
end
end
-- TODO: Add check for jobs checked out by process
if found ~= true then
log_debug("Searching for digest:", digest, "in process sets")
found = find_digest_in_process_set(digest, threshold)
end
if found ~= true then
local queued = digest .. ":QUEUED"
local primed = digest .. ":PRIMED"
local locked = digest .. ":LOCKED"
local info = digest .. ":INFO"
local run_digest = digest .. ":RUN"
local run_queued = digest .. ":RUN:QUEUED"
local run_primed = digest .. ":RUN:PRIMED"
local run_locked = digest .. ":RUN:LOCKED"
local run_info = digest .. ":RUN:INFO"
redis.call(del_cmd, digest, queued, primed, locked, info, run_digest, run_queued, run_primed, run_locked, run_info)
redis.call("ZREM", digests_set, digest)
del_count = del_count + 1
end
end
index = index + per
until index >= total or del_count >= reaper_count
log_debug("END")
return del_count