Skip to content

Commit

Permalink
Add digest scores for faster deletes in sorted sets (#835)
Browse files Browse the repository at this point in the history
* add performance test for replace conflict strategy

* fix performance issue when deleting from large sets
  • Loading branch information
ezekg committed Feb 22, 2024
1 parent 6b81b77 commit 1bfba2f
Show file tree
Hide file tree
Showing 15 changed files with 184 additions and 63 deletions.
9 changes: 7 additions & 2 deletions lib/sidekiq_unique_jobs/locksmith.rb
Expand Up @@ -82,7 +82,7 @@ def delete
# Deletes the lock regardless of if it has a pttl set
#
def delete!
call_script(:delete, key.to_a, [job_id, config.pttl, config.type, config.limit]).to_i.positive?
call_script(:delete, key.to_a, argv).to_i.positive?
end

#
Expand Down Expand Up @@ -362,7 +362,11 @@ def taken?(conn)
end

def argv
[job_id, config.pttl, config.type, config.limit]
[job_id, config.pttl, config.type, config.limit, lock_score]
end

def lock_score
item[AT].to_s
end

def lock_info
Expand All @@ -375,6 +379,7 @@ def lock_info
TYPE => config.type,
LOCK_ARGS => item[LOCK_ARGS],
TIME => now_f,
AT => item[AT],
)
end

Expand Down
11 changes: 6 additions & 5 deletions lib/sidekiq_unique_jobs/lua/delete.lua
Expand Up @@ -13,14 +13,15 @@ local job_id = ARGV[1]
local pttl = tonumber(ARGV[2])
local lock_type = ARGV[3]
local limit = tonumber(ARGV[4])
local lock_score = ARGV[5]
-------- END lock arguments -----------

-------- BEGIN injected arguments --------
local current_time = tonumber(ARGV[5])
local debug_lua = tostring(ARGV[6]) == "1"
local max_history = tonumber(ARGV[7])
local script_name = tostring(ARGV[8]) .. ".lua"
local redisversion = tostring(ARGV[9])
local current_time = tonumber(ARGV[6])
local debug_lua = tostring(ARGV[7]) == "1"
local max_history = tonumber(ARGV[8])
local script_name = tostring(ARGV[9]) .. ".lua"
local redisversion = tostring(ARGV[10])
--------- END injected arguments ---------

-------- BEGIN local functions --------
Expand Down
23 changes: 16 additions & 7 deletions lib/sidekiq_unique_jobs/lua/lock.lua
Expand Up @@ -15,15 +15,16 @@ local job_id = ARGV[1]
local pttl = tonumber(ARGV[2])
local lock_type = ARGV[3]
local limit = tonumber(ARGV[4])
local lock_score = ARGV[5]
-------- END lock arguments -----------


-------- BEGIN injected arguments --------
local current_time = tonumber(ARGV[5])
local debug_lua = tostring(ARGV[6]) == "1"
local max_history = tonumber(ARGV[7])
local script_name = tostring(ARGV[8]) .. ".lua"
local redisversion = ARGV[9]
local current_time = tonumber(ARGV[6])
local debug_lua = tostring(ARGV[7]) == "1"
local max_history = tonumber(ARGV[8])
local script_name = tostring(ARGV[9]) .. ".lua"
local redisversion = ARGV[10]
--------- END injected arguments ---------


Expand Down Expand Up @@ -62,8 +63,16 @@ if lock_type == "until_expired" and pttl and pttl > 0 then
log_debug("ZADD", expiring_digests, current_time + pttl, digest)
redis.call("ZADD", expiring_digests, current_time + pttl, digest)
else
log_debug("ZADD", digests, current_time, digest)
redis.call("ZADD", digests, current_time, digest)
local score

if #lock_score == 0 then
score = current_time
else
score = lock_score
end

log_debug("ZADD", digests, score, digest)
redis.call("ZADD", digests, score, digest)
end

