diff --git a/lib/sidekiq/scheduled.rb b/lib/sidekiq/scheduled.rb index 9d4a2b5343..290e0e2449 100644 --- a/lib/sidekiq/scheduled.rb +++ b/lib/sidekiq/scheduled.rb @@ -9,29 +9,45 @@ module Scheduled SETS = %w[retry schedule] class Enq + LUA_ZPOPBYSCORE = <<~EOS + local key, now = KEYS[1], ARGV[1] + local jobs = redis.call("zrangebyscore", key, "-inf", now, "limit", 0, 1) + if jobs[1] then + redis.call("zrem", key, jobs[1]) + return jobs[1] + end + EOS + + LUA_ZPOPBYSCORE_SHA = Digest::SHA1.hexdigest(LUA_ZPOPBYSCORE) + def enqueue_jobs(now = Time.now.to_f.to_s, sorted_sets = SETS) # A job's "score" in Redis is the time at which it should be processed. # Just check Redis for the set of jobs with a timestamp before now. Sidekiq.redis do |conn| sorted_sets.each do |sorted_set| - # Get next items in the queue with scores (time to execute) <= now. - until (jobs = conn.zrangebyscore(sorted_set, "-inf", now, limit: [0, 100])).empty? - # We need to go through the list one at a time to reduce the risk of something - # going wrong between the time jobs are popped from the scheduled queue and when - # they are pushed onto a work queue and losing the jobs. - jobs.each do |job| - # Pop item off the queue and add it to the work queue. If the job can't be popped from - # the queue, it's because another process already popped it so we can move on to the - # next one. - if conn.zrem(sorted_set, job) - Sidekiq::Client.push(Sidekiq.load_json(job)) - Sidekiq.logger.debug { "enqueued #{sorted_set}: #{job}" } - end - end + # Get next item in the queue with score (time to execute) <= now. + # We need to go through the list one at a time to reduce the risk of something + # going wrong between the time jobs are popped from the scheduled queue and when + # they are pushed onto a work queue and losing the jobs. + while job = redis_eval_lua(conn, LUA_ZPOPBYSCORE, LUA_ZPOPBYSCORE_SHA, keys: [sorted_set], argv: [now]) + Sidekiq::Client.push(Sidekiq.load_json(job)) + Sidekiq.logger.debug { "enqueued #{sorted_set}: #{job}" } end end end end + + private + + def redis_eval_lua(conn, script, sha, keys: nil, argv: nil) + conn.evalsha(sha, keys: keys, argv: argv) + rescue Redis::CommandError => e + if e.message.start_with?('NOSCRIPT') + conn.eval(script, keys: keys, argv: argv) + else + raise + end + end end ##