From 4a0432622fbd2f02fe96f12c3afd1b6ac3fab876 Mon Sep 17 00:00:00 2001 From: Kelly Sutton Date: Tue, 2 Nov 2021 09:54:44 -0700 Subject: [PATCH] Implement `Sidekiq::Worker.perform_bulk` (#5042) * Sketch out some failing tests to capture the behavior Co-authored-by: jeffcarbs * Implement Sidekiq::Client.perform_bulk Co-authored-by: jeffcarbs * Allow .perform_bulk to operate on different batch sizes Co-authored-by: jeffcarbs * Write a failing test to capture the Sidekiq::Worker::Setter.perform_bulk behavior Co-authored-by: jeffcarbs * Implement Sidekiq::Worker::Setter.perform_bulk Co-authored-by: jeffcarbs * Write a small comment for to document the method Co-authored-by: jeffcarbs * Add a Changes.md entry Co-authored-by: jeffcarbs --- Changes.md | 10 ++++++++++ lib/sidekiq/worker.rb | 32 ++++++++++++++++++++++++++++++++ test/test_client.rb | 25 +++++++++++++++++++++++++ test/test_worker.rb | 11 +++++++++++ 4 files changed, 78 insertions(+) diff --git a/Changes.md b/Changes.md index ddeeeb858..811452212 100644 --- a/Changes.md +++ b/Changes.md @@ -24,6 +24,16 @@ end require "sidekiq/middleware/current_attributes" Sidekiq::CurrentAttributes.persist(Myapp::Current) # Your AS::CurrentAttributes singleton ``` +- **FEATURE**: Introduce new method, `.perform_bulk` on `Sidekiq::Worker` that makes enqueuing + jobs in bulk adhere to Redis best practices by enqueuing 1,000 jobs per round trip. This + shares a similar args syntax to `Sidekiq::Client.push_bulk`. Batch sizes can be configured + with the optional `batch_size:` keyword argument. +```ruby +MyJob.perform_bulk([[1], [2], [3]]) + +# With a batch size provided: +MyJob.perform_bulk([[1], [2], [3]], batch_size: 100) +``` - Implement `queue_as`, `wait` and `wait_until` for ActiveJob compatibility [#5003] - Retry Redis operation if we get an `UNBLOCKED` Redis error. [#4985] - Run existing signal traps, if any, before running Sidekiq's trap. [#4991] diff --git a/lib/sidekiq/worker.rb b/lib/sidekiq/worker.rb index e85a94888..73c32ba5f 100644 --- a/lib/sidekiq/worker.rb +++ b/lib/sidekiq/worker.rb @@ -191,6 +191,12 @@ def perform_async(*args) @klass.client_push(@opts.merge("args" => args, "class" => @klass)) end + def perform_bulk(args, batch_size: 1_000) + args.each_slice(batch_size).flat_map do |slice| + Sidekiq::Client.push_bulk(@opts.merge("class" => @klass, "args" => slice)) + end + end + # +interval+ must be a timestamp, numeric or something that acts # numeric (like an activesupport time interval). def perform_in(interval, *args) @@ -235,6 +241,32 @@ def perform_async(*args) client_push("class" => self, "args" => args) end + ## + # Push a large number of jobs to Redis, while limiting the batch of + # each job payload to 1,000. This method helps cut down on the number + # of round trips to Redis, which can increase the performance of enqueueing + # large numbers of jobs. + # + # +items+ must be an Array of Arrays. + # + # For finer-grained control, use `Sidekiq::Client.push_bulk` directly. + # + # Example (3 Redis round trips): + # + # SomeWorker.perform_async(1) + # SomeWorker.perform_async(2) + # SomeWorker.perform_async(3) + # + # Would instead become (1 Redis round trip): + # + # SomeWorker.perform_bulk([[1], [2], [3]]) + # + def perform_bulk(items, batch_size: 1_000) + items.each_slice(batch_size).flat_map do |slice| + Sidekiq::Client.push_bulk("class" => self, "args" => slice) + end + end + # +interval+ must be a timestamp, numeric or something that acts # numeric (like an activesupport time interval). def perform_in(interval, *args) diff --git a/test/test_client.rb b/test/test_client.rb index b49386c3b..883ec35c0 100644 --- a/test/test_client.rb +++ b/test/test_client.rb @@ -183,6 +183,31 @@ class QueuedWorker end end end + + describe '.perform_bulk' do + it 'pushes a large set of jobs' do + jids = MyWorker.perform_bulk((1..1_001).to_a.map { |x| Array(x) }) + assert_equal 1_001, jids.size + end + + it 'pushes a large set of jobs with a different batch size' do + jids = MyWorker.perform_bulk((1..1_001).to_a.map { |x| Array(x) }, batch_size: 100) + assert_equal 1_001, jids.size + end + + it 'handles no jobs' do + jids = MyWorker.perform_bulk([]) + assert_equal 0, jids.size + end + + describe 'errors' do + it 'raises ArgumentError with invalid params' do + assert_raises ArgumentError do + Sidekiq::Client.push_bulk('class' => 'MyWorker', 'args' => [[1], 2]) + end + end + end + end end class BaseWorker diff --git a/test/test_worker.rb b/test/test_worker.rb index dfe6e7de5..576e019e6 100644 --- a/test/test_worker.rb +++ b/test/test_worker.rb @@ -80,5 +80,16 @@ def setup assert_equal 'foo', job['queue'] assert_equal 'xyz', job['bar'] end + + it 'works with .perform_bulk' do + q = Sidekiq::Queue.new('bar') + assert_equal 0, q.size + + set = SetWorker.set('queue' => 'bar') + jids = set.perform_bulk((1..1_001).to_a.map { |x| Array(x) }) + + assert_equal 1_001, q.size + assert_equal 1_001, jids.size + end end end