Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue up conflicted jobs and release later #664

Open
piloos opened this issue Dec 1, 2021 · 3 comments
Open

Queue up conflicted jobs and release later #664

piloos opened this issue Dec 1, 2021 · 3 comments

Comments

@piloos
Copy link
Contributor

piloos commented Dec 1, 2021

In our case we have a third party service which we cannot use in parallel. Hence, all our jobs using this 3rd party service should be unique while executing. However, each of these jobs has value and must be executed at some point.

In order to accomplish this we implemented two things:

  1. We created a custom strategy which queues up the conflicted jobs in a an atomic shared Redis data structure.
  2. We use the after_unlock callback to push the queued up jobs to Sidekiq again. We made a small change on the gem to pass the item in the after_unlock method.

Since our use case seems quite universal, I was wondering if we are over-engineering things here? Are we overlooking built-in mechanisms to accomplish this use case? (Note that I am not a fan of raise/retry in Sidekiq because this is dependent on the retry mechanism and timing of Sidekiq, and neither from reschedule method since this is imprecise).

Our example code:

module UniqueJobStrategies
  class CompanyQueueStrategy < SidekiqUniqueJobs::OnConflict::Strategy
    def call
      company_id = item['args'][1]
      logger.info("Conflict! Holding job for company id #{company_id} in our queue")
      COMPANY_JOB_QUEUE.modify(company_id.to_s) do |array|
        array.push(item)
      end
    end
  end
end
class TestWorker
  include Sidekiq::Worker

  sidekiq_options lock: :until_executed,
                  lock_info: true,
                  lock_args_method: :lock_args,
                  on_conflict: :company_queue_strategy,
                  lock_ttl: 2.weeks

  def perform(task, *args)
    send(task, *args)
  end

  def self.lock_args(args)
    [ args[1] ]
  end

  def sleep_a_bit(company_id, seconds)
    logger.info("going to sleep for company id #{company_id}...")
    sleep seconds
    logger.info("...waking up for company id #{company_id}")
  end

  def after_unlock(item)
    job = nil
    company_id = item['args'][1]
    logger.info("I was unlocked --> checking for other jobs for company id #{company_id}")
    COMPANY_JOB_QUEUE.modify(company_id.to_s) do |array|
      job = array.shift # take the first one (FIFO)
      array
    end
    if job
      logger.info("Pusing job for company id #{company_id} to Sidekiq")
      job['class'].constantize.set(queue: job['queue']).perform_async(*job['args'])
    end
  end

end
@Joerg-Seitz
Copy link

Thx for writing this out, my coworkers and me currently share a similar problem.

Due to extensive locking on some objects we want to avoid parallel execution of Jobs that deal with the same object.
But Jobs that deal with different objects of the same type should be able to run in parallel.
All of these Jobs are important, so I'd rather see them in the queue than in some invisible data stored in redis.

Example:
AwesomeJob(1) processing object A can be run in parallel to AwesomeJob(2) processing object B (this we can achieve with :lock_args https://github.com/mhenrixon/sidekiq-unique-jobs#finer-control-over-uniqueness)
AwesomeJob(3) processing object C should be unique while it is executed in the scope of its object. When AwesomeJob(4) processing object C get's scheduled during execution time of AwesomeJob(3) it shouldn't be dropped but enqueued and handled after the first one finished executing. (Not sure how to achieve this)

I'm also wondering if the gem is even intended to be used for this. currently it seems the uniqueness is enforced when the jobs are scheduled.

@mhenrixon Do you want such functionality in your gem or do you maybe know about a library that fulfills these requirements?

@mhenrixon
Copy link
Owner

You hit on an in-between feature here. I agree that the replace and raise resolutions need to be more sophisticated to cover your scenario, but there are alternatives.

Correct me if I misunderstood something:

  1. Due to concurrency constraints with third-party API, You need to constrain a specific worker to 1 job at a time.
  2. Rather than failing the job and retrying (, you'd like to put it back and try later but find the sidekiq retry mechanism unreliable at best.

If you guys are running on Sidekiq 7 you are in luck!

What you want to do is have a specific queue with only one worker. Sidekiq 7 makes this ridiculously simple with capsules and 1 thread.

If you, on the other hand, need uniqueness, you need to combine the two, or we need to add this feature.

@jgmontoya
Copy link

Hi there! I'm also having a similar use case where I need to ensure that the execution of certain jobs (which I can control with :lock_args_method) is non-concurrent and where order is preserved.

That is: if JobA(A) is enqueued, and while it's being executed JobA(B) and JobA(C) are enqueued, then I need to have the three jobs execute in that order (first JobA(B), then JobB(C)).
This makes it so I can't just reschedule, as they'll likely end up in the wrong order.

If I'm understanding @piloos's solution correctly, that'd be achieved with his solution. However I do have some questions:

  • Can you get another instance of TestWorker to begin executing before what you've stored on redis, if it happens to 'arrive' whenever the after_unlock is running?
  • Did you eventually find an easier/better/simpler way to achieve this?

I'm thinking on doing something similar. I was thinking maybe I can leverage sidekiq and send jobs to a queue with no worker instead of needing to create a custom data structure, but basically the same thing I guess. This might work better for @Joerg-Seitz, as jobs would be visible in sidekiqs view.

If you guys are running on Sidekiq 7 you are in luck!
What you want to do is have a specific queue with only one worker. Sidekiq 7 makes this ridiculously simple with capsules and 1 thread.

Regarding this, keep in mind that if you go this route you lose the ability to scale your sidekiq worker horizontally: sidekiq/sidekiq#6148

If you, on the other hand, need uniqueness, you need to combine the two, or we need to add this feature.
Is it possible to use the gem as-is to achieve this? I coulnd't quite figure out the combination.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants