Skip to content

Commit

Permalink
Fixes Cluster worker shutdown/restart (#1908)
Browse files Browse the repository at this point in the history
* cluster.rb - fixup worker wait in Cluster, add Worker#term?

1. Cluster - fix wait in #check_workers, #stop_workers
2. Cluster - add private wait_workers method for use in above
2. Worker  - add #term? method for use in above

* Adds two tests for worker SIGTERM/respawn and phased-restart

test_worker_spawn_external_term - sends SIGTERM to workers, checks respawn, etc

test_worker_phased_restart - checking worker handling during phased-restart
  • Loading branch information
MSP-Greg authored and nateberkopec committed Sep 5, 2019
1 parent 736117c commit c9a584c
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 38 deletions.
65 changes: 30 additions & 35 deletions lib/puma/cluster.rb
Expand Up @@ -35,34 +35,10 @@ def stop_workers
@workers.each { |x| x.term }

begin
if RUBY_VERSION < '2.6'
@workers.each do |w|
begin
Process.waitpid(w.pid)
rescue Errno::ECHILD
# child is already terminated
end
end
else
# below code is for a bug in Ruby 2.6+, above waitpid call hangs
t_st = Process.clock_gettime(Process::CLOCK_MONOTONIC)
pids = @workers.map(&:pid)
loop do
pids.reject! do |w_pid|
begin
if Process.waitpid(w_pid, Process::WNOHANG)
log " worker status: #{$?}"
true
end
rescue Errno::ECHILD
true # child is already terminated
end
end
break if pids.empty?
sleep 0.5
end
t_end = Process.clock_gettime(Process::CLOCK_MONOTONIC)
log format(" worker shutdown time: %6.2f", t_end - t_st)
loop do
wait_workers
break if @workers.empty?
sleep 0.2
end
rescue Interrupt
log "! Cancelled waiting for workers"
Expand Down Expand Up @@ -99,6 +75,7 @@ def initialize(idx, pid, phase, options)
@last_checkin = Time.now
@last_status = '{}'
@dead = false
@term = false
end

attr_reader :index, :pid, :phase, :signal, :last_checkin, :last_status, :started_at
Expand All @@ -120,6 +97,10 @@ def dead!
@dead = true
end

def term?
@term
end

def ping!(status)
@last_checkin = Time.now
@last_status = status
Expand All @@ -134,9 +115,9 @@ def term
if @first_term_sent && (Time.now - @first_term_sent) > @options[:worker_shutdown_timeout]
@signal = "KILL"
else
@term ||= true
@first_term_sent ||= Time.now
end

Process.kill @signal, @pid
rescue Errno::ESRCH
end
Expand Down Expand Up @@ -227,12 +208,7 @@ def check_workers(force=false)
# during this loop by giving the kernel time to kill them.
sleep 1 if any

pids = []
while pid = Process.waitpid(-1, Process::WNOHANG) do
pids << pid
end
@workers.reject! { |w| w.dead? || pids.include?(w.pid) }

wait_workers
cull_workers
spawn_workers

Expand Down Expand Up @@ -554,5 +530,24 @@ def run
@wakeup.close
end
end

private

# loops thru @workers, removing workers that exited, and calling
# `#term` if needed
def wait_workers
@workers.reject! do |w|
begin
if Process.wait(w.pid, Process::WNOHANG)
true
else
w.term if w.term?
nil
end
rescue Errno::ECHILD
true # child is already terminated
end
end
end
end
end
132 changes: 129 additions & 3 deletions test/test_integration.rb
Expand Up @@ -10,7 +10,8 @@
# TODO: remove stdout logging, get everything out of my rainbow dots

class TestIntegration < Minitest::Test

HOST = '127.0.0.1'

def setup
@state_path = "test/test_puma.state"
@bind_path = "test/test_server.sock"
Expand All @@ -26,8 +27,8 @@ def setup
end

def teardown
File.unlink @state_path rescue nil
File.unlink @bind_path rescue nil
File.unlink @state_path rescue nil
File.unlink @bind_path rescue nil
File.unlink @control_path rescue nil

@wait.close
Expand Down Expand Up @@ -392,4 +393,129 @@ def test_no_zombie_children
# Should return nil if Puma has correctly cleaned up
assert_nil Process.waitpid(-1, Process::WNOHANG)
end

def test_worker_spawn_external_term
worker_respawn { |l, old_pids|
old_pids.each { |p| Process.kill :TERM, p }
}
end

def test_worker_phased_restart
worker_respawn { |l, old_pids| l.phased_restart }
end

private

def worker_respawn
skip NO_FORK_MSG unless HAS_FORK
port = UniquePort.call
workers_booted = 0

conf = Puma::Configuration.new do |c|
c.bind "tcp://#{HOST}:#{port}"
c.threads 1, 1
c.workers 2
c.worker_shutdown_timeout 2
c.app TestApps::SLEEP
c.after_worker_fork { |idx| workers_booted += 1 }
end

# start Puma via launcher
thr, launcher, _e = run_launcher conf

# make sure two workers have booted
time = 0
until workers_booted >= 2 || time >= 10
sleep 2
time += 2
end

cluster = launcher.instance_variable_get :@runner

http0 = Net::HTTP.new HOST, port
http1 = Net::HTTP.new HOST, port
body0 = nil
body1 = nil

worker0 = Thread.new do
begin
req0 = Net::HTTP::Get.new "/sleep35", {}
http0.start.request(req0) { |rep0| body0 = rep0.body }
rescue
end
end

worker1 = Thread.new do
begin
req1 = Net::HTTP::Get.new "/sleep40", {}
http1.start.request(req1) { |rep1| body1 = rep1.body }
rescue
end
end

old_pids = cluster.instance_variable_get(:@workers).map(&:pid)

start_time = Time.now.to_f

# below should 'cancel' the phase 0 workers, either via phased_restart or
# externally SIGTERM'ing them
yield launcher, old_pids

# make sure four workers have booted
time = 0
until workers_booted >= 4 || time >= 45
sleep 2
time += 2
end

new_pids = cluster.instance_variable_get(:@workers).map(&:pid)

# should be empty if all old workers removed
old_waited = old_pids.map { |pid|
begin
Process.wait(pid, Process::WNOHANG)
pid
rescue Errno::ECHILD
nil # child is already terminated
end
}.compact

Thread.kill worker0
Thread.kill worker1

launcher.stop
assert_kind_of Thread, thr.join, "server didn't stop"

refute_equal 'Slept 35', body0
refute_equal 'Slept 40', body1

# Since 35 is the shorter of the two requests, server should restart
# and cancel both requests
assert_operator (Time.now.to_f - start_time).round(2), :<, 35

msg = "old_pids #{old_pids.inspect} new_pids #{new_pids.inspect} old_waited #{old_waited.inspect}"
assert_equal 2, new_pids.length, msg
assert_equal 2, old_pids.length, msg
assert_empty new_pids & old_pids, "#{msg}\nBoth workers should be replaced with new"
assert_empty old_waited, msg
end

def run_launcher(conf)
# below for future PR
#@wait, @ready = IO.pipe
# @ios_to_close << @wait << @ready
#@events = Puma::Events.strings
#@events.on_booted { @ready << "!" }

launcher = Puma::Launcher.new conf, :events => @events

thr = Thread.new do
launcher.run
end

# wait for boot from #@events.on_booted
@wait.sysread 1

[thr, launcher, @events]
end
end

0 comments on commit c9a584c

Please sign in to comment.