diff --git a/test/config/worker_shutdown_timeout_2.rb b/test/config/worker_shutdown_timeout_2.rb new file mode 100644 index 0000000000..ddb8c6453d --- /dev/null +++ b/test/config/worker_shutdown_timeout_2.rb @@ -0,0 +1 @@ +worker_shutdown_timeout 2 diff --git a/test/helper.rb b/test/helper.rb index 0a127f0091..b56fa93605 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -84,10 +84,12 @@ module TestSkips UNIX_SKT_EXIST = Object.const_defined? :UNIXSocket UNIX_SKT_MSG = "UnixSockets aren't available on the #{RUBY_PLATFORM} platform" + SIGNAL_LIST = Signal.list.keys.map(&:to_sym) - (Puma.windows? ? [:TERM] : []) + # usage: skip_unless_signal_exist? :USR2 def skip_unless_signal_exist?(sig, bt: caller) - signal = sig.to_s - unless Signal.list.key? signal + signal = sig.to_s.sub(/\ASIG/, '').to_sym + unless SIGNAL_LIST.include? signal skip "Signal #{signal} isn't available on the #{RUBY_PLATFORM} platform", bt end end diff --git a/test/helpers/integration.rb b/test/helpers/integration.rb index 31608ef880..8b222ea3b0 100644 --- a/test/helpers/integration.rb +++ b/test/helpers/integration.rb @@ -15,6 +15,7 @@ class TestIntegration < Minitest::Test def setup @ios_to_close = [] + @bind_path = "test/#{name}_server.sock" end def teardown @@ -36,26 +37,31 @@ def teardown io.close if io.is_a?(IO) && !io.closed? io = nil end + File.unlink(@bind_path) rescue nil end private - def cli_server(argv, bind = nil) - if bind - cmd = "#{BASE} bin/puma -b #{bind} #{argv}" + def cli_server(argv, unix: false) + if unix + cmd = "#{BASE} bin/puma -b unix://#{@bind_path} #{argv}" else @tcp_port = UniquePort.call cmd = "#{BASE} bin/puma -b tcp://#{HOST}:#{@tcp_port} #{argv}" end @server = IO.popen(cmd, "r") wait_for_server_to_boot + @pid = @server.pid @server end - def send_term_to_server(pid) - Process.kill(:TERM, pid) + def stop_server(pid = @pid, signal: :TERM) + Process.kill signal, pid sleep 1 - Process.wait2(pid) + begin + Process.wait2 pid + rescue Errno::ECHILD + end end def restart_server_and_listen(argv) @@ -77,8 +83,8 @@ def wait_for_server_to_boot true while @server.gets !~ /Ctrl-C/ # wait for server to say it booted end - def connect(path = nil) - s = TCPSocket.new HOST, @tcp_port + def connect(path = nil, unix: false) + s = unix ? UNIXSocket.new(@bind_path) : TCPSocket.new(HOST, @tcp_port) @ios_to_close << s s << "GET /#{path} HTTP/1.1\r\n\r\n" true until s.gets == "\r\n" diff --git a/test/rackup/sleep_pid.ru b/test/rackup/sleep_pid.ru new file mode 100644 index 0000000000..0f1811c8c7 --- /dev/null +++ b/test/rackup/sleep_pid.ru @@ -0,0 +1,8 @@ +# call with "GET /sleep HTTP/1.1\r\n\r\n", where is the number of +# seconds to sleep, returns process pid + +run lambda { |env| + dly = (env['REQUEST_PATH'][/\/sleep(\d+)/,1] || '0').to_i + sleep dly + [200, {"Content-Type" => "text/plain"}, ["Slept #{dly} #{Process.pid}"]] +} diff --git a/test/test_integration_cluster.rb b/test/test_integration_cluster.rb index 67e0bf3724..2208196d60 100644 --- a/test/test_integration_cluster.rb +++ b/test/test_integration_cluster.rb @@ -2,6 +2,8 @@ require_relative "helpers/integration" class TestIntegrationCluster < TestIntegration + DARWIN = !!RUBY_PLATFORM[/darwin/] + def setup skip NO_FORK_MSG unless HAS_FORK @@ -27,60 +29,37 @@ def test_usr2_restart assert_equal "Hello World", new_reply end - def test_term_closes_listeners - pid = cli_server("-w #{WORKERS} -q test/rackup/sleep.ru").pid - threads = [] - initial_reply = nil - next_replies = [] - condition_variable = ConditionVariable.new - mutex = Mutex.new - - threads << Thread.new do - s = connect "sleep1" - mutex.synchronize { condition_variable.broadcast } - initial_reply = read_body(s) - end - - threads << Thread.new do - mutex.synchronize { - condition_variable.wait(mutex, 1) - Process.kill("SIGTERM", pid) - } - end + # Next two tests, one tcp, one unix + # Send requests 10 per second. Send 10, then :TERM server, then send another 30. + # No more than 10 should throw Errno::ECONNRESET. - 10.times.each do |i| - threads << Thread.new do - mutex.synchronize { condition_variable.wait(mutex, 1.5) } - - begin - s = connect "sleep1" - read_body(s) - next_replies << :success - rescue Errno::ECONNRESET - # connection was accepted but then closed - # client would see an empty response - next_replies << :connection_reset - rescue Errno::ECONNREFUSED - # connection was was never accepted - # it can therefore be re-tried before the - # client receives an empty response - next_replies << :connection_refused - end - end - end + def test_term_closes_listeners_tcp + skip_unless_signal_exist? :TERM + term_closes_listeners unix: false + end - threads.each(&:join) + def test_term_closes_listeners_unix + skip_unless_signal_exist? :TERM + term_closes_listeners unix: true + end - assert_equal "Slept 1", initial_reply + # Next two tests, one tcp, one unix + # Send requests 1 per second. Send 1, then :USR1 server, then send another 24. + # All should be responded to, and at least three workers should be used - assert_includes next_replies, :connection_refused + def test_usr1_all_respond_tcp + skip_unless_signal_exist? :USR1 + usr1_all_respond unix: false + end - refute_includes next_replies, :connection_reset + def test_usr1_all_respond_unix + skip_unless_signal_exist? :USR1 + usr1_all_respond unix: true end def test_term_exit_code - pid = cli_server("-w #{WORKERS} test/rackup/hello.ru").pid - _, status = send_term_to_server(pid) + cli_server "-w #{WORKERS} test/rackup/hello.ru" + _, status = stop_server assert_equal 15, status end @@ -112,117 +91,166 @@ def test_term_worker_clean_exit Process.kill :TERM, pid Process.wait pid - zombies = clean_exit_pids worker_pids + zombies = bad_exit_pids worker_pids assert_empty zombies, "Process ids #{zombies} became zombies" end - # mimicking stuck workers, test respawn with external SIGTERM + # mimicking stuck workers, test respawn with external TERM def test_stuck_external_term_spawn - worker_respawn { |l, phase0_worker_pids| - phase0_worker_pids.each { |p| Process.kill :TERM, p } - } + skip_unless_signal_exist? :TERM + + worker_respawn(0) do |phase0_worker_pids| + last = phase0_worker_pids.last + phase0_worker_pids.each do |pid| + Process.kill :TERM, pid + sleep 4 unless pid == last + end + end end # mimicking stuck workers, test restart def test_stuck_phased_restart - worker_respawn { |l, phase0_worker_pids| l.phased_restart } + skip_unless_signal_exist? :USR1 + worker_respawn { |phase0_worker_pids| Process.kill :USR1, @pid } end private - def worker_respawn - skip NO_FORK_MSG unless HAS_FORK - port = UniquePort.call - workers_booted = 0 - - conf = Puma::Configuration.new do |c| - c.bind "tcp://#{HOST}:#{port}" - c.threads 1, 1 - c.workers WORKERS - c.worker_shutdown_timeout 2 - c.app TestApps::SLEEP - c.after_worker_fork { |idx| workers_booted += 1 } - end + # Send requests 10 per second. Send 10, then :TERM server, then send another 30. + # No more than 10 should throw Errno::ECONNRESET. + def term_closes_listeners(unix: false) + skip_unless_signal_exist? :TERM + + cli_server "-w #{WORKERS} -t 0:6 -q test/rackup/sleep_pid.ru", unix: unix + threads = [] + replies = [] + mutex = Mutex.new + div = 10 - # start Puma via launcher - thr, launcher, _e = run_launcher conf + refused = thread_run_refused unix: unix - # make sure two workers have booted - time = 0 - until workers_booted >= WORKERS || time >= 10 - sleep 2 - time += 2 + 41.times.each do |i| + if i == 10 + threads << Thread.new do + sleep i.to_f/div + Process.kill :TERM, @pid + mutex.synchronize { replies << :term_sent } + end + else + threads << Thread.new do + thread_run replies, i.to_f/div, 1, mutex, refused, unix: unix + end + end end - cluster = launcher.instance_variable_get :@runner + threads.each(&:join) - http0 = Net::HTTP.new HOST, port - http1 = Net::HTTP.new HOST, port - body0 = nil - body1 = nil + responses = replies.count { |r| r[/\ASlept 1/] } + resets = replies.count { |r| r == :reset } + refused = replies.count { |r| r == :refused } + msg = "#{responses} responses, #{resets} resets, #{refused} refused" - worker0 = Thread.new do - begin - req0 = Net::HTTP::Get.new "/sleep35", {} - http0.start.request(req0) { |rep0| body0 = rep0.body } - rescue - end - end + assert_operator 9, :<=, responses, msg - worker1 = Thread.new do - begin - req1 = Net::HTTP::Get.new "/sleep40", {} - http1.start.request(req1) { |rep1| body1 = rep1.body } - rescue + assert_operator 10, :>=, resets , msg + + assert_operator 20, :<=, refused , msg + end + + # Send requests 1 per second. Send 1, then :USR1 server, then send another 24. + # All should be responded to, and at least three workers should be used + def usr1_all_respond(unix: false) + cli_server "-w #{WORKERS} -t 0:5 -q test/rackup/sleep_pid.ru", unix: unix + threads = [] + replies = [] + mutex = Mutex.new + + s = connect "sleep1", unix: unix + replies << read_body(s) + Process.kill :USR1, @pid + + refused = thread_run_refused unix: unix + + 24.times do |delay| + threads << Thread.new do + thread_run replies, delay, 1, mutex, refused, unix: unix end end - phase0_worker_pids = cluster.instance_variable_get(:@workers).map(&:pid) + threads.each(&:join) - start_time = Time.now.to_f + responses = replies.count { |r| r[/\ASlept 1/] } + resets = replies.count { |r| r == :reset } + refused = replies.count { |r| r == :refused } - # below should 'cancel' the phase 0 workers, either via phased_restart or - # externally SIGTERM'ing them - yield launcher, phase0_worker_pids - - # make sure four workers have booted - time = 0 - until workers_booted >= 2 * WORKERS || time >= 45 - sleep 2 - time += 2 - end + # get pids from replies, generate uniq array + qty_pids = replies.map { |body| body[/\d+\z/] }.uniq.compact.length - phase1_worker_pids = cluster.instance_variable_get(:@workers).map(&:pid) + msg = "#{responses} responses, #{qty_pids} uniq pids" - # should be empty if all phase 0 workers cleanly exited - phase0_exited = clean_exit_pids phase0_worker_pids + assert_equal 25, responses, msg + assert_operator qty_pids, :>, 2, msg + + msg = "#{responses} responses, #{resets} resets, #{refused} refused" + + refute_includes replies, :refused, msg + + refute_includes replies, :reset , msg + end + + def worker_respawn(phase = 1, size = WORKERS) + threads = [] + + cli_server "-w #{WORKERS} -t 1:1 -C test/config/worker_shutdown_timeout_2.rb test/rackup/sleep_pid.ru" + + # make sure two workers have booted + phase0_worker_pids = get_worker_pids - Thread.kill worker0 - Thread.kill worker1 + [35, 40].each do |sleep_time| + threads << Thread.new do + begin + connect "sleep#{sleep_time}" + # stuck connections will raise IOError or Errno::ECONNRESET + # when shutdown + rescue IOError, Errno::ECONNRESET + end + end + end - launcher.stop - assert_kind_of Thread, thr.join, "server didn't stop" + @start_time = Time.now.to_f - refute_equal 'Slept 35', body0 - refute_equal 'Slept 40', body1 + # below should 'cancel' the phase 0 workers, either via phased_restart or + # externally TERM'ing them + yield phase0_worker_pids + + # wait for new workers to boot + phase1_worker_pids = get_worker_pids phase + + # should be empty if all phase 0 workers cleanly exited + phase0_exited = bad_exit_pids phase0_worker_pids # Since 35 is the shorter of the two requests, server should restart # and cancel both requests - assert_operator (Time.now.to_f - start_time).round(2), :<, 35 + assert_operator (Time.now.to_f - @start_time).round(2), :<, 35 msg = "phase0_worker_pids #{phase0_worker_pids.inspect} phase1_worker_pids #{phase1_worker_pids.inspect} phase0_exited #{phase0_exited.inspect}" assert_equal WORKERS, phase0_worker_pids.length, msg + assert_equal WORKERS, phase1_worker_pids.length, msg assert_empty phase0_worker_pids & phase1_worker_pids, "#{msg}\nBoth workers should be replaced with new" + assert_empty phase0_exited, msg + + threads.each { |th| Thread.kill th } + stop_server signal: :KILL end # Returns an array of pids still in the process table, so it should # be empty for a clean exit. # Process.kill should raise the Errno::ESRCH exception, indicating the # process is dead and has been reaped. - def clean_exit_pids(pids) + def bad_exit_pids(pids) pids.map do |pid| begin pid if Process.kill 0, pid @@ -232,19 +260,29 @@ def clean_exit_pids(pids) end.compact end - def run_launcher(conf) - wait, ready = IO.pipe - @ios_to_close << wait << ready - events = Puma::Events.strings - events.on_booted { ready << "!" } - - launcher = Puma::Launcher.new conf, :events => events - - thr = Thread.new { launcher.run } - - # wait for boot from `events.on_booted` - wait.sysread 1 + # used with thread_run to define correct 'refused' errors + def thread_run_refused(unix: false) + if unix + DARWIN ? [Errno::ENOENT, IOError] : [Errno::ENOENT] + else + DARWIN ? [Errno::ECONNREFUSED, Errno::EPIPE, EOFError] : + [Errno::ECONNREFUSED] + end + end - [thr, launcher, events] + # used in loop to create several 'requests' + def thread_run(replies, delay, sleep_time, mutex, refused, unix: false) + begin + sleep delay + s = connect "sleep#{sleep_time}", unix: unix + body = read_body(s) + mutex.synchronize { replies << body } + rescue Errno::ECONNRESET + # connection was accepted but then closed + # client would see an empty response + mutex.synchronize { replies << :reset } + rescue *refused + mutex.synchronize { replies << :refused } + end end end diff --git a/test/test_integration_pumactl.rb b/test/test_integration_pumactl.rb index 2e8b67963c..d35d59a6cc 100644 --- a/test/test_integration_pumactl.rb +++ b/test/test_integration_pumactl.rb @@ -6,7 +6,6 @@ def setup super @state_path = "test/#{name}_puma.state" - @bind_path = "test/#{name}_server.sock" @control_path = "test/#{name}_control.sock" end @@ -16,15 +15,15 @@ def teardown begin # refute File.exist?(@bind_path), "Bind path must be removed after stop" ensure - [@bind_path, @state_path, @control_path].each { |p| File.unlink(p) rescue nil } + [@state_path, @control_path].each { |p| File.unlink(p) rescue nil } end end def test_pumactl_stop skip UNIX_SKT_MSG unless UNIX_SKT_EXIST - cli_server("-q test/rackup/sleep.ru --control-url unix://#{@control_path} --control-token #{TOKEN} -S #{@state_path}") + cli_server "-q test/rackup/sleep.ru --control-url unix://#{@control_path} --control-token #{TOKEN} -S #{@state_path}" - cli_pumactl "-C unix://#{@control_path} -T #{TOKEN} stop" + cli_pumactl "stop", unix: true _, status = Process.wait2(@server.pid) assert_equal 0, status @@ -35,7 +34,7 @@ def test_pumactl_stop def test_pumactl_phased_restart_cluster skip NO_FORK_MSG unless HAS_FORK - cli_server "-q -w #{WORKERS} test/rackup/sleep.ru --control-url unix://#{@control_path} --control-token #{TOKEN} -S #{@state_path}", "unix://#{@bind_path}" + cli_server "-q -w #{WORKERS} test/rackup/sleep.ru --control-url unix://#{@control_path} --control-token #{TOKEN} -S #{@state_path}", unix: true s = UNIXSocket.new @bind_path @ios_to_close << s @@ -46,7 +45,7 @@ def test_pumactl_phased_restart_cluster assert File.exist? @bind_path # Phased restart - cli_pumactl "-C unix://#{@control_path} -T #{TOKEN} phased-restart" + cli_pumactl "phased-restart", unix: true # Get the PIDs of the phase 1 workers. phase1_worker_pids = get_worker_pids 1 @@ -58,8 +57,7 @@ def test_pumactl_phased_restart_cluster assert_empty phase0_worker_pids & phase1_worker_pids, "#{msg}\nBoth workers should be replaced with new" assert File.exist?(@bind_path), "Bind path must exist after phased restart" - # Stop - cli_pumactl "-C unix://#{@control_path} -T #{TOKEN} stop" + cli_pumactl "stop", unix: true _, status = Process.wait2(@server.pid) assert_equal 0, status @@ -89,8 +87,12 @@ def test_pumactl_kill_unknown private - def cli_pumactl(argv) - pumactl = IO.popen("#{BASE} bin/pumactl #{argv}", "r") + def cli_pumactl(argv, unix: false) + if unix + pumactl = IO.popen("#{BASE} bin/pumactl -C unix://#{@control_path} -T #{TOKEN} #{argv}", "r") + else + pumactl = IO.popen("#{BASE} bin/pumactl #{argv}", "r") + end @ios_to_close << pumactl Process.wait pumactl.pid pumactl diff --git a/test/test_integration_single.rb b/test/test_integration_single.rb index 883974bb90..9532f1fe00 100644 --- a/test/test_integration_single.rb +++ b/test/test_integration_single.rb @@ -23,19 +23,19 @@ def test_usr2_restart_restores_environment end def test_term_exit_code - skip_on :windows # no SIGTERM + skip_on :windows # no TERM - pid = cli_server("test/rackup/hello.ru").pid - _, status = send_term_to_server(pid) + cli_server "test/rackup/hello.ru" + _, status = stop_server assert_equal 15, status end def test_term_suppress - skip_on :windows # no SIGTERM + skip_on :windows # no TERM - pid = cli_server("-C test/config/suppress_exception.rb test/rackup/hello.ru").pid - _, status = send_term_to_server(pid) + cli_server "-C test/config/suppress_exception.rb test/rackup/hello.ru" + _, status = stop_server assert_equal 0, status end