-
-
Notifications
You must be signed in to change notification settings - Fork 273
/
reap_orphans_spec.rb
145 lines (126 loc) · 3.66 KB
/
reap_orphans_spec.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# frozen_string_literal: true
RSpec.describe "reap_orphans.lua" do
subject(:reap_orphans) do
call_script(
:reap_orphans,
keys: redis_keys,
argv: argv,
)
end
let(:redis_keys) do
[
SidekiqUniqueJobs::DIGESTS,
SidekiqUniqueJobs::SCHEDULE,
SidekiqUniqueJobs::RETRY,
]
end
let(:argv) { [100, threshold] }
let(:digest) { "uniquejobs:digest" }
let(:lock) { SidekiqUniqueJobs::Lock.create(digest, job_id, lock_info) }
let(:job_id) { "job_id" }
let(:item) { raw_item }
let(:created_at) { (Time.now - 1000).to_f }
let(:threshold) { [Time.now - SidekiqUniqueJobs.config.reaper_timeout] }
let(:raw_item) do
{
"class" => MyUniqueJob,
"args" => [1, 2],
"jid" => job_id,
"lock_digest" => digest,
"created_at" => created_at,
}
end
let(:lock_info) do
{
"job_id" => job_id,
"limit" => 1,
"lock" => :while_executing,
"time" => now_f,
"timeout" => nil,
"ttl" => nil,
"lock_args" => [1, 2],
"worker" => "MyUniqueJob",
}
end
before do
SidekiqUniqueJobs.disable!
lock
end
after do
SidekiqUniqueJobs.enable!
end
context "when scheduled" do
let(:item) { raw_item.merge("at" => Time.now.to_f + 3_600) }
context "without scheduled job" do
it "keeps the digest" do
expect { reap_orphans }.to change { digests.count }.by(-1)
expect(unique_keys).to match_array([])
end
end
context "with scheduled job" do
before { push_item(item) }
it "keeps the digest" do
expect { reap_orphans }.not_to change { digests.count }.from(1)
expect(unique_keys).not_to match_array([])
end
end
end
context "when retried" do
let(:item) { raw_item.merge("retry_count" => 2, "failed_at" => now_f) }
context "without job in retry" do
it "keeps the digest" do
expect { reap_orphans }.to change { digests.count }.by(-1)
expect(unique_keys).to match_array([])
end
end
context "with job in retry" do
before { zadd("retry", Time.now.to_f.to_s, dump_json(item)) }
it "keeps the digest" do
expect { reap_orphans }.not_to change { digests.count }.from(1)
expect(unique_keys).not_to match_array([])
end
end
end
context "when digest exists in a queue" do
context "without enqueued job" do
it "keeps the digest" do
expect { reap_orphans }.to change { digests.count }.by(-1)
expect(unique_keys).to match_array([])
end
end
context "with enqueued job" do
before { push_item(item) }
it "keeps the digest" do
expect { reap_orphans }.not_to change { digests.count }.from(1)
expect(unique_keys).not_to match_array([])
end
end
end
context "when digest exists in a a process set" do
context "without job" do
it "keeps the digest" do
expect { reap_orphans }.to change { digests.count }.by(-1)
expect(unique_keys).to match_array([])
end
end
context "with job" do
let(:process_key) { "process-id" }
let(:thread_id) { "thread-id" }
let(:worker_key) { "#{process_key}:workers" }
before do
SidekiqUniqueJobs.redis do |conn|
conn.multi do
conn.sadd("processes", process_key)
conn.hset(worker_key, thread_id, dump_json(item))
conn.expire(process_key, 60)
conn.expire(worker_key, 60)
end
end
end
it "keeps the digest" do
expect { reap_orphans }.not_to change { digests.count }.from(1)
expect(unique_keys).not_to match_array([])
end
end
end
end