Skip to content

Commit

Permalink
Fetch API refactor, WIP (#4602)
Browse files Browse the repository at this point in the history
* Fetch API refactor, WIP

* save options for later

* changes

* Fix test failures
  • Loading branch information
mperham committed Jun 19, 2020
1 parent 50b9e67 commit fce05c9
Show file tree
Hide file tree
Showing 12 changed files with 52 additions and 51 deletions.
1 change: 1 addition & 0 deletions Changes.md
Expand Up @@ -12,6 +12,7 @@ HEAD
- Remove rack-protection, reimplement CSRF protection [#4588]
- Require redis-rb 4.2 [#4591]
- Update to jquery 1.12.4 [#4593]
- Refactor internal fetch logic and API [#4602]

6.0.7
---------
Expand Down
1 change: 1 addition & 0 deletions Pro-Changes.md
Expand Up @@ -10,6 +10,7 @@ HEAD
- Remove `concurrent-ruby` gem dependency [#4586]
- Update `constantize` for batch callbacks. [#4469]
- Add queue tag to `jobs.recovered.fetch` metric [#4594]
- Refactor Pro's fetch infrastructure [#4602]

5.0.1
---------
Expand Down
38 changes: 20 additions & 18 deletions lib/sidekiq/fetch.rb
Expand Up @@ -25,8 +25,10 @@ def requeue
}

def initialize(options)
@strictly_ordered_queues = !!options[:strict]
@queues = options[:queues].map { |q| "queue:#{q}" }
raise ArgumentError, "missing queue list" unless options[:queues]
@options = options
@strictly_ordered_queues = !!@options[:strict]
@queues = @options[:queues].map { |q| "queue:#{q}" }
if @strictly_ordered_queues
@queues.uniq!
@queues << TIMEOUT
Expand All @@ -38,24 +40,9 @@ def retrieve_work
UnitOfWork.new(*work) if work
end

# Creating the Redis#brpop command takes into account any
# configured queue weights. By default Redis#brpop returns
# data from the first queue that has pending elements. We
# recreate the queue command each time we invoke Redis#brpop
# to honor weights and avoid queue starvation.
def queues_cmd
if @strictly_ordered_queues
@queues
else
queues = @queues.shuffle!.uniq
queues << TIMEOUT
queues
end
end

# By leaving this as a class method, it can be pluggable and used by the Manager actor. Making it
# an instance method will make it async to the Fetcher actor
def self.bulk_requeue(inprogress, options)
def bulk_requeue(inprogress, options)
return if inprogress.empty?

Sidekiq.logger.debug { "Re-queueing terminated jobs" }
Expand All @@ -76,5 +63,20 @@ def self.bulk_requeue(inprogress, options)
rescue => ex
Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.message}")
end

# Creating the Redis#brpop command takes into account any
# configured queue weights. By default Redis#brpop returns
# data from the first queue that has pending elements. We
# recreate the queue command each time we invoke Redis#brpop
# to honor weights and avoid queue starvation.
def queues_cmd
if @strictly_ordered_queues
@queues
else
queues = @queues.shuffle!.uniq
queues << TIMEOUT
queues
end
end
end
end
3 changes: 2 additions & 1 deletion lib/sidekiq/launcher.rb
Expand Up @@ -22,6 +22,7 @@ class Launcher
attr_accessor :manager, :poller, :fetcher

def initialize(options)
options[:fetch] ||= BasicFetch.new(options)
@manager = Sidekiq::Manager.new(options)
@poller = Sidekiq::Scheduled::Poller.new
@done = false
Expand Down Expand Up @@ -56,7 +57,7 @@ def stop

# Requeue everything in case there was a worker who grabbed work while stopped
# This call is a no-op in Sidekiq but necessary for Sidekiq Pro.
strategy = (@options[:fetch] || Sidekiq::BasicFetch)
strategy = @options[:fetch]
strategy.bulk_requeue([], @options)

clear_heartbeat
Expand Down
6 changes: 3 additions & 3 deletions lib/sidekiq/manager.rb
Expand Up @@ -35,7 +35,7 @@ def initialize(options = {})
@done = false
@workers = Set.new
@count.times do
@workers << Processor.new(self)
@workers << Processor.new(self, options)
end
@plock = Mutex.new
end
Expand Down Expand Up @@ -90,7 +90,7 @@ def processor_died(processor, reason)
@plock.synchronize do
@workers.delete(processor)
unless @done
p = Processor.new(self)
p = Processor.new(self, options)
@workers << p
p.start
end
Expand Down Expand Up @@ -123,7 +123,7 @@ def hard_shutdown
# contract says that jobs are run AT LEAST once. Process termination
# is delayed until we're certain the jobs are back in Redis because
# it is worse to lose a job than to run it twice.
strategy = (@options[:fetch] || Sidekiq::BasicFetch)
strategy = @options[:fetch]
strategy.bulk_requeue(jobs, @options)
end

Expand Down
8 changes: 4 additions & 4 deletions lib/sidekiq/processor.rb
Expand Up @@ -28,15 +28,15 @@ class Processor
attr_reader :thread
attr_reader :job

def initialize(mgr)
def initialize(mgr, options)
@mgr = mgr
@down = false
@done = false
@job = nil
@thread = nil
@strategy = (mgr.options[:fetch] || Sidekiq::BasicFetch).new(mgr.options)
@reloader = Sidekiq.options[:reloader]
@job_logger = (mgr.options[:job_logger] || Sidekiq::JobLogger).new
@strategy = options[:fetch]
@reloader = options[:reloader] || proc { |&block| block.call }
@job_logger = (options[:job_logger] || Sidekiq::JobLogger).new
@retrier = Sidekiq::JobRetry.new
end

Expand Down
13 changes: 8 additions & 5 deletions test/test_actors.rb
Expand Up @@ -50,7 +50,8 @@ def perform(slp)
end

it 'can start and stop' do
f = Sidekiq::Processor.new(Mgr.new)
m = Mgr.new
f = Sidekiq::Processor.new(m, m.options)
f.terminate
end

Expand All @@ -74,14 +75,16 @@ def processor_stopped(inst)
end
end
def options
{ :concurrency => 3, :queues => ['default'] }
opts = { :concurrency => 3, :queues => ['default'] }
opts[:fetch] = Sidekiq::BasicFetch.new(opts)
opts
end
end

it 'can process' do
mgr = Mgr.new

p = Sidekiq::Processor.new(mgr)
p = Sidekiq::Processor.new(mgr, mgr.options)
JoeWorker.perform_async(0)

a = $count
Expand All @@ -93,7 +96,7 @@ def options
it 'deals with errors' do
mgr = Mgr.new

p = Sidekiq::Processor.new(mgr)
p = Sidekiq::Processor.new(mgr, mgr.options)
JoeWorker.perform_async("boom")
q = Sidekiq::Queue.new
assert_equal 1, q.size
Expand All @@ -116,7 +119,7 @@ def options
it 'gracefully kills' do
mgr = Mgr.new

p = Sidekiq::Processor.new(mgr)
p = Sidekiq::Processor.new(mgr, mgr.options)
JoeWorker.perform_async(1)
q = Sidekiq::Queue.new
assert_equal 1, q.size
Expand Down
1 change: 1 addition & 0 deletions test/test_cli.rb
Expand Up @@ -419,6 +419,7 @@ def logdev

describe '#run' do
before do
Sidekiq.options[:concurrency] = 2
Sidekiq.options[:require] = './test/fake_env.rb'
end

Expand Down
2 changes: 1 addition & 1 deletion test/test_fetch.rb
Expand Up @@ -52,7 +52,7 @@
assert_equal 0, q1.size
assert_equal 0, q2.size

Sidekiq::BasicFetch.bulk_requeue(works, {:queues => []})
fetch.bulk_requeue(works, {:queues => []})
assert_equal 2, q1.size
assert_equal 1, q2.size
end
Expand Down
2 changes: 1 addition & 1 deletion test/test_manager.rb
Expand Up @@ -8,7 +8,7 @@
end

def new_manager(opts)
Sidekiq::Manager.new(opts)
Sidekiq::Manager.new(opts.merge(fetch: Sidekiq::BasicFetch.new(opts)))
end

it 'creates N processor instances' do
Expand Down
6 changes: 2 additions & 4 deletions test/test_middleware.rb
Expand Up @@ -78,10 +78,8 @@ def call(*args)
end

boss = Minitest::Mock.new
boss.expect(:options, {:queues => ['default'] }, [])
boss.expect(:options, {:queues => ['default'] }, [])
boss.expect(:options, {:queues => ['default'] }, [])
processor = Sidekiq::Processor.new(boss)
opts = {:queues => ['default'] }
processor = Sidekiq::Processor.new(boss, opts)
boss.expect(:processor_done, nil, [processor])
processor.process(Sidekiq::BasicFetch::UnitOfWork.new('queue:default', msg))
assert_equal %w(2 before 3 before 1 before work_performed 1 after 3 after 2 after), $recorder.flatten
Expand Down
22 changes: 8 additions & 14 deletions test/test_processor.rb
Expand Up @@ -11,10 +11,9 @@
before do
$invokes = 0
@mgr = Minitest::Mock.new
@mgr.expect(:options, {:queues => ['default']})
@mgr.expect(:options, {:queues => ['default']})
@mgr.expect(:options, {:queues => ['default']})
@processor = ::Sidekiq::Processor.new(@mgr)
opts = {:queues => ['default']}
opts[:fetch] = Sidekiq::BasicFetch.new(opts)
@processor = ::Sidekiq::Processor.new(@mgr, opts)
end

class MockWorker
Expand Down Expand Up @@ -327,11 +326,9 @@ def call(item, queue)
end

before do
opts = {:queues => ['default'], job_logger: CustomJobLogger}
@mgr = Minitest::Mock.new
@mgr.expect(:options, {:queues => ['default'], job_logger: CustomJobLogger})
@mgr.expect(:options, {:queues => ['default'], job_logger: CustomJobLogger})
@mgr.expect(:options, {:queues => ['default'], job_logger: CustomJobLogger})
@processor = ::Sidekiq::Processor.new(@mgr)
@processor = ::Sidekiq::Processor.new(@mgr, opts)
end
end
end
Expand All @@ -356,18 +353,15 @@ def successful_job

describe 'custom job logger class' do
before do
@mgr = Minitest::Mock.new
@mgr.expect(:options, {:queues => ['default'], :job_logger => CustomJobLogger})
@mgr.expect(:options, {:queues => ['default'], :job_logger => CustomJobLogger})
@mgr.expect(:options, {:queues => ['default'], :job_logger => CustomJobLogger})
@processor = ::Sidekiq::Processor.new(@mgr)
opts = {:queues => ['default'], :job_logger => CustomJobLogger}
opts[:fetch] = Sidekiq::BasicFetch.new(opts)
@processor = ::Sidekiq::Processor.new(nil, opts)
end

it 'is called instead default Sidekiq::JobLogger' do
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] })
@processor.process(work(msg))
assert_equal 1, $invokes
@mgr.verify
end
end
end

0 comments on commit fce05c9

Please sign in to comment.