Skip to content

Commit

Permalink
fix: ensure a new lock isn't conflicting with itself (#830)
Browse files Browse the repository at this point in the history
* fix: give orphans a safer cleanup buffer

Sidekiq only checks in every ten seconds or so. Take this into consideration when cleaning up orphans. Also, make use of zrange byscore to limit the selection a bit.

* chore: soft deprecate lua reaper
  • Loading branch information
mhenrixon committed Feb 7, 2024
1 parent 63e9431 commit b1e3573
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 25 deletions.
2 changes: 1 addition & 1 deletion lib/sidekiq_unique_jobs/batch_delete.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module SidekiqUniqueJobs
class BatchDelete
#
# @return [Integer] the default batch size
BATCH_SIZE = 100
BATCH_SIZE = 500

#
# @return [Array<String>] Supported key suffixes
Expand Down
1 change: 1 addition & 0 deletions lib/sidekiq_unique_jobs/constants.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ module SidekiqUniqueJobs
DEAD_VERSION = "uniquejobs:dead"
DIGESTS = "uniquejobs:digests"
EXPIRING_DIGESTS = "uniquejobs:expiring_digests"
ORPHANED_DIGESTS = "uniquejobs:orphaned_digests"
ERRORS = "errors"
JID = "jid"
LIMIT = "limit"
Expand Down
23 changes: 13 additions & 10 deletions lib/sidekiq_unique_jobs/lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ class Lock # rubocop:disable Metrics/ClassLength
#
# @return [Lock] a newly lock that has been locked
#
def self.create(digest, job_id, lock_info = {})
lock = new(digest, time: Timing.now_f)
lock.lock(job_id, lock_info)
def self.create(digest, job_id, lock_info: {}, time: Timing.now_f, score: nil)
lock = new(digest, time: time)
lock.lock(job_id, lock_info, score)
lock
end

Expand Down Expand Up @@ -63,15 +63,16 @@ def initialize(key, time: nil)
#
# @return [void]
#
def lock(job_id, lock_info = {})
def lock(job_id, lock_info = {}, score = nil)
score ||= now_f
redis do |conn|
conn.multi do |pipeline|
pipeline.set(key.digest, job_id)
pipeline.hset(key.locked, job_id, now_f)
info.set(lock_info, pipeline)
add_digest_to_set(pipeline, lock_info)
pipeline.zadd(key.changelog, now_f, changelog_json(job_id, "queue.lua", "Queued"))
pipeline.zadd(key.changelog, now_f, changelog_json(job_id, "lock.lua", "Locked"))
add_digest_to_set(pipeline, lock_info, score)
pipeline.zadd(key.changelog, score, changelog_json(job_id, "queue.lua", "Queued"))
pipeline.zadd(key.changelog, score, changelog_json(job_id, "lock.lua", "Locked"))
end
end
end
Expand Down Expand Up @@ -333,12 +334,14 @@ def changelog_json(job_id, script, message)
#
# @return [nil]
#
def add_digest_to_set(pipeline, lock_info)
def add_digest_to_set(pipeline, lock_info, score = nil)
score ||= now_f
digest_string = key.digest

if lock_info["lock"] == :until_expired
pipeline.zadd(key.expiring_digests, now_f + lock_info["ttl"], digest_string)
pipeline.zadd(key.expiring_digests, score + lock_info["ttl"], digest_string)
else
pipeline.zadd(key.digests, now_f, digest_string)
pipeline.zadd(key.digests, score, digest_string)
end
end
end
Expand Down
3 changes: 2 additions & 1 deletion lib/sidekiq_unique_jobs/orphans/reaper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ class Reaper
#
# @return [Hash<Symbol, SidekiqUniqueJobs::Orphans::Reaper] the current implementation of reapers
REAPERS = {
lua: SidekiqUniqueJobs::Orphans::LuaReaper,
lua: SidekiqUniqueJobs::Orphans::RubyReaper,
ruby: SidekiqUniqueJobs::Orphans::RubyReaper,
none: SidekiqUniqueJobs::Orphans::NullReaper,
nil => SidekiqUniqueJobs::Orphans::NullReaper,
false => SidekiqUniqueJobs::Orphans::NullReaper,
true => SidekiqUniqueJobs::Orphans::RubyReaper,
}.freeze

#
Expand Down
33 changes: 26 additions & 7 deletions lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ module Orphans
class RubyReaper < Reaper
include SidekiqUniqueJobs::Timing

#
# @return [Integer] a best guess of Sidekiq::Launcher::BEAT_PAUSE
SIDEKIQ_BEAT_PAUSE = 10
#
# @return [String] the suffix for :RUN locks
RUN_SUFFIX = ":RUN"
Expand Down Expand Up @@ -74,25 +77,41 @@ def call

BatchDelete.call(expired_digests, conn)
BatchDelete.call(orphans, conn)

# orphans.each_slice(500) do |chunk|
# conn.pipelined do |pipeline|
# chunk.each do |digest|
# next if belongs_to_job?(digest)

# pipeline.zadd(ORPHANED_DIGESTS, now_f, digest)
# end
# end
# end
end

def expired_digests
max_score = (start_time - reaper_timeout).to_f

conn.zrange(EXPIRING_DIGESTS, 0, max_score, "byscore")
end

