Skip to content

Commit

Permalink
Change scheduler to use Lua-based script
Browse files Browse the repository at this point in the history
This allows us to pop jobs atomically so that we remove redundant ZREMs
from multiple processes trying to process the same job.
  • Loading branch information
Heinrich Lee Yu committed Nov 5, 2021
1 parent 8cc43f7 commit a22e688
Showing 1 changed file with 33 additions and 14 deletions.
47 changes: 33 additions & 14 deletions lib/sidekiq/scheduled.rb
Expand Up @@ -9,29 +9,48 @@ module Scheduled
SETS = %w[retry schedule]

class Enq
LUA_ZPOPBYSCORE = <<~LUA
local key, now = KEYS[1], ARGV[1]
local jobs = redis.call("zrangebyscore", key, "-inf", now, "limit", 0, 1)
if jobs[1] then
redis.call("zrem", key, jobs[1])
return jobs[1]
end
LUA

def initialize
@lua_zpopbyscore_sha = nil
end

def enqueue_jobs(now = Time.now.to_f.to_s, sorted_sets = SETS)
# A job's "score" in Redis is the time at which it should be processed.
# Just check Redis for the set of jobs with a timestamp before now.
Sidekiq.redis do |conn|
sorted_sets.each do |sorted_set|
# Get next items in the queue with scores (time to execute) <= now.
until (jobs = conn.zrangebyscore(sorted_set, "-inf", now, limit: [0, 100])).empty?
# We need to go through the list one at a time to reduce the risk of something
# going wrong between the time jobs are popped from the scheduled queue and when
# they are pushed onto a work queue and losing the jobs.
jobs.each do |job|
# Pop item off the queue and add it to the work queue. If the job can't be popped from
# the queue, it's because another process already popped it so we can move on to the
# next one.
if conn.zrem(sorted_set, job)
Sidekiq::Client.push(Sidekiq.load_json(job))
Sidekiq.logger.debug { "enqueued #{sorted_set}: #{job}" }
end
end
# Get next item in the queue with score (time to execute) <= now.
# We need to go through the list one at a time to reduce the risk of something
# going wrong between the time jobs are popped from the scheduled queue and when
# they are pushed onto a work queue and losing the jobs.
while (job = zpopbyscore(conn, keys: [sorted_set], argv: [now]))
Sidekiq::Client.push(Sidekiq.load_json(job))
Sidekiq.logger.debug { "enqueued #{sorted_set}: #{job}" }
end
end
end
end

private

def zpopbyscore(conn, keys: nil, argv: nil)
@lua_zpopbyscore_sha = conn.script(:load, LUA_ZPOPBYSCORE) if @lua_zpopbyscore_sha.nil?

conn.evalsha(@lua_zpopbyscore_sha, keys: keys, argv: argv)
rescue Redis::CommandError => e
raise unless e.message.start_with?("NOSCRIPT")

@lua_zpopbyscore_sha = nil
retry
end
end

##
Expand Down

0 comments on commit a22e688

Please sign in to comment.