Skip to content

Commit

Permalink
Implement Sidekiq::Worker.perform_bulk (#5042)
Browse files Browse the repository at this point in the history
* Sketch out some failing tests to capture the behavior

Co-authored-by: jeffcarbs <jeff.carbonella@gmail.com>

* Implement Sidekiq::Client.perform_bulk

Co-authored-by: jeffcarbs <jeff.carbonella@gmail.com>

* Allow .perform_bulk to operate on different batch sizes

Co-authored-by: jeffcarbs <jeff.carbonella@gmail.com>

* Write a failing test to capture the Sidekiq::Worker::Setter.perform_bulk behavior

Co-authored-by: jeffcarbs <jeff.carbonella@gmail.com>

* Implement Sidekiq::Worker::Setter.perform_bulk

Co-authored-by: jeffcarbs <jeff.carbonella@gmail.com>

* Write a small comment for to document the method

Co-authored-by: jeffcarbs <jeff.carbonella@gmail.com>

* Add a Changes.md entry

Co-authored-by: jeffcarbs <jeff.carbonella@gmail.com>
  • Loading branch information
kellysutton and jeffcarbs committed Nov 2, 2021
1 parent fd76471 commit 4a04326
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 0 deletions.
10 changes: 10 additions & 0 deletions Changes.md
Expand Up @@ -24,6 +24,16 @@ end
require "sidekiq/middleware/current_attributes"
Sidekiq::CurrentAttributes.persist(Myapp::Current) # Your AS::CurrentAttributes singleton
```
- **FEATURE**: Introduce new method, `.perform_bulk` on `Sidekiq::Worker` that makes enqueuing
jobs in bulk adhere to Redis best practices by enqueuing 1,000 jobs per round trip. This
shares a similar args syntax to `Sidekiq::Client.push_bulk`. Batch sizes can be configured
with the optional `batch_size:` keyword argument.
```ruby
MyJob.perform_bulk([[1], [2], [3]])

# With a batch size provided:
MyJob.perform_bulk([[1], [2], [3]], batch_size: 100)
```
- Implement `queue_as`, `wait` and `wait_until` for ActiveJob compatibility [#5003]
- Retry Redis operation if we get an `UNBLOCKED` Redis error. [#4985]
- Run existing signal traps, if any, before running Sidekiq's trap. [#4991]
Expand Down
32 changes: 32 additions & 0 deletions lib/sidekiq/worker.rb
Expand Up @@ -191,6 +191,12 @@ def perform_async(*args)
@klass.client_push(@opts.merge("args" => args, "class" => @klass))
end

def perform_bulk(args, batch_size: 1_000)
args.each_slice(batch_size).flat_map do |slice|
Sidekiq::Client.push_bulk(@opts.merge("class" => @klass, "args" => slice))
end
end

# +interval+ must be a timestamp, numeric or something that acts
# numeric (like an activesupport time interval).
def perform_in(interval, *args)
Expand Down Expand Up @@ -235,6 +241,32 @@ def perform_async(*args)
client_push("class" => self, "args" => args)
end

##
# Push a large number of jobs to Redis, while limiting the batch of
# each job payload to 1,000. This method helps cut down on the number
# of round trips to Redis, which can increase the performance of enqueueing
# large numbers of jobs.
#
# +items+ must be an Array of Arrays.
#
# For finer-grained control, use `Sidekiq::Client.push_bulk` directly.
#
# Example (3 Redis round trips):
#
# SomeWorker.perform_async(1)
# SomeWorker.perform_async(2)
# SomeWorker.perform_async(3)
#
# Would instead become (1 Redis round trip):
#
# SomeWorker.perform_bulk([[1], [2], [3]])
#
def perform_bulk(items, batch_size: 1_000)
items.each_slice(batch_size).flat_map do |slice|
Sidekiq::Client.push_bulk("class" => self, "args" => slice)
end
end

# +interval+ must be a timestamp, numeric or something that acts
# numeric (like an activesupport time interval).
def perform_in(interval, *args)
Expand Down
25 changes: 25 additions & 0 deletions test/test_client.rb
Expand Up @@ -183,6 +183,31 @@ class QueuedWorker
end
end
end

describe '.perform_bulk' do
it 'pushes a large set of jobs' do
jids = MyWorker.perform_bulk((1..1_001).to_a.map { |x| Array(x) })
assert_equal 1_001, jids.size
end

it 'pushes a large set of jobs with a different batch size' do
jids = MyWorker.perform_bulk((1..1_001).to_a.map { |x| Array(x) }, batch_size: 100)
assert_equal 1_001, jids.size
end

it 'handles no jobs' do
jids = MyWorker.perform_bulk([])
assert_equal 0, jids.size
end

describe 'errors' do
it 'raises ArgumentError with invalid params' do
assert_raises ArgumentError do
Sidekiq::Client.push_bulk('class' => 'MyWorker', 'args' => [[1], 2])
end
end
end
end
end

class BaseWorker
Expand Down
11 changes: 11 additions & 0 deletions test/test_worker.rb
Expand Up @@ -80,5 +80,16 @@ def setup
assert_equal 'foo', job['queue']
assert_equal 'xyz', job['bar']
end

it 'works with .perform_bulk' do
q = Sidekiq::Queue.new('bar')
assert_equal 0, q.size

set = SetWorker.set('queue' => 'bar')
jids = set.perform_bulk((1..1_001).to_a.map { |x| Array(x) })

assert_equal 1_001, q.size
assert_equal 1_001, jids.size
end
end
end

0 comments on commit 4a04326

Please sign in to comment.