Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ActiveJob’s wait_until #5003

Merged
merged 4 commits into from Sep 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion Changes.md
Expand Up @@ -19,8 +19,9 @@ end
```ruby
# config/initializers/sidekiq.rb
require "sidekiq/middleware/current_attributes"
Sidekiq::CurrentAttributes.persist(Myapp::Current) # Your AS:CurrentAttributes singleton
Sidekiq::CurrentAttributes.persist(Myapp::Current) # Your AS::CurrentAttributes singleton
```
- Implement `queue_as` and `wait_until` for ActiveJob compatibility [#5003]
- Retry Redis operation if we get an `UNBLOCKED` Redis error. [#4985]
- Run existing signal traps, if any, before running Sidekiq's trap. [#4991]

Expand Down
46 changes: 40 additions & 6 deletions lib/sidekiq/worker.rb
Expand Up @@ -9,6 +9,7 @@ module Sidekiq
#
# class HardWorker
# include Sidekiq::Worker
# sidekiq_options queue: 'critical', retry: 5
#
# def perform(*args)
# # do some work
Expand All @@ -20,6 +21,26 @@ module Sidekiq
# HardWorker.perform_async(1, 2, 3)
#
# Note that perform_async is a class method, perform is an instance method.
#
# Sidekiq::Worker also includes several APIs to provide compatibility with
# ActiveJob.
#
# class SomeWorker
# include Sidekiq::Worker
# queue_as :critical
#
# def perform(...)
# end
# end
#
# SomeWorker.set(wait_until: 1.hour).perform_async(123)
#
# Note that arguments passed to the job must still obey Sidekiq's
# best practice for simple, JSON-native data types. Sidekiq will not
# implement ActiveJob's more complex argument serialization. For
# this reason, we don't implement `perform_later` as our call semantics
# are very different.
#
module Worker
##
# The Options module is extracted so we can include it in ActiveJob::Base
Expand Down Expand Up @@ -153,10 +174,16 @@ class Setter
def initialize(klass, opts)
@klass = klass
@opts = opts

# ActiveJob compatibility
interval = @opts.delete(:wait_until)
at(interval) if interval
end

def set(options)
interval = options.delete(:wait_until)
@opts.merge!(options)
at(interval) if interval
self
end

Expand All @@ -167,16 +194,19 @@ def perform_async(*args)
# +interval+ must be a timestamp, numeric or something that acts
# numeric (like an activesupport time interval).
def perform_in(interval, *args)
at(interval).perform_async(*args)
end
alias_method :perform_at, :perform_in

private

def at(interval)
int = interval.to_f
now = Time.now.to_f
ts = (int < 1_000_000_000 ? now + int : int)

payload = @opts.merge("class" => @klass, "args" => args)
# Optimization to enqueue something now that is scheduled to go out now or in the past
payload["at"] = ts if ts > now
@klass.client_push(payload)
@opts["at"] = ts if ts > now
self
end
alias_method :perform_at, :perform_in
end

module ClassMethods
Expand All @@ -192,6 +222,10 @@ def delay_until(*args)
raise ArgumentError, "Do not call .delay_until on a Sidekiq::Worker class, call .perform_at"
end

def queue_as(q)
sidekiq_options("queue" => q.to_s)
end

def set(options)
Setter.new(self, options)
end
Expand Down
19 changes: 18 additions & 1 deletion test/test_worker.rb
Expand Up @@ -5,13 +5,30 @@

class SetWorker
include Sidekiq::Worker
sidekiq_options :queue => :foo, 'retry' => 12
queue_as :foo
sidekiq_options 'retry' => 12
end

def setup
Sidekiq.redis {|c| c.flushdb }
end

it "provides basic ActiveJob compatibilility" do
q = Sidekiq::ScheduledSet.new
assert_equal 0, q.size
jid = SetWorker.set(wait_until: 1.hour).perform_async(123)
assert jid
assert_equal 1, q.size

q = Sidekiq::Queue.new("foo")
assert_equal 0, q.size
SetWorker.perform_async
assert_equal 1, q.size

SetWorker.set(queue: 'xyz').perform_async
assert_equal 1, Sidekiq::Queue.new("xyz").size
end

it 'can be memoized' do
q = Sidekiq::Queue.new('bar')
assert_equal 0, q.size
Expand Down