Skip to content

Commit

Permalink
Fixes Cluster worker shutdown/restart (#1908)
Browse files Browse the repository at this point in the history
* cluster.rb - fixup worker wait in Cluster, add Worker#term?

1. Cluster - fix wait in #check_workers, #stop_workers
2. Cluster - add private wait_workers method for use in above
2. Worker  - add #term? method for use in above

* Adds two tests for worker SIGTERM/respawn and phased-restart

test_worker_spawn_external_term - sends SIGTERM to workers, checks respawn, etc

test_worker_phased_restart - checking worker handling during phased-restart
  • Loading branch information
MSP-Greg authored and nateberkopec committed Sep 9, 2019
1 parent d5c394e commit f759017
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 0 deletions.
8 changes: 8 additions & 0 deletions lib/puma/cluster.rb
Expand Up @@ -74,6 +74,10 @@ def initialize(idx, pid, phase, options)
@started_at = Time.now
@last_checkin = Time.now
@last_status = '{}'
<<<<<<< HEAD
=======
@dead = false
>>>>>>> Fixes Cluster worker shutdown/restart (#1908)
@term = false
end

Expand All @@ -92,6 +96,10 @@ def term?
@term
end

def term?
@term
end

def ping!(status)
@last_checkin = Time.now
@last_status = status
Expand Down
129 changes: 129 additions & 0 deletions test/test_integration.rb
Expand Up @@ -9,8 +9,12 @@
# TODO: remove stdout logging, get everything out of my rainbow dots

class TestIntegration < Minitest::Test
<<<<<<< HEAD
HOST = "127.0.0.1"
TOKEN = "xxyyzz"
=======
HOST = '127.0.0.1'
>>>>>>> Fixes Cluster worker shutdown/restart (#1908)

def setup
@state_path = "test/test_#{name}_puma.state"
Expand Down Expand Up @@ -533,4 +537,129 @@ def run_launcher(conf)

[thr, launcher, @events]
end

def test_worker_spawn_external_term
worker_respawn { |l, old_pids|
old_pids.each { |p| Process.kill :TERM, p }
}
end

def test_worker_phased_restart
worker_respawn { |l, old_pids| l.phased_restart }
end

private

def worker_respawn
skip NO_FORK_MSG unless HAS_FORK
port = UniquePort.call
workers_booted = 0

conf = Puma::Configuration.new do |c|
c.bind "tcp://#{HOST}:#{port}"
c.threads 1, 1
c.workers 2
c.worker_shutdown_timeout 2
c.app TestApps::SLEEP
c.after_worker_fork { |idx| workers_booted += 1 }
end

# start Puma via launcher
thr, launcher, _e = run_launcher conf

# make sure two workers have booted
time = 0
until workers_booted >= 2 || time >= 10
sleep 2
time += 2
end

cluster = launcher.instance_variable_get :@runner

http0 = Net::HTTP.new HOST, port
http1 = Net::HTTP.new HOST, port
body0 = nil
body1 = nil

worker0 = Thread.new do
begin
req0 = Net::HTTP::Get.new "/sleep35", {}
http0.start.request(req0) { |rep0| body0 = rep0.body }
rescue
end
end

worker1 = Thread.new do
begin
req1 = Net::HTTP::Get.new "/sleep40", {}
http1.start.request(req1) { |rep1| body1 = rep1.body }
rescue
end
end

old_pids = cluster.instance_variable_get(:@workers).map(&:pid)

start_time = Time.now.to_f

# below should 'cancel' the phase 0 workers, either via phased_restart or
# externally SIGTERM'ing them
yield launcher, old_pids

# make sure four workers have booted
time = 0
until workers_booted >= 4 || time >= 45
sleep 2
time += 2
end

new_pids = cluster.instance_variable_get(:@workers).map(&:pid)

# should be empty if all old workers removed
old_waited = old_pids.map { |pid|
begin
Process.wait(pid, Process::WNOHANG)
pid
rescue Errno::ECHILD
nil # child is already terminated
end
}.compact

Thread.kill worker0
Thread.kill worker1

launcher.stop
assert_kind_of Thread, thr.join, "server didn't stop"

refute_equal 'Slept 35', body0
refute_equal 'Slept 40', body1

# Since 35 is the shorter of the two requests, server should restart
# and cancel both requests
assert_operator (Time.now.to_f - start_time).round(2), :<, 35

msg = "old_pids #{old_pids.inspect} new_pids #{new_pids.inspect} old_waited #{old_waited.inspect}"
assert_equal 2, new_pids.length, msg
assert_equal 2, old_pids.length, msg
assert_empty new_pids & old_pids, "#{msg}\nBoth workers should be replaced with new"
assert_empty old_waited, msg
end

def run_launcher(conf)
# below for future PR
#@wait, @ready = IO.pipe
# @ios_to_close << @wait << @ready
#@events = Puma::Events.strings
#@events.on_booted { @ready << "!" }

launcher = Puma::Launcher.new conf, :events => @events

thr = Thread.new do
launcher.run
end

# wait for boot from #@events.on_booted
@wait.sysread 1

[thr, launcher, @events]
end
end

0 comments on commit f759017

Please sign in to comment.