def orphaned_digests
conn.zrange(ORPHANED_DIGESTS, 0, max_score, "byscore")
end

def max_score
(start_time - reaper_timeout - SIDEKIQ_BEAT_PAUSE).to_f
end

#
# Find orphaned digests
#
#
# @return [Array<String>] an array of orphaned digests
#
def orphans # rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity
page = 0
per = reaper_count * 2
orphans = []
results = conn.zrange(digests.key, page * per, (page + 1) * per)
page = 0
per = reaper_count * 2
results = digests.byscore(0, max_score, offset: page * per, count: (page + 1) * per)

while results.size.positive?
results.each do |digest|
Expand All @@ -107,7 +126,7 @@ def orphans # rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity
break if orphans.size >= reaper_count

page += 1
results = conn.zrange(digests.key, page * per, (page + 1) * per)
results = digests.byscore(0, max_score, offset: page * per, count: (page + 1) * per)
end

orphans
Expand Down Expand Up @@ -218,7 +237,7 @@ def match?(key_one, key_two)
end

def considered_active?(time_f)
(Time.now - reaper_timeout).to_f < time_f
max_score < time_f
end

#
Expand Down
8 changes: 8 additions & 0 deletions lib/sidekiq_unique_jobs/redis/sorted_set.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ def add(values)
end
end

def byscore(min, max, offset: nil, count: nil)
redis do |conn|
return conn.zrange(key, min, max, "byscore") unless offset && count

conn.zrange(key, min, max, "byscore", "limit", offset, count)
end
end

#
# Return the zrak of the member
#
Expand Down
4 changes: 2 additions & 2 deletions spec/sidekiq_unique_jobs/lock_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
its(:to_s) { is_expected.to eq(expected_string) }

describe ".create" do
subject(:create) { described_class.create(key, job_id, lock_info) }
subject(:create) { described_class.create(key, job_id, lock_info: lock_info) }

it "creates all expected keys in redis" do
create
Expand Down Expand Up @@ -77,7 +77,7 @@
describe "#del" do
subject(:del) { lock.del }

let(:lock) { described_class.create(key, job_id, info) }
let(:lock) { described_class.create(key, job_id, lock_info: info) }

it "creates keys and adds job_id to locked hash" do
expect { lock }.to change { entity.locked_jids }.to([job_id])
Expand Down
2 changes: 1 addition & 1 deletion spec/sidekiq_unique_jobs/lua/reap_orphans_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
end
let(:argv) { [100, threshold] }
let(:digest) { "uniquejobs:digest" }
let(:lock) { SidekiqUniqueJobs::Lock.create(digest, job_id, lock_info) }
let(:lock) { SidekiqUniqueJobs::Lock.create(digest, job_id, lock_info: lock_info) }
let(:job_id) { "job_id" }
let(:item) { raw_item }
let(:created_at) { (Time.now - 1000).to_f }
Expand Down
13 changes: 11 additions & 2 deletions spec/sidekiq_unique_jobs/orphans/reaper_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,17 @@
let(:digest) { "uniquejobs:digest" }
let(:job_id) { "job_id" }
let(:item) { raw_item }
let(:lock) { SidekiqUniqueJobs::Lock.create(digest, job_id, lock_info) }
let(:lock) { SidekiqUniqueJobs::Lock.create(digest, job_id, lock_info: lock_info, score: score) }
let(:raw_item) { { "class" => MyUniqueJob, "args" => [], "jid" => job_id, "lock_digest" => digest } }

let(:score) do
(
Time.now -
SidekiqUniqueJobs.config.reaper_timeout -
SidekiqUniqueJobs::Orphans::RubyReaper::SIDEKIQ_BEAT_PAUSE -
100
).to_f
end
let(:lock_info) do
{
"job_id" => job_id,
Expand Down Expand Up @@ -152,7 +161,7 @@
end

context "when digest has :RUN suffix" do
let(:lock) { SidekiqUniqueJobs::Lock.create("#{digest}:RUN", job_id, lock_info) }
let(:lock) { SidekiqUniqueJobs::Lock.create("#{digest}:RUN", job_id, lock_info: lock_info) }

context "that matches current digest" do # rubocop:disable RSpec/NestedGroups
let(:created_at) { (Time.now - (reaper_timeout + 100)).to_f }
Expand Down
11 changes: 10 additions & 1 deletion spec/sidekiq_unique_jobs/orphans/ruby_reaper_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
let(:digest) { "uniquejobs:digest" }
let(:job_id) { "job_id" }
let(:item) { raw_item }
let(:lock) { SidekiqUniqueJobs::Lock.create(digest, job_id, lock_info) }
let(:lock) { SidekiqUniqueJobs::Lock.create(digest, job_id, lock_info: lock_info, score: score) }
let(:raw_item) { { "class" => MyUniqueJob, "args" => [], "jid" => job_id, "lock_digest" => digest } }
let(:lock_info) do
{
Expand All @@ -20,6 +20,15 @@
}
end

let(:score) do
(
Time.now -
SidekiqUniqueJobs.config.reaper_timeout -
SidekiqUniqueJobs::Orphans::RubyReaper::SIDEKIQ_BEAT_PAUSE -
100
).to_f
end

before do
SidekiqUniqueJobs.disable!
lock
Expand Down

0 comments on commit b1e3573

Please sign in to comment.