Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition to avoid reaping active jobs #563

Merged
merged 1 commit into from Jan 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 8 additions & 7 deletions lib/sidekiq_unique_jobs/lua/reap_orphans.lua
Expand Up @@ -7,15 +7,16 @@ local retry_set = KEYS[3]
-------- END keys ---------

-------- BEGIN argv ---------
local reaper_count = tonumber(ARGV[1])
local reaper_count = tonumber(ARGV[1])
local threshold = tonumber(ARGV[2])
-------- END argv ---------

-------- BEGIN injected arguments --------
local current_time = tonumber(ARGV[2])
local debug_lua = ARGV[3] == "true"
local max_history = tonumber(ARGV[4])
local script_name = ARGV[5] .. ".lua"
local redisversion = ARGV[6]
local current_time = tonumber(ARGV[3])
local debug_lua = ARGV[4] == "true"
local max_history = tonumber(ARGV[5])
local script_name = ARGV[6] .. ".lua"
local redisversion = ARGV[7]
--------- END injected arguments ---------


Expand Down Expand Up @@ -65,7 +66,7 @@ repeat
-- TODO: Add check for jobs checked out by process
if found ~= true then
log_debug("Searching for digest:", digest, "in process sets")
found = find_digest_in_process_set(digest)
found = find_digest_in_process_set(digest, threshold)
end

if found ~= true then
Expand Down
@@ -1,4 +1,4 @@
local function find_digest_in_process_set(digest)
local function find_digest_in_process_set(digest, threshold)
local process_cursor = 0
local job_cursor = 0
local pattern = "*" .. digest .. "*"
Expand Down Expand Up @@ -26,11 +26,18 @@ local function find_digest_in_process_set(digest)
log_debug("No entries in:", workers_key)
else
for i = 1, #jobs, 2 do
if string.find(jobs[i +1], digest) then
local jobstr = jobs[i +1]
if string.find(jobstr, digest) then
log_debug("Found digest", digest, "in:", workers_key)
found = true
break
end

local job = cjson.decode(jobstr)
if job.created_at > threshold then
found = true
break
end
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq_unique_jobs/orphans/lua_reaper.rb
Expand Up @@ -21,7 +21,7 @@ def call
:reap_orphans,
conn,
keys: [DIGESTS, SCHEDULE, RETRY, PROCESSES],
argv: [reaper_count],
argv: [reaper_count, (Time.now - reaper_timeout).to_f],
)
end
end
Expand Down
10 changes: 10 additions & 0 deletions lib/sidekiq_unique_jobs/orphans/reaper.rb
Expand Up @@ -76,6 +76,16 @@ def reaper
config.reaper
end

#
# The configured timeout for the reaper
#
#
# @return [Integer] timeout in seconds
#
def reaper_timeout
config.reaper_timeout
end

#
# The number of locks to reap at a time
#
Expand Down
11 changes: 9 additions & 2 deletions lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb
Expand Up @@ -117,7 +117,7 @@ def enqueued?(digest)
end
end

def active?(digest) # rubocop:disable Metrics/MethodLength
def active?(digest) # rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity
Sidekiq.redis do |conn|
procs = conn.sscan_each("processes").to_a
return false if procs.empty?
Expand All @@ -132,14 +132,21 @@ def active?(digest) # rubocop:disable Metrics/MethodLength
next unless workers.any?

workers.each_pair do |_tid, job|
return true if load_json(job).dig(PAYLOAD, LOCK_DIGEST) == digest
item = load_json(job)

return true if item.dig(PAYLOAD, LOCK_DIGEST) == digest
return true if considered_active?(item[CREATED_AT])
end
end

false
end
end

def considered_active?(time_f)
(Time.now - reaper_timeout).to_f < time_f
end

#
# Loops through all the redis queues and yields them one by one
#
Expand Down
8 changes: 7 additions & 1 deletion spec/sidekiq_unique_jobs/lua/delete_job_by_digest_spec.rb
Expand Up @@ -18,9 +18,15 @@
]
end

context "when job doesn't exist" do
let(:argv) { ["abcdefab"] }

it { is_expected.to eq(nil) }
end

context "when job is retried" do
let(:job_id) { "abcdefab" }
let(:job) { dump_json(item) }
let(:job) { dump_json(item) }
let(:item) do
{
"class" => "MyUniqueJob",
Expand Down
22 changes: 16 additions & 6 deletions spec/sidekiq_unique_jobs/lua/reap_orphans_spec.rb
Expand Up @@ -16,12 +16,22 @@
SidekiqUniqueJobs::RETRY,
]
end
let(:argv) { [100] }
let(:digest) { "uniquejobs:digest" }
let(:lock) { SidekiqUniqueJobs::Lock.create(digest, job_id, lock_info) }
let(:job_id) { "job_id" }
let(:item) { raw_item }
let(:raw_item) { { "class" => MyUniqueJob, "args" => [1, 2], "jid" => job_id, "lock_digest" => digest } }
let(:argv) { [100, threshold] }
let(:digest) { "uniquejobs:digest" }
let(:lock) { SidekiqUniqueJobs::Lock.create(digest, job_id, lock_info) }
let(:job_id) { "job_id" }
let(:item) { raw_item }
let(:created_at) { (Time.now - 1000).to_f }
let(:threshold) { [Time.now - SidekiqUniqueJobs.config.reaper_timeout] }
let(:raw_item) do
{
"class" => MyUniqueJob,
"args" => [1, 2],
"jid" => job_id,
"lock_digest" => digest,
"created_at" => created_at,
}
end
let(:lock_info) do
{
"job_id" => job_id,
Expand Down
71 changes: 59 additions & 12 deletions spec/sidekiq_unique_jobs/orphans/reaper_spec.rb
Expand Up @@ -33,14 +33,34 @@
end

describe ".call" do
subject(:call) { described_class.call }
subject(:call) { described_class.call(conn) }

let(:conn) { nil }

around do |example|
SidekiqUniqueJobs.use_config(reaper: reaper) do
example.run
end
end

context "when given a connection" do
let(:conn) { instance_spy(ConnectionPool) }
let(:reaper) { :ruby }
let(:reaper_spy) { instance_spy("SidekiqUniqueJobs::Orphans::Reaper") }

before do
allow(reaper_spy).to receive(:call)
allow(described_class).to receive(:new).and_return(reaper_spy)
end

it "calls the reaper with the given connection" do
call

expect(reaper_spy).to have_received(:call)
expect(described_class).to have_received(:new).with(conn)
end
end

shared_examples "deletes orphans" do
context "when scheduled" do
let(:item) { raw_item.merge("at" => Time.now.to_f + 3_600) }
Expand Down Expand Up @@ -109,36 +129,63 @@
end

context "with job in process" do
let(:process_key) { "process-id" }
let(:thread_id) { "thread-id" }
let(:worker_key) { "#{process_key}:workers" }
let(:process_key) { "process-id" }
let(:thread_id) { "thread-id" }
let(:worker_key) { "#{process_key}:workers" }
let(:created_at) { (Time.now - reaper_timeout).to_f }
let(:reaper_timeout) { SidekiqUniqueJobs.config.reaper_timeout }

before do
SidekiqUniqueJobs.redis do |conn|
conn.multi do
conn.sadd("processes", process_key)
conn.set(process_key, "bogus")
conn.hset(worker_key, thread_id, dump_json(payload: item))
conn.hset(worker_key, thread_id, dump_json(created_at: created_at, payload: item))
conn.expire(process_key, 60)
conn.expire(worker_key, 60)
end
end
end

# TODO: Adjust this spec for earlier sidekiq versions
context "that matches current digest", sidekiq_ver: ">= 5.0" do
it "keeps the digest" do
expect { call }.not_to change { digests.count }.from(1)
expect(unique_keys).not_to match_array([])
context "and created_at is old" do # rubocop:disable RSpec/NestedGroups
let(:created_at) { (Time.now - (reaper_timeout + 100)).to_f }

it "keeps the digest" do
expect { call }.not_to change { digests.count }.from(1)
expect(unique_keys).not_to match_array([])
end
end

context "and created_at is recent" do # rubocop:disable RSpec/NestedGroups
let(:created_at) { Time.now.to_f }

it "keeps the digest" do
expect { call }.not_to change { digests.count }.from(1)
expect(unique_keys).not_to match_array([])
end
end
end

context "that does not match current digest" do
let(:item) { { "class" => MyUniqueJob, "args" => [], "jid" => job_id, "lock_digest" => "uniquejobs:d2" } }

it "deletes the digest" do
expect { call }.to change { digests.count }.by(-1)
expect(unique_keys).to match_array([])
context "and created_at is old" do # rubocop:disable RSpec/NestedGroups
let(:created_at) { (Time.now - (reaper_timeout + 100)).to_f }

it "deletes the digest" do
expect { call }.to change { digests.count }.by(-1)
expect(unique_keys).to match_array([])
end
end

context "and created_at is recent" do # rubocop:disable RSpec/NestedGroups
let(:created_at) { Time.now.to_f }

it "keeps the digest" do
expect { call }.not_to change { digests.count }.from(1)
expect(unique_keys).not_to match_array([])
end
end
end
end
Expand Down