From 1bfba2f4d1e1aa3e1edc1d99fcb870dfe755267b Mon Sep 17 00:00:00 2001 From: Zeke Gabrielse Date: Thu, 22 Feb 2024 01:23:37 -0600 Subject: [PATCH] Add digest scores for faster deletes in sorted sets (#835) * add performance test for replace conflict strategy * fix performance issue when deleting from large sets --- lib/sidekiq_unique_jobs/locksmith.rb | 9 ++- lib/sidekiq_unique_jobs/lua/delete.lua | 11 ++-- lib/sidekiq_unique_jobs/lua/lock.lua | 23 +++++--- lib/sidekiq_unique_jobs/lua/queue.lua | 17 +++--- .../lua/shared/_delete_from_queue.lua | 20 +++---- .../lua/shared/_delete_from_sorted_set.lua | 30 ++++++---- lib/sidekiq_unique_jobs/lua/unlock.lua | 19 ++++--- .../unique_job_on_conflict_replace_spec.rb | 55 +++++++++++++++++++ spec/sidekiq_unique_jobs/lua/delete_spec.rb | 3 + spec/sidekiq_unique_jobs/lua/lock_spec.rb | 5 +- spec/sidekiq_unique_jobs/lua/queue_spec.rb | 10 ++-- spec/sidekiq_unique_jobs/lua/unlock_spec.rb | 5 +- spec/support/sidekiq_unique_jobs/testing.rb | 8 +-- .../workers/unique_job_on_conflict_replace.rb | 14 +++++ .../unique_job_on_conflict_replace_spec.rb | 18 ++++++ 15 files changed, 184 insertions(+), 63 deletions(-) create mode 100644 spec/performance/unique_job_on_conflict_replace_spec.rb create mode 100644 spec/support/workers/unique_job_on_conflict_replace.rb create mode 100644 spec/workers/unique_job_on_conflict_replace_spec.rb diff --git a/lib/sidekiq_unique_jobs/locksmith.rb b/lib/sidekiq_unique_jobs/locksmith.rb index 3f49619c3..f3a177858 100644 --- a/lib/sidekiq_unique_jobs/locksmith.rb +++ b/lib/sidekiq_unique_jobs/locksmith.rb @@ -82,7 +82,7 @@ def delete # Deletes the lock regardless of if it has a pttl set # def delete! - call_script(:delete, key.to_a, [job_id, config.pttl, config.type, config.limit]).to_i.positive? + call_script(:delete, key.to_a, argv).to_i.positive? end # @@ -362,7 +362,11 @@ def taken?(conn) end def argv - [job_id, config.pttl, config.type, config.limit] + [job_id, config.pttl, config.type, config.limit, lock_score] + end + + def lock_score + item[AT].to_s end def lock_info @@ -375,6 +379,7 @@ def lock_info TYPE => config.type, LOCK_ARGS => item[LOCK_ARGS], TIME => now_f, + AT => item[AT], ) end diff --git a/lib/sidekiq_unique_jobs/lua/delete.lua b/lib/sidekiq_unique_jobs/lua/delete.lua index b4f6a6e64..312986f22 100644 --- a/lib/sidekiq_unique_jobs/lua/delete.lua +++ b/lib/sidekiq_unique_jobs/lua/delete.lua @@ -13,14 +13,15 @@ local job_id = ARGV[1] local pttl = tonumber(ARGV[2]) local lock_type = ARGV[3] local limit = tonumber(ARGV[4]) +local lock_score = ARGV[5] -------- END lock arguments ----------- -------- BEGIN injected arguments -------- -local current_time = tonumber(ARGV[5]) -local debug_lua = tostring(ARGV[6]) == "1" -local max_history = tonumber(ARGV[7]) -local script_name = tostring(ARGV[8]) .. ".lua" -local redisversion = tostring(ARGV[9]) +local current_time = tonumber(ARGV[6]) +local debug_lua = tostring(ARGV[7]) == "1" +local max_history = tonumber(ARGV[8]) +local script_name = tostring(ARGV[9]) .. ".lua" +local redisversion = tostring(ARGV[10]) --------- END injected arguments --------- -------- BEGIN local functions -------- diff --git a/lib/sidekiq_unique_jobs/lua/lock.lua b/lib/sidekiq_unique_jobs/lua/lock.lua index 4d813e9a4..f48291555 100644 --- a/lib/sidekiq_unique_jobs/lua/lock.lua +++ b/lib/sidekiq_unique_jobs/lua/lock.lua @@ -15,15 +15,16 @@ local job_id = ARGV[1] local pttl = tonumber(ARGV[2]) local lock_type = ARGV[3] local limit = tonumber(ARGV[4]) +local lock_score = ARGV[5] -------- END lock arguments ----------- -------- BEGIN injected arguments -------- -local current_time = tonumber(ARGV[5]) -local debug_lua = tostring(ARGV[6]) == "1" -local max_history = tonumber(ARGV[7]) -local script_name = tostring(ARGV[8]) .. ".lua" -local redisversion = ARGV[9] +local current_time = tonumber(ARGV[6]) +local debug_lua = tostring(ARGV[7]) == "1" +local max_history = tonumber(ARGV[8]) +local script_name = tostring(ARGV[9]) .. ".lua" +local redisversion = ARGV[10] --------- END injected arguments --------- @@ -62,8 +63,16 @@ if lock_type == "until_expired" and pttl and pttl > 0 then log_debug("ZADD", expiring_digests, current_time + pttl, digest) redis.call("ZADD", expiring_digests, current_time + pttl, digest) else - log_debug("ZADD", digests, current_time, digest) - redis.call("ZADD", digests, current_time, digest) + local score + + if #lock_score == 0 then + score = current_time + else + score = lock_score + end + + log_debug("ZADD", digests, score, digest) + redis.call("ZADD", digests, score, digest) end log_debug("HSET", locked, job_id, current_time) diff --git a/lib/sidekiq_unique_jobs/lua/queue.lua b/lib/sidekiq_unique_jobs/lua/queue.lua index 6c4787008..bec7a8a88 100644 --- a/lib/sidekiq_unique_jobs/lua/queue.lua +++ b/lib/sidekiq_unique_jobs/lua/queue.lua @@ -10,18 +10,19 @@ local digests = KEYS[7] -------- BEGIN lock arguments --------- -local job_id = ARGV[1] -- The job_id that was previously primed -local pttl = tonumber(ARGV[2]) -local lock_type = ARGV[3] -local limit = tonumber(ARGV[4]) +local job_id = ARGV[1] -- The job_id that was previously primed +local pttl = tonumber(ARGV[2]) +local lock_type = ARGV[3] +local limit = tonumber(ARGV[4]) +local lock_score = ARGV[5] -------- END lock arguments ----------- -------- BEGIN injected arguments -------- -local current_time = tonumber(ARGV[5]) -local debug_lua = tostring(ARGV[6]) == "1" -local max_history = tonumber(ARGV[7]) -local script_name = tostring(ARGV[8]) .. ".lua" +local current_time = tonumber(ARGV[6]) +local debug_lua = tostring(ARGV[7]) == "1" +local max_history = tonumber(ARGV[8]) +local script_name = tostring(ARGV[9]) .. ".lua" --------- END injected arguments --------- diff --git a/lib/sidekiq_unique_jobs/lua/shared/_delete_from_queue.lua b/lib/sidekiq_unique_jobs/lua/shared/_delete_from_queue.lua index 7ca9b9c29..f301c7e24 100644 --- a/lib/sidekiq_unique_jobs/lua/shared/_delete_from_queue.lua +++ b/lib/sidekiq_unique_jobs/lua/shared/_delete_from_queue.lua @@ -1,22 +1,22 @@ local function delete_from_queue(queue, digest) - local per = 50 - local total = redis.call("LLEN", queue) - local index = 0 - local result = nil + local total = redis.call("LLEN", queue) + local per = 50 + + for index = 0, total, per do + local items = redis.call("LRANGE", queue, index, index + per - 1) - while (index < total) do - local items = redis.call("LRANGE", queue, index, index + per -1) if #items == 0 then break end + for _, item in pairs(items) do if string.find(item, digest) then redis.call("LREM", queue, 1, item) - result = item - break + + return item end end - index = index + per end - return result + + return nil end diff --git a/lib/sidekiq_unique_jobs/lua/shared/_delete_from_sorted_set.lua b/lib/sidekiq_unique_jobs/lua/shared/_delete_from_sorted_set.lua index 78f53cab4..fcf7a5ed4 100644 --- a/lib/sidekiq_unique_jobs/lua/shared/_delete_from_sorted_set.lua +++ b/lib/sidekiq_unique_jobs/lua/shared/_delete_from_sorted_set.lua @@ -1,19 +1,29 @@ local function delete_from_sorted_set(name, digest) - local per = 50 - local total = redis.call("zcard", name) - local index = 0 - local result + local score = redis.call("ZSCORE", "uniquejobs:digests", digest) + local total = redis.call("ZCARD", name) + local per = 50 + + for offset = 0, total, per do + local items + + if score then + items = redis.call("ZRANGE", name, score, "+inf", "BYSCORE", "LIMIT", offset, per) + else + items = redis.call("ZRANGE", name, offset, offset + per -1) + end + + if #items == 0 then + break + end - while (index < total) do - local items = redis.call("ZRANGE", name, index, index + per -1) for _, item in pairs(items) do if string.find(item, digest) then redis.call("ZREM", name, item) - result = item - break + + return item end end - index = index + per end - return result + + return nil end diff --git a/lib/sidekiq_unique_jobs/lua/unlock.lua b/lib/sidekiq_unique_jobs/lua/unlock.lua index 70d44bf1a..2252057c6 100644 --- a/lib/sidekiq_unique_jobs/lua/unlock.lua +++ b/lib/sidekiq_unique_jobs/lua/unlock.lua @@ -10,19 +10,20 @@ local digests = KEYS[7] -------- BEGIN lock arguments --------- -local job_id = ARGV[1] -local pttl = tonumber(ARGV[2]) -local lock_type = ARGV[3] -local limit = tonumber(ARGV[4]) +local job_id = ARGV[1] +local pttl = tonumber(ARGV[2]) +local lock_type = ARGV[3] +local limit = tonumber(ARGV[4]) +local lock_score = ARGV[5] -------- END lock arguments ----------- -------- BEGIN injected arguments -------- -local current_time = tonumber(ARGV[5]) -local debug_lua = tostring(ARGV[6]) == "1" -local max_history = tonumber(ARGV[7]) -local script_name = tostring(ARGV[8]) .. ".lua" -local redisversion = ARGV[9] +local current_time = tonumber(ARGV[6]) +local debug_lua = tostring(ARGV[7]) == "1" +local max_history = tonumber(ARGV[8]) +local script_name = tostring(ARGV[9]) .. ".lua" +local redisversion = ARGV[10] --------- END injected arguments --------- diff --git a/spec/performance/unique_job_on_conflict_replace_spec.rb b/spec/performance/unique_job_on_conflict_replace_spec.rb new file mode 100644 index 000000000..e7f570819 --- /dev/null +++ b/spec/performance/unique_job_on_conflict_replace_spec.rb @@ -0,0 +1,55 @@ +# frozen_string_literal: true + +RSpec.describe UniqueJobOnConflictReplace, :perf do + let(:lock_prefix) { described_class.sidekiq_options.fetch("lock_prefix") { SidekiqUniqueJobs.config.lock_prefix } } + let(:lock_timeout) { described_class.sidekiq_options.fetch("lock_timeout") { SidekiqUniqueJobs.config.lock_timeout } } + let(:lock_ttl) { described_class.sidekiq_options.fetch("lock_ttl") { SidekiqUniqueJobs.config.lock_ttl } } + let(:queue) { described_class.sidekiq_options["queue"] } + let(:on_conflict) { described_class.sidekiq_options["on_conflict"] } + let(:lock) { described_class.sidekiq_options["lock"] } + + before do + flushdb + end + + context "when schedule queue is large" do + it "locks and replaces quickly" do + (0..100_000).each_slice(1_000) do |nums| + redis do |conn| + conn.pipelined do |pipeline| + nums.each do |num| + created_at = Time.now.to_f + scheduled_at = created_at + rand(3_600..2_592_000) + + payload = { + "retry" => true, + "queue" => queue, + "lock" => lock, + "on_conflict" => on_conflict, + "class" => described_class.name, + "args" => [num, { "type" => "extremely unique" }], + "jid" => SecureRandom.hex(12), + "created_at" => created_at, + "lock_timeout" => lock_timeout, + "lock_ttl" => lock_ttl, + "lock_prefix" => lock_prefix, + "lock_args" => [num, { "type" => "extremely unique" }], + "lock_digest" => "#{lock_prefix}:#{SecureRandom.hex}", + } + + pipeline.zadd("schedule", scheduled_at, payload.to_json) + end + end + end + end + + # queueing it once at the end of the queue should succeed + expect(described_class.perform_in(2_592_000, 100_000, { "type" => "extremely unique" })).not_to be_nil + + # queueing it again should be performant + expect do + described_class.perform_in(2_592_000, 100_000, { "type" => "extremely unique" }) + end.to perform_under(10).ms + end + end +end diff --git a/spec/sidekiq_unique_jobs/lua/delete_spec.rb b/spec/sidekiq_unique_jobs/lua/delete_spec.rb index cb35f2610..f9043afa2 100644 --- a/spec/sidekiq_unique_jobs/lua/delete_spec.rb +++ b/spec/sidekiq_unique_jobs/lua/delete_spec.rb @@ -9,6 +9,7 @@ lock_ttl, lock_type, lock_limit, + lock_score, ] end let(:job_id) { "jobid" } @@ -22,6 +23,8 @@ let(:lock_ttl) { nil } let(:locked_jid) { job_id } let(:lock_limit) { 1 } + let(:now_f) { SidekiqUniqueJobs.now_f } + let(:lock_score) { now_f.to_s } context "when queued" do before do diff --git a/spec/sidekiq_unique_jobs/lua/lock_spec.rb b/spec/sidekiq_unique_jobs/lua/lock_spec.rb index 672bbd89b..821f553cb 100644 --- a/spec/sidekiq_unique_jobs/lua/lock_spec.rb +++ b/spec/sidekiq_unique_jobs/lua/lock_spec.rb @@ -3,8 +3,8 @@ RSpec.describe "lock.lua" do subject(:lock) { call_script(:lock, key.to_a, argv_one) } - let(:argv_one) { [job_id_one, lock_ttl, lock_type, lock_limit] } - let(:argv_two) { [job_id_two, lock_ttl, lock_type, lock_limit] } + let(:argv_one) { [job_id_one, lock_ttl, lock_type, lock_limit, lock_score] } + let(:argv_two) { [job_id_two, lock_ttl, lock_type, lock_limit, lock_score] } let(:job_id_one) { "job_id_one" } let(:job_id_two) { "job_id_two" } let(:lock_type) { :until_executed } @@ -19,6 +19,7 @@ let(:locked_jid) { job_id_one } let(:now_f) { SidekiqUniqueJobs.now_f } let(:lock_limit) { 1 } + let(:lock_score) { now_f.to_s } shared_context "with a primed key", :with_primed_key do before do diff --git a/spec/sidekiq_unique_jobs/lua/queue_spec.rb b/spec/sidekiq_unique_jobs/lua/queue_spec.rb index 9e31debc2..8c1bb0cc4 100644 --- a/spec/sidekiq_unique_jobs/lua/queue_spec.rb +++ b/spec/sidekiq_unique_jobs/lua/queue_spec.rb @@ -10,6 +10,7 @@ lock_pttl, lock_type, lock_limit, + lock_score, ] end let(:digest) { "uniquejobs:digest" } @@ -21,6 +22,7 @@ let(:locked_jid) { job_id } let(:lock_limit) { 1 } let(:now_f) { SidekiqUniqueJobs.now_f } + let(:lock_score) { now_f.to_s } before do flush_redis @@ -54,7 +56,7 @@ context "when queued by another job_id" do before do - call_script(:queue, key.to_a, [job_id_two, lock_pttl, lock_type, lock_limit]) + call_script(:queue, key.to_a, [job_id_two, lock_pttl, lock_type, lock_limit, lock_score]) end context "with lock_limit 1" do @@ -94,7 +96,7 @@ context "when queued by same job_id" do before do - call_script(:queue, key.to_a, [job_id_one, lock_pttl, lock_type, lock_limit]) + call_script(:queue, key.to_a, [job_id_one, lock_pttl, lock_type, lock_limit, lock_score]) end it "stores the right keys in redis" do @@ -113,9 +115,9 @@ context "when primed by another job_id" do before do - call_script(:queue, key.to_a, [job_id_two, lock_pttl, lock_type, lock_limit]) + call_script(:queue, key.to_a, [job_id_two, lock_pttl, lock_type, lock_limit, lock_score]) rpoplpush(key.queued, key.primed) - call_script(:lock, key.to_a, [job_id_two, lock_pttl, lock_type, lock_limit]) + call_script(:lock, key.to_a, [job_id_two, lock_pttl, lock_type, lock_limit, lock_score]) end context "with lock_limit 1" do diff --git a/spec/sidekiq_unique_jobs/lua/unlock_spec.rb b/spec/sidekiq_unique_jobs/lua/unlock_spec.rb index 9e82e98eb..1b5696b33 100644 --- a/spec/sidekiq_unique_jobs/lua/unlock_spec.rb +++ b/spec/sidekiq_unique_jobs/lua/unlock_spec.rb @@ -3,8 +3,8 @@ RSpec.describe "unlock.lua" do subject(:unlock) { call_script(:unlock, key.to_a, argv_one) } - let(:argv_one) { [job_id_one, lock_ttl, lock_type, lock_limit] } - let(:argv_two) { [job_id_two, lock_ttl, lock_type, lock_limit] } + let(:argv_one) { [job_id_one, lock_ttl, lock_type, lock_limit, lock_score] } + let(:argv_two) { [job_id_two, lock_ttl, lock_type, lock_limit, lock_score] } let(:job_id_one) { "job_id_one" } let(:job_id_two) { "job_id_two" } let(:lock_type) { :until_executed } @@ -17,6 +17,7 @@ let(:lock_ttl) { nil } let(:locked_jid) { job_id_one } let(:lock_limit) { 1 } + let(:lock_score) { now_f.to_s } shared_context "with a lock", :with_a_lock do before do diff --git a/spec/support/sidekiq_unique_jobs/testing.rb b/spec/support/sidekiq_unique_jobs/testing.rb index 765521bec..8169aa9b3 100644 --- a/spec/support/sidekiq_unique_jobs/testing.rb +++ b/spec/support/sidekiq_unique_jobs/testing.rb @@ -59,12 +59,12 @@ def debug(*args) redis { |conn| conn.debug(*args) } end - def flushall(options = nil) - redis { |conn| conn.flushall(options) } + def flushall(...) + redis { |conn| conn.flushall(...) } end - def flushdb(options = nil) - redis { |conn| conn.flushdb(options) } + def flushdb(...) + redis { |conn| conn.flushdb(...) } end def info(_cmd = nil) diff --git a/spec/support/workers/unique_job_on_conflict_replace.rb b/spec/support/workers/unique_job_on_conflict_replace.rb new file mode 100644 index 000000000..64c6a5631 --- /dev/null +++ b/spec/support/workers/unique_job_on_conflict_replace.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +# :nocov: + +class UniqueJobOnConflictReplace + include Sidekiq::Worker + sidekiq_options lock: :until_executing, + queue: :customqueue, + on_conflict: :replace + + def perform(one, two) + [one, two] + end +end diff --git a/spec/workers/unique_job_on_conflict_replace_spec.rb b/spec/workers/unique_job_on_conflict_replace_spec.rb new file mode 100644 index 000000000..6571ef440 --- /dev/null +++ b/spec/workers/unique_job_on_conflict_replace_spec.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +RSpec.describe UniqueJobOnConflictReplace do + it_behaves_like "sidekiq with options" do + let(:options) do + { + "lock" => :until_executing, + "on_conflict" => :replace, + "queue" => :customqueue, + "retry" => true, + } + end + end + + it_behaves_like "a performing worker" do + let(:args) { ["hundred", { "type" => "extremely unique", "id" => 44 }] } + end +end