From a22e688782b93c1b12be4b3c55bacf1a28ee6558 Mon Sep 17 00:00:00 2001 From: Heinrich Lee Yu Date: Tue, 2 Nov 2021 19:31:04 +0800 Subject: [PATCH] Change scheduler to use Lua-based script This allows us to pop jobs atomically so that we remove redundant ZREMs from multiple processes trying to process the same job. --- lib/sidekiq/scheduled.rb | 47 ++++++++++++++++++++++++++++------------ 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/lib/sidekiq/scheduled.rb b/lib/sidekiq/scheduled.rb index 9d4a2b534..ed00bba52 100644 --- a/lib/sidekiq/scheduled.rb +++ b/lib/sidekiq/scheduled.rb @@ -9,29 +9,48 @@ module Scheduled SETS = %w[retry schedule] class Enq + LUA_ZPOPBYSCORE = <<~LUA + local key, now = KEYS[1], ARGV[1] + local jobs = redis.call("zrangebyscore", key, "-inf", now, "limit", 0, 1) + if jobs[1] then + redis.call("zrem", key, jobs[1]) + return jobs[1] + end + LUA + + def initialize + @lua_zpopbyscore_sha = nil + end + def enqueue_jobs(now = Time.now.to_f.to_s, sorted_sets = SETS) # A job's "score" in Redis is the time at which it should be processed. # Just check Redis for the set of jobs with a timestamp before now. Sidekiq.redis do |conn| sorted_sets.each do |sorted_set| - # Get next items in the queue with scores (time to execute) <= now. - until (jobs = conn.zrangebyscore(sorted_set, "-inf", now, limit: [0, 100])).empty? - # We need to go through the list one at a time to reduce the risk of something - # going wrong between the time jobs are popped from the scheduled queue and when - # they are pushed onto a work queue and losing the jobs. - jobs.each do |job| - # Pop item off the queue and add it to the work queue. If the job can't be popped from - # the queue, it's because another process already popped it so we can move on to the - # next one. - if conn.zrem(sorted_set, job) - Sidekiq::Client.push(Sidekiq.load_json(job)) - Sidekiq.logger.debug { "enqueued #{sorted_set}: #{job}" } - end - end + # Get next item in the queue with score (time to execute) <= now. + # We need to go through the list one at a time to reduce the risk of something + # going wrong between the time jobs are popped from the scheduled queue and when + # they are pushed onto a work queue and losing the jobs. + while (job = zpopbyscore(conn, keys: [sorted_set], argv: [now])) + Sidekiq::Client.push(Sidekiq.load_json(job)) + Sidekiq.logger.debug { "enqueued #{sorted_set}: #{job}" } end end end end + + private + + def zpopbyscore(conn, keys: nil, argv: nil) + @lua_zpopbyscore_sha = conn.script(:load, LUA_ZPOPBYSCORE) if @lua_zpopbyscore_sha.nil? + + conn.evalsha(@lua_zpopbyscore_sha, keys: keys, argv: argv) + rescue Redis::CommandError => e + raise unless e.message.start_with?("NOSCRIPT") + + @lua_zpopbyscore_sha = nil + retry + end end ##