diff --git a/lib/puma/cluster.rb b/lib/puma/cluster.rb index 952e8c5d33..006dc19f53 100644 --- a/lib/puma/cluster.rb +++ b/lib/puma/cluster.rb @@ -35,34 +35,10 @@ def stop_workers @workers.each { |x| x.term } begin - if RUBY_VERSION < '2.6' - @workers.each do |w| - begin - Process.waitpid(w.pid) - rescue Errno::ECHILD - # child is already terminated - end - end - else - # below code is for a bug in Ruby 2.6+, above waitpid call hangs - t_st = Process.clock_gettime(Process::CLOCK_MONOTONIC) - pids = @workers.map(&:pid) - loop do - pids.reject! do |w_pid| - begin - if Process.waitpid(w_pid, Process::WNOHANG) - log " worker status: #{$?}" - true - end - rescue Errno::ECHILD - true # child is already terminated - end - end - break if pids.empty? - sleep 0.5 - end - t_end = Process.clock_gettime(Process::CLOCK_MONOTONIC) - log format(" worker shutdown time: %6.2f", t_end - t_st) + loop do + wait_workers + break if @workers.empty? + sleep 0.2 end rescue Interrupt log "! Cancelled waiting for workers" @@ -99,6 +75,7 @@ def initialize(idx, pid, phase, options) @last_checkin = Time.now @last_status = '{}' @dead = false + @term = false end attr_reader :index, :pid, :phase, :signal, :last_checkin, :last_status, :started_at @@ -120,6 +97,10 @@ def dead! @dead = true end + def term? + @term + end + def ping!(status) @last_checkin = Time.now @last_status = status @@ -134,9 +115,9 @@ def term if @first_term_sent && (Time.now - @first_term_sent) > @options[:worker_shutdown_timeout] @signal = "KILL" else + @term ||= true @first_term_sent ||= Time.now end - Process.kill @signal, @pid rescue Errno::ESRCH end @@ -227,12 +208,7 @@ def check_workers(force=false) # during this loop by giving the kernel time to kill them. sleep 1 if any - pids = [] - while pid = Process.waitpid(-1, Process::WNOHANG) do - pids << pid - end - @workers.reject! { |w| w.dead? || pids.include?(w.pid) } - + wait_workers cull_workers spawn_workers @@ -554,5 +530,24 @@ def run @wakeup.close end end + + private + + # loops thru @workers, removing workers that exited, and calling + # `#term` if needed + def wait_workers + @workers.reject! do |w| + begin + if Process.wait(w.pid, Process::WNOHANG) + true + else + w.term if w.term? + nil + end + rescue Errno::ECHILD + true # child is already terminated + end + end + end end end diff --git a/test/test_integration.rb b/test/test_integration.rb index 70555a90fa..08cf265b55 100644 --- a/test/test_integration.rb +++ b/test/test_integration.rb @@ -9,13 +9,13 @@ # TODO: remove stdout logging, get everything out of my rainbow dots class TestIntegration < Minitest::Test + HOST = "127.0.0.1" TOKEN = "xxyyzz" def setup - unique = UniquePort.call - @state_path = "test/test_#{unique}_puma.state" - @bind_path = "test/test_#{unique}_server.sock" - @control_path = "test/test_#{unique}_control.sock" + @state_path = "test/test_#{name}_puma.state" + @bind_path = "test/test_#{name}_server.sock" + @control_path = "test/test_#{name}_control.sock" @server = nil @@ -26,8 +26,8 @@ def setup end def teardown - File.unlink @state_path rescue nil - File.unlink @bind_path rescue nil + File.unlink @state_path rescue nil + File.unlink @bind_path rescue nil File.unlink @control_path rescue nil @wait.close @@ -399,4 +399,129 @@ def test_no_zombie_children end.compact assert_empty zombies, "Process ids #{zombies} became zombies" end + + def test_worker_spawn_external_term + worker_respawn { |l, old_pids| + old_pids.each { |p| Process.kill :TERM, p } + } + end + + def test_worker_phased_restart + worker_respawn { |l, old_pids| l.phased_restart } + end + + private + + def worker_respawn + skip NO_FORK_MSG unless HAS_FORK + port = UniquePort.call + workers_booted = 0 + + conf = Puma::Configuration.new do |c| + c.bind "tcp://#{HOST}:#{port}" + c.threads 1, 1 + c.workers 2 + c.worker_shutdown_timeout 2 + c.app TestApps::SLEEP + c.after_worker_fork { |idx| workers_booted += 1 } + end + + # start Puma via launcher + thr, launcher, _e = run_launcher conf + + # make sure two workers have booted + time = 0 + until workers_booted >= 2 || time >= 10 + sleep 2 + time += 2 + end + + cluster = launcher.instance_variable_get :@runner + + http0 = Net::HTTP.new HOST, port + http1 = Net::HTTP.new HOST, port + body0 = nil + body1 = nil + + worker0 = Thread.new do + begin + req0 = Net::HTTP::Get.new "/sleep35", {} + http0.start.request(req0) { |rep0| body0 = rep0.body } + rescue + end + end + + worker1 = Thread.new do + begin + req1 = Net::HTTP::Get.new "/sleep40", {} + http1.start.request(req1) { |rep1| body1 = rep1.body } + rescue + end + end + + old_pids = cluster.instance_variable_get(:@workers).map(&:pid) + + start_time = Time.now.to_f + + # below should 'cancel' the phase 0 workers, either via phased_restart or + # externally SIGTERM'ing them + yield launcher, old_pids + + # make sure four workers have booted + time = 0 + until workers_booted >= 4 || time >= 45 + sleep 2 + time += 2 + end + + new_pids = cluster.instance_variable_get(:@workers).map(&:pid) + + # should be empty if all old workers removed + old_waited = old_pids.map { |pid| + begin + Process.wait(pid, Process::WNOHANG) + pid + rescue Errno::ECHILD + nil # child is already terminated + end + }.compact + + Thread.kill worker0 + Thread.kill worker1 + + launcher.stop + assert_kind_of Thread, thr.join, "server didn't stop" + + refute_equal 'Slept 35', body0 + refute_equal 'Slept 40', body1 + + # Since 35 is the shorter of the two requests, server should restart + # and cancel both requests + assert_operator (Time.now.to_f - start_time).round(2), :<, 35 + + msg = "old_pids #{old_pids.inspect} new_pids #{new_pids.inspect} old_waited #{old_waited.inspect}" + assert_equal 2, new_pids.length, msg + assert_equal 2, old_pids.length, msg + assert_empty new_pids & old_pids, "#{msg}\nBoth workers should be replaced with new" + assert_empty old_waited, msg + end + + def run_launcher(conf) + # below for future PR + #@wait, @ready = IO.pipe + # @ios_to_close << @wait << @ready + #@events = Puma::Events.strings + #@events.on_booted { @ready << "!" } + + launcher = Puma::Launcher.new conf, :events => @events + + thr = Thread.new do + launcher.run + end + + # wait for boot from #@events.on_booted + @wait.sysread 1 + + [thr, launcher, @events] + end end