diff --git a/Changes.md b/Changes.md index 1043db4dd..1d140f06b 100644 --- a/Changes.md +++ b/Changes.md @@ -19,8 +19,9 @@ end ```ruby # config/initializers/sidekiq.rb require "sidekiq/middleware/current_attributes" -Sidekiq::CurrentAttributes.persist(Myapp::Current) # Your AS:CurrentAttributes singleton +Sidekiq::CurrentAttributes.persist(Myapp::Current) # Your AS::CurrentAttributes singleton ``` +- Implement `queue_as` and `wait_until` for ActiveJob compatibility [#5003] - Retry Redis operation if we get an `UNBLOCKED` Redis error. [#4985] - Run existing signal traps, if any, before running Sidekiq's trap. [#4991] diff --git a/lib/sidekiq/worker.rb b/lib/sidekiq/worker.rb index ea2c879ed..1500db9ca 100644 --- a/lib/sidekiq/worker.rb +++ b/lib/sidekiq/worker.rb @@ -9,6 +9,7 @@ module Sidekiq # # class HardWorker # include Sidekiq::Worker + # sidekiq_options queue: 'critical', retry: 5 # # def perform(*args) # # do some work @@ -20,6 +21,26 @@ module Sidekiq # HardWorker.perform_async(1, 2, 3) # # Note that perform_async is a class method, perform is an instance method. + # + # Sidekiq::Worker also includes several APIs to provide compatibility with + # ActiveJob. + # + # class SomeWorker + # include Sidekiq::Worker + # queue_as :critical + # + # def perform(...) + # end + # end + # + # SomeWorker.set(wait_until: 1.hour).perform_async(123) + # + # Note that arguments passed to the job must still obey Sidekiq's + # best practice for simple, JSON-native data types. Sidekiq will not + # implement ActiveJob's more complex argument serialization. For + # this reason, we don't implement `perform_later` as our call semantics + # are very different. + # module Worker ## # The Options module is extracted so we can include it in ActiveJob::Base @@ -153,10 +174,16 @@ class Setter def initialize(klass, opts) @klass = klass @opts = opts + + # ActiveJob compatibility + interval = @opts.delete(:wait_until) + at(interval) if interval end def set(options) + interval = options.delete(:wait_until) @opts.merge!(options) + at(interval) if interval self end @@ -167,16 +194,19 @@ def perform_async(*args) # +interval+ must be a timestamp, numeric or something that acts # numeric (like an activesupport time interval). def perform_in(interval, *args) + at(interval).perform_async(*args) + end + alias_method :perform_at, :perform_in + + private + + def at(interval) int = interval.to_f now = Time.now.to_f ts = (int < 1_000_000_000 ? now + int : int) - - payload = @opts.merge("class" => @klass, "args" => args) - # Optimization to enqueue something now that is scheduled to go out now or in the past - payload["at"] = ts if ts > now - @klass.client_push(payload) + @opts["at"] = ts if ts > now + self end - alias_method :perform_at, :perform_in end module ClassMethods @@ -192,6 +222,10 @@ def delay_until(*args) raise ArgumentError, "Do not call .delay_until on a Sidekiq::Worker class, call .perform_at" end + def queue_as(q) + sidekiq_options("queue" => q.to_s) + end + def set(options) Setter.new(self, options) end diff --git a/test/test_worker.rb b/test/test_worker.rb index 0a3542a2f..76f5ca167 100644 --- a/test/test_worker.rb +++ b/test/test_worker.rb @@ -5,13 +5,30 @@ class SetWorker include Sidekiq::Worker - sidekiq_options :queue => :foo, 'retry' => 12 + queue_as :foo + sidekiq_options 'retry' => 12 end def setup Sidekiq.redis {|c| c.flushdb } end + it "provides basic ActiveJob compatibilility" do + q = Sidekiq::ScheduledSet.new + assert_equal 0, q.size + jid = SetWorker.set(wait_until: 1.hour).perform_async(123) + assert jid + assert_equal 1, q.size + + q = Sidekiq::Queue.new("foo") + assert_equal 0, q.size + SetWorker.perform_async + assert_equal 1, q.size + + SetWorker.set(queue: 'xyz').perform_async + assert_equal 1, Sidekiq::Queue.new("xyz").size + end + it 'can be memoized' do q = Sidekiq::Queue.new('bar') assert_equal 0, q.size