Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Sidekiq::Worker.perform_bulk #5042

Merged
merged 7 commits into from Nov 2, 2021

Conversation

kellysutton
Copy link
Contributor

@kellysutton kellysutton commented Nov 2, 2021

This is the code implements #5041.

We've got a monkey-patch into Sidekiq::Worker::ClassMethods and Sidekiq::Worker::Setter that we have been getting a lot of mileage out of that might be worth upstreaming.

Background: We do a lot of batch processing, needing to enqueue 1000's or 100's of K's of jobs at once. We were using Sidekiq::Client.push_bulk with an each_slice to create chunks of 1000.

Enter what we call .perform_bulk. It's an implementation of Sidekiq::Client.push_bulk that encodes the best practice of 1000 jobs at a time. This allows clients to enqueue many jobs without the sharp edge of forgetting to slice their batch up into smaller chunks.

Downside here is that the .push_bulk-based approach does communicate to the client, "Hey, you're interacting with the network N times (once per loop)."

This PR has the implementation and tests for the .perform_bulk method. Please let me know if anything looks amiss, but I tried to keep things as idiomatic as possible.

Notes:

  • Should I update the CHANGELOG or is that for the repo maintainers to update?
  • I took a stab at a comment, but wasn't sure if it was too verbose. Feel free to slim down.

kellysutton and others added 6 commits November 1, 2021 20:20
Co-authored-by: jeffcarbs <jeff.carbonella@gmail.com>
Co-authored-by: jeffcarbs <jeff.carbonella@gmail.com>
Co-authored-by: jeffcarbs <jeff.carbonella@gmail.com>
…ulk behavior

Co-authored-by: jeffcarbs <jeff.carbonella@gmail.com>
Co-authored-by: jeffcarbs <jeff.carbonella@gmail.com>
Co-authored-by: jeffcarbs <jeff.carbonella@gmail.com>
@mperham
Copy link
Collaborator

mperham commented Nov 2, 2021

This looks great. Feel free to add an item to the changelog.

@kellysutton
Copy link
Contributor Author

@mperham Alright, I've gone ahead and added an entry to Changes.md. Let me know if there are other things need to be done!

@mperham mperham merged commit 4a04326 into sidekiq:main Nov 2, 2021
@manojmj92
Copy link
Contributor

manojmj92 commented Nov 2, 2021

@mperham @kellysutton - Thank you for this!

I have ideas on a possible improvement.

We currently have the ability to schedule bulk jobs for some time in the future, so like

Sidekiq::Client.push_bulk("class" => FooJob, "args" => [[1], [2]], "at" => [1.minute.from_now.to_f, 5.minutes.from_now.to_f])

(which was added with #4243)

or

Sidekiq::Client.push_bulk("class" => FooJob, "args" => [[1], [2]], "at" => 1.minute.from_now.to_f)

but with the new implementation of perform_bulk we lose the ability to specify at (which could be a Numeric or an array of Numeric representing timestamp)

(Note: When I say that we lose this ability, this is still possible via the set interface, so like SomeWorker.set(wait_until: 1.minute.from_now.to_f).perform_bulk(...), but even here, wait_until can't be an array of Numeric)

I wonder if we can build an API that supports scheduling even for push_bulk, like say, named perform_bulk_in, which would accept an interval too, like

def perform_bulk_in(args, interval, batch_size: 1_000)
....
end

interval here can be an array of Numeric or a single Numeric. If it is an array of Numeric, we can slice it the same way as batch_size.

If I have your interest, I'd be happy to build this 🙂

@kellysutton
Copy link
Contributor Author

kellysutton commented Nov 2, 2021

@manojmj92 Great call-out. This was a purposeful omission in the original PR, but happy to explore alternatives. The comment for this method hints at using Sidekiq::Client.push_bulk if you need to configure things like that. I'll defer to @mperham to decide what to do here.

Generally at my place of work, we've almost entirely moved away from setting 'at' on jobs, especially when enqueueing them in bulk. If you're interested to hear more, I can try to whip up a blog post explaining why we've stopped doing that.

@mperham
Copy link
Collaborator

mperham commented Nov 2, 2021

I'm happy with it as is; I don't think we need to expose every option at every level. Sidekiq::Client.push_bulk is still available if you need to bulk schedule.

@kellysutton kellysutton deleted the perform-bulk branch November 3, 2021 16:24
@@ -191,6 +191,12 @@ def perform_async(*args)
@klass.client_push(@opts.merge("args" => args, "class" => @klass))
end

def perform_bulk(args, batch_size: 1_000)
args.each_slice(batch_size).flat_map do |slice|
Sidekiq::Client.push_bulk(@opts.merge("class" => @klass, "args" => slice))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mperham this is a bit different than the example on https://github.com/mperham/sidekiq/wiki/Batches#huge-batches in that the jobs get sequentially added by push_bulk rather than concurrently added via perform_async. On super large sets of jobs, like inserting 300,000 at once in 500 job push_bulks with about 400 worker threads, Redis times out sometimes. Maybe the recommendation for huge batches should be changed?

I suspect that it's just as good performance-wise to sequentially post the jobs by bulk by a single thread.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

and got many errors:

image

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it’s not a great idea to do lots of concurrent bulk pushes. I think the best thing you can do is lower the bulk size from 1000 to 100 so each bulk op is faster but YMMV.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better to do the bulk inserts sequentially rather than concurrently? Especially if running hundreds of threads.

@ulyssesrex
Copy link

Generally at my place of work, we've almost entirely moved away from setting 'at' on jobs, especially when enqueueing them in bulk. If you're interested to hear more, I can try to whip up a blog post explaining why we've stopped doing that.

@kellysutton Personally, I would be interested in hearing why you avoid setting 'at' on jobs. We're experimenting with a throttling solution at the moment that explicitly sets that argument.

@kellysutton
Copy link
Contributor Author

Sure thing. at is a perfectly fine concept for most usages of Sidekiq, but it's got a sharp edge that doesn't fit our expectations of the library.

Specifically, at only specifies when something should be loaded onto a queue and not when something should execute. We see this problem happen more often when enqueueing jobs in bulk, where the difference between expected vs. actual execution time differs.

So in our first pass of implementing .perform_bulk, we didn't include that. The at functionality is still present in the lower-level Sidekiq::Client.push_bulk if needed!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants