diff --git a/lib/sidekiq_unique_jobs/lua/reap_orphans.lua b/lib/sidekiq_unique_jobs/lua/reap_orphans.lua index 9b16ccd42..e3845ec35 100644 --- a/lib/sidekiq_unique_jobs/lua/reap_orphans.lua +++ b/lib/sidekiq_unique_jobs/lua/reap_orphans.lua @@ -7,15 +7,16 @@ local retry_set = KEYS[3] -------- END keys --------- -------- BEGIN argv --------- -local reaper_count = tonumber(ARGV[1]) +local reaper_count = tonumber(ARGV[1]) +local threshold = tonumber(ARGV[2]) -------- END argv --------- -------- BEGIN injected arguments -------- -local current_time = tonumber(ARGV[2]) -local debug_lua = ARGV[3] == "true" -local max_history = tonumber(ARGV[4]) -local script_name = ARGV[5] .. ".lua" -local redisversion = ARGV[6] +local current_time = tonumber(ARGV[3]) +local debug_lua = ARGV[4] == "true" +local max_history = tonumber(ARGV[5]) +local script_name = ARGV[6] .. ".lua" +local redisversion = ARGV[7] --------- END injected arguments --------- @@ -65,7 +66,7 @@ repeat -- TODO: Add check for jobs checked out by process if found ~= true then log_debug("Searching for digest:", digest, "in process sets") - found = find_digest_in_process_set(digest) + found = find_digest_in_process_set(digest, threshold) end if found ~= true then diff --git a/lib/sidekiq_unique_jobs/lua/shared/_find_digest_in_process_set.lua b/lib/sidekiq_unique_jobs/lua/shared/_find_digest_in_process_set.lua index 2d04788f8..391e72ead 100644 --- a/lib/sidekiq_unique_jobs/lua/shared/_find_digest_in_process_set.lua +++ b/lib/sidekiq_unique_jobs/lua/shared/_find_digest_in_process_set.lua @@ -1,4 +1,4 @@ -local function find_digest_in_process_set(digest) +local function find_digest_in_process_set(digest, threshold) local process_cursor = 0 local job_cursor = 0 local pattern = "*" .. digest .. "*" @@ -26,11 +26,18 @@ local function find_digest_in_process_set(digest) log_debug("No entries in:", workers_key) else for i = 1, #jobs, 2 do - if string.find(jobs[i +1], digest) then + local jobstr = jobs[i +1] + if string.find(jobstr, digest) then log_debug("Found digest", digest, "in:", workers_key) found = true break end + + local job = cjson.decode(jobstr) + if job.created_at > threshold then + found = true + break + end end end diff --git a/lib/sidekiq_unique_jobs/orphans/lua_reaper.rb b/lib/sidekiq_unique_jobs/orphans/lua_reaper.rb index a2de43bf8..fc9fd0101 100644 --- a/lib/sidekiq_unique_jobs/orphans/lua_reaper.rb +++ b/lib/sidekiq_unique_jobs/orphans/lua_reaper.rb @@ -21,7 +21,7 @@ def call :reap_orphans, conn, keys: [DIGESTS, SCHEDULE, RETRY, PROCESSES], - argv: [reaper_count], + argv: [reaper_count, (Time.now - reaper_timeout).to_f], ) end end diff --git a/lib/sidekiq_unique_jobs/orphans/reaper.rb b/lib/sidekiq_unique_jobs/orphans/reaper.rb index c3d6ed5af..a560647b4 100644 --- a/lib/sidekiq_unique_jobs/orphans/reaper.rb +++ b/lib/sidekiq_unique_jobs/orphans/reaper.rb @@ -76,6 +76,16 @@ def reaper config.reaper end + # + # The configured timeout for the reaper + # + # + # @return [Integer] timeout in seconds + # + def reaper_timeout + config.reaper_timeout + end + # # The number of locks to reap at a time # diff --git a/lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb b/lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb index 3cb0092b5..f005c1c56 100644 --- a/lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb +++ b/lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb @@ -117,7 +117,7 @@ def enqueued?(digest) end end - def active?(digest) # rubocop:disable Metrics/MethodLength + def active?(digest) # rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity Sidekiq.redis do |conn| procs = conn.sscan_each("processes").to_a return false if procs.empty? @@ -132,7 +132,10 @@ def active?(digest) # rubocop:disable Metrics/MethodLength next unless workers.any? workers.each_pair do |_tid, job| - return true if load_json(job).dig(PAYLOAD, LOCK_DIGEST) == digest + item = load_json(job) + + return true if item.dig(PAYLOAD, LOCK_DIGEST) == digest + return true if considered_active?(item[CREATED_AT]) end end @@ -140,6 +143,10 @@ def active?(digest) # rubocop:disable Metrics/MethodLength end end + def considered_active?(time_f) + (Time.now - reaper_timeout).to_f < time_f + end + # # Loops through all the redis queues and yields them one by one # diff --git a/spec/sidekiq_unique_jobs/lua/delete_job_by_digest_spec.rb b/spec/sidekiq_unique_jobs/lua/delete_job_by_digest_spec.rb index dc4d9a66d..120a921df 100644 --- a/spec/sidekiq_unique_jobs/lua/delete_job_by_digest_spec.rb +++ b/spec/sidekiq_unique_jobs/lua/delete_job_by_digest_spec.rb @@ -18,9 +18,15 @@ ] end + context "when job doesn't exist" do + let(:argv) { ["abcdefab"] } + + it { is_expected.to eq(nil) } + end + context "when job is retried" do let(:job_id) { "abcdefab" } - let(:job) { dump_json(item) } + let(:job) { dump_json(item) } let(:item) do { "class" => "MyUniqueJob", diff --git a/spec/sidekiq_unique_jobs/lua/reap_orphans_spec.rb b/spec/sidekiq_unique_jobs/lua/reap_orphans_spec.rb index ad6dab10c..3ddf82e02 100644 --- a/spec/sidekiq_unique_jobs/lua/reap_orphans_spec.rb +++ b/spec/sidekiq_unique_jobs/lua/reap_orphans_spec.rb @@ -16,12 +16,22 @@ SidekiqUniqueJobs::RETRY, ] end - let(:argv) { [100] } - let(:digest) { "uniquejobs:digest" } - let(:lock) { SidekiqUniqueJobs::Lock.create(digest, job_id, lock_info) } - let(:job_id) { "job_id" } - let(:item) { raw_item } - let(:raw_item) { { "class" => MyUniqueJob, "args" => [1, 2], "jid" => job_id, "lock_digest" => digest } } + let(:argv) { [100, threshold] } + let(:digest) { "uniquejobs:digest" } + let(:lock) { SidekiqUniqueJobs::Lock.create(digest, job_id, lock_info) } + let(:job_id) { "job_id" } + let(:item) { raw_item } + let(:created_at) { (Time.now - 1000).to_f } + let(:threshold) { [Time.now - SidekiqUniqueJobs.config.reaper_timeout] } + let(:raw_item) do + { + "class" => MyUniqueJob, + "args" => [1, 2], + "jid" => job_id, + "lock_digest" => digest, + "created_at" => created_at, + } + end let(:lock_info) do { "job_id" => job_id, diff --git a/spec/sidekiq_unique_jobs/orphans/reaper_spec.rb b/spec/sidekiq_unique_jobs/orphans/reaper_spec.rb index c1d7a1244..e4e3d6af3 100644 --- a/spec/sidekiq_unique_jobs/orphans/reaper_spec.rb +++ b/spec/sidekiq_unique_jobs/orphans/reaper_spec.rb @@ -33,7 +33,9 @@ end describe ".call" do - subject(:call) { described_class.call } + subject(:call) { described_class.call(conn) } + + let(:conn) { nil } around do |example| SidekiqUniqueJobs.use_config(reaper: reaper) do @@ -41,6 +43,24 @@ end end + context "when given a connection" do + let(:conn) { instance_spy(ConnectionPool) } + let(:reaper) { :ruby } + let(:reaper_spy) { instance_spy("SidekiqUniqueJobs::Orphans::Reaper") } + + before do + allow(reaper_spy).to receive(:call) + allow(described_class).to receive(:new).and_return(reaper_spy) + end + + it "calls the reaper with the given connection" do + call + + expect(reaper_spy).to have_received(:call) + expect(described_class).to have_received(:new).with(conn) + end + end + shared_examples "deletes orphans" do context "when scheduled" do let(:item) { raw_item.merge("at" => Time.now.to_f + 3_600) } @@ -109,36 +129,63 @@ end context "with job in process" do - let(:process_key) { "process-id" } - let(:thread_id) { "thread-id" } - let(:worker_key) { "#{process_key}:workers" } + let(:process_key) { "process-id" } + let(:thread_id) { "thread-id" } + let(:worker_key) { "#{process_key}:workers" } + let(:created_at) { (Time.now - reaper_timeout).to_f } + let(:reaper_timeout) { SidekiqUniqueJobs.config.reaper_timeout } before do SidekiqUniqueJobs.redis do |conn| conn.multi do conn.sadd("processes", process_key) conn.set(process_key, "bogus") - conn.hset(worker_key, thread_id, dump_json(payload: item)) + conn.hset(worker_key, thread_id, dump_json(created_at: created_at, payload: item)) conn.expire(process_key, 60) conn.expire(worker_key, 60) end end end - # TODO: Adjust this spec for earlier sidekiq versions context "that matches current digest", sidekiq_ver: ">= 5.0" do - it "keeps the digest" do - expect { call }.not_to change { digests.count }.from(1) - expect(unique_keys).not_to match_array([]) + context "and created_at is old" do # rubocop:disable RSpec/NestedGroups + let(:created_at) { (Time.now - (reaper_timeout + 100)).to_f } + + it "keeps the digest" do + expect { call }.not_to change { digests.count }.from(1) + expect(unique_keys).not_to match_array([]) + end + end + + context "and created_at is recent" do # rubocop:disable RSpec/NestedGroups + let(:created_at) { Time.now.to_f } + + it "keeps the digest" do + expect { call }.not_to change { digests.count }.from(1) + expect(unique_keys).not_to match_array([]) + end end end context "that does not match current digest" do let(:item) { { "class" => MyUniqueJob, "args" => [], "jid" => job_id, "lock_digest" => "uniquejobs:d2" } } - it "deletes the digest" do - expect { call }.to change { digests.count }.by(-1) - expect(unique_keys).to match_array([]) + context "and created_at is old" do # rubocop:disable RSpec/NestedGroups + let(:created_at) { (Time.now - (reaper_timeout + 100)).to_f } + + it "deletes the digest" do + expect { call }.to change { digests.count }.by(-1) + expect(unique_keys).to match_array([]) + end + end + + context "and created_at is recent" do # rubocop:disable RSpec/NestedGroups + let(:created_at) { Time.now.to_f } + + it "keeps the digest" do + expect { call }.not_to change { digests.count }.from(1) + expect(unique_keys).not_to match_array([]) + end end end end