From 5e962360cee64c314d9201ebde072270c500ac9e Mon Sep 17 00:00:00 2001 From: MSP-Greg Date: Fri, 23 Aug 2019 02:07:28 -0500 Subject: [PATCH] Fixes Cluster worker shutdown/restart (#1908) * cluster.rb - fixup worker wait in Cluster, add Worker#term? 1. Cluster - fix wait in #check_workers, #stop_workers 2. Cluster - add private wait_workers method for use in above 2. Worker - add #term? method for use in above * Adds two tests for worker SIGTERM/respawn and phased-restart test_worker_spawn_external_term - sends SIGTERM to workers, checks respawn, etc test_worker_phased_restart - checking worker handling during phased-restart --- lib/puma/cluster.rb | 65 +++++++++----------- test/test_integration.rb | 130 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 158 insertions(+), 37 deletions(-) diff --git a/lib/puma/cluster.rb b/lib/puma/cluster.rb index 952e8c5d33..006dc19f53 100644 --- a/lib/puma/cluster.rb +++ b/lib/puma/cluster.rb @@ -35,34 +35,10 @@ def stop_workers @workers.each { |x| x.term } begin - if RUBY_VERSION < '2.6' - @workers.each do |w| - begin - Process.waitpid(w.pid) - rescue Errno::ECHILD - # child is already terminated - end - end - else - # below code is for a bug in Ruby 2.6+, above waitpid call hangs - t_st = Process.clock_gettime(Process::CLOCK_MONOTONIC) - pids = @workers.map(&:pid) - loop do - pids.reject! do |w_pid| - begin - if Process.waitpid(w_pid, Process::WNOHANG) - log " worker status: #{$?}" - true - end - rescue Errno::ECHILD - true # child is already terminated - end - end - break if pids.empty? - sleep 0.5 - end - t_end = Process.clock_gettime(Process::CLOCK_MONOTONIC) - log format(" worker shutdown time: %6.2f", t_end - t_st) + loop do + wait_workers + break if @workers.empty? + sleep 0.2 end rescue Interrupt log "! Cancelled waiting for workers" @@ -99,6 +75,7 @@ def initialize(idx, pid, phase, options) @last_checkin = Time.now @last_status = '{}' @dead = false + @term = false end attr_reader :index, :pid, :phase, :signal, :last_checkin, :last_status, :started_at @@ -120,6 +97,10 @@ def dead! @dead = true end + def term? + @term + end + def ping!(status) @last_checkin = Time.now @last_status = status @@ -134,9 +115,9 @@ def term if @first_term_sent && (Time.now - @first_term_sent) > @options[:worker_shutdown_timeout] @signal = "KILL" else + @term ||= true @first_term_sent ||= Time.now end - Process.kill @signal, @pid rescue Errno::ESRCH end @@ -227,12 +208,7 @@ def check_workers(force=false) # during this loop by giving the kernel time to kill them. sleep 1 if any - pids = [] - while pid = Process.waitpid(-1, Process::WNOHANG) do - pids << pid - end - @workers.reject! { |w| w.dead? || pids.include?(w.pid) } - + wait_workers cull_workers spawn_workers @@ -554,5 +530,24 @@ def run @wakeup.close end end + + private + + # loops thru @workers, removing workers that exited, and calling + # `#term` if needed + def wait_workers + @workers.reject! do |w| + begin + if Process.wait(w.pid, Process::WNOHANG) + true + else + w.term if w.term? + nil + end + rescue Errno::ECHILD + true # child is already terminated + end + end + end end end diff --git a/test/test_integration.rb b/test/test_integration.rb index d886c323a1..c29fe44812 100644 --- a/test/test_integration.rb +++ b/test/test_integration.rb @@ -10,6 +10,7 @@ # TODO: remove stdout logging, get everything out of my rainbow dots class TestIntegration < Minitest::Test + HOST = '127.0.0.1' def setup @state_path = "test/test_puma.state" @@ -26,8 +27,8 @@ def setup end def teardown - File.unlink @state_path rescue nil - File.unlink @bind_path rescue nil + File.unlink @state_path rescue nil + File.unlink @bind_path rescue nil File.unlink @control_path rescue nil @wait.close @@ -392,4 +393,129 @@ def test_no_zombie_children # Should return nil if Puma has correctly cleaned up assert_nil Process.waitpid(-1, Process::WNOHANG) end + + def test_worker_spawn_external_term + worker_respawn { |l, old_pids| + old_pids.each { |p| Process.kill :TERM, p } + } + end + + def test_worker_phased_restart + worker_respawn { |l, old_pids| l.phased_restart } + end + + private + + def worker_respawn + skip NO_FORK_MSG unless HAS_FORK + port = UniquePort.call + workers_booted = 0 + + conf = Puma::Configuration.new do |c| + c.bind "tcp://#{HOST}:#{port}" + c.threads 1, 1 + c.workers 2 + c.worker_shutdown_timeout 2 + c.app TestApps::SLEEP + c.after_worker_fork { |idx| workers_booted += 1 } + end + + # start Puma via launcher + thr, launcher, _e = run_launcher conf + + # make sure two workers have booted + time = 0 + until workers_booted >= 2 || time >= 10 + sleep 2 + time += 2 + end + + cluster = launcher.instance_variable_get :@runner + + http0 = Net::HTTP.new HOST, port + http1 = Net::HTTP.new HOST, port + body0 = nil + body1 = nil + + worker0 = Thread.new do + begin + req0 = Net::HTTP::Get.new "/sleep35", {} + http0.start.request(req0) { |rep0| body0 = rep0.body } + rescue + end + end + + worker1 = Thread.new do + begin + req1 = Net::HTTP::Get.new "/sleep40", {} + http1.start.request(req1) { |rep1| body1 = rep1.body } + rescue + end + end + + old_pids = cluster.instance_variable_get(:@workers).map(&:pid) + + start_time = Time.now.to_f + + # below should 'cancel' the phase 0 workers, either via phased_restart or + # externally SIGTERM'ing them + yield launcher, old_pids + + # make sure four workers have booted + time = 0 + until workers_booted >= 4 || time >= 45 + sleep 2 + time += 2 + end + + new_pids = cluster.instance_variable_get(:@workers).map(&:pid) + + # should be empty if all old workers removed + old_waited = old_pids.map { |pid| + begin + Process.wait(pid, Process::WNOHANG) + pid + rescue Errno::ECHILD + nil # child is already terminated + end + }.compact + + Thread.kill worker0 + Thread.kill worker1 + + launcher.stop + assert_kind_of Thread, thr.join, "server didn't stop" + + refute_equal 'Slept 35', body0 + refute_equal 'Slept 40', body1 + + # Since 35 is the shorter of the two requests, server should restart + # and cancel both requests + assert_operator (Time.now.to_f - start_time).round(2), :<, 35 + + msg = "old_pids #{old_pids.inspect} new_pids #{new_pids.inspect} old_waited #{old_waited.inspect}" + assert_equal 2, new_pids.length, msg + assert_equal 2, old_pids.length, msg + assert_empty new_pids & old_pids, "#{msg}\nBoth workers should be replaced with new" + assert_empty old_waited, msg + end + + def run_launcher(conf) + # below for future PR + #@wait, @ready = IO.pipe + # @ios_to_close << @wait << @ready + #@events = Puma::Events.strings + #@events.on_booted { @ready << "!" } + + launcher = Puma::Launcher.new conf, :events => @events + + thr = Thread.new do + launcher.run + end + + # wait for boot from #@events.on_booted + @wait.sysread 1 + + [thr, launcher, @events] + end end