From b3b5b274e63ab27d9be7ce867d5fe23f8462af4d Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Tue, 28 Sep 2021 10:57:11 -0700 Subject: [PATCH 1/4] Implement support for `set(wait_until: )` --- lib/sidekiq/worker.rb | 21 +++++++++++++++------ test/test_worker.rb | 8 ++++++++ 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/lib/sidekiq/worker.rb b/lib/sidekiq/worker.rb index ea2c879ed..139ace7bf 100644 --- a/lib/sidekiq/worker.rb +++ b/lib/sidekiq/worker.rb @@ -153,10 +153,16 @@ class Setter def initialize(klass, opts) @klass = klass @opts = opts + + # ActiveJob compatibility + interval = @opts.delete(:wait_until) + at(interval) if interval end def set(options) + interval = options.delete(:wait_until) @opts.merge!(options) + at(interval) if interval self end @@ -167,16 +173,19 @@ def perform_async(*args) # +interval+ must be a timestamp, numeric or something that acts # numeric (like an activesupport time interval). def perform_in(interval, *args) + at(interval).perform_async(*args) + end + alias_method :perform_at, :perform_in + + private + + def at(interval) int = interval.to_f now = Time.now.to_f ts = (int < 1_000_000_000 ? now + int : int) - - payload = @opts.merge("class" => @klass, "args" => args) - # Optimization to enqueue something now that is scheduled to go out now or in the past - payload["at"] = ts if ts > now - @klass.client_push(payload) + @opts["at"] = ts if ts > now + self end - alias_method :perform_at, :perform_in end module ClassMethods diff --git a/test/test_worker.rb b/test/test_worker.rb index 0a3542a2f..d28072416 100644 --- a/test/test_worker.rb +++ b/test/test_worker.rb @@ -12,6 +12,14 @@ def setup Sidekiq.redis {|c| c.flushdb } end + it "provides basic ActiveJob compatibilility" do + q = Sidekiq::ScheduledSet.new + assert_equal 0, q.size + jid = SetWorker.set(wait_until: 1.hour).perform_later(123) + assert jid + assert_equal 1, q.size + end + it 'can be memoized' do q = Sidekiq::Queue.new('bar') assert_equal 0, q.size From 7f78cdc2930c04a253470a14e0610434d022e92a Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Tue, 28 Sep 2021 11:08:20 -0700 Subject: [PATCH 2/4] Implement `queue_as`, docs --- lib/sidekiq/worker.rb | 23 +++++++++++++++++++++++ test/test_worker.rb | 11 ++++++++++- 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/lib/sidekiq/worker.rb b/lib/sidekiq/worker.rb index 139ace7bf..4ee1d1ba5 100644 --- a/lib/sidekiq/worker.rb +++ b/lib/sidekiq/worker.rb @@ -9,6 +9,7 @@ module Sidekiq # # class HardWorker # include Sidekiq::Worker + # sidekiq_options queue: 'critical', retry: 5 # # def perform(*args) # # do some work @@ -20,6 +21,24 @@ module Sidekiq # HardWorker.perform_async(1, 2, 3) # # Note that perform_async is a class method, perform is an instance method. + # + # Sidekiq::Worker also includes several APIs to provide compatibility with + # ActiveJob. + # + # class SomeWorker + # include Sidekiq::Worker + # queue_as :critical + # + # def perform(...) + # end + # end + # + # SomeWorker.set(wait_until: 1.hour).perform_later(123) + # + # Note that arguments passed to the job must still obey Sidekiq's + # best practice for simple, JSON-native data types. Sidekiq will not + # implement ActiveJob's more complex argument serialization. + # module Worker ## # The Options module is extracted so we can include it in ActiveJob::Base @@ -201,6 +220,10 @@ def delay_until(*args) raise ArgumentError, "Do not call .delay_until on a Sidekiq::Worker class, call .perform_at" end + def queue_as(q) + sidekiq_options("queue" => q.to_s) + end + def set(options) Setter.new(self, options) end diff --git a/test/test_worker.rb b/test/test_worker.rb index d28072416..8b3697864 100644 --- a/test/test_worker.rb +++ b/test/test_worker.rb @@ -5,7 +5,8 @@ class SetWorker include Sidekiq::Worker - sidekiq_options :queue => :foo, 'retry' => 12 + queue_as :foo + sidekiq_options 'retry' => 12 end def setup @@ -18,6 +19,14 @@ def setup jid = SetWorker.set(wait_until: 1.hour).perform_later(123) assert jid assert_equal 1, q.size + + q = Sidekiq::Queue.new("foo") + assert_equal 0, q.size + SetWorker.perform_later + assert_equal 1, q.size + + SetWorker.set(queue: 'xyz').perform_later + assert_equal 1, Sidekiq::Queue.new("xyz").size end it 'can be memoized' do From f0f75330e503d7aeb8d02fc9034d860a58134ea2 Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Tue, 28 Sep 2021 11:12:34 -0700 Subject: [PATCH 3/4] Rollback implementing `perform_later`, it's a footgun --- lib/sidekiq/worker.rb | 6 ++++-- test/test_worker.rb | 6 +++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/lib/sidekiq/worker.rb b/lib/sidekiq/worker.rb index 4ee1d1ba5..1500db9ca 100644 --- a/lib/sidekiq/worker.rb +++ b/lib/sidekiq/worker.rb @@ -33,11 +33,13 @@ module Sidekiq # end # end # - # SomeWorker.set(wait_until: 1.hour).perform_later(123) + # SomeWorker.set(wait_until: 1.hour).perform_async(123) # # Note that arguments passed to the job must still obey Sidekiq's # best practice for simple, JSON-native data types. Sidekiq will not - # implement ActiveJob's more complex argument serialization. + # implement ActiveJob's more complex argument serialization. For + # this reason, we don't implement `perform_later` as our call semantics + # are very different. # module Worker ## diff --git a/test/test_worker.rb b/test/test_worker.rb index 8b3697864..76f5ca167 100644 --- a/test/test_worker.rb +++ b/test/test_worker.rb @@ -16,16 +16,16 @@ def setup it "provides basic ActiveJob compatibilility" do q = Sidekiq::ScheduledSet.new assert_equal 0, q.size - jid = SetWorker.set(wait_until: 1.hour).perform_later(123) + jid = SetWorker.set(wait_until: 1.hour).perform_async(123) assert jid assert_equal 1, q.size q = Sidekiq::Queue.new("foo") assert_equal 0, q.size - SetWorker.perform_later + SetWorker.perform_async assert_equal 1, q.size - SetWorker.set(queue: 'xyz').perform_later + SetWorker.set(queue: 'xyz').perform_async assert_equal 1, Sidekiq::Queue.new("xyz").size end From 0696cf97509fd008d52d66d018d297c1d2a5825d Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Tue, 28 Sep 2021 11:20:04 -0700 Subject: [PATCH 4/4] changes --- Changes.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Changes.md b/Changes.md index 1043db4dd..1d140f06b 100644 --- a/Changes.md +++ b/Changes.md @@ -19,8 +19,9 @@ end ```ruby # config/initializers/sidekiq.rb require "sidekiq/middleware/current_attributes" -Sidekiq::CurrentAttributes.persist(Myapp::Current) # Your AS:CurrentAttributes singleton +Sidekiq::CurrentAttributes.persist(Myapp::Current) # Your AS::CurrentAttributes singleton ``` +- Implement `queue_as` and `wait_until` for ActiveJob compatibility [#5003] - Retry Redis operation if we get an `UNBLOCKED` Redis error. [#4985] - Run existing signal traps, if any, before running Sidekiq's trap. [#4991]