Skip to content

Commit

Permalink
Change scheduler to use Lua-based script
Browse files Browse the repository at this point in the history
This allows us to pop jobs atomically so that we remove redundant ZREMs
from multiple processes trying to process the same job.
  • Loading branch information
Heinrich Lee Yu committed Nov 2, 2021
1 parent 8cc43f7 commit 5905a16
Showing 1 changed file with 30 additions and 14 deletions.
44 changes: 30 additions & 14 deletions lib/sidekiq/scheduled.rb
Expand Up @@ -9,29 +9,45 @@ module Scheduled
SETS = %w[retry schedule]

class Enq
LUA_ZPOPBYSCORE = <<~EOS
local key, now = KEYS[1], ARGV[1]
local jobs = redis.call("zrangebyscore", key, "-inf", now, "limit", 0, 1)
if jobs[1] then
redis.call("zrem", key, jobs[1])
return jobs[1]
end
EOS

LUA_ZPOPBYSCORE_SHA = Digest::SHA1.hexdigest(LUA_ZPOPBYSCORE)

def enqueue_jobs(now = Time.now.to_f.to_s, sorted_sets = SETS)
# A job's "score" in Redis is the time at which it should be processed.
# Just check Redis for the set of jobs with a timestamp before now.
Sidekiq.redis do |conn|
sorted_sets.each do |sorted_set|
# Get next items in the queue with scores (time to execute) <= now.
until (jobs = conn.zrangebyscore(sorted_set, "-inf", now, limit: [0, 100])).empty?
# We need to go through the list one at a time to reduce the risk of something
# going wrong between the time jobs are popped from the scheduled queue and when
# they are pushed onto a work queue and losing the jobs.
jobs.each do |job|
# Pop item off the queue and add it to the work queue. If the job can't be popped from
# the queue, it's because another process already popped it so we can move on to the
# next one.
if conn.zrem(sorted_set, job)
Sidekiq::Client.push(Sidekiq.load_json(job))
Sidekiq.logger.debug { "enqueued #{sorted_set}: #{job}" }
end
end
# Get next item in the queue with score (time to execute) <= now.
# We need to go through the list one at a time to reduce the risk of something
# going wrong between the time jobs are popped from the scheduled queue and when
# they are pushed onto a work queue and losing the jobs.
while job = redis_eval_lua(conn, LUA_ZPOPBYSCORE, LUA_ZPOPBYSCORE_SHA, keys: [sorted_set], argv: [now])
Sidekiq::Client.push(Sidekiq.load_json(job))
Sidekiq.logger.debug { "enqueued #{sorted_set}: #{job}" }
end
end
end
end

private

def redis_eval_lua(conn, script, sha, keys: nil, argv: nil)
conn.evalsha(sha, keys: keys, argv: argv)
rescue Redis::CommandError => e
if e.message.start_with?('NOSCRIPT')
conn.eval(script, keys: keys, argv: argv)
else
raise
end
end
end

##
Expand Down

0 comments on commit 5905a16

Please sign in to comment.