From d6c017b05b5af20e50aef119919837100b3004f4 Mon Sep 17 00:00:00 2001 From: MSP-Greg Date: Fri, 23 Aug 2019 09:21:31 -0500 Subject: [PATCH 1/3] test_integration.rb - clean up, use command line run when possible test_phased_restart_via_pumactl - added timer to verify correct worker shutdown timing --- test/test_integration.rb | 296 +++++++++++++++++---------------------- 1 file changed, 128 insertions(+), 168 deletions(-) diff --git a/test/test_integration.rb b/test/test_integration.rb index 08cf265b55..feb9fa6d17 100644 --- a/test/test_integration.rb +++ b/test/test_integration.rb @@ -5,150 +5,55 @@ require "puma/control_cli" require "open3" -# TODO: Remove over-utilization of @instance variables -# TODO: remove stdout logging, get everything out of my rainbow dots - class TestIntegration < Minitest::Test HOST = "127.0.0.1" TOKEN = "xxyyzz" - def setup - @state_path = "test/test_#{name}_puma.state" - @bind_path = "test/test_#{name}_server.sock" - @control_path = "test/test_#{name}_control.sock" - - @server = nil - - @wait, @ready = IO.pipe + BASE = defined?(Bundler) ? + "bundle exec #{Gem.ruby} -Ilib bin/puma" : + "#{Gem.ruby} -Ilib bin/puma" - @events = Puma::Events.strings - @events.on_booted { @ready << "!" } + def setup + @ios_to_close = [] + @state_path = "test/#{name}_puma.state" + @bind_path = "test/#{name}_server.sock" + @control_path = "test/#{name}_control.sock" end def teardown - File.unlink @state_path rescue nil - File.unlink @bind_path rescue nil - File.unlink @control_path rescue nil - - @wait.close - @ready.close - - if @server - Process.kill "INT", @server.pid + if defined?(@server) && @server + begin + Process.kill "INT", @server.pid + rescue + Errno::ESRCH + end begin Process.wait @server.pid rescue Errno::ECHILD end - - @server.close + @server.close unless @server.closed? end - end - - def server_cmd(argv) - @tcp_port = UniquePort.call - base = "#{Gem.ruby} -Ilib bin/puma" - base = "bundle exec #{base}" if defined?(Bundler) - "#{base} -b tcp://127.0.0.1:#{@tcp_port} #{argv}" - end - - def server(argv) - @server = IO.popen(server_cmd(argv), "r") - wait_for_server_to_boot(@server) - - @server - end - - def start_forked_server(argv) - servercmd = server_cmd(argv) - pid = fork do - exec servercmd + @ios_to_close.each do |io| + io.close if io.is_a?(IO) && !io.closed? + io = nil end - sleep 5 - pid - end - - def stop_forked_server(pid) - Process.kill(:TERM, pid) - sleep 1 - Process.wait2(pid) - end - - def restart_server_and_listen(argv) - server(argv) - connection = connect - initial_reply = read_body(connection) - restart_server(@server, connection) - [initial_reply, read_body(connect)] - end - - def wait_booted - @wait.sysread 1 - end - - # reuses an existing connection to make sure that works - def restart_server(server, connection) - Process.kill :USR2, @server.pid - - connection.write "GET / HTTP/1.1\r\n\r\n" # trigger it to start by sending a new request - - wait_for_server_to_boot(server) - end - - def connect(path = nil) - s = TCPSocket.new "localhost", @tcp_port - s << "GET /#{path} HTTP/1.1\r\n\r\n" - true until s.gets == "\r\n" - s - end - - def wait_for_server_to_boot(server) - true while server.gets !~ /Ctrl-C/ # wait for server to say it booted - end - - def read_body(connection) - Timeout.timeout(10) do - loop do - response = connection.readpartial(1024) - body = response.split("\r\n\r\n", 2).last - return body if body && !body.empty? - sleep 0.01 - end - end + File.unlink @state_path rescue nil + File.unlink @bind_path rescue nil + File.unlink @control_path rescue nil end def test_stop_via_pumactl skip UNIX_SKT_MSG unless UNIX_SKT_EXIST + server("-q test/rackup/sleep.ru --control-url unix://#{@control_path} --control-token #{TOKEN} -S #{@state_path}") - conf = Puma::Configuration.new do |c| - c.quiet - c.state_path @state_path - c.bind "unix://#{@bind_path}" - c.activate_control_app "unix://#{@control_path}", :auth_token => TOKEN - c.rackup "test/rackup/hello.ru" - end + Puma::ControlCLI.new(%W!-S #{@state_path} stop!, StringIO.new).run - l = Puma::Launcher.new conf, :events => @events - - t = Thread.new do - Thread.current.abort_on_exception = true - l.run - end - - wait_booted - - s = UNIXSocket.new @bind_path - s << "GET / HTTP/1.0\r\n\r\n" - assert_equal "Hello World", read_body(s) - - sout = StringIO.new - - ccli = Puma::ControlCLI.new %W!-S #{@state_path} stop!, sout - - ccli.run + _, status = Process.wait2(@server.pid) + assert_equal 0, status - assert_kind_of Thread, t.join, "server didn't stop" + @server = nil end def test_phased_restart_via_pumactl @@ -166,38 +71,36 @@ def test_phased_restart_via_pumactl c.rackup "test/rackup/sleep.ru" end - l = Puma::Launcher.new conf, :events => @events + thr, _l, events = run_launcher conf - t = Thread.new do - Thread.current.abort_on_exception = true - l.run - end - - wait_booted + start_time = Time.now.to_f s = UNIXSocket.new @bind_path + @ios_to_close << s s << "GET /sleep#{delay} HTTP/1.0\r\n\r\n" - sout = StringIO.new # Phased restart - ccli = Puma::ControlCLI.new ["-S", @state_path, "phased-restart"], sout - ccli.run + Puma::ControlCLI.new(%W!-S #{@state_path} phased-restart!, events.stdout).run done = false until done - @events.stdout.rewind - log = @events.stdout.readlines.join("") + events.stdout.rewind + log = events.stdout.readlines.join("") if log =~ /- Worker \d \(pid: \d+\) booted, phase: 1/ assert_match(/TERM sent/, log) assert_match(/- Worker \d \(pid: \d+\) booted, phase: 1/, log) done = true end end + + # if worker_shutdown_timeout is functioning, request shouldn't be allowed + # to complete, hence, restart should take less than request time + assert_operator (Time.now.to_f - start_time).round(2), :<, 30 + # Stop - ccli = Puma::ControlCLI.new ["-S", @state_path, "stop"], sout - ccli.run + Puma::ControlCLI.new(%W!-S #{@state_path} stop!, events.stdout).run - assert_kind_of Thread, t.join, "server didn't stop" + assert_kind_of Thread, thr.join, "server didn't stop" assert File.exist? @bind_path end @@ -213,8 +116,7 @@ def test_kill_unknown_via_pumactl sout = StringIO.new e = assert_raises SystemExit do - ccli = Puma::ControlCLI.new %W!-p #{safe_pid} stop!, sout - ccli.run + Puma::ControlCLI.new(%W!-p #{safe_pid} stop!, sout).run end sout.rewind # windows bad URI(is not URI?) @@ -222,13 +124,13 @@ def test_kill_unknown_via_pumactl assert_equal(1, e.status) end - def test_restart_closes_keepalive_sockets + def test_restart_with_usr2_works skip_unless_signal_exist? :USR2 _, new_reply = restart_server_and_listen("-q test/rackup/hello.ru") assert_equal "Hello World", new_reply end - def test_restart_closes_keepalive_sockets_workers + def test_restart_with_usr2_works_workers skip NO_FORK_MSG unless HAS_FORK _, new_reply = restart_server_and_listen("-q -w 2 test/rackup/hello.ru") assert_equal "Hello World", new_reply @@ -236,7 +138,7 @@ def test_restart_closes_keepalive_sockets_workers def test_sigterm_closes_listeners_on_forked_servers skip NO_FORK_MSG unless HAS_FORK - pid = start_forked_server("-w 2 -q test/rackup/sleep.ru") + pid = server("-w 2 -q test/rackup/sleep.ru").pid threads = [] initial_reply = nil next_replies = [] @@ -277,7 +179,7 @@ def test_sigterm_closes_listeners_on_forked_servers end end - threads.map(&:join) + threads.each(&:join) assert_equal "Slept 1", initial_reply @@ -301,9 +203,9 @@ def test_restart_restores_environment end def test_term_signal_exit_code_in_single_mode - skip NO_FORK_MSG unless HAS_FORK + skip_on :windows # no SIGTERM - pid = start_forked_server("test/rackup/hello.ru") + pid = server("test/rackup/hello.ru").pid _, status = stop_forked_server(pid) assert_equal 15, status @@ -312,16 +214,16 @@ def test_term_signal_exit_code_in_single_mode def test_term_signal_exit_code_in_clustered_mode skip NO_FORK_MSG unless HAS_FORK - pid = start_forked_server("-w 2 test/rackup/hello.ru") + pid = server("-w 2 test/rackup/hello.ru").pid _, status = stop_forked_server(pid) assert_equal 15, status end def test_term_signal_suppress_in_single_mode - skip NO_FORK_MSG unless HAS_FORK + skip_on :windows # no SIGTERM - pid = start_forked_server("-C test/config/suppress_exception.rb test/rackup/hello.ru") + pid = server("-C test/config/suppress_exception.rb test/rackup/hello.ru").pid _, status = stop_forked_server(pid) assert_equal 0, status @@ -340,6 +242,7 @@ def test_term_signal_suppress_in_clustered_mode status = $?.exitstatus assert_equal 0, status + @server.close unless @server.closed? @server = nil # prevent `#teardown` from killing already killed server end @@ -348,14 +251,14 @@ def test_not_accepts_new_connections_after_term_signal server('test/rackup/sleep.ru') - _stdin, curl_stdout, _stderr, curl_wait_thread = Open3.popen3("curl http://127.0.0.1:#{@tcp_port}/sleep10") + _stdin, curl_stdout, _stderr, curl_wait_thread = Open3.popen3("curl http://#{HOST}:#{@tcp_port}/sleep10") sleep 1 # ensure curl send a request Process.kill(:TERM, @server.pid) true while @server.gets !~ /Gracefully stopping/ # wait for server to begin graceful shutdown # Invoke a request which must be rejected - _stdin, _stdout, rejected_curl_stderr, rejected_curl_wait_thread = Open3.popen3("curl 127.0.0.1:#{@tcp_port}") + _stdin, _stdout, rejected_curl_stderr, rejected_curl_wait_thread = Open3.popen3("curl #{HOST}:#{@tcp_port}") assert nil != Process.getpgid(@server.pid) # ensure server is still running assert nil != Process.getpgid(rejected_curl_wait_thread[:pid]) # ensure first curl invokation still in progress @@ -367,6 +270,7 @@ def test_not_accepts_new_connections_after_term_signal assert_match(/Connection refused/, rejected_curl_stderr.read) Process.wait(@server.pid) + @server.close unless @server.closed? @server = nil # prevent `#teardown` from killing already killed server end @@ -385,6 +289,7 @@ def test_no_zombie_children # Signal the workers to terminate, and wait for them to die. Process.kill :TERM, @server.pid Process.wait @server.pid + @server.close unless @server.closed? @server = nil # prevent `#teardown` from killing already killed server # Check if the worker processes remain in the process table. @@ -400,18 +305,92 @@ def test_no_zombie_children assert_empty zombies, "Process ids #{zombies} became zombies" end + # mimicking stuck workers, test respawn with external SIGTERM def test_worker_spawn_external_term worker_respawn { |l, old_pids| old_pids.each { |p| Process.kill :TERM, p } } end + # mimicking stuck workers, test restart def test_worker_phased_restart worker_respawn { |l, old_pids| l.phased_restart } end private + def server(argv, bind = nil) + if bind + cmd = "#{BASE} -b #{bind} #{argv}" + else + @tcp_port = UniquePort.call + cmd = "#{BASE} -b tcp://#{HOST}:#{@tcp_port} #{argv}" + end + @server = IO.popen(cmd, "r") + wait_for_server_to_boot + @server + end + + def stop_forked_server(pid) + Process.kill(:TERM, pid) + sleep 1 + Process.wait2(pid) + end + + def restart_server_and_listen(argv) + server(argv) + connection = connect + initial_reply = read_body(connection) + restart_server(connection) + [initial_reply, read_body(connect)] + end + + # reuses an existing connection to make sure that works + def restart_server(connection) + Process.kill :USR2, @server.pid + connection.write "GET / HTTP/1.1\r\n\r\n" # trigger it to start by sending a new request + wait_for_server_to_boot + end + + def wait_for_server_to_boot + true while @server.gets !~ /Ctrl-C/ # wait for server to say it booted + end + + def connect(path = nil) + s = TCPSocket.new HOST, @tcp_port + @ios_to_close << s + s << "GET /#{path} HTTP/1.1\r\n\r\n" + true until s.gets == "\r\n" + s + end + + def read_body(connection) + Timeout.timeout(10) do + loop do + response = connection.readpartial(1024) + body = response.split("\r\n\r\n", 2).last + return body if body && !body.empty? + sleep 0.01 + end + end + end + + def run_launcher(conf) + wait, ready = IO.pipe + @ios_to_close << wait << ready + events = Puma::Events.strings + events.on_booted { ready << "!" } + + launcher = Puma::Launcher.new conf, :events => events + + thr = Thread.new { launcher.run } + + # wait for boot from `events.on_booted` + wait.sysread 1 + + [thr, launcher, events] + end + def worker_respawn skip NO_FORK_MSG unless HAS_FORK port = UniquePort.call @@ -505,23 +484,4 @@ def worker_respawn assert_empty new_pids & old_pids, "#{msg}\nBoth workers should be replaced with new" assert_empty old_waited, msg end - - def run_launcher(conf) - # below for future PR - #@wait, @ready = IO.pipe - # @ios_to_close << @wait << @ready - #@events = Puma::Events.strings - #@events.on_booted { @ready << "!" } - - launcher = Puma::Launcher.new conf, :events => @events - - thr = Thread.new do - launcher.run - end - - # wait for boot from #@events.on_booted - @wait.sysread 1 - - [thr, launcher, @events] - end end From b823060002bf9f7037961b5a1c95aa4e58e478fb Mon Sep 17 00:00:00 2001 From: MSP-Greg Date: Sat, 24 Aug 2019 21:02:48 -0500 Subject: [PATCH 2/3] test_integration.rb - update test_pumactl_phased_restart_cluster & more cleanup 1. test_pumactl_phased_restart_cluster uses CLI pumaclt, but no longer checks based on stuck workers 2. When appropriate, run ControlCLI from CLI/popen and account for bundler 3. Renamed several tests 4. Added two common methods, #cli_pumactl and #get_worker_pids 5. Set worker number as a constant, as GitHub Actions may allow more than 2 --- test/test_integration.rb | 200 ++++++++++++++++++++------------------- 1 file changed, 101 insertions(+), 99 deletions(-) diff --git a/test/test_integration.rb b/test/test_integration.rb index feb9fa6d17..fd7a3ba94f 100644 --- a/test/test_integration.rb +++ b/test/test_integration.rb @@ -1,7 +1,6 @@ # frozen_string_literal: true require_relative "helper" -require "puma/cli" require "puma/control_cli" require "open3" @@ -9,9 +8,10 @@ class TestIntegration < Minitest::Test HOST = "127.0.0.1" TOKEN = "xxyyzz" - BASE = defined?(Bundler) ? - "bundle exec #{Gem.ruby} -Ilib bin/puma" : - "#{Gem.ruby} -Ilib bin/puma" + BASE = defined?(Bundler) ? "bundle exec #{Gem.ruby} -Ilib" : + "#{Gem.ruby} -Ilib" + + WORKERS = 2 def setup @ios_to_close = [] @@ -32,6 +32,7 @@ def teardown rescue Errno::ECHILD end @server.close unless @server.closed? + @server = nil end @ios_to_close.each do |io| @@ -44,11 +45,11 @@ def teardown File.unlink @control_path rescue nil end - def test_stop_via_pumactl + def test_pumactl_stop skip UNIX_SKT_MSG unless UNIX_SKT_EXIST server("-q test/rackup/sleep.ru --control-url unix://#{@control_path} --control-token #{TOKEN} -S #{@state_path}") - Puma::ControlCLI.new(%W!-S #{@state_path} stop!, StringIO.new).run + cli_pumactl "-C unix://#{@control_path} -T #{TOKEN} stop" _, status = Process.wait2(@server.pid) assert_equal 0, status @@ -56,55 +57,40 @@ def test_stop_via_pumactl @server = nil end - def test_phased_restart_via_pumactl + def test_pumactl_phased_restart_cluster skip NO_FORK_MSG unless HAS_FORK - delay = 40 - - conf = Puma::Configuration.new do |c| - c.quiet - c.state_path @state_path - c.bind "unix://#{@bind_path}" - c.activate_control_app "unix://#{@control_path}", :auth_token => TOKEN - c.workers 2 - c.worker_shutdown_timeout 2 - c.rackup "test/rackup/sleep.ru" - end - - thr, _l, events = run_launcher conf - - start_time = Time.now.to_f + server "-q -w #{WORKERS} test/rackup/sleep.ru --control-url unix://#{@control_path} --control-token #{TOKEN} -S #{@state_path}", "unix://#{@bind_path}" s = UNIXSocket.new @bind_path @ios_to_close << s - s << "GET /sleep#{delay} HTTP/1.0\r\n\r\n" + s << "GET /sleep5 HTTP/1.0\r\n\r\n" + + # Get the PIDs of the phase 0 workers. + phase0_worker_pids = get_worker_pids 0 # Phased restart - Puma::ControlCLI.new(%W!-S #{@state_path} phased-restart!, events.stdout).run - - done = false - until done - events.stdout.rewind - log = events.stdout.readlines.join("") - if log =~ /- Worker \d \(pid: \d+\) booted, phase: 1/ - assert_match(/TERM sent/, log) - assert_match(/- Worker \d \(pid: \d+\) booted, phase: 1/, log) - done = true - end - end + cli_pumactl "-C unix://#{@control_path} -T #{TOKEN} phased-restart" - # if worker_shutdown_timeout is functioning, request shouldn't be allowed - # to complete, hence, restart should take less than request time - assert_operator (Time.now.to_f - start_time).round(2), :<, 30 + # Get the PIDs of the phase 1 workers. + phase1_worker_pids = get_worker_pids 1 + + msg = "phase 0 pids #{phase0_worker_pids.inspect} phase 1 pids #{phase1_worker_pids.inspect}" + + assert_equal WORKERS, phase0_worker_pids.length, msg + assert_equal WORKERS, phase1_worker_pids.length, msg + assert_empty phase0_worker_pids & phase1_worker_pids, "#{msg}\nBoth workers should be replaced with new" # Stop - Puma::ControlCLI.new(%W!-S #{@state_path} stop!, events.stdout).run + cli_pumactl "-C unix://#{@control_path} -T #{TOKEN} stop" - assert_kind_of Thread, thr.join, "server didn't stop" - assert File.exist? @bind_path + _, status = Process.wait2(@server.pid) + assert_equal 0, status + + @server = nil end - def test_kill_unknown_via_pumactl + def test_pumactl_kill_unknown skip_on :jruby # we run ls to get a 'safe' pid to pass off as puma in cli stop @@ -124,21 +110,35 @@ def test_kill_unknown_via_pumactl assert_equal(1, e.status) end - def test_restart_with_usr2_works + def test_usr2_restart_single skip_unless_signal_exist? :USR2 _, new_reply = restart_server_and_listen("-q test/rackup/hello.ru") assert_equal "Hello World", new_reply end - def test_restart_with_usr2_works_workers + def test_usr2_restart_cluster skip NO_FORK_MSG unless HAS_FORK - _, new_reply = restart_server_and_listen("-q -w 2 test/rackup/hello.ru") + _, new_reply = restart_server_and_listen("-q -w #{WORKERS} test/rackup/hello.ru") assert_equal "Hello World", new_reply end - def test_sigterm_closes_listeners_on_forked_servers + # It does not share environments between multiple generations, which would break Dotenv + def test_usr2_restart_restores_environment + # jruby has a bug where setting `nil` into the ENV or `delete` do not change the + # next workers ENV + skip_on :jruby + skip_unless_signal_exist? :USR2 + + initial_reply, new_reply = restart_server_and_listen("-q test/rackup/hello-env.ru") + + assert_includes initial_reply, "Hello RAND" + assert_includes new_reply, "Hello RAND" + refute_equal initial_reply, new_reply + end + + def test_term_closes_listeners_cluster skip NO_FORK_MSG unless HAS_FORK - pid = server("-w 2 -q test/rackup/sleep.ru").pid + pid = server("-w #{WORKERS} -q test/rackup/sleep.ru").pid threads = [] initial_reply = nil next_replies = [] @@ -188,21 +188,7 @@ def test_sigterm_closes_listeners_on_forked_servers refute_includes next_replies, :connection_reset end - # It does not share environments between multiple generations, which would break Dotenv - def test_restart_restores_environment - # jruby has a bug where setting `nil` into the ENV or `delete` do not change the - # next workers ENV - skip_on :jruby - skip_unless_signal_exist? :USR2 - - initial_reply, new_reply = restart_server_and_listen("-q test/rackup/hello-env.ru") - - assert_includes initial_reply, "Hello RAND" - assert_includes new_reply, "Hello RAND" - refute_equal initial_reply, new_reply - end - - def test_term_signal_exit_code_in_single_mode + def test_term_exit_code_single skip_on :windows # no SIGTERM pid = server("test/rackup/hello.ru").pid @@ -211,16 +197,16 @@ def test_term_signal_exit_code_in_single_mode assert_equal 15, status end - def test_term_signal_exit_code_in_clustered_mode + def test_term_exit_code_cluster skip NO_FORK_MSG unless HAS_FORK - pid = server("-w 2 test/rackup/hello.ru").pid + pid = server("-w #{WORKERS} test/rackup/hello.ru").pid _, status = stop_forked_server(pid) assert_equal 15, status end - def test_term_signal_suppress_in_single_mode + def test_term_suppress_single skip_on :windows # no SIGTERM pid = server("-C test/config/suppress_exception.rb test/rackup/hello.ru").pid @@ -229,10 +215,10 @@ def test_term_signal_suppress_in_single_mode assert_equal 0, status end - def test_term_signal_suppress_in_clustered_mode + def test_term_suppress_cluster skip NO_FORK_MSG unless HAS_FORK - server("-w 2 -C test/config/suppress_exception.rb test/rackup/hello.ru") + server("-w #{WORKERS} -C test/config/suppress_exception.rb test/rackup/hello.ru") Process.kill(:TERM, @server.pid) begin @@ -246,7 +232,7 @@ def test_term_signal_suppress_in_clustered_mode @server = nil # prevent `#teardown` from killing already killed server end - def test_not_accepts_new_connections_after_term_signal + def test_term_not_accepts_new_connections skip_on :jruby, :windows server('test/rackup/sleep.ru') @@ -274,23 +260,18 @@ def test_not_accepts_new_connections_after_term_signal @server = nil # prevent `#teardown` from killing already killed server end - def test_no_zombie_children + def test_term_worker_clean_exit_cluster skip NO_FORK_MSG unless HAS_FORK skip "Intermittent failure on Ruby 2.2" if RUBY_VERSION < '2.3' - worker_pids = [] - server = server("-w 2 test/rackup/hello.ru") + pid = server("-w #{WORKERS} test/rackup/hello.ru").pid + # Get the PIDs of the child workers. - while worker_pids.size < 2 - next unless line = server.gets.match(/pid: (\d+)/) - worker_pids << line.captures.first.to_i - end + worker_pids = get_worker_pids 0 # Signal the workers to terminate, and wait for them to die. - Process.kill :TERM, @server.pid - Process.wait @server.pid - @server.close unless @server.closed? - @server = nil # prevent `#teardown` from killing already killed server + Process.kill :TERM, pid + Process.wait pid # Check if the worker processes remain in the process table. # Process.kill should raise the Errno::ESRCH exception, @@ -306,25 +287,25 @@ def test_no_zombie_children end # mimicking stuck workers, test respawn with external SIGTERM - def test_worker_spawn_external_term - worker_respawn { |l, old_pids| - old_pids.each { |p| Process.kill :TERM, p } + def test_stuck_external_term_spawn_cluster + worker_respawn { |l, phase0_worker_pids| + phase0_worker_pids.each { |p| Process.kill :TERM, p } } end # mimicking stuck workers, test restart - def test_worker_phased_restart - worker_respawn { |l, old_pids| l.phased_restart } + def test_stuck_phased_restart_cluster + worker_respawn { |l, phase0_worker_pids| l.phased_restart } end private def server(argv, bind = nil) if bind - cmd = "#{BASE} -b #{bind} #{argv}" + cmd = "#{BASE} bin/puma -b #{bind} #{argv}" else @tcp_port = UniquePort.call - cmd = "#{BASE} -b tcp://#{HOST}:#{@tcp_port} #{argv}" + cmd = "#{BASE} bin/puma -b tcp://#{HOST}:#{@tcp_port} #{argv}" end @server = IO.popen(cmd, "r") wait_for_server_to_boot @@ -375,6 +356,14 @@ def read_body(connection) end end + def cli_pumactl(argv) + cmd = "#{BASE} bin/pumactl #{argv}" + pumactl = IO.popen(cmd, "r") + @ios_to_close << pumactl + Process.wait pumactl.pid + pumactl + end + def run_launcher(conf) wait, ready = IO.pipe @ios_to_close << wait << ready @@ -399,7 +388,7 @@ def worker_respawn conf = Puma::Configuration.new do |c| c.bind "tcp://#{HOST}:#{port}" c.threads 1, 1 - c.workers 2 + c.workers WORKERS c.worker_shutdown_timeout 2 c.app TestApps::SLEEP c.after_worker_fork { |idx| workers_booted += 1 } @@ -410,7 +399,7 @@ def worker_respawn # make sure two workers have booted time = 0 - until workers_booted >= 2 || time >= 10 + until workers_booted >= WORKERS || time >= 10 sleep 2 time += 2 end @@ -438,25 +427,25 @@ def worker_respawn end end - old_pids = cluster.instance_variable_get(:@workers).map(&:pid) + phase0_worker_pids = cluster.instance_variable_get(:@workers).map(&:pid) start_time = Time.now.to_f # below should 'cancel' the phase 0 workers, either via phased_restart or # externally SIGTERM'ing them - yield launcher, old_pids + yield launcher, phase0_worker_pids # make sure four workers have booted time = 0 - until workers_booted >= 4 || time >= 45 + until workers_booted >= 2 * WORKERS || time >= 45 sleep 2 time += 2 end - new_pids = cluster.instance_variable_get(:@workers).map(&:pid) + phase1_worker_pids = cluster.instance_variable_get(:@workers).map(&:pid) - # should be empty if all old workers removed - old_waited = old_pids.map { |pid| + # should be empty if all phase 0 workers cleanly exited + phase0_exited = phase0_worker_pids.map { |pid| begin Process.wait(pid, Process::WNOHANG) pid @@ -478,10 +467,23 @@ def worker_respawn # and cancel both requests assert_operator (Time.now.to_f - start_time).round(2), :<, 35 - msg = "old_pids #{old_pids.inspect} new_pids #{new_pids.inspect} old_waited #{old_waited.inspect}" - assert_equal 2, new_pids.length, msg - assert_equal 2, old_pids.length, msg - assert_empty new_pids & old_pids, "#{msg}\nBoth workers should be replaced with new" - assert_empty old_waited, msg + msg = "phase0_worker_pids #{phase0_worker_pids.inspect} phase1_worker_pids #{phase1_worker_pids.inspect} phase0_exited #{phase0_exited.inspect}" + assert_equal WORKERS, phase0_worker_pids.length, msg + assert_equal WORKERS, phase1_worker_pids.length, msg + assert_empty phase0_worker_pids & phase1_worker_pids, "#{msg}\nBoth workers should be replaced with new" + assert_empty phase0_exited, msg + end + + def get_worker_pids(phase, size = WORKERS) + pids = [] + re = /pid: (\d+)\) booted, phase: #{phase}/ + while pids.size < size + if pid = @server.gets[re, 1] + pids << pid + else + sleep 2 + end + end + pids.map(&:to_i) end end From 22cb4e8c1fd00b9915b7f867a9461a354e612c94 Mon Sep 17 00:00:00 2001 From: MSP-Greg Date: Mon, 26 Aug 2019 09:00:07 -0500 Subject: [PATCH 3/3] test-integration.rb - add clean_exit_pids, rename server to cli_server --- test/test_integration.rb | 61 ++++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 31 deletions(-) diff --git a/test/test_integration.rb b/test/test_integration.rb index fd7a3ba94f..7d3e67c8f4 100644 --- a/test/test_integration.rb +++ b/test/test_integration.rb @@ -47,7 +47,7 @@ def teardown def test_pumactl_stop skip UNIX_SKT_MSG unless UNIX_SKT_EXIST - server("-q test/rackup/sleep.ru --control-url unix://#{@control_path} --control-token #{TOKEN} -S #{@state_path}") + cli_server("-q test/rackup/sleep.ru --control-url unix://#{@control_path} --control-token #{TOKEN} -S #{@state_path}") cli_pumactl "-C unix://#{@control_path} -T #{TOKEN} stop" @@ -60,7 +60,7 @@ def test_pumactl_stop def test_pumactl_phased_restart_cluster skip NO_FORK_MSG unless HAS_FORK - server "-q -w #{WORKERS} test/rackup/sleep.ru --control-url unix://#{@control_path} --control-token #{TOKEN} -S #{@state_path}", "unix://#{@bind_path}" + cli_server "-q -w #{WORKERS} test/rackup/sleep.ru --control-url unix://#{@control_path} --control-token #{TOKEN} -S #{@state_path}", "unix://#{@bind_path}" s = UNIXSocket.new @bind_path @ios_to_close << s @@ -138,7 +138,7 @@ def test_usr2_restart_restores_environment def test_term_closes_listeners_cluster skip NO_FORK_MSG unless HAS_FORK - pid = server("-w #{WORKERS} -q test/rackup/sleep.ru").pid + pid = cli_server("-w #{WORKERS} -q test/rackup/sleep.ru").pid threads = [] initial_reply = nil next_replies = [] @@ -191,7 +191,7 @@ def test_term_closes_listeners_cluster def test_term_exit_code_single skip_on :windows # no SIGTERM - pid = server("test/rackup/hello.ru").pid + pid = cli_server("test/rackup/hello.ru").pid _, status = stop_forked_server(pid) assert_equal 15, status @@ -200,7 +200,7 @@ def test_term_exit_code_single def test_term_exit_code_cluster skip NO_FORK_MSG unless HAS_FORK - pid = server("-w #{WORKERS} test/rackup/hello.ru").pid + pid = cli_server("-w #{WORKERS} test/rackup/hello.ru").pid _, status = stop_forked_server(pid) assert_equal 15, status @@ -209,7 +209,7 @@ def test_term_exit_code_cluster def test_term_suppress_single skip_on :windows # no SIGTERM - pid = server("-C test/config/suppress_exception.rb test/rackup/hello.ru").pid + pid = cli_server("-C test/config/suppress_exception.rb test/rackup/hello.ru").pid _, status = stop_forked_server(pid) assert_equal 0, status @@ -218,7 +218,7 @@ def test_term_suppress_single def test_term_suppress_cluster skip NO_FORK_MSG unless HAS_FORK - server("-w #{WORKERS} -C test/config/suppress_exception.rb test/rackup/hello.ru") + cli_server("-w #{WORKERS} -C test/config/suppress_exception.rb test/rackup/hello.ru") Process.kill(:TERM, @server.pid) begin @@ -235,7 +235,7 @@ def test_term_suppress_cluster def test_term_not_accepts_new_connections skip_on :jruby, :windows - server('test/rackup/sleep.ru') + cli_server('test/rackup/sleep.ru') _stdin, curl_stdout, _stderr, curl_wait_thread = Open3.popen3("curl http://#{HOST}:#{@tcp_port}/sleep10") sleep 1 # ensure curl send a request @@ -264,7 +264,7 @@ def test_term_worker_clean_exit_cluster skip NO_FORK_MSG unless HAS_FORK skip "Intermittent failure on Ruby 2.2" if RUBY_VERSION < '2.3' - pid = server("-w #{WORKERS} test/rackup/hello.ru").pid + pid = cli_server("-w #{WORKERS} test/rackup/hello.ru").pid # Get the PIDs of the child workers. worker_pids = get_worker_pids 0 @@ -273,16 +273,8 @@ def test_term_worker_clean_exit_cluster Process.kill :TERM, pid Process.wait pid - # Check if the worker processes remain in the process table. - # Process.kill should raise the Errno::ESRCH exception, - # indicating the process is dead and has been reaped. - zombies = worker_pids.map do |pid| - begin - pid if Process.kill 0, pid - rescue Errno::ESRCH - nil - end - end.compact + zombies = clean_exit_pids worker_pids + assert_empty zombies, "Process ids #{zombies} became zombies" end @@ -300,7 +292,7 @@ def test_stuck_phased_restart_cluster private - def server(argv, bind = nil) + def cli_server(argv, bind = nil) if bind cmd = "#{BASE} bin/puma -b #{bind} #{argv}" else @@ -319,7 +311,7 @@ def stop_forked_server(pid) end def restart_server_and_listen(argv) - server(argv) + cli_server(argv) connection = connect initial_reply = read_body(connection) restart_server(connection) @@ -357,8 +349,7 @@ def read_body(connection) end def cli_pumactl(argv) - cmd = "#{BASE} bin/pumactl #{argv}" - pumactl = IO.popen(cmd, "r") + pumactl = IO.popen("#{BASE} bin/pumactl #{argv}", "r") @ios_to_close << pumactl Process.wait pumactl.pid pumactl @@ -445,14 +436,7 @@ def worker_respawn phase1_worker_pids = cluster.instance_variable_get(:@workers).map(&:pid) # should be empty if all phase 0 workers cleanly exited - phase0_exited = phase0_worker_pids.map { |pid| - begin - Process.wait(pid, Process::WNOHANG) - pid - rescue Errno::ECHILD - nil # child is already terminated - end - }.compact + phase0_exited = clean_exit_pids phase0_worker_pids Thread.kill worker0 Thread.kill worker1 @@ -474,6 +458,7 @@ def worker_respawn assert_empty phase0_exited, msg end + # gets worker pids from @server output def get_worker_pids(phase, size = WORKERS) pids = [] re = /pid: (\d+)\) booted, phase: #{phase}/ @@ -486,4 +471,18 @@ def get_worker_pids(phase, size = WORKERS) end pids.map(&:to_i) end + + # Returns an array of pids still in the process table, so it should + # be empty for a clean exit. + # Process.kill should raise the Errno::ESRCH exception, indicating the + # process is dead and has been reaped. + def clean_exit_pids(pids) + pids.map do |pid| + begin + pid if Process.kill 0, pid + rescue Errno::ESRCH + nil + end + end.compact + end end