Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch API refactor, WIP #4602

Merged
merged 4 commits into from Jun 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Changes.md
Expand Up @@ -12,6 +12,7 @@ HEAD
- Remove rack-protection, reimplement CSRF protection [#4588]
- Require redis-rb 4.2 [#4591]
- Update to jquery 1.12.4 [#4593]
- Refactor internal fetch logic and API [#4602]

6.0.7
---------
Expand Down
1 change: 1 addition & 0 deletions Pro-Changes.md
Expand Up @@ -10,6 +10,7 @@ HEAD
- Remove `concurrent-ruby` gem dependency [#4586]
- Update `constantize` for batch callbacks. [#4469]
- Add queue tag to `jobs.recovered.fetch` metric [#4594]
- Refactor Pro's fetch infrastructure [#4602]

5.0.1
---------
Expand Down
38 changes: 20 additions & 18 deletions lib/sidekiq/fetch.rb
Expand Up @@ -25,8 +25,10 @@ def requeue
}

def initialize(options)
@strictly_ordered_queues = !!options[:strict]
@queues = options[:queues].map { |q| "queue:#{q}" }
raise ArgumentError, "missing queue list" unless options[:queues]
@options = options
@strictly_ordered_queues = !!@options[:strict]
@queues = @options[:queues].map { |q| "queue:#{q}" }
if @strictly_ordered_queues
@queues.uniq!
@queues << TIMEOUT
Expand All @@ -38,24 +40,9 @@ def retrieve_work
UnitOfWork.new(*work) if work
end

# Creating the Redis#brpop command takes into account any
# configured queue weights. By default Redis#brpop returns
# data from the first queue that has pending elements. We
# recreate the queue command each time we invoke Redis#brpop
# to honor weights and avoid queue starvation.
def queues_cmd
if @strictly_ordered_queues
@queues
else
queues = @queues.shuffle!.uniq
queues << TIMEOUT
queues
end
end

# By leaving this as a class method, it can be pluggable and used by the Manager actor. Making it
# an instance method will make it async to the Fetcher actor
def self.bulk_requeue(inprogress, options)
def bulk_requeue(inprogress, options)
return if inprogress.empty?

Sidekiq.logger.debug { "Re-queueing terminated jobs" }
Expand All @@ -76,5 +63,20 @@ def self.bulk_requeue(inprogress, options)
rescue => ex
Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.message}")
end

# Creating the Redis#brpop command takes into account any
# configured queue weights. By default Redis#brpop returns
# data from the first queue that has pending elements. We
# recreate the queue command each time we invoke Redis#brpop
# to honor weights and avoid queue starvation.
def queues_cmd
if @strictly_ordered_queues
@queues
else
queues = @queues.shuffle!.uniq
queues << TIMEOUT
queues
end
end
end
end
3 changes: 2 additions & 1 deletion lib/sidekiq/launcher.rb
Expand Up @@ -22,6 +22,7 @@ class Launcher
attr_accessor :manager, :poller, :fetcher

def initialize(options)
options[:fetch] ||= BasicFetch.new(options)
@manager = Sidekiq::Manager.new(options)
@poller = Sidekiq::Scheduled::Poller.new
@done = false
Expand Down Expand Up @@ -56,7 +57,7 @@ def stop

# Requeue everything in case there was a worker who grabbed work while stopped
# This call is a no-op in Sidekiq but necessary for Sidekiq Pro.
strategy = (@options[:fetch] || Sidekiq::BasicFetch)
strategy = @options[:fetch]
strategy.bulk_requeue([], @options)

clear_heartbeat
Expand Down
6 changes: 3 additions & 3 deletions lib/sidekiq/manager.rb
Expand Up @@ -35,7 +35,7 @@ def initialize(options = {})
@done = false
@workers = Set.new
@count.times do
@workers << Processor.new(self)
@workers << Processor.new(self, options)
end
@plock = Mutex.new
end
Expand Down Expand Up @@ -90,7 +90,7 @@ def processor_died(processor, reason)
@plock.synchronize do
@workers.delete(processor)
unless @done
p = Processor.new(self)
p = Processor.new(self, options)
@workers << p
p.start
end
Expand Down Expand Up @@ -123,7 +123,7 @@ def hard_shutdown
# contract says that jobs are run AT LEAST once. Process termination
# is delayed until we're certain the jobs are back in Redis because
# it is worse to lose a job than to run it twice.
strategy = (@options[:fetch] || Sidekiq::BasicFetch)
strategy = @options[:fetch]
strategy.bulk_requeue(jobs, @options)
end

Expand Down
8 changes: 4 additions & 4 deletions lib/sidekiq/processor.rb
Expand Up @@ -28,15 +28,15 @@ class Processor
attr_reader :thread
attr_reader :job

def initialize(mgr)
def initialize(mgr, options)
@mgr = mgr
@down = false
@done = false
@job = nil
@thread = nil
@strategy = (mgr.options[:fetch] || Sidekiq::BasicFetch).new(mgr.options)
@reloader = Sidekiq.options[:reloader]
@job_logger = (mgr.options[:job_logger] || Sidekiq::JobLogger).new
@strategy = options[:fetch]
@reloader = options[:reloader] || proc { |&block| block.call }
@job_logger = (options[:job_logger] || Sidekiq::JobLogger).new
@retrier = Sidekiq::JobRetry.new
end

Expand Down
13 changes: 8 additions & 5 deletions test/test_actors.rb
Expand Up @@ -50,7 +50,8 @@ def perform(slp)
end

it 'can start and stop' do
f = Sidekiq::Processor.new(Mgr.new)
m = Mgr.new
f = Sidekiq::Processor.new(m, m.options)
f.terminate
end

Expand All @@ -74,14 +75,16 @@ def processor_stopped(inst)
end
end
def options
{ :concurrency => 3, :queues => ['default'] }
opts = { :concurrency => 3, :queues => ['default'] }
opts[:fetch] = Sidekiq::BasicFetch.new(opts)
opts
end
end

it 'can process' do
mgr = Mgr.new

p = Sidekiq::Processor.new(mgr)
p = Sidekiq::Processor.new(mgr, mgr.options)
JoeWorker.perform_async(0)

a = $count
Expand All @@ -93,7 +96,7 @@ def options
it 'deals with errors' do
mgr = Mgr.new

p = Sidekiq::Processor.new(mgr)
p = Sidekiq::Processor.new(mgr, mgr.options)
JoeWorker.perform_async("boom")
q = Sidekiq::Queue.new
assert_equal 1, q.size
Expand All @@ -116,7 +119,7 @@ def options
it 'gracefully kills' do
mgr = Mgr.new

p = Sidekiq::Processor.new(mgr)
p = Sidekiq::Processor.new(mgr, mgr.options)
JoeWorker.perform_async(1)
q = Sidekiq::Queue.new
assert_equal 1, q.size
Expand Down
1 change: 1 addition & 0 deletions test/test_cli.rb
Expand Up @@ -419,6 +419,7 @@ def logdev

describe '#run' do
before do
Sidekiq.options[:concurrency] = 2
Sidekiq.options[:require] = './test/fake_env.rb'
end

Expand Down
2 changes: 1 addition & 1 deletion test/test_fetch.rb
Expand Up @@ -52,7 +52,7 @@
assert_equal 0, q1.size
assert_equal 0, q2.size

Sidekiq::BasicFetch.bulk_requeue(works, {:queues => []})
fetch.bulk_requeue(works, {:queues => []})
assert_equal 2, q1.size
assert_equal 1, q2.size
end
Expand Down
2 changes: 1 addition & 1 deletion test/test_manager.rb
Expand Up @@ -8,7 +8,7 @@
end

def new_manager(opts)
Sidekiq::Manager.new(opts)
Sidekiq::Manager.new(opts.merge(fetch: Sidekiq::BasicFetch.new(opts)))
end

it 'creates N processor instances' do
Expand Down
6 changes: 2 additions & 4 deletions test/test_middleware.rb
Expand Up @@ -78,10 +78,8 @@ def call(*args)
end

boss = Minitest::Mock.new
boss.expect(:options, {:queues => ['default'] }, [])
boss.expect(:options, {:queues => ['default'] }, [])
boss.expect(:options, {:queues => ['default'] }, [])
processor = Sidekiq::Processor.new(boss)
opts = {:queues => ['default'] }
processor = Sidekiq::Processor.new(boss, opts)
boss.expect(:processor_done, nil, [processor])
processor.process(Sidekiq::BasicFetch::UnitOfWork.new('queue:default', msg))
assert_equal %w(2 before 3 before 1 before work_performed 1 after 3 after 2 after), $recorder.flatten
Expand Down
22 changes: 8 additions & 14 deletions test/test_processor.rb
Expand Up @@ -11,10 +11,9 @@
before do
$invokes = 0
@mgr = Minitest::Mock.new
@mgr.expect(:options, {:queues => ['default']})
@mgr.expect(:options, {:queues => ['default']})
@mgr.expect(:options, {:queues => ['default']})
@processor = ::Sidekiq::Processor.new(@mgr)
opts = {:queues => ['default']}
opts[:fetch] = Sidekiq::BasicFetch.new(opts)
@processor = ::Sidekiq::Processor.new(@mgr, opts)
end

class MockWorker
Expand Down Expand Up @@ -327,11 +326,9 @@ def call(item, queue)
end

before do
opts = {:queues => ['default'], job_logger: CustomJobLogger}
@mgr = Minitest::Mock.new
@mgr.expect(:options, {:queues => ['default'], job_logger: CustomJobLogger})
@mgr.expect(:options, {:queues => ['default'], job_logger: CustomJobLogger})
@mgr.expect(:options, {:queues => ['default'], job_logger: CustomJobLogger})
@processor = ::Sidekiq::Processor.new(@mgr)
@processor = ::Sidekiq::Processor.new(@mgr, opts)
end
end
end
Expand All @@ -356,18 +353,15 @@ def successful_job

describe 'custom job logger class' do
before do
@mgr = Minitest::Mock.new
@mgr.expect(:options, {:queues => ['default'], :job_logger => CustomJobLogger})
@mgr.expect(:options, {:queues => ['default'], :job_logger => CustomJobLogger})
@mgr.expect(:options, {:queues => ['default'], :job_logger => CustomJobLogger})
@processor = ::Sidekiq::Processor.new(@mgr)
opts = {:queues => ['default'], :job_logger => CustomJobLogger}
opts[:fetch] = Sidekiq::BasicFetch.new(opts)
@processor = ::Sidekiq::Processor.new(nil, opts)
end

it 'is called instead default Sidekiq::JobLogger' do
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] })
@processor.process(work(msg))
assert_equal 1, $invokes
@mgr.verify
end
end
end