Skip to content

Commit

Permalink
Support ActiveJob’s wait_until (#5003)
Browse files Browse the repository at this point in the history
* Implement support for `set(wait_until: <interval>)`

* Implement `queue_as`, docs

* Rollback implementing `perform_later`, it's a footgun

* changes
  • Loading branch information
mperham committed Sep 28, 2021
1 parent 051555e commit 2b2390d
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 8 deletions.
3 changes: 2 additions & 1 deletion Changes.md
Expand Up @@ -19,8 +19,9 @@ end
```ruby
# config/initializers/sidekiq.rb
require "sidekiq/middleware/current_attributes"
Sidekiq::CurrentAttributes.persist(Myapp::Current) # Your AS:CurrentAttributes singleton
Sidekiq::CurrentAttributes.persist(Myapp::Current) # Your AS::CurrentAttributes singleton
```
- Implement `queue_as` and `wait_until` for ActiveJob compatibility [#5003]
- Retry Redis operation if we get an `UNBLOCKED` Redis error. [#4985]
- Run existing signal traps, if any, before running Sidekiq's trap. [#4991]

Expand Down
46 changes: 40 additions & 6 deletions lib/sidekiq/worker.rb
Expand Up @@ -9,6 +9,7 @@ module Sidekiq
#
# class HardWorker
# include Sidekiq::Worker
# sidekiq_options queue: 'critical', retry: 5
#
# def perform(*args)
# # do some work
Expand All @@ -20,6 +21,26 @@ module Sidekiq
# HardWorker.perform_async(1, 2, 3)
#
# Note that perform_async is a class method, perform is an instance method.
#
# Sidekiq::Worker also includes several APIs to provide compatibility with
# ActiveJob.
#
# class SomeWorker
# include Sidekiq::Worker
# queue_as :critical
#
# def perform(...)
# end
# end
#
# SomeWorker.set(wait_until: 1.hour).perform_async(123)
#
# Note that arguments passed to the job must still obey Sidekiq's
# best practice for simple, JSON-native data types. Sidekiq will not
# implement ActiveJob's more complex argument serialization. For
# this reason, we don't implement `perform_later` as our call semantics
# are very different.
#
module Worker
##
# The Options module is extracted so we can include it in ActiveJob::Base
Expand Down Expand Up @@ -153,10 +174,16 @@ class Setter
def initialize(klass, opts)
@klass = klass
@opts = opts

# ActiveJob compatibility
interval = @opts.delete(:wait_until)
at(interval) if interval
end

def set(options)
interval = options.delete(:wait_until)
@opts.merge!(options)
at(interval) if interval
self
end

Expand All @@ -167,16 +194,19 @@ def perform_async(*args)
# +interval+ must be a timestamp, numeric or something that acts
# numeric (like an activesupport time interval).
def perform_in(interval, *args)
at(interval).perform_async(*args)
end
alias_method :perform_at, :perform_in

private

def at(interval)
int = interval.to_f
now = Time.now.to_f
ts = (int < 1_000_000_000 ? now + int : int)

payload = @opts.merge("class" => @klass, "args" => args)
# Optimization to enqueue something now that is scheduled to go out now or in the past
payload["at"] = ts if ts > now
@klass.client_push(payload)
@opts["at"] = ts if ts > now
self
end
alias_method :perform_at, :perform_in
end

module ClassMethods
Expand All @@ -192,6 +222,10 @@ def delay_until(*args)
raise ArgumentError, "Do not call .delay_until on a Sidekiq::Worker class, call .perform_at"
end

def queue_as(q)
sidekiq_options("queue" => q.to_s)
end

def set(options)
Setter.new(self, options)
end
Expand Down
19 changes: 18 additions & 1 deletion test/test_worker.rb
Expand Up @@ -5,13 +5,30 @@

class SetWorker
include Sidekiq::Worker
sidekiq_options :queue => :foo, 'retry' => 12
queue_as :foo
sidekiq_options 'retry' => 12
end

def setup
Sidekiq.redis {|c| c.flushdb }
end

it "provides basic ActiveJob compatibilility" do
q = Sidekiq::ScheduledSet.new
assert_equal 0, q.size
jid = SetWorker.set(wait_until: 1.hour).perform_async(123)
assert jid
assert_equal 1, q.size

q = Sidekiq::Queue.new("foo")
assert_equal 0, q.size
SetWorker.perform_async
assert_equal 1, q.size

SetWorker.set(queue: 'xyz').perform_async
assert_equal 1, Sidekiq::Queue.new("xyz").size
end

it 'can be memoized' do
q = Sidekiq::Queue.new('bar')
assert_equal 0, q.size
Expand Down

0 comments on commit 2b2390d

Please sign in to comment.