diff --git a/test/test_integration.rb b/test/test_integration.rb index 08cf265b55..7d3e67c8f4 100644 --- a/test/test_integration.rb +++ b/test/test_integration.rb @@ -1,207 +1,96 @@ # frozen_string_literal: true require_relative "helper" -require "puma/cli" require "puma/control_cli" require "open3" -# TODO: Remove over-utilization of @instance variables -# TODO: remove stdout logging, get everything out of my rainbow dots - class TestIntegration < Minitest::Test HOST = "127.0.0.1" TOKEN = "xxyyzz" - def setup - @state_path = "test/test_#{name}_puma.state" - @bind_path = "test/test_#{name}_server.sock" - @control_path = "test/test_#{name}_control.sock" - - @server = nil + BASE = defined?(Bundler) ? "bundle exec #{Gem.ruby} -Ilib" : + "#{Gem.ruby} -Ilib" - @wait, @ready = IO.pipe + WORKERS = 2 - @events = Puma::Events.strings - @events.on_booted { @ready << "!" } + def setup + @ios_to_close = [] + @state_path = "test/#{name}_puma.state" + @bind_path = "test/#{name}_server.sock" + @control_path = "test/#{name}_control.sock" end def teardown - File.unlink @state_path rescue nil - File.unlink @bind_path rescue nil - File.unlink @control_path rescue nil - - @wait.close - @ready.close - - if @server - Process.kill "INT", @server.pid + if defined?(@server) && @server + begin + Process.kill "INT", @server.pid + rescue + Errno::ESRCH + end begin Process.wait @server.pid rescue Errno::ECHILD end - - @server.close + @server.close unless @server.closed? + @server = nil end - end - - def server_cmd(argv) - @tcp_port = UniquePort.call - base = "#{Gem.ruby} -Ilib bin/puma" - base = "bundle exec #{base}" if defined?(Bundler) - "#{base} -b tcp://127.0.0.1:#{@tcp_port} #{argv}" - end - - def server(argv) - @server = IO.popen(server_cmd(argv), "r") - - wait_for_server_to_boot(@server) - @server - end - - def start_forked_server(argv) - servercmd = server_cmd(argv) - pid = fork do - exec servercmd + @ios_to_close.each do |io| + io.close if io.is_a?(IO) && !io.closed? + io = nil end - sleep 5 - pid - end - - def stop_forked_server(pid) - Process.kill(:TERM, pid) - sleep 1 - Process.wait2(pid) - end - - def restart_server_and_listen(argv) - server(argv) - connection = connect - initial_reply = read_body(connection) - restart_server(@server, connection) - [initial_reply, read_body(connect)] - end - - def wait_booted - @wait.sysread 1 - end - - # reuses an existing connection to make sure that works - def restart_server(server, connection) - Process.kill :USR2, @server.pid - - connection.write "GET / HTTP/1.1\r\n\r\n" # trigger it to start by sending a new request - - wait_for_server_to_boot(server) - end - - def connect(path = nil) - s = TCPSocket.new "localhost", @tcp_port - s << "GET /#{path} HTTP/1.1\r\n\r\n" - true until s.gets == "\r\n" - s - end - - def wait_for_server_to_boot(server) - true while server.gets !~ /Ctrl-C/ # wait for server to say it booted - end - - def read_body(connection) - Timeout.timeout(10) do - loop do - response = connection.readpartial(1024) - body = response.split("\r\n\r\n", 2).last - return body if body && !body.empty? - sleep 0.01 - end - end + File.unlink @state_path rescue nil + File.unlink @bind_path rescue nil + File.unlink @control_path rescue nil end - def test_stop_via_pumactl + def test_pumactl_stop skip UNIX_SKT_MSG unless UNIX_SKT_EXIST + cli_server("-q test/rackup/sleep.ru --control-url unix://#{@control_path} --control-token #{TOKEN} -S #{@state_path}") - conf = Puma::Configuration.new do |c| - c.quiet - c.state_path @state_path - c.bind "unix://#{@bind_path}" - c.activate_control_app "unix://#{@control_path}", :auth_token => TOKEN - c.rackup "test/rackup/hello.ru" - end - - l = Puma::Launcher.new conf, :events => @events - - t = Thread.new do - Thread.current.abort_on_exception = true - l.run - end - - wait_booted - - s = UNIXSocket.new @bind_path - s << "GET / HTTP/1.0\r\n\r\n" - assert_equal "Hello World", read_body(s) - - sout = StringIO.new + cli_pumactl "-C unix://#{@control_path} -T #{TOKEN} stop" - ccli = Puma::ControlCLI.new %W!-S #{@state_path} stop!, sout - - ccli.run + _, status = Process.wait2(@server.pid) + assert_equal 0, status - assert_kind_of Thread, t.join, "server didn't stop" + @server = nil end - def test_phased_restart_via_pumactl + def test_pumactl_phased_restart_cluster skip NO_FORK_MSG unless HAS_FORK - delay = 40 + cli_server "-q -w #{WORKERS} test/rackup/sleep.ru --control-url unix://#{@control_path} --control-token #{TOKEN} -S #{@state_path}", "unix://#{@bind_path}" - conf = Puma::Configuration.new do |c| - c.quiet - c.state_path @state_path - c.bind "unix://#{@bind_path}" - c.activate_control_app "unix://#{@control_path}", :auth_token => TOKEN - c.workers 2 - c.worker_shutdown_timeout 2 - c.rackup "test/rackup/sleep.ru" - end + s = UNIXSocket.new @bind_path + @ios_to_close << s + s << "GET /sleep5 HTTP/1.0\r\n\r\n" - l = Puma::Launcher.new conf, :events => @events + # Get the PIDs of the phase 0 workers. + phase0_worker_pids = get_worker_pids 0 - t = Thread.new do - Thread.current.abort_on_exception = true - l.run - end + # Phased restart + cli_pumactl "-C unix://#{@control_path} -T #{TOKEN} phased-restart" - wait_booted + # Get the PIDs of the phase 1 workers. + phase1_worker_pids = get_worker_pids 1 - s = UNIXSocket.new @bind_path - s << "GET /sleep#{delay} HTTP/1.0\r\n\r\n" + msg = "phase 0 pids #{phase0_worker_pids.inspect} phase 1 pids #{phase1_worker_pids.inspect}" + + assert_equal WORKERS, phase0_worker_pids.length, msg + assert_equal WORKERS, phase1_worker_pids.length, msg + assert_empty phase0_worker_pids & phase1_worker_pids, "#{msg}\nBoth workers should be replaced with new" - sout = StringIO.new - # Phased restart - ccli = Puma::ControlCLI.new ["-S", @state_path, "phased-restart"], sout - ccli.run - - done = false - until done - @events.stdout.rewind - log = @events.stdout.readlines.join("") - if log =~ /- Worker \d \(pid: \d+\) booted, phase: 1/ - assert_match(/TERM sent/, log) - assert_match(/- Worker \d \(pid: \d+\) booted, phase: 1/, log) - done = true - end - end # Stop - ccli = Puma::ControlCLI.new ["-S", @state_path, "stop"], sout - ccli.run + cli_pumactl "-C unix://#{@control_path} -T #{TOKEN} stop" - assert_kind_of Thread, t.join, "server didn't stop" - assert File.exist? @bind_path + _, status = Process.wait2(@server.pid) + assert_equal 0, status + + @server = nil end - def test_kill_unknown_via_pumactl + def test_pumactl_kill_unknown skip_on :jruby # we run ls to get a 'safe' pid to pass off as puma in cli stop @@ -213,8 +102,7 @@ def test_kill_unknown_via_pumactl sout = StringIO.new e = assert_raises SystemExit do - ccli = Puma::ControlCLI.new %W!-p #{safe_pid} stop!, sout - ccli.run + Puma::ControlCLI.new(%W!-p #{safe_pid} stop!, sout).run end sout.rewind # windows bad URI(is not URI?) @@ -222,21 +110,35 @@ def test_kill_unknown_via_pumactl assert_equal(1, e.status) end - def test_restart_closes_keepalive_sockets + def test_usr2_restart_single skip_unless_signal_exist? :USR2 _, new_reply = restart_server_and_listen("-q test/rackup/hello.ru") assert_equal "Hello World", new_reply end - def test_restart_closes_keepalive_sockets_workers + def test_usr2_restart_cluster skip NO_FORK_MSG unless HAS_FORK - _, new_reply = restart_server_and_listen("-q -w 2 test/rackup/hello.ru") + _, new_reply = restart_server_and_listen("-q -w #{WORKERS} test/rackup/hello.ru") assert_equal "Hello World", new_reply end - def test_sigterm_closes_listeners_on_forked_servers + # It does not share environments between multiple generations, which would break Dotenv + def test_usr2_restart_restores_environment + # jruby has a bug where setting `nil` into the ENV or `delete` do not change the + # next workers ENV + skip_on :jruby + skip_unless_signal_exist? :USR2 + + initial_reply, new_reply = restart_server_and_listen("-q test/rackup/hello-env.ru") + + assert_includes initial_reply, "Hello RAND" + assert_includes new_reply, "Hello RAND" + refute_equal initial_reply, new_reply + end + + def test_term_closes_listeners_cluster skip NO_FORK_MSG unless HAS_FORK - pid = start_forked_server("-w 2 -q test/rackup/sleep.ru") + pid = cli_server("-w #{WORKERS} -q test/rackup/sleep.ru").pid threads = [] initial_reply = nil next_replies = [] @@ -277,7 +179,7 @@ def test_sigterm_closes_listeners_on_forked_servers end end - threads.map(&:join) + threads.each(&:join) assert_equal "Slept 1", initial_reply @@ -286,51 +188,37 @@ def test_sigterm_closes_listeners_on_forked_servers refute_includes next_replies, :connection_reset end - # It does not share environments between multiple generations, which would break Dotenv - def test_restart_restores_environment - # jruby has a bug where setting `nil` into the ENV or `delete` do not change the - # next workers ENV - skip_on :jruby - skip_unless_signal_exist? :USR2 + def test_term_exit_code_single + skip_on :windows # no SIGTERM - initial_reply, new_reply = restart_server_and_listen("-q test/rackup/hello-env.ru") - - assert_includes initial_reply, "Hello RAND" - assert_includes new_reply, "Hello RAND" - refute_equal initial_reply, new_reply - end - - def test_term_signal_exit_code_in_single_mode - skip NO_FORK_MSG unless HAS_FORK - - pid = start_forked_server("test/rackup/hello.ru") + pid = cli_server("test/rackup/hello.ru").pid _, status = stop_forked_server(pid) assert_equal 15, status end - def test_term_signal_exit_code_in_clustered_mode + def test_term_exit_code_cluster skip NO_FORK_MSG unless HAS_FORK - pid = start_forked_server("-w 2 test/rackup/hello.ru") + pid = cli_server("-w #{WORKERS} test/rackup/hello.ru").pid _, status = stop_forked_server(pid) assert_equal 15, status end - def test_term_signal_suppress_in_single_mode - skip NO_FORK_MSG unless HAS_FORK + def test_term_suppress_single + skip_on :windows # no SIGTERM - pid = start_forked_server("-C test/config/suppress_exception.rb test/rackup/hello.ru") + pid = cli_server("-C test/config/suppress_exception.rb test/rackup/hello.ru").pid _, status = stop_forked_server(pid) assert_equal 0, status end - def test_term_signal_suppress_in_clustered_mode + def test_term_suppress_cluster skip NO_FORK_MSG unless HAS_FORK - server("-w 2 -C test/config/suppress_exception.rb test/rackup/hello.ru") + cli_server("-w #{WORKERS} -C test/config/suppress_exception.rb test/rackup/hello.ru") Process.kill(:TERM, @server.pid) begin @@ -340,22 +228,23 @@ def test_term_signal_suppress_in_clustered_mode status = $?.exitstatus assert_equal 0, status + @server.close unless @server.closed? @server = nil # prevent `#teardown` from killing already killed server end - def test_not_accepts_new_connections_after_term_signal + def test_term_not_accepts_new_connections skip_on :jruby, :windows - server('test/rackup/sleep.ru') + cli_server('test/rackup/sleep.ru') - _stdin, curl_stdout, _stderr, curl_wait_thread = Open3.popen3("curl http://127.0.0.1:#{@tcp_port}/sleep10") + _stdin, curl_stdout, _stderr, curl_wait_thread = Open3.popen3("curl http://#{HOST}:#{@tcp_port}/sleep10") sleep 1 # ensure curl send a request Process.kill(:TERM, @server.pid) true while @server.gets !~ /Gracefully stopping/ # wait for server to begin graceful shutdown # Invoke a request which must be rejected - _stdin, _stdout, rejected_curl_stderr, rejected_curl_wait_thread = Open3.popen3("curl 127.0.0.1:#{@tcp_port}") + _stdin, _stdout, rejected_curl_stderr, rejected_curl_wait_thread = Open3.popen3("curl #{HOST}:#{@tcp_port}") assert nil != Process.getpgid(@server.pid) # ensure server is still running assert nil != Process.getpgid(rejected_curl_wait_thread[:pid]) # ensure first curl invokation still in progress @@ -367,51 +256,121 @@ def test_not_accepts_new_connections_after_term_signal assert_match(/Connection refused/, rejected_curl_stderr.read) Process.wait(@server.pid) + @server.close unless @server.closed? @server = nil # prevent `#teardown` from killing already killed server end - def test_no_zombie_children + def test_term_worker_clean_exit_cluster skip NO_FORK_MSG unless HAS_FORK skip "Intermittent failure on Ruby 2.2" if RUBY_VERSION < '2.3' - worker_pids = [] - server = server("-w 2 test/rackup/hello.ru") + pid = cli_server("-w #{WORKERS} test/rackup/hello.ru").pid + # Get the PIDs of the child workers. - while worker_pids.size < 2 - next unless line = server.gets.match(/pid: (\d+)/) - worker_pids << line.captures.first.to_i - end + worker_pids = get_worker_pids 0 # Signal the workers to terminate, and wait for them to die. - Process.kill :TERM, @server.pid - Process.wait @server.pid - @server = nil # prevent `#teardown` from killing already killed server + Process.kill :TERM, pid + Process.wait pid + + zombies = clean_exit_pids worker_pids - # Check if the worker processes remain in the process table. - # Process.kill should raise the Errno::ESRCH exception, - # indicating the process is dead and has been reaped. - zombies = worker_pids.map do |pid| - begin - pid if Process.kill 0, pid - rescue Errno::ESRCH - nil - end - end.compact assert_empty zombies, "Process ids #{zombies} became zombies" end - def test_worker_spawn_external_term - worker_respawn { |l, old_pids| - old_pids.each { |p| Process.kill :TERM, p } + # mimicking stuck workers, test respawn with external SIGTERM + def test_stuck_external_term_spawn_cluster + worker_respawn { |l, phase0_worker_pids| + phase0_worker_pids.each { |p| Process.kill :TERM, p } } end - def test_worker_phased_restart - worker_respawn { |l, old_pids| l.phased_restart } + # mimicking stuck workers, test restart + def test_stuck_phased_restart_cluster + worker_respawn { |l, phase0_worker_pids| l.phased_restart } end private + def cli_server(argv, bind = nil) + if bind + cmd = "#{BASE} bin/puma -b #{bind} #{argv}" + else + @tcp_port = UniquePort.call + cmd = "#{BASE} bin/puma -b tcp://#{HOST}:#{@tcp_port} #{argv}" + end + @server = IO.popen(cmd, "r") + wait_for_server_to_boot + @server + end + + def stop_forked_server(pid) + Process.kill(:TERM, pid) + sleep 1 + Process.wait2(pid) + end + + def restart_server_and_listen(argv) + cli_server(argv) + connection = connect + initial_reply = read_body(connection) + restart_server(connection) + [initial_reply, read_body(connect)] + end + + # reuses an existing connection to make sure that works + def restart_server(connection) + Process.kill :USR2, @server.pid + connection.write "GET / HTTP/1.1\r\n\r\n" # trigger it to start by sending a new request + wait_for_server_to_boot + end + + def wait_for_server_to_boot + true while @server.gets !~ /Ctrl-C/ # wait for server to say it booted + end + + def connect(path = nil) + s = TCPSocket.new HOST, @tcp_port + @ios_to_close << s + s << "GET /#{path} HTTP/1.1\r\n\r\n" + true until s.gets == "\r\n" + s + end + + def read_body(connection) + Timeout.timeout(10) do + loop do + response = connection.readpartial(1024) + body = response.split("\r\n\r\n", 2).last + return body if body && !body.empty? + sleep 0.01 + end + end + end + + def cli_pumactl(argv) + pumactl = IO.popen("#{BASE} bin/pumactl #{argv}", "r") + @ios_to_close << pumactl + Process.wait pumactl.pid + pumactl + end + + def run_launcher(conf) + wait, ready = IO.pipe + @ios_to_close << wait << ready + events = Puma::Events.strings + events.on_booted { ready << "!" } + + launcher = Puma::Launcher.new conf, :events => events + + thr = Thread.new { launcher.run } + + # wait for boot from `events.on_booted` + wait.sysread 1 + + [thr, launcher, events] + end + def worker_respawn skip NO_FORK_MSG unless HAS_FORK port = UniquePort.call @@ -420,7 +379,7 @@ def worker_respawn conf = Puma::Configuration.new do |c| c.bind "tcp://#{HOST}:#{port}" c.threads 1, 1 - c.workers 2 + c.workers WORKERS c.worker_shutdown_timeout 2 c.app TestApps::SLEEP c.after_worker_fork { |idx| workers_booted += 1 } @@ -431,7 +390,7 @@ def worker_respawn # make sure two workers have booted time = 0 - until workers_booted >= 2 || time >= 10 + until workers_booted >= WORKERS || time >= 10 sleep 2 time += 2 end @@ -459,32 +418,25 @@ def worker_respawn end end - old_pids = cluster.instance_variable_get(:@workers).map(&:pid) + phase0_worker_pids = cluster.instance_variable_get(:@workers).map(&:pid) start_time = Time.now.to_f # below should 'cancel' the phase 0 workers, either via phased_restart or # externally SIGTERM'ing them - yield launcher, old_pids + yield launcher, phase0_worker_pids # make sure four workers have booted time = 0 - until workers_booted >= 4 || time >= 45 + until workers_booted >= 2 * WORKERS || time >= 45 sleep 2 time += 2 end - new_pids = cluster.instance_variable_get(:@workers).map(&:pid) + phase1_worker_pids = cluster.instance_variable_get(:@workers).map(&:pid) - # should be empty if all old workers removed - old_waited = old_pids.map { |pid| - begin - Process.wait(pid, Process::WNOHANG) - pid - rescue Errno::ECHILD - nil # child is already terminated - end - }.compact + # should be empty if all phase 0 workers cleanly exited + phase0_exited = clean_exit_pids phase0_worker_pids Thread.kill worker0 Thread.kill worker1 @@ -499,29 +451,38 @@ def worker_respawn # and cancel both requests assert_operator (Time.now.to_f - start_time).round(2), :<, 35 - msg = "old_pids #{old_pids.inspect} new_pids #{new_pids.inspect} old_waited #{old_waited.inspect}" - assert_equal 2, new_pids.length, msg - assert_equal 2, old_pids.length, msg - assert_empty new_pids & old_pids, "#{msg}\nBoth workers should be replaced with new" - assert_empty old_waited, msg + msg = "phase0_worker_pids #{phase0_worker_pids.inspect} phase1_worker_pids #{phase1_worker_pids.inspect} phase0_exited #{phase0_exited.inspect}" + assert_equal WORKERS, phase0_worker_pids.length, msg + assert_equal WORKERS, phase1_worker_pids.length, msg + assert_empty phase0_worker_pids & phase1_worker_pids, "#{msg}\nBoth workers should be replaced with new" + assert_empty phase0_exited, msg end - def run_launcher(conf) - # below for future PR - #@wait, @ready = IO.pipe - # @ios_to_close << @wait << @ready - #@events = Puma::Events.strings - #@events.on_booted { @ready << "!" } - - launcher = Puma::Launcher.new conf, :events => @events - - thr = Thread.new do - launcher.run + # gets worker pids from @server output + def get_worker_pids(phase, size = WORKERS) + pids = [] + re = /pid: (\d+)\) booted, phase: #{phase}/ + while pids.size < size + if pid = @server.gets[re, 1] + pids << pid + else + sleep 2 + end end + pids.map(&:to_i) + end - # wait for boot from #@events.on_booted - @wait.sysread 1 - - [thr, launcher, @events] - end + # Returns an array of pids still in the process table, so it should + # be empty for a clean exit. + # Process.kill should raise the Errno::ESRCH exception, indicating the + # process is dead and has been reaped. + def clean_exit_pids(pids) + pids.map do |pid| + begin + pid if Process.kill 0, pid + rescue Errno::ESRCH + nil + end + end.compact + end end