log_debug("HSET", locked, job_id, current_time)
Expand Down
17 changes: 9 additions & 8 deletions lib/sidekiq_unique_jobs/lua/queue.lua
Expand Up @@ -10,18 +10,19 @@ local digests = KEYS[7]


-------- BEGIN lock arguments ---------
local job_id = ARGV[1] -- The job_id that was previously primed
local pttl = tonumber(ARGV[2])
local lock_type = ARGV[3]
local limit = tonumber(ARGV[4])
local job_id = ARGV[1] -- The job_id that was previously primed
local pttl = tonumber(ARGV[2])
local lock_type = ARGV[3]
local limit = tonumber(ARGV[4])
local lock_score = ARGV[5]
-------- END lock arguments -----------


-------- BEGIN injected arguments --------
local current_time = tonumber(ARGV[5])
local debug_lua = tostring(ARGV[6]) == "1"
local max_history = tonumber(ARGV[7])
local script_name = tostring(ARGV[8]) .. ".lua"
local current_time = tonumber(ARGV[6])
local debug_lua = tostring(ARGV[7]) == "1"
local max_history = tonumber(ARGV[8])
local script_name = tostring(ARGV[9]) .. ".lua"
--------- END injected arguments ---------


Expand Down
20 changes: 10 additions & 10 deletions lib/sidekiq_unique_jobs/lua/shared/_delete_from_queue.lua
@@ -1,22 +1,22 @@
local function delete_from_queue(queue, digest)
local per = 50
local total = redis.call("LLEN", queue)
local index = 0
local result = nil
local total = redis.call("LLEN", queue)
local per = 50

for index = 0, total, per do
local items = redis.call("LRANGE", queue, index, index + per - 1)

while (index < total) do
local items = redis.call("LRANGE", queue, index, index + per -1)
if #items == 0 then
break
end

for _, item in pairs(items) do
if string.find(item, digest) then
redis.call("LREM", queue, 1, item)
result = item
break

return item
end
end
index = index + per
end
return result

return nil
end
30 changes: 20 additions & 10 deletions lib/sidekiq_unique_jobs/lua/shared/_delete_from_sorted_set.lua
@@ -1,19 +1,29 @@
local function delete_from_sorted_set(name, digest)
local per = 50
local total = redis.call("zcard", name)
local index = 0
local result
local score = redis.call("ZSCORE", "uniquejobs:digests", digest)
local total = redis.call("ZCARD", name)
local per = 50

for offset = 0, total, per do
local items

if score then
items = redis.call("ZRANGE", name, score, "+inf", "BYSCORE", "LIMIT", offset, per)
else
items = redis.call("ZRANGE", name, offset, offset + per -1)
end

if #items == 0 then
break
end

while (index < total) do
local items = redis.call("ZRANGE", name, index, index + per -1)
for _, item in pairs(items) do
if string.find(item, digest) then
redis.call("ZREM", name, item)
result = item
break

return item
end
end
index = index + per
end
return result

return nil
end
19 changes: 10 additions & 9 deletions lib/sidekiq_unique_jobs/lua/unlock.lua
Expand Up @@ -10,19 +10,20 @@ local digests = KEYS[7]


-------- BEGIN lock arguments ---------
local job_id = ARGV[1]
local pttl = tonumber(ARGV[2])
local lock_type = ARGV[3]
local limit = tonumber(ARGV[4])
local job_id = ARGV[1]
local pttl = tonumber(ARGV[2])
local lock_type = ARGV[3]
local limit = tonumber(ARGV[4])
local lock_score = ARGV[5]
-------- END lock arguments -----------


-------- BEGIN injected arguments --------
local current_time = tonumber(ARGV[5])
local debug_lua = tostring(ARGV[6]) == "1"
local max_history = tonumber(ARGV[7])
local script_name = tostring(ARGV[8]) .. ".lua"
local redisversion = ARGV[9]
local current_time = tonumber(ARGV[6])
local debug_lua = tostring(ARGV[7]) == "1"
local max_history = tonumber(ARGV[8])
local script_name = tostring(ARGV[9]) .. ".lua"
local redisversion = ARGV[10]
--------- END injected arguments ---------


Expand Down
55 changes: 55 additions & 0 deletions spec/performance/unique_job_on_conflict_replace_spec.rb
@@ -0,0 +1,55 @@
# frozen_string_literal: true

RSpec.describe UniqueJobOnConflictReplace, :perf do
let(:lock_prefix) { described_class.sidekiq_options.fetch("lock_prefix") { SidekiqUniqueJobs.config.lock_prefix } }
let(:lock_timeout) { described_class.sidekiq_options.fetch("lock_timeout") { SidekiqUniqueJobs.config.lock_timeout } }
let(:lock_ttl) { described_class.sidekiq_options.fetch("lock_ttl") { SidekiqUniqueJobs.config.lock_ttl } }
let(:queue) { described_class.sidekiq_options["queue"] }
let(:on_conflict) { described_class.sidekiq_options["on_conflict"] }
let(:lock) { described_class.sidekiq_options["lock"] }

before do
flushdb
end

context "when schedule queue is large" do
it "locks and replaces quickly" do
(0..100_000).each_slice(1_000) do |nums|
redis do |conn|
conn.pipelined do |pipeline|
nums.each do |num|
created_at = Time.now.to_f
scheduled_at = created_at + rand(3_600..2_592_000)

payload = {
"retry" => true,
"queue" => queue,
"lock" => lock,
"on_conflict" => on_conflict,
"class" => described_class.name,
"args" => [num, { "type" => "extremely unique" }],
"jid" => SecureRandom.hex(12),
"created_at" => created_at,
"lock_timeout" => lock_timeout,
"lock_ttl" => lock_ttl,
"lock_prefix" => lock_prefix,
"lock_args" => [num, { "type" => "extremely unique" }],
"lock_digest" => "#{lock_prefix}:#{SecureRandom.hex}",
}

pipeline.zadd("schedule", scheduled_at, payload.to_json)
end
end
end
end

# queueing it once at the end of the queue should succeed
expect(described_class.perform_in(2_592_000, 100_000, { "type" => "extremely unique" })).not_to be_nil

# queueing it again should be performant
expect do
described_class.perform_in(2_592_000, 100_000, { "type" => "extremely unique" })
end.to perform_under(10).ms
end
end
end
3 changes: 3 additions & 0 deletions spec/sidekiq_unique_jobs/lua/delete_spec.rb
Expand Up @@ -9,6 +9,7 @@
lock_ttl,
lock_type,
lock_limit,
lock_score,
]
end
let(:job_id) { "jobid" }
Expand All @@ -22,6 +23,8 @@
let(:lock_ttl) { nil }
let(:locked_jid) { job_id }
let(:lock_limit) { 1 }
let(:now_f) { SidekiqUniqueJobs.now_f }
let(:lock_score) { now_f.to_s }

context "when queued" do
before do
Expand Down
5 changes: 3 additions & 2 deletions spec/sidekiq_unique_jobs/lua/lock_spec.rb
Expand Up @@ -3,8 +3,8 @@
RSpec.describe "lock.lua" do
subject(:lock) { call_script(:lock, key.to_a, argv_one) }

let(:argv_one) { [job_id_one, lock_ttl, lock_type, lock_limit] }
let(:argv_two) { [job_id_two, lock_ttl, lock_type, lock_limit] }
let(:argv_one) { [job_id_one, lock_ttl, lock_type, lock_limit, lock_score] }
let(:argv_two) { [job_id_two, lock_ttl, lock_type, lock_limit, lock_score] }
let(:job_id_one) { "job_id_one" }
let(:job_id_two) { "job_id_two" }
let(:lock_type) { :until_executed }
Expand All @@ -19,6 +19,7 @@
let(:locked_jid) { job_id_one }
let(:now_f) { SidekiqUniqueJobs.now_f }
let(:lock_limit) { 1 }
let(:lock_score) { now_f.to_s }

shared_context "with a primed key", :with_primed_key do
before do
Expand Down
10 changes: 6 additions & 4 deletions spec/sidekiq_unique_jobs/lua/queue_spec.rb
Expand Up @@ -10,6 +10,7 @@
lock_pttl,
lock_type,
lock_limit,
lock_score,
]
end
let(:digest) { "uniquejobs:digest" }
Expand All @@ -21,6 +22,7 @@
let(:locked_jid) { job_id }
let(:lock_limit) { 1 }
let(:now_f) { SidekiqUniqueJobs.now_f }
let(:lock_score) { now_f.to_s }

before do
flush_redis
Expand Down Expand Up @@ -54,7 +56,7 @@

context "when queued by another job_id" do
before do
call_script(:queue, key.to_a, [job_id_two, lock_pttl, lock_type, lock_limit])
call_script(:queue, key.to_a, [job_id_two, lock_pttl, lock_type, lock_limit, lock_score])
end

context "with lock_limit 1" do
Expand Down Expand Up @@ -94,7 +96,7 @@

context "when queued by same job_id" do
before do
call_script(:queue, key.to_a, [job_id_one, lock_pttl, lock_type, lock_limit])
call_script(:queue, key.to_a, [job_id_one, lock_pttl, lock_type, lock_limit, lock_score])
end

it "stores the right keys in redis" do
Expand All @@ -113,9 +115,9 @@

context "when primed by another job_id" do
before do
call_script(:queue, key.to_a, [job_id_two, lock_pttl, lock_type, lock_limit])
call_script(:queue, key.to_a, [job_id_two, lock_pttl, lock_type, lock_limit, lock_score])
rpoplpush(key.queued, key.primed)
call_script(:lock, key.to_a, [job_id_two, lock_pttl, lock_type, lock_limit])
call_script(:lock, key.to_a, [job_id_two, lock_pttl, lock_type, lock_limit, lock_score])
end

context "with lock_limit 1" do
Expand Down
5 changes: 3 additions & 2 deletions spec/sidekiq_unique_jobs/lua/unlock_spec.rb
Expand Up @@ -3,8 +3,8 @@
RSpec.describe "unlock.lua" do
subject(:unlock) { call_script(:unlock, key.to_a, argv_one) }

let(:argv_one) { [job_id_one, lock_ttl, lock_type, lock_limit] }
let(:argv_two) { [job_id_two, lock_ttl, lock_type, lock_limit] }
let(:argv_one) { [job_id_one, lock_ttl, lock_type, lock_limit, lock_score] }
let(:argv_two) { [job_id_two, lock_ttl, lock_type, lock_limit, lock_score] }
let(:job_id_one) { "job_id_one" }
let(:job_id_two) { "job_id_two" }
let(:lock_type) { :until_executed }
Expand All @@ -17,6 +17,7 @@
let(:lock_ttl) { nil }
let(:locked_jid) { job_id_one }
let(:lock_limit) { 1 }
let(:lock_score) { now_f.to_s }

shared_context "with a lock", :with_a_lock do
before do
Expand Down
8 changes: 4 additions & 4 deletions spec/support/sidekiq_unique_jobs/testing.rb
Expand Up @@ -59,12 +59,12 @@ def debug(*args)
redis { |conn| conn.debug(*args) }
end

def flushall(options = nil)
redis { |conn| conn.flushall(options) }
def flushall(...)
redis { |conn| conn.flushall(...) }
end

def flushdb(options = nil)
redis { |conn| conn.flushdb(options) }
def flushdb(...)
redis { |conn| conn.flushdb(...) }
end

def info(_cmd = nil)
Expand Down

0 comments on commit 1bfba2f

Please sign in to comment.