Skip to content

Commit

Permalink
Fix 532 (#549)
Browse files Browse the repository at this point in the history
* Clean up the run keys when given a run key

* Mandatory rubocop commit
  • Loading branch information
mhenrixon committed Nov 19, 2020
1 parent 9732c62 commit 4ce41b3
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 45 deletions.
1 change: 1 addition & 0 deletions lib/sidekiq_unique_jobs/digests.rb
Expand Up @@ -75,6 +75,7 @@ def delete_by_digest(digest) # rubocop:disable Metrics/MethodLength
"#{digest}:GRABBED",
"#{digest}:AVAILABLE",
"#{digest}:VERSION",
"#{digest}:RUN",
"#{digest}:RUN:EXISTS",
"#{digest}:RUN:GRABBED",
"#{digest}:RUN:AVAILABLE",
Expand Down
21 changes: 13 additions & 8 deletions redis/delete_by_digest.lua
Expand Up @@ -5,19 +5,24 @@ local exists_key = KEYS[3]
local grabbed_key = KEYS[4]
local available_key = KEYS[5]
local version_key = KEYS[6]
local run_exists_key = KEYS[7]
local run_grabbed_key = KEYS[8]
local run_available_key = KEYS[9]
local run_version_key = KEYS[10]
local run_key = KEYS[7]
local run_exists_key = KEYS[8]
local run_grabbed_key = KEYS[9]
local run_available_key = KEYS[10]
local run_version_key = KEYS[11]

local count = redis.call('SREM', unique_keys, unique_digest)
redis.call('DEL', exists_key)
redis.call('DEL', grabbed_key)
redis.call('DEL', available_key)
redis.call('DEL', version_key)
redis.call('DEL', run_exists_key)
redis.call('DEL', run_grabbed_key)
redis.call('DEL', run_available_key)
redis.call('DEL', run_version_key)

local result, index = run_key:gsub(':RUN:RUN', "")
if index == 0 then
redis.call('DEL', run_exists_key)
redis.call('DEL', run_grabbed_key)
redis.call('DEL', run_available_key)
redis.call('DEL', run_version_key)
end

return count
2 changes: 2 additions & 0 deletions spec/spec_helper.rb
Expand Up @@ -62,3 +62,5 @@ def capture(stream)

result
end

RSpec::Support::ObjectFormatter.default_instance.max_formatted_output_length = 10_000
14 changes: 7 additions & 7 deletions spec/support/simulate_lock.rb
Expand Up @@ -22,7 +22,7 @@ def lock_until_and_while_executing(digest, jid, ttl = nil)
end

def lock_while_executing(digest, jid, ttl = nil)
item = get_item(digest: digest, jid: jid, lock_type: :while_executing, ttl: ttl)
item = get_item(digest: "#{digest}:RUN", jid: jid, lock_type: :while_executing, ttl: ttl)
lock(item)
end

Expand All @@ -34,19 +34,19 @@ def runtime_lock(digest, jid, ttl = nil)
end

def lock(item)
Locksmith.new(item).lock
SidekiqUniqueJobs::Locksmith.new(item).lock
end

def unlock(item)
Locksmith.new(item).unlock
SidekiqUniqueJobs::Locksmith.new(item).unlock
end

def get_item(digest: "randomdigest", jid: "randomjid", lock_type: :until_executed, ttl: nil)
item = {
UNIQUE_DIGEST_KEY => digest,
JID_KEY => jid,
LOCK_EXPIRATION_KEY => ttl,
LOCK_KEY => lock_type,
SidekiqUniqueJobs::UNIQUE_DIGEST_KEY => digest,
SidekiqUniqueJobs::JID_KEY => jid,
SidekiqUniqueJobs::LOCK_EXPIRATION_KEY => ttl,
SidekiqUniqueJobs::LOCK_KEY => lock_type,
}
@items << item
item
Expand Down
112 changes: 82 additions & 30 deletions spec/unit/sidekiq_unique_jobs/digests_spec.rb
Expand Up @@ -2,30 +2,57 @@

require "spec_helper"
RSpec.describe SidekiqUniqueJobs::Digests, redis: :redis do
before do
(1..10).each do |arg|
MyUniqueJob.perform_async(arg, arg)
shared_context "with a regular job" do
let(:expected_keys) do
%w[
uniquejobs:e739dadc23533773b920936336341d01
uniquejobs:56c68cab5038eb57959538866377560d
uniquejobs:8d9e83be14c033be4496295ec2740b91
uniquejobs:23e8715233c2e8f7b578263fcb8ac657
uniquejobs:6722965def15faf3c45cb9e66f994a49
uniquejobs:5bdd20fbbdda2fc28d6461e0eb1f76ee
uniquejobs:c658060a30b761bb12f2133cb7c3f294
uniquejobs:b34294c4802ee2d61c9e3e8dd7f2bab4
uniquejobs:06c3a5b63038c7b724b8603bb02ace99
uniquejobs:62c11d32fd69c691802579682409a483
]
end

before do
(1..10).each do |arg|
MyUniqueJob.perform_async(arg, arg)
end
end
end

let(:expected_keys) do
%w[
uniquejobs:e739dadc23533773b920936336341d01
uniquejobs:56c68cab5038eb57959538866377560d
uniquejobs:8d9e83be14c033be4496295ec2740b91
uniquejobs:23e8715233c2e8f7b578263fcb8ac657
uniquejobs:6722965def15faf3c45cb9e66f994a49
uniquejobs:5bdd20fbbdda2fc28d6461e0eb1f76ee
uniquejobs:c658060a30b761bb12f2133cb7c3f294
uniquejobs:b34294c4802ee2d61c9e3e8dd7f2bab4
uniquejobs:06c3a5b63038c7b724b8603bb02ace99
uniquejobs:62c11d32fd69c691802579682409a483
]
shared_context "with a runtime job" do
before do
(1..10).each do |arg|
SimulateLock.lock_while_executing("uniquejobs:abcde#{arg}", arg.to_s)
end
end

let(:expected_keys) do
%w[
uniquejobs:abcde1:RUN
uniquejobs:abcde10:RUN
uniquejobs:abcde2:RUN
uniquejobs:abcde3:RUN
uniquejobs:abcde4:RUN
uniquejobs:abcde5:RUN
uniquejobs:abcde6:RUN
uniquejobs:abcde7:RUN
uniquejobs:abcde8:RUN
uniquejobs:abcde9:RUN
]
end
end

describe ".all" do
subject(:all) { described_class.all(pattern: "*", count: 1000) }

include_context "with a regular job"

it { is_expected.to match_array(expected_keys) }
end

Expand All @@ -36,6 +63,8 @@
let(:pattern) { nil }
let(:count) { 1000 }

include_context "with a regular job"

before do
allow(described_class).to receive(:log_info)
end
Expand Down Expand Up @@ -80,24 +109,45 @@
describe ".delete_by_digest" do
subject(:delete_by_digest) { described_class.delete_by_digest(digest) }

let(:digest) { expected_keys.last }
context "when with a regular job" do
include_context "with a regular job"

before do
allow(described_class).to receive(:log_info)
end
let(:digest) { expected_keys.last }

it "deletes just the specific digest" do
expect(delete_by_digest).to eq(9)
expect(described_class.all).to match_array(expected_keys - [digest])
before do
allow(described_class).to receive(:log_info)
end

it "deletes just the specific digest" do
expect(delete_by_digest).to eq(9)
expect(described_class.all).to match_array(expected_keys - [digest])
end

it "logs performance info" do
delete_by_digest
expect(described_class).to have_received(:log_info)
.with(
a_string_starting_with("delete_by_digest(#{digest})")
.and(matching(/completed in (\d\.\d+)ms/)),
)
end
end

it "logs performance info" do
delete_by_digest
expect(described_class).to have_received(:log_info)
.with(
a_string_starting_with("delete_by_digest(#{digest})")
.and(matching(/completed in (\d\.\d+)ms/)),
)
context "when given a runtime job" do
include_context "with a runtime job"

let(:digest) { expected_keys.last }

it "deletes just the specific digest" do
expect(delete_by_digest).to eq(9)
expect(unique_keys).not_to include(%W[
#{digest}
#{digest}:EXISTS
#{digest}:GRABBED
])

expect(described_class.all).to match_array(expected_keys - [digest])
end
end
end

Expand All @@ -107,6 +157,8 @@
let(:pattern) { "*" }
let(:count) { 1000 }

include_context "with a regular job"

before do
allow(described_class).to receive(:log_info)
end
Expand Down

0 comments on commit 4ce41b3

Please sign in to comment.