From 4c8d4d6921a1b9b9155ab83b09a632c377401398 Mon Sep 17 00:00:00 2001 From: MSP-Greg Date: Thu, 19 Sep 2019 12:37:53 -0500 Subject: [PATCH] Update test_integration files per PR #1956 (#1965) * Update test_integration files per PR #1956 test_integration_cluster.rb Request handling during server TERM - two tests `#test_term_closes_listeners_tcp` `#test_term_closes_listeners_unix` using `#term_closes_listeners` Send requests 10 per second. Send 10, then :TERM server, then send another 30. No more than 10 should throw Errno::ECONNRESET. Request handling during phased restart - two tests `#test_usr1_all_respond_tcp` `#test_usr1_all_respond_unix` using `#usr1_all_respond` Send requests 1 per second. Send 1, then :USR1 server, then send another 24. All should be responded to, and at least three workers should be used Stuck worker tests - two tests `#test_stuck_external_term_spawn` Tests whether externally TERM'd 'stuck' workers are proper re-spawned. `#test_stuck_phased_restart` Tests whether 'stuck' workers are properly shutdown during phased-restart. helper files/methods changes 1. helper file changes to allow binding to TCP or UNIX, see kwarg unix: 2. Skip on Windows for signal TERM * Misc updates, debug output, cleanup * Add comments * fix test_int_signal_with_background_thread_in_jruby per review * TestIntegrationCluster#term_closes_listeners - add interleaved assert * cluster.rb - remove duplicate Worker#term? method --- lib/puma/cluster.rb | 4 - test/config/worker_shutdown_timeout_2.rb | 1 + test/helper.rb | 24 +- test/helpers/integration.rb | 54 ++-- test/rackup/sleep_pid.ru | 8 + test/rackup/sleep_step.ru | 10 + test/test_cli.rb | 37 ++- test/test_integration_cluster.rb | 363 ++++++++++++++--------- test/test_integration_pumactl.rb | 50 ++-- test/test_integration_single.rb | 37 +-- 10 files changed, 378 insertions(+), 210 deletions(-) create mode 100644 test/config/worker_shutdown_timeout_2.rb create mode 100644 test/rackup/sleep_pid.ru create mode 100644 test/rackup/sleep_step.ru diff --git a/lib/puma/cluster.rb b/lib/puma/cluster.rb index 86c1b3ec0e..5b84f66853 100644 --- a/lib/puma/cluster.rb +++ b/lib/puma/cluster.rb @@ -92,10 +92,6 @@ def term? @term end - def term? - @term - end - def ping!(status) @last_checkin = Time.now @last_status = status diff --git a/test/config/worker_shutdown_timeout_2.rb b/test/config/worker_shutdown_timeout_2.rb new file mode 100644 index 0000000000..ddb8c6453d --- /dev/null +++ b/test/config/worker_shutdown_timeout_2.rb @@ -0,0 +1 @@ +worker_shutdown_timeout 2 diff --git a/test/helper.rb b/test/helper.rb index 0a127f0091..5dbce76fa1 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -21,6 +21,9 @@ $LOAD_PATH << File.expand_path("../../lib", __FILE__) Thread.abort_on_exception = true +$debugging_info = ''.dup +$debugging_hold = false # needed for TestCLI#test_control_clustered + require "puma" require "puma/events" require "puma/detect" @@ -84,10 +87,12 @@ module TestSkips UNIX_SKT_EXIST = Object.const_defined? :UNIXSocket UNIX_SKT_MSG = "UnixSockets aren't available on the #{RUBY_PLATFORM} platform" + SIGNAL_LIST = Signal.list.keys.map(&:to_sym) - (Puma.windows? ? [:INT, :TERM] : []) + # usage: skip_unless_signal_exist? :USR2 def skip_unless_signal_exist?(sig, bt: caller) - signal = sig.to_s - unless Signal.list.key? signal + signal = sig.to_s.sub(/\ASIG/, '').to_sym + unless SIGNAL_LIST.include? signal skip "Signal #{signal} isn't available on the #{RUBY_PLATFORM} platform", bt end end @@ -130,4 +135,19 @@ def self.run(reporter, options = {}) # :nodoc: prove_it! super end + + def full_name + "#{self.class.name}##{name}" + end +end + +Minitest.after_run do + # needed for TestCLI#test_control_clustered + unless $debugging_hold + out = $debugging_info.strip + unless out.empty? + puts "", " Debugging Info".rjust(75, '-'), + out, '-' * 75, "" + end + end end diff --git a/test/helpers/integration.rb b/test/helpers/integration.rb index 31608ef880..7cd6b74e3d 100644 --- a/test/helpers/integration.rb +++ b/test/helpers/integration.rb @@ -15,60 +15,68 @@ class TestIntegration < Minitest::Test def setup @ios_to_close = [] + @bind_path = "test/#{name}_server.sock" end def teardown if defined?(@server) && @server - begin - Process.kill "INT", @server.pid - rescue - Errno::ESRCH - end - begin - Process.wait @server.pid - rescue Errno::ECHILD - end - @server.close unless @server.closed? - @server = nil + stop_server @pid, signal: :INT end @ios_to_close.each do |io| io.close if io.is_a?(IO) && !io.closed? io = nil end + refute File.exist?(@bind_path), "Bind path must be removed after stop" + File.unlink(@bind_path) rescue nil + + # wait until the end for OS buffering? + if defined?(@server) && @server + @server.close unless @server.closed? + @server = nil + end end private - def cli_server(argv, bind = nil) - if bind - cmd = "#{BASE} bin/puma -b #{bind} #{argv}" + def cli_server(argv, unix: false) + if unix + cmd = "#{BASE} bin/puma -b unix://#{@bind_path} #{argv}" else @tcp_port = UniquePort.call cmd = "#{BASE} bin/puma -b tcp://#{HOST}:#{@tcp_port} #{argv}" end @server = IO.popen(cmd, "r") wait_for_server_to_boot + @pid = @server.pid @server end - def send_term_to_server(pid) - Process.kill(:TERM, pid) - sleep 1 - Process.wait2(pid) + # rescue statements are just in case method is called with a server + # that is already stopped/killed, especially since Process.wait2 is + # blocking + def stop_server(pid = @pid, signal: :TERM) + begin + Process.kill signal, pid + rescue Errno::ESRCH + end + begin + Process.wait2 pid + rescue Errno::ECHILD + end end def restart_server_and_listen(argv) - cli_server(argv) + cli_server argv connection = connect initial_reply = read_body(connection) - restart_server(connection) + restart_server connection [initial_reply, read_body(connect)] end # reuses an existing connection to make sure that works def restart_server(connection) - Process.kill :USR2, @server.pid + Process.kill :USR2, @pid connection.write "GET / HTTP/1.1\r\n\r\n" # trigger it to start by sending a new request wait_for_server_to_boot end @@ -77,8 +85,8 @@ def wait_for_server_to_boot true while @server.gets !~ /Ctrl-C/ # wait for server to say it booted end - def connect(path = nil) - s = TCPSocket.new HOST, @tcp_port + def connect(path = nil, unix: false) + s = unix ? UNIXSocket.new(@bind_path) : TCPSocket.new(HOST, @tcp_port) @ios_to_close << s s << "GET /#{path} HTTP/1.1\r\n\r\n" true until s.gets == "\r\n" diff --git a/test/rackup/sleep_pid.ru b/test/rackup/sleep_pid.ru new file mode 100644 index 0000000000..0f1811c8c7 --- /dev/null +++ b/test/rackup/sleep_pid.ru @@ -0,0 +1,8 @@ +# call with "GET /sleep HTTP/1.1\r\n\r\n", where is the number of +# seconds to sleep, returns process pid + +run lambda { |env| + dly = (env['REQUEST_PATH'][/\/sleep(\d+)/,1] || '0').to_i + sleep dly + [200, {"Content-Type" => "text/plain"}, ["Slept #{dly} #{Process.pid}"]] +} diff --git a/test/rackup/sleep_step.ru b/test/rackup/sleep_step.ru new file mode 100644 index 0000000000..e5633cd183 --- /dev/null +++ b/test/rackup/sleep_step.ru @@ -0,0 +1,10 @@ +# call with "GET /sleep- HTTP/1.1\r\n\r\n", where is the number of +# seconds to sleep and is the step + +run lambda { |env| + p = env['REQUEST_PATH'] + dly = (p[/\/sleep(\d+)/,1] || '0').to_i + step = p[/(\d+)\z/,1].to_i + sleep dly + [200, {"Content-Type" => "text/plain"}, ["Slept #{dly} #{step}"]] +} diff --git a/test/test_cli.rb b/test/test_cli.rb index 2e0ef4d411..ac3ed4e23a 100644 --- a/test/test_cli.rb +++ b/test/test_cli.rb @@ -81,6 +81,9 @@ def test_control_clustered "--control-token", "", "test/rackup/lobster.ru"], @events + # without this, Minitest.after_run will trigger on this test ? + $debugging_hold = true + t = Thread.new { cli.run } wait_booted @@ -88,6 +91,7 @@ def test_control_clustered s = UNIXSocket.new @tmp_path s << "GET /stats HTTP/1.0\r\n\r\n" body = s.read + s.close require 'json' status = JSON.parse(body.split("\n").last) @@ -99,10 +103,23 @@ def test_control_clustered s = UNIXSocket.new @tmp_path s << "GET /stats HTTP/1.0\r\n\r\n" body = s.read - assert_match(/\{ "started_at": "\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z", "workers": 2, "phase": 0, "booted_workers": 2, "old_workers": 0, "worker_status": \[\{ "started_at": "\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z", "pid": \d+, "index": 0, "phase": 0, "booted": true, "last_checkin": "[^"]+", "last_status": \{ "backlog":0, "running":2, "pool_capacity":2, "max_threads": 2 \} \},\{ "started_at": "\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z", "pid": \d+, "index": 1, "phase": 0, "booted": true, "last_checkin": "[^"]+", "last_status": \{ "backlog":0, "running":2, "pool_capacity":2, "max_threads": 2 \} \}\] \}/, body.split("\r\n").last) + s.close - cli.launcher.stop - t.join + assert_match(/\{ "started_at": "\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z", "workers": 2, "phase": 0, "booted_workers": 2, "old_workers": 0, "worker_status": \[\{ "started_at": "\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z", "pid": \d+, "index": 0, "phase": 0, "booted": true, "last_checkin": "[^"]+", "last_status": \{ "backlog":0, "running":2, "pool_capacity":2, "max_threads": 2 \} \},\{ "started_at": "\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z", "pid": \d+, "index": 1, "phase": 0, "booted": true, "last_checkin": "[^"]+", "last_status": \{ "backlog":0, "running":2, "pool_capacity":2, "max_threads": 2 \} \}\] \}/, body.split("\r\n").last) + ensure + if UNIX_SKT_EXIST && HAS_FORK + cli.launcher.stop + + done = nil + until done + @events.stdout.rewind + log = @events.stdout.readlines.join '' + done = log[/ - Goodbye!/] + end + + t.join + $debugging_hold = false + end end def test_control @@ -121,11 +138,14 @@ def test_control s = UNIXSocket.new @tmp_path s << "GET /stats HTTP/1.0\r\n\r\n" body = s.read + s.close assert_match(/{ "started_at": "\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z", "backlog": 0, "running": 0, "pool_capacity": 16, "max_threads": 16 }/, body.split("\r\n").last) - - cli.launcher.stop - t.join + ensure + if UNIX_SKT_EXIST + cli.launcher.stop + t.join + end end def test_control_stop @@ -144,10 +164,11 @@ def test_control_stop s = UNIXSocket.new @tmp_path s << "GET /stop HTTP/1.0\r\n\r\n" body = s.read + s.close assert_equal '{ "status": "ok" }', body.split("\r\n").last - - t.join + ensure + t.join if UNIX_SKT_EXIST end def control_gc_stats(uri, cntl) diff --git a/test/test_integration_cluster.rb b/test/test_integration_cluster.rb index 9fb9a89698..f6634c3b6a 100644 --- a/test/test_integration_cluster.rb +++ b/test/test_integration_cluster.rb @@ -2,21 +2,28 @@ require_relative "helpers/integration" class TestIntegrationCluster < TestIntegration + parallelize_me! + + DARWIN = !!RUBY_PLATFORM[/darwin/] + def setup + skip NO_FORK_MSG unless HAS_FORK super + end - skip NO_FORK_MSG unless HAS_FORK + def teardown + super if HAS_FORK end def test_siginfo_thread_print skip_unless_signal_exist? :INFO - cli_server("-w #{WORKERS} -q test/rackup/hello.ru") + cli_server "-w #{WORKERS} -q test/rackup/hello.ru" worker_pids = get_worker_pids output = [] t = Thread.new { output << @server.readlines } - Process.kill(:INFO, worker_pids.first) - Process.kill(:INT, @server.pid) + Process.kill :INFO, worker_pids.first + Process.kill :INT , @pid t.join assert_match "Thread TID", output.join @@ -27,202 +34,252 @@ def test_usr2_restart assert_equal "Hello World", new_reply end - def test_term_closes_listeners - pid = cli_server("-w #{WORKERS} -q test/rackup/sleep.ru").pid - threads = [] - initial_reply = nil - next_replies = [] - condition_variable = ConditionVariable.new - mutex = Mutex.new - - threads << Thread.new do - s = connect "sleep1" - mutex.synchronize { condition_variable.broadcast } - initial_reply = read_body(s) - end - - threads << Thread.new do - mutex.synchronize { - condition_variable.wait(mutex, 1) - Process.kill("SIGTERM", pid) - } - end - - 10.times.each do |i| - threads << Thread.new do - mutex.synchronize { condition_variable.wait(mutex, 1.5) } + # Next two tests, one tcp, one unix + # Send requests 10 per second. Send 10, then :TERM server, then send another 30. + # No more than 10 should throw Errno::ECONNRESET. - begin - s = connect "sleep1" - read_body(s) - next_replies << :success - rescue Errno::ECONNRESET - # connection was accepted but then closed - # client would see an empty response - next_replies << :connection_reset - rescue Errno::ECONNREFUSED - # connection was was never accepted - # it can therefore be re-tried before the - # client receives an empty response - next_replies << :connection_refused - end - end - end + def test_term_closes_listeners_tcp + skip_unless_signal_exist? :TERM + term_closes_listeners unix: false + end - threads.each(&:join) + def test_term_closes_listeners_unix + skip_unless_signal_exist? :TERM + term_closes_listeners unix: true + end - assert_equal "Slept 1", initial_reply + # Next two tests, one tcp, one unix + # Send requests 1 per second. Send 1, then :USR1 server, then send another 24. + # All should be responded to, and at least three workers should be used - assert_includes next_replies, :connection_refused + def test_usr1_all_respond_tcp + skip_unless_signal_exist? :USR1 + usr1_all_respond unix: false + end - refute_includes next_replies, :connection_reset + def test_usr1_all_respond_unix + skip_unless_signal_exist? :USR1 + usr1_all_respond unix: true end def test_term_exit_code - pid = cli_server("-w #{WORKERS} test/rackup/hello.ru").pid - _, status = send_term_to_server(pid) + cli_server "-w #{WORKERS} test/rackup/hello.ru" + _, status = stop_server assert_equal 15, status end def test_term_suppress - cli_server("-w #{WORKERS} -C test/config/suppress_exception.rb test/rackup/hello.ru") + cli_server "-w #{WORKERS} -C test/config/suppress_exception.rb test/rackup/hello.ru" - Process.kill(:TERM, @server.pid) - begin - Process.wait @server.pid - rescue Errno::ECHILD - end - status = $?.exitstatus + _, status = stop_server assert_equal 0, status - @server.close unless @server.closed? - @server = nil # prevent `#teardown` from killing already killed server end def test_term_worker_clean_exit skip "Intermittent failure on Ruby 2.2" if RUBY_VERSION < '2.3' - pid = cli_server("-w #{WORKERS} test/rackup/hello.ru").pid + cli_server "-w #{WORKERS} test/rackup/hello.ru" # Get the PIDs of the child workers. worker_pids = get_worker_pids # Signal the workers to terminate, and wait for them to die. - Process.kill :TERM, pid - Process.wait pid + Process.kill :TERM, @pid + Process.wait @pid - zombies = clean_exit_pids worker_pids + zombies = bad_exit_pids worker_pids assert_empty zombies, "Process ids #{zombies} became zombies" end - # mimicking stuck workers, test respawn with external SIGTERM + # mimicking stuck workers, test respawn with external TERM def test_stuck_external_term_spawn - worker_respawn { |l, phase0_worker_pids| - phase0_worker_pids.each { |p| Process.kill :TERM, p } - } + skip_unless_signal_exist? :TERM + + worker_respawn(0) do |phase0_worker_pids| + last = phase0_worker_pids.last + # test is tricky if only one worker is TERM'd, so kill all but + # spread out, so all aren't killed at once + phase0_worker_pids.each do |pid| + Process.kill :TERM, pid + sleep 4 unless pid == last + end + end end # mimicking stuck workers, test restart def test_stuck_phased_restart - worker_respawn { |l, phase0_worker_pids| l.phased_restart } + skip_unless_signal_exist? :USR1 + worker_respawn { |phase0_worker_pids| Process.kill :USR1, @pid } end private - def worker_respawn - skip NO_FORK_MSG unless HAS_FORK - port = UniquePort.call - workers_booted = 0 - - conf = Puma::Configuration.new do |c| - c.bind "tcp://#{HOST}:#{port}" - c.threads 1, 1 - c.workers WORKERS - c.worker_shutdown_timeout 2 - c.app TestApps::SLEEP - c.after_worker_fork { |idx| workers_booted += 1 } - end + # Send requests 10 per second. Send 10, then :TERM server, then send another 30. + # No more than 10 should throw Errno::ECONNRESET. + def term_closes_listeners(unix: false) + skip_unless_signal_exist? :TERM - # start Puma via launcher - thr, launcher, _e = run_launcher conf + cli_server "-w #{WORKERS} -t 0:6 -q test/rackup/sleep_step.ru", unix: unix + threads = [] + replies = [] + mutex = Mutex.new + div = 10 - # make sure two workers have booted - time = 0 - until workers_booted >= WORKERS || time >= 10 - sleep 2 - time += 2 + refused = thread_run_refused unix: unix + + 41.times.each do |i| + if i == 10 + threads << Thread.new do + sleep i.to_f/div + Process.kill :TERM, @pid + mutex.synchronize { replies[i] = :term_sent } + end + else + threads << Thread.new do + thread_run_step replies, i.to_f/div, 1, i, mutex, refused, unix: unix + end + end end - cluster = launcher.instance_variable_get :@runner + threads.each(&:join) - http0 = Net::HTTP.new HOST, port - http1 = Net::HTTP.new HOST, port - body0 = nil - body1 = nil + failures = replies.count(:failure) + successes = replies.count(:success) + resets = replies.count(:reset) + refused = replies.count(:refused) - worker0 = Thread.new do - begin - req0 = Net::HTTP::Get.new "/sleep35", {} - http0.start.request(req0) { |rep0| body0 = rep0.body } - rescue - end + r_success = replies.rindex(:success) + l_reset = replies.index(:reset) + r_reset = replies.rindex(:reset) + l_refused = replies.index(:refused) + + msg = "#{successes} successes, #{resets} resets, #{refused} refused, failures #{failures}" + + assert_equal 0, failures, msg + + assert_operator 9, :<=, successes, msg + + assert_operator 10, :>=, resets , msg + + assert_operator 20, :<=, refused , msg + + # Interleaved asserts + # UNIX binders do not generate :reset items + if l_reset + assert_operator r_success, :<, l_reset , "Interleaved success and reset" + assert_operator r_reset , :<, l_refused, "Interleaved reset and refused" + else + assert_operator r_success, :<, l_refused, "Interleaved success and refused" end - worker1 = Thread.new do - begin - req1 = Net::HTTP::Get.new "/sleep40", {} - http1.start.request(req1) { |rep1| body1 = rep1.body } - rescue + ensure + if passed? + $debugging_info << "#{full_name}\n #{msg}\n" + else + $debugging_info << "#{full_name}\n #{msg}\n#{replies.inspect}\n" + end + end + + # Send requests 1 per second. Send 1, then :USR1 server, then send another 24. + # All should be responded to, and at least three workers should be used + def usr1_all_respond(unix: false) + cli_server "-w #{WORKERS} -t 0:5 -q test/rackup/sleep_pid.ru", unix: unix + threads = [] + replies = [] + mutex = Mutex.new + + s = connect "sleep1", unix: unix + replies << read_body(s) + + Process.kill :USR1, @pid + + refused = thread_run_refused unix: unix + + 24.times do |delay| + threads << Thread.new do + thread_run_pid replies, delay, 1, mutex, refused, unix: unix end end - phase0_worker_pids = cluster.instance_variable_get(:@workers).map(&:pid) + threads.each(&:join) + + responses = replies.count { |r| r[/\ASlept 1/] } + resets = replies.count { |r| r == :reset } + refused = replies.count { |r| r == :refused } - start_time = Time.now.to_f + # get pids from replies, generate uniq array + qty_pids = replies.map { |body| body[/\d+\z/] }.uniq.compact.length - # below should 'cancel' the phase 0 workers, either via phased_restart or - # externally SIGTERM'ing them - yield launcher, phase0_worker_pids - - # make sure four workers have booted - time = 0 - until workers_booted >= 2 * WORKERS || time >= 45 - sleep 2 - time += 2 + msg = "#{responses} responses, #{qty_pids} uniq pids" + + assert_equal 25, responses, msg + assert_operator qty_pids, :>, 2, msg + + msg = "#{responses} responses, #{resets} resets, #{refused} refused" + + refute_includes replies, :refused, msg + + refute_includes replies, :reset , msg + ensure + unless passed? + $debugging_info << "#{full_name}\n #{msg}\n#{replies.inspect}\n" end + end - phase1_worker_pids = cluster.instance_variable_get(:@workers).map(&:pid) + def worker_respawn(phase = 1, size = WORKERS) + threads = [] - # should be empty if all phase 0 workers cleanly exited - phase0_exited = clean_exit_pids phase0_worker_pids + cli_server "-w #{WORKERS} -t 1:1 -C test/config/worker_shutdown_timeout_2.rb test/rackup/sleep_pid.ru" + + # make sure two workers have booted + phase0_worker_pids = get_worker_pids - Thread.kill worker0 - Thread.kill worker1 + [35, 40].each do |sleep_time| + threads << Thread.new do + begin + connect "sleep#{sleep_time}" + # stuck connections will raise IOError or Errno::ECONNRESET + # when shutdown + rescue IOError, Errno::ECONNRESET + end + end + end - launcher.stop - assert_kind_of Thread, thr.join, "server didn't stop" + @start_time = Time.now.to_f - refute_equal 'Slept 35', body0 - refute_equal 'Slept 40', body1 + # below should 'cancel' the phase 0 workers, either via phased_restart or + # externally TERM'ing them + yield phase0_worker_pids + + # wait for new workers to boot + phase1_worker_pids = get_worker_pids phase + + # should be empty if all phase 0 workers cleanly exited + phase0_exited = bad_exit_pids phase0_worker_pids # Since 35 is the shorter of the two requests, server should restart # and cancel both requests - assert_operator (Time.now.to_f - start_time).round(2), :<, 35 + assert_operator (Time.now.to_f - @start_time).round(2), :<, 35 msg = "phase0_worker_pids #{phase0_worker_pids.inspect} phase1_worker_pids #{phase1_worker_pids.inspect} phase0_exited #{phase0_exited.inspect}" assert_equal WORKERS, phase0_worker_pids.length, msg + assert_equal WORKERS, phase1_worker_pids.length, msg assert_empty phase0_worker_pids & phase1_worker_pids, "#{msg}\nBoth workers should be replaced with new" + assert_empty phase0_exited, msg + + threads.each { |th| Thread.kill th } end # Returns an array of pids still in the process table, so it should # be empty for a clean exit. # Process.kill should raise the Errno::ESRCH exception, indicating the # process is dead and has been reaped. - def clean_exit_pids(pids) + def bad_exit_pids(pids) pids.map do |pid| begin pid if Process.kill 0, pid @@ -232,19 +289,51 @@ def clean_exit_pids(pids) end.compact end - def run_launcher(conf) - wait, ready = IO.pipe - @ios_to_close << wait << ready - events = Puma::Events.strings - events.on_booted { ready << "!" } + # used with thread_run to define correct 'refused' errors + def thread_run_refused(unix: false) + if unix + DARWIN ? [Errno::ENOENT, IOError] : [Errno::ENOENT] + else + DARWIN ? [Errno::ECONNREFUSED, Errno::EPIPE, EOFError] : + [Errno::ECONNREFUSED] + end + end - launcher = Puma::Launcher.new conf, :events => events + # used in loop to create several 'requests' + def thread_run_pid(replies, delay, sleep_time, mutex, refused, unix: false) + begin + sleep delay + s = connect "sleep#{sleep_time}", unix: unix + body = read_body(s) + mutex.synchronize { replies << body } + rescue Errno::ECONNRESET + # connection was accepted but then closed + # client would see an empty response + mutex.synchronize { replies << :reset } + rescue *refused + mutex.synchronize { replies << :refused } + end + end - thr = Thread.new { launcher.run } + # used in loop to create several 'requests' + def thread_run_step(replies, delay, sleep_time, step, mutex, refused, unix: false) + begin + sleep delay + s = connect "sleep#{sleep_time}-#{step}", unix: unix + body = read_body(s) + if body[/\ASlept /] + mutex.synchronize { replies[step] = :success } + else + mutex.synchronize { replies[step] = :failure } + end + rescue Errno::ECONNRESET + # connection was accepted but then closed + # client would see an empty response + mutex.synchronize { replies[step] = :reset } + rescue *refused + mutex.synchronize { replies[step] = :refused } + end + end - # wait for boot from `events.on_booted` - wait.sysread 1 - [thr, launcher, events] - end end diff --git a/test/test_integration_pumactl.rb b/test/test_integration_pumactl.rb index 2e8b67963c..b7ae5f715f 100644 --- a/test/test_integration_pumactl.rb +++ b/test/test_integration_pumactl.rb @@ -2,40 +2,49 @@ require_relative "helpers/integration" class TestIntegrationPumactl < TestIntegration + parallelize_me! + def setup super @state_path = "test/#{name}_puma.state" - @bind_path = "test/#{name}_server.sock" @control_path = "test/#{name}_control.sock" end def teardown super - begin - # refute File.exist?(@bind_path), "Bind path must be removed after stop" - ensure - [@bind_path, @state_path, @control_path].each { |p| File.unlink(p) rescue nil } - end + [@state_path, @control_path].each { |p| File.unlink(p) rescue nil } + end + + def test_stop_tcp + @control_tcp_port = UniquePort.call + cli_server "-q test/rackup/sleep.ru --control-url tcp://#{HOST}:#{@control_tcp_port} --control-token #{TOKEN} -S #{@state_path}" + + cli_pumactl "stop" + + _, status = Process.wait2(@pid) + assert_equal 0, status + + @server = nil end - def test_pumactl_stop + def test_stop_unix skip UNIX_SKT_MSG unless UNIX_SKT_EXIST - cli_server("-q test/rackup/sleep.ru --control-url unix://#{@control_path} --control-token #{TOKEN} -S #{@state_path}") + cli_server "-q test/rackup/sleep.ru --control-url unix://#{@control_path} --control-token #{TOKEN} -S #{@state_path}", unix: true - cli_pumactl "-C unix://#{@control_path} -T #{TOKEN} stop" + cli_pumactl "stop", unix: true - _, status = Process.wait2(@server.pid) + _, status = Process.wait2(@pid) assert_equal 0, status @server = nil end - def test_pumactl_phased_restart_cluster + def test_phased_restart_cluster skip NO_FORK_MSG unless HAS_FORK - cli_server "-q -w #{WORKERS} test/rackup/sleep.ru --control-url unix://#{@control_path} --control-token #{TOKEN} -S #{@state_path}", "unix://#{@bind_path}" + cli_server "-q -w #{WORKERS} test/rackup/sleep.ru --control-url unix://#{@control_path} --control-token #{TOKEN} -S #{@state_path}", unix: true s = UNIXSocket.new @bind_path @ios_to_close << s @@ -46,7 +55,7 @@ def test_pumactl_phased_restart_cluster assert File.exist? @bind_path # Phased restart - cli_pumactl "-C unix://#{@control_path} -T #{TOKEN} phased-restart" + cli_pumactl "phased-restart", unix: true # Get the PIDs of the phase 1 workers. phase1_worker_pids = get_worker_pids 1 @@ -58,16 +67,15 @@ def test_pumactl_phased_restart_cluster assert_empty phase0_worker_pids & phase1_worker_pids, "#{msg}\nBoth workers should be replaced with new" assert File.exist?(@bind_path), "Bind path must exist after phased restart" - # Stop - cli_pumactl "-C unix://#{@control_path} -T #{TOKEN} stop" + cli_pumactl "stop", unix: true - _, status = Process.wait2(@server.pid) + _, status = Process.wait2(@pid) assert_equal 0, status @server = nil end - def test_pumactl_kill_unknown + def test_kill_unknown skip_on :jruby # we run ls to get a 'safe' pid to pass off as puma in cli stop @@ -89,8 +97,12 @@ def test_pumactl_kill_unknown private - def cli_pumactl(argv) - pumactl = IO.popen("#{BASE} bin/pumactl #{argv}", "r") + def cli_pumactl(argv, unix: false) + if unix + pumactl = IO.popen("#{BASE} bin/pumactl -C unix://#{@control_path} -T #{TOKEN} #{argv}", "r") + else + pumactl = IO.popen("#{BASE} bin/pumactl -C tcp://#{HOST}:#{@control_tcp_port} -T #{TOKEN} #{argv}", "r") + end @ios_to_close << pumactl Process.wait pumactl.pid pumactl diff --git a/test/test_integration_single.rb b/test/test_integration_single.rb index 65e6cbfec5..8325231f50 100644 --- a/test/test_integration_single.rb +++ b/test/test_integration_single.rb @@ -2,6 +2,8 @@ require_relative "helpers/integration" class TestIntegrationSingle < TestIntegration + parallelize_me! + def test_usr2_restart skip_unless_signal_exist? :USR2 _, new_reply = restart_server_and_listen("-q test/rackup/hello.ru") @@ -23,33 +25,34 @@ def test_usr2_restart_restores_environment end def test_term_exit_code - skip_on :windows # no SIGTERM + skip_unless_signal_exist? :TERM skip_on :jruby # JVM does not return correct exit code for TERM - pid = cli_server("test/rackup/hello.ru").pid - _, status = send_term_to_server(pid) + cli_server "test/rackup/hello.ru" + _, status = stop_server assert_equal 15, status end def test_term_suppress - skip_on :windows # no SIGTERM + skip_unless_signal_exist? :TERM - pid = cli_server("-C test/config/suppress_exception.rb test/rackup/hello.ru").pid - _, status = send_term_to_server(pid) + cli_server "-C test/config/suppress_exception.rb test/rackup/hello.ru" + _, status = stop_server assert_equal 0, status end def test_term_not_accepts_new_connections - skip_on :jruby, :windows + skip_unless_signal_exist? :TERM + skip_on :jruby - cli_server('test/rackup/sleep.ru') + cli_server 'test/rackup/sleep.ru' _stdin, curl_stdout, _stderr, curl_wait_thread = Open3.popen3("curl http://#{HOST}:#{@tcp_port}/sleep10") sleep 1 # ensure curl send a request - Process.kill(:TERM, @server.pid) + Process.kill :TERM, @pid true while @server.gets !~ /Gracefully stopping/ # wait for server to begin graceful shutdown # Invoke a request which must be rejected @@ -69,10 +72,10 @@ def test_term_not_accepts_new_connections @server = nil # prevent `#teardown` from killing already killed server end - def test_int_signal_with_background_thread_in_jruby - skip_unless :jruby + def test_int_refuse + skip_unless_signal_exist? :INT - cli_server('test/rackup/hello.ru') + cli_server 'test/rackup/hello.ru' begin sock = TCPSocket.new(HOST, @tcp_port) sock.close @@ -80,8 +83,8 @@ def test_int_signal_with_background_thread_in_jruby fail("Port didn't open properly: #{ex.message}") end - Process.kill :INT, @server.pid - Process.wait @server.pid + Process.kill :INT, @pid + Process.wait @pid assert_raises(Errno::ECONNREFUSED) { TCPSocket.new(HOST, @tcp_port) } end @@ -89,11 +92,11 @@ def test_int_signal_with_background_thread_in_jruby def test_siginfo_thread_print skip_unless_signal_exist? :INFO - cli_server("test/rackup/hello.ru") + cli_server 'test/rackup/hello.ru' output = [] t = Thread.new { output << @server.readlines } - Process.kill(:INFO, @server.pid) - Process.kill(:INT, @server.pid) + Process.kill :INFO, @pid + Process.kill :INT , @pid t.join assert_match "Thread TID", output.join