Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes Cluster worker shutdown/restart #1908

Merged
merged 2 commits into from Aug 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
65 changes: 30 additions & 35 deletions lib/puma/cluster.rb
Expand Up @@ -35,34 +35,10 @@ def stop_workers
@workers.each { |x| x.term }

begin
if RUBY_VERSION < '2.6'
@workers.each do |w|
begin
Process.waitpid(w.pid)
rescue Errno::ECHILD
# child is already terminated
end
end
else
# below code is for a bug in Ruby 2.6+, above waitpid call hangs
t_st = Process.clock_gettime(Process::CLOCK_MONOTONIC)
pids = @workers.map(&:pid)
loop do
pids.reject! do |w_pid|
begin
if Process.waitpid(w_pid, Process::WNOHANG)
log " worker status: #{$?}"
true
end
rescue Errno::ECHILD
true # child is already terminated
end
end
break if pids.empty?
sleep 0.5
end
t_end = Process.clock_gettime(Process::CLOCK_MONOTONIC)
log format(" worker shutdown time: %6.2f", t_end - t_st)
loop do
wait_workers
break if @workers.empty?
sleep 0.2
end
rescue Interrupt
log "! Cancelled waiting for workers"
Expand Down Expand Up @@ -99,6 +75,7 @@ def initialize(idx, pid, phase, options)
@last_checkin = Time.now
@last_status = '{}'
@dead = false
@term = false
end

attr_reader :index, :pid, :phase, :signal, :last_checkin, :last_status, :started_at
Expand All @@ -120,6 +97,10 @@ def dead!
@dead = true
end

def term?
@term
end

def ping!(status)
@last_checkin = Time.now
@last_status = status
Expand All @@ -134,9 +115,9 @@ def term
if @first_term_sent && (Time.now - @first_term_sent) > @options[:worker_shutdown_timeout]
@signal = "KILL"
else
@term ||= true
@first_term_sent ||= Time.now
end

Process.kill @signal, @pid
rescue Errno::ESRCH
end
Expand Down Expand Up @@ -227,12 +208,7 @@ def check_workers(force=false)
# during this loop by giving the kernel time to kill them.
sleep 1 if any

pids = []
while pid = Process.waitpid(-1, Process::WNOHANG) do
pids << pid
end
@workers.reject! { |w| w.dead? || pids.include?(w.pid) }

wait_workers
cull_workers
spawn_workers

Expand Down Expand Up @@ -554,5 +530,24 @@ def run
@wakeup.close
end
end

private

# loops thru @workers, removing workers that exited, and calling
# `#term` if needed
def wait_workers
@workers.reject! do |w|
begin
if Process.wait(w.pid, Process::WNOHANG)
true
else
w.term if w.term?
nil
end
rescue Errno::ECHILD
true # child is already terminated
end
end
end
end
end
137 changes: 131 additions & 6 deletions test/test_integration.rb
Expand Up @@ -9,13 +9,13 @@
# TODO: remove stdout logging, get everything out of my rainbow dots

class TestIntegration < Minitest::Test
HOST = "127.0.0.1"
TOKEN = "xxyyzz"

def setup
unique = UniquePort.call
@state_path = "test/test_#{unique}_puma.state"
@bind_path = "test/test_#{unique}_server.sock"
@control_path = "test/test_#{unique}_control.sock"
@state_path = "test/test_#{name}_puma.state"
@bind_path = "test/test_#{name}_server.sock"
@control_path = "test/test_#{name}_control.sock"

@server = nil

Expand All @@ -26,8 +26,8 @@ def setup
end

def teardown
File.unlink @state_path rescue nil
File.unlink @bind_path rescue nil
File.unlink @state_path rescue nil
File.unlink @bind_path rescue nil
File.unlink @control_path rescue nil

@wait.close
Expand Down Expand Up @@ -399,4 +399,129 @@ def test_no_zombie_children
end.compact
assert_empty zombies, "Process ids #{zombies} became zombies"
end

def test_worker_spawn_external_term
worker_respawn { |l, old_pids|
old_pids.each { |p| Process.kill :TERM, p }
}
end

def test_worker_phased_restart
worker_respawn { |l, old_pids| l.phased_restart }
end

private

def worker_respawn
skip NO_FORK_MSG unless HAS_FORK
port = UniquePort.call
workers_booted = 0

conf = Puma::Configuration.new do |c|
c.bind "tcp://#{HOST}:#{port}"
c.threads 1, 1
c.workers 2
c.worker_shutdown_timeout 2
c.app TestApps::SLEEP
c.after_worker_fork { |idx| workers_booted += 1 }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a new mechanism for determining if the process has finished booting? We already have two methods.

end

# start Puma via launcher
thr, launcher, _e = run_launcher conf

# make sure two workers have booted
time = 0
until workers_booted >= 2 || time >= 10
sleep 2
time += 2
end

cluster = launcher.instance_variable_get :@runner

http0 = Net::HTTP.new HOST, port
http1 = Net::HTTP.new HOST, port
body0 = nil
body1 = nil

worker0 = Thread.new do
begin
req0 = Net::HTTP::Get.new "/sleep35", {}
http0.start.request(req0) { |rep0| body0 = rep0.body }
rescue
end
end

worker1 = Thread.new do
begin
req1 = Net::HTTP::Get.new "/sleep40", {}
http1.start.request(req1) { |rep1| body1 = rep1.body }
rescue
end
end

old_pids = cluster.instance_variable_get(:@workers).map(&:pid)

start_time = Time.now.to_f

# below should 'cancel' the phase 0 workers, either via phased_restart or
# externally SIGTERM'ing them
yield launcher, old_pids

# make sure four workers have booted
time = 0
until workers_booted >= 4 || time >= 45
sleep 2
time += 2
end

new_pids = cluster.instance_variable_get(:@workers).map(&:pid)

# should be empty if all old workers removed
old_waited = old_pids.map { |pid|
begin
Process.wait(pid, Process::WNOHANG)
pid
rescue Errno::ECHILD
nil # child is already terminated
end
}.compact

Thread.kill worker0
Thread.kill worker1

launcher.stop
assert_kind_of Thread, thr.join, "server didn't stop"

refute_equal 'Slept 35', body0
refute_equal 'Slept 40', body1

# Since 35 is the shorter of the two requests, server should restart
# and cancel both requests
assert_operator (Time.now.to_f - start_time).round(2), :<, 35

msg = "old_pids #{old_pids.inspect} new_pids #{new_pids.inspect} old_waited #{old_waited.inspect}"
assert_equal 2, new_pids.length, msg
assert_equal 2, old_pids.length, msg
assert_empty new_pids & old_pids, "#{msg}\nBoth workers should be replaced with new"
assert_empty old_waited, msg
end

def run_launcher(conf)
# below for future PR
#@wait, @ready = IO.pipe
# @ios_to_close << @wait << @ready
#@events = Puma::Events.strings
#@events.on_booted { @ready << "!" }

launcher = Puma::Launcher.new conf, :events => @events

thr = Thread.new do
launcher.run
end

# wait for boot from #@events.on_booted
@wait.sysread 1

[thr, launcher, @events]
end
end