Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change scheduler to use Lua-based script #5044

Merged
merged 1 commit into from Nov 5, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
47 changes: 33 additions & 14 deletions lib/sidekiq/scheduled.rb
Expand Up @@ -9,29 +9,48 @@ module Scheduled
SETS = %w[retry schedule]

class Enq
LUA_ZPOPBYSCORE = <<~LUA
local key, now = KEYS[1], ARGV[1]
local jobs = redis.call("zrangebyscore", key, "-inf", now, "limit", 0, 1)
if jobs[1] then
redis.call("zrem", key, jobs[1])
return jobs[1]
end
LUA

def initialize
@lua_zpopbyscore_sha = nil
end

def enqueue_jobs(now = Time.now.to_f.to_s, sorted_sets = SETS)
# A job's "score" in Redis is the time at which it should be processed.
# Just check Redis for the set of jobs with a timestamp before now.
Sidekiq.redis do |conn|
sorted_sets.each do |sorted_set|
# Get next items in the queue with scores (time to execute) <= now.
until (jobs = conn.zrangebyscore(sorted_set, "-inf", now, limit: [0, 100])).empty?
# We need to go through the list one at a time to reduce the risk of something
# going wrong between the time jobs are popped from the scheduled queue and when
# they are pushed onto a work queue and losing the jobs.
jobs.each do |job|
# Pop item off the queue and add it to the work queue. If the job can't be popped from
# the queue, it's because another process already popped it so we can move on to the
# next one.
if conn.zrem(sorted_set, job)
Sidekiq::Client.push(Sidekiq.load_json(job))
Sidekiq.logger.debug { "enqueued #{sorted_set}: #{job}" }
end
end
# Get next item in the queue with score (time to execute) <= now.
# We need to go through the list one at a time to reduce the risk of something
# going wrong between the time jobs are popped from the scheduled queue and when
# they are pushed onto a work queue and losing the jobs.
while (job = zpopbyscore(conn, keys: [sorted_set], argv: [now]))
Sidekiq::Client.push(Sidekiq.load_json(job))
Sidekiq.logger.debug { "enqueued #{sorted_set}: #{job}" }
end
end
end
end

private

def zpopbyscore(conn, keys: nil, argv: nil)
@lua_zpopbyscore_sha = conn.script(:load, LUA_ZPOPBYSCORE) if @lua_zpopbyscore_sha.nil?

conn.evalsha(@lua_zpopbyscore_sha, keys: keys, argv: argv)
rescue Redis::CommandError => e
raise unless e.message.start_with?("NOSCRIPT")

@lua_zpopbyscore_sha = nil
retry
end
end

##
Expand Down