Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternate worker culling strategy #2773

Merged
merged 1 commit into from Jan 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 12 additions & 5 deletions lib/puma/cluster.rb
Expand Up @@ -111,7 +111,14 @@ def cull_workers

debug "Culling #{diff.inspect} workers"

workers_to_cull = @workers[-diff,diff]
workers_to_cull =
case @options[:worker_culling_strategy]
when :youngest
@workers.sort_by(&:started_at)[-diff,diff]
when :oldest
@workers.sort_by(&:started_at)[0,diff]
end

debug "Workers to cull: #{workers_to_cull.inspect}"

workers_to_cull.each do |worker|
Expand All @@ -122,10 +129,10 @@ def cull_workers

# @!attribute [r] next_worker_index
def next_worker_index
all_positions = 0...@options[:workers]
occupied_positions = @workers.map { |w| w.index }
available_positions = all_positions.to_a - occupied_positions
available_positions.first
occupied_positions = @workers.map(&:index)
idx = 0
idx += 1 until !occupied_positions.include?(idx)
idx
Comment on lines -125 to +135
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I changed this method's implementation is because I ran into an edge case while writing tests for this PR: if @options[:workers] is decreased (e.g. because the process received a TTOU signal), the previous implementation could potentially return nil.

end

def all_workers_booted?
Expand Down
1 change: 1 addition & 0 deletions lib/puma/configuration.rb
Expand Up @@ -200,6 +200,7 @@ def puma_default_options
:worker_timeout => DefaultWorkerTimeout,
:worker_boot_timeout => DefaultWorkerTimeout,
:worker_shutdown_timeout => DefaultWorkerShutdownTimeout,
:worker_culling_strategy => :youngest,
:remote_address => :socket,
:tag => method(:infer_tag),
:environment => -> { ENV['APP_ENV'] || ENV['RACK_ENV'] || ENV['RAILS_ENV'] || 'development' },
Expand Down
24 changes: 24 additions & 0 deletions lib/puma/dsl.rb
Expand Up @@ -795,6 +795,30 @@ def worker_shutdown_timeout(timeout)
@options[:worker_shutdown_timeout] = Integer(timeout)
end

# Set the strategy for worker culling.
#
# There are two possible values:
#
# 1. **:youngest** - the youngest workers (i.e. the workers that were
# the most recently started) will be culled.
# 2. **:oldest** - the oldest workers (i.e. the workers that were started
# the longest time ago) will be culled.
#
# @note Cluster mode only.
# @example
# worker_culling_strategy :oldest
# @see Puma::Cluster#cull_workers
#
def worker_culling_strategy(strategy)
stategy = strategy.to_sym

if ![:youngest, :oldest].include?(strategy)
raise "Invalid value for worker_culling_strategy - #{stategy}"
end

@options[:worker_culling_strategy] = strategy
end

# When set to true (the default), workers accept all requests
# and queue them before passing them to the handlers.
# When set to false, each worker process accepts exactly as
Expand Down
50 changes: 50 additions & 0 deletions test/test_integration_cluster.rb
Expand Up @@ -351,6 +351,56 @@ def test_warning_message_not_outputted_when_single_worker_silenced
refute_match(/WARNING: Detected running cluster mode with 1 worker/, output.join)
end

def test_signal_ttin
cli_server "-w 2 test/rackup/hello.ru"
get_worker_pids # to consume server logs

Process.kill :TTIN, @pid

line = @server.gets
assert_match(/Worker 2 \(PID: \d+\) booted in/, line)
end

def test_signal_ttou
cli_server "-w 2 test/rackup/hello.ru"
get_worker_pids # to consume server logs

Process.kill :TTOU, @pid

line = @server.gets
assert_match(/Worker 1 \(PID: \d+\) terminating/, line)
end

def test_culling_strategy_youngest
cli_server "-w 2 test/rackup/hello.ru", config: "worker_culling_strategy :youngest"
get_worker_pids # to consume server logs

Process.kill :TTIN, @pid

line = @server.gets
assert_match(/Worker 2 \(PID: \d+\) booted in/, line)

Process.kill :TTOU, @pid

line = @server.gets
assert_match(/Worker 2 \(PID: \d+\) terminating/, line)
end

def test_culling_strategy_oldest
cli_server "-w 2 test/rackup/hello.ru", config: "worker_culling_strategy :oldest"
get_worker_pids # to consume server logs

Process.kill :TTIN, @pid

line = @server.gets
assert_match(/Worker 2 \(PID: \d+\) booted in/, line)

Process.kill :TTOU, @pid

line = @server.gets
assert_match(/Worker 0 \(PID: \d+\) terminating/, line)
end

private

def worker_timeout(timeout, iterations, details, config)
Expand Down