From f759017e35574f4ad7d20c543aa544fea28c7da1 Mon Sep 17 00:00:00 2001 From: MSP-Greg Date: Fri, 23 Aug 2019 02:07:28 -0500 Subject: [PATCH] Fixes Cluster worker shutdown/restart (#1908) * cluster.rb - fixup worker wait in Cluster, add Worker#term? 1. Cluster - fix wait in #check_workers, #stop_workers 2. Cluster - add private wait_workers method for use in above 2. Worker - add #term? method for use in above * Adds two tests for worker SIGTERM/respawn and phased-restart test_worker_spawn_external_term - sends SIGTERM to workers, checks respawn, etc test_worker_phased_restart - checking worker handling during phased-restart --- lib/puma/cluster.rb | 8 +++ test/test_integration.rb | 129 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 137 insertions(+) diff --git a/lib/puma/cluster.rb b/lib/puma/cluster.rb index d3da53d6a8..69eb059da4 100644 --- a/lib/puma/cluster.rb +++ b/lib/puma/cluster.rb @@ -74,6 +74,10 @@ def initialize(idx, pid, phase, options) @started_at = Time.now @last_checkin = Time.now @last_status = '{}' +<<<<<<< HEAD +======= + @dead = false +>>>>>>> Fixes Cluster worker shutdown/restart (#1908) @term = false end @@ -92,6 +96,10 @@ def term? @term end + def term? + @term + end + def ping!(status) @last_checkin = Time.now @last_status = status diff --git a/test/test_integration.rb b/test/test_integration.rb index 066c43aca5..5beb10a9f8 100644 --- a/test/test_integration.rb +++ b/test/test_integration.rb @@ -9,8 +9,12 @@ # TODO: remove stdout logging, get everything out of my rainbow dots class TestIntegration < Minitest::Test +<<<<<<< HEAD HOST = "127.0.0.1" TOKEN = "xxyyzz" +======= + HOST = '127.0.0.1' +>>>>>>> Fixes Cluster worker shutdown/restart (#1908) def setup @state_path = "test/test_#{name}_puma.state" @@ -533,4 +537,129 @@ def run_launcher(conf) [thr, launcher, @events] end + + def test_worker_spawn_external_term + worker_respawn { |l, old_pids| + old_pids.each { |p| Process.kill :TERM, p } + } + end + + def test_worker_phased_restart + worker_respawn { |l, old_pids| l.phased_restart } + end + + private + + def worker_respawn + skip NO_FORK_MSG unless HAS_FORK + port = UniquePort.call + workers_booted = 0 + + conf = Puma::Configuration.new do |c| + c.bind "tcp://#{HOST}:#{port}" + c.threads 1, 1 + c.workers 2 + c.worker_shutdown_timeout 2 + c.app TestApps::SLEEP + c.after_worker_fork { |idx| workers_booted += 1 } + end + + # start Puma via launcher + thr, launcher, _e = run_launcher conf + + # make sure two workers have booted + time = 0 + until workers_booted >= 2 || time >= 10 + sleep 2 + time += 2 + end + + cluster = launcher.instance_variable_get :@runner + + http0 = Net::HTTP.new HOST, port + http1 = Net::HTTP.new HOST, port + body0 = nil + body1 = nil + + worker0 = Thread.new do + begin + req0 = Net::HTTP::Get.new "/sleep35", {} + http0.start.request(req0) { |rep0| body0 = rep0.body } + rescue + end + end + + worker1 = Thread.new do + begin + req1 = Net::HTTP::Get.new "/sleep40", {} + http1.start.request(req1) { |rep1| body1 = rep1.body } + rescue + end + end + + old_pids = cluster.instance_variable_get(:@workers).map(&:pid) + + start_time = Time.now.to_f + + # below should 'cancel' the phase 0 workers, either via phased_restart or + # externally SIGTERM'ing them + yield launcher, old_pids + + # make sure four workers have booted + time = 0 + until workers_booted >= 4 || time >= 45 + sleep 2 + time += 2 + end + + new_pids = cluster.instance_variable_get(:@workers).map(&:pid) + + # should be empty if all old workers removed + old_waited = old_pids.map { |pid| + begin + Process.wait(pid, Process::WNOHANG) + pid + rescue Errno::ECHILD + nil # child is already terminated + end + }.compact + + Thread.kill worker0 + Thread.kill worker1 + + launcher.stop + assert_kind_of Thread, thr.join, "server didn't stop" + + refute_equal 'Slept 35', body0 + refute_equal 'Slept 40', body1 + + # Since 35 is the shorter of the two requests, server should restart + # and cancel both requests + assert_operator (Time.now.to_f - start_time).round(2), :<, 35 + + msg = "old_pids #{old_pids.inspect} new_pids #{new_pids.inspect} old_waited #{old_waited.inspect}" + assert_equal 2, new_pids.length, msg + assert_equal 2, old_pids.length, msg + assert_empty new_pids & old_pids, "#{msg}\nBoth workers should be replaced with new" + assert_empty old_waited, msg + end + + def run_launcher(conf) + # below for future PR + #@wait, @ready = IO.pipe + # @ios_to_close << @wait << @ready + #@events = Puma::Events.strings + #@events.on_booted { @ready << "!" } + + launcher = Puma::Launcher.new conf, :events => @events + + thr = Thread.new do + launcher.run + end + + # wait for boot from #@events.on_booted + @wait.sysread 1 + + [thr, launcher, @events] + end end