Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Sidekiq::Worker.perform_bulk #5042

Merged
merged 7 commits into from Nov 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions Changes.md
Expand Up @@ -24,6 +24,16 @@ end
require "sidekiq/middleware/current_attributes"
Sidekiq::CurrentAttributes.persist(Myapp::Current) # Your AS::CurrentAttributes singleton
```
- **FEATURE**: Introduce new method, `.perform_bulk` on `Sidekiq::Worker` that makes enqueuing
jobs in bulk adhere to Redis best practices by enqueuing 1,000 jobs per round trip. This
shares a similar args syntax to `Sidekiq::Client.push_bulk`. Batch sizes can be configured
with the optional `batch_size:` keyword argument.
```ruby
MyJob.perform_bulk([[1], [2], [3]])

# With a batch size provided:
MyJob.perform_bulk([[1], [2], [3]], batch_size: 100)
```
- Implement `queue_as`, `wait` and `wait_until` for ActiveJob compatibility [#5003]
- Retry Redis operation if we get an `UNBLOCKED` Redis error. [#4985]
- Run existing signal traps, if any, before running Sidekiq's trap. [#4991]
Expand Down
32 changes: 32 additions & 0 deletions lib/sidekiq/worker.rb
Expand Up @@ -191,6 +191,12 @@ def perform_async(*args)
@klass.client_push(@opts.merge("args" => args, "class" => @klass))
end

def perform_bulk(args, batch_size: 1_000)
args.each_slice(batch_size).flat_map do |slice|
Sidekiq::Client.push_bulk(@opts.merge("class" => @klass, "args" => slice))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mperham this is a bit different than the example on https://github.com/mperham/sidekiq/wiki/Batches#huge-batches in that the jobs get sequentially added by push_bulk rather than concurrently added via perform_async. On super large sets of jobs, like inserting 300,000 at once in 500 job push_bulks with about 400 worker threads, Redis times out sometimes. Maybe the recommendation for huge batches should be changed?

I suspect that it's just as good performance-wise to sequentially post the jobs by bulk by a single thread.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

and got many errors:

image

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it’s not a great idea to do lots of concurrent bulk pushes. I think the best thing you can do is lower the bulk size from 1000 to 100 so each bulk op is faster but YMMV.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better to do the bulk inserts sequentially rather than concurrently? Especially if running hundreds of threads.

end
end

# +interval+ must be a timestamp, numeric or something that acts
# numeric (like an activesupport time interval).
def perform_in(interval, *args)
Expand Down Expand Up @@ -235,6 +241,32 @@ def perform_async(*args)
client_push("class" => self, "args" => args)
end

##
# Push a large number of jobs to Redis, while limiting the batch of
# each job payload to 1,000. This method helps cut down on the number
# of round trips to Redis, which can increase the performance of enqueueing
# large numbers of jobs.
#
# +items+ must be an Array of Arrays.
#
# For finer-grained control, use `Sidekiq::Client.push_bulk` directly.
#
# Example (3 Redis round trips):
#
# SomeWorker.perform_async(1)
# SomeWorker.perform_async(2)
# SomeWorker.perform_async(3)
#
# Would instead become (1 Redis round trip):
#
# SomeWorker.perform_bulk([[1], [2], [3]])
#
def perform_bulk(items, batch_size: 1_000)
items.each_slice(batch_size).flat_map do |slice|
Sidekiq::Client.push_bulk("class" => self, "args" => slice)
end
end

# +interval+ must be a timestamp, numeric or something that acts
# numeric (like an activesupport time interval).
def perform_in(interval, *args)
Expand Down
25 changes: 25 additions & 0 deletions test/test_client.rb
Expand Up @@ -183,6 +183,31 @@ class QueuedWorker
end
end
end

describe '.perform_bulk' do
it 'pushes a large set of jobs' do
jids = MyWorker.perform_bulk((1..1_001).to_a.map { |x| Array(x) })
assert_equal 1_001, jids.size
end

it 'pushes a large set of jobs with a different batch size' do
jids = MyWorker.perform_bulk((1..1_001).to_a.map { |x| Array(x) }, batch_size: 100)
assert_equal 1_001, jids.size
end

it 'handles no jobs' do
jids = MyWorker.perform_bulk([])
assert_equal 0, jids.size
end

describe 'errors' do
it 'raises ArgumentError with invalid params' do
assert_raises ArgumentError do
Sidekiq::Client.push_bulk('class' => 'MyWorker', 'args' => [[1], 2])
end
end
end
end
end

class BaseWorker
Expand Down
11 changes: 11 additions & 0 deletions test/test_worker.rb
Expand Up @@ -80,5 +80,16 @@ def setup
assert_equal 'foo', job['queue']
assert_equal 'xyz', job['bar']
end

it 'works with .perform_bulk' do
q = Sidekiq::Queue.new('bar')
assert_equal 0, q.size

set = SetWorker.set('queue' => 'bar')
jids = set.perform_bulk((1..1_001).to_a.map { |x| Array(x) })

assert_equal 1_001, q.size
assert_equal 1_001, jids.size
end
end
end