diff --git a/lib/sidekiq/limit_fetch.rb b/lib/sidekiq/limit_fetch.rb index d91c301..dd101c5 100644 --- a/lib/sidekiq/limit_fetch.rb +++ b/lib/sidekiq/limit_fetch.rb @@ -27,7 +27,9 @@ def retrieve_work end def bulk_requeue(*args) - Sidekiq::BasicFetch.bulk_requeue(*args) + klass = Sidekiq::BasicFetch + fetch = klass.respond_to?(:bulk_requeue) ? klass : klass.new(Sidekiq::options) + fetch.bulk_requeue(*args) end def redis_retryable diff --git a/spec/sidekiq/limit_fetch_spec.rb b/spec/sidekiq/limit_fetch_spec.rb index b1db291..15c3a61 100644 --- a/spec/sidekiq/limit_fetch_spec.rb +++ b/spec/sidekiq/limit_fetch_spec.rb @@ -10,13 +10,16 @@ Sidekiq.redis do |it| it.del 'queue:queue1' - it.lpush 'queue:queue1', 'task1' - it.lpush 'queue:queue1', 'task2' + it.del 'queue:queue2' it.expire 'queue:queue1', 30 end end it 'should acquire lock on queue for execution' do + Sidekiq.redis do |it| + it.lpush 'queue:queue1', 'task1' + it.lpush 'queue:queue1', 'task2' + end work = subject.retrieve_work expect(work.queue_name).to eq 'queue1' expect(work.job).to eq 'task1' @@ -45,4 +48,25 @@ work = subject.retrieve_work expect(work.job).to eq 'task2' end + + it 'bulk requeues' do + Sidekiq.redis do |it| + it.lpush 'queue:queue1', 'task1' + it.lpush 'queue:queue2', 'task2' + it.lpush 'queue:queue2', 'task3' + end + q1 = Sidekiq::Queue['queue1'] + q2 = Sidekiq::Queue['queue2'] + expect(q1.size).to eq 1 + expect(q2.size).to eq 2 + + fetch = subject.new(queues: ['queue1', 'queue2']) + works = 3.times.map { fetch.retrieve_work } + expect(q1.size).to eq 0 + expect(q2.size).to eq 0 + + fetch.bulk_requeue(works, queues: []) + expect(q1.size).to eq 1 + expect(q2.size).to eq 2 + end end