From 4471054cca24d80fdffa0d0dd3b684f74daeb46d Mon Sep 17 00:00:00 2001 From: MSP-Greg Date: Mon, 9 Sep 2019 14:05:56 -0500 Subject: [PATCH 1/5] Updates binder.rb, cluster.rb, launcher.rb 1. binder.rb Add PR #1886 2. cluster.rb Add PR #1952, additional minor changes 3. launcher.rb #close_binder_listeners close io's as fast as possible --- lib/puma/binder.rb | 8 -------- lib/puma/cluster.rb | 19 ++++++++++++------- lib/puma/launcher.rb | 7 ++++--- 3 files changed, 16 insertions(+), 18 deletions(-) diff --git a/lib/puma/binder.rb b/lib/puma/binder.rb index f2165f7474..d0b122e782 100644 --- a/lib/puma/binder.rb +++ b/lib/puma/binder.rb @@ -50,14 +50,6 @@ def env(sock) def close @ios.each { |i| i.close } - @unix_paths.each do |i| - # Errno::ENOENT is intermittently raised - begin - unix_socket = UNIXSocket.new i - unix_socket.close - rescue Errno::ENOENT - end - end end def import_from_env diff --git a/lib/puma/cluster.rb b/lib/puma/cluster.rb index 24f4ea5f85..5bc22d83f3 100644 --- a/lib/puma/cluster.rb +++ b/lib/puma/cluster.rb @@ -68,7 +68,7 @@ def initialize(idx, pid, phase, options) @pid = pid @phase = phase @stage = :started - @signal = "TERM" + @signal = :TERM @options = options @first_term_sent = nil @started_at = Time.now @@ -108,7 +108,7 @@ def ping_timeout?(which) def term begin if @first_term_sent && (Time.now - @first_term_sent) > @options[:worker_shutdown_timeout] - @signal = "KILL" + @signal = :KILL else @term ||= true @first_term_sent ||= Time.now @@ -119,12 +119,12 @@ def term end def kill - Process.kill "KILL", @pid + Process.kill :KILL, @pid rescue Errno::ESRCH end def hup - Process.kill "HUP", @pid + Process.kill :HUP, @pid rescue Errno::ESRCH end end @@ -220,8 +220,10 @@ def check_workers(force=false) log "- Stopping #{w.pid} for phased upgrade..." end - w.term - log "- #{w.signal} sent to #{w.pid}..." + unless w.term? + w.term + log "- #{w.signal} sent to #{w.pid}..." + end end end end @@ -270,6 +272,7 @@ def worker(index, master) server = start_server Signal.trap "SIGTERM" do + @worker_write << "e#{Process.pid}\n" rescue nil server.stop end @@ -501,8 +504,10 @@ def run w.boot! log "- Worker #{w.index} (pid: #{pid}) booted, phase: #{w.phase}" force_check = true + when "e" + w.instance_variable_set :@term, true when "t" - w.term + w.term unless w.term? force_check = true when "p" w.ping!(result.sub(/^\d+/,'').chomp) diff --git a/lib/puma/launcher.rb b/lib/puma/launcher.rb index da9bce2573..ee21a0761f 100644 --- a/lib/puma/launcher.rb +++ b/lib/puma/launcher.rb @@ -217,11 +217,12 @@ def restart_args end def close_binder_listeners - @binder.listeners.each do |l, io| - io.close + # close binder listeners as fast as possible, so separate loop + @binder.listeners.each { |_, io| io.close } + @binder.listeners.each do |l, _| uri = URI.parse(l) next unless uri.scheme == 'unix' - File.unlink("#{uri.host}#{uri.path}") + File.unlink "#{uri.host}#{uri.path}" end end From 8d1d3941d9f328c6bc499c0b51cf324d7f8f26c3 Mon Sep 17 00:00:00 2001 From: MSP-Greg Date: Mon, 9 Sep 2019 14:30:13 -0500 Subject: [PATCH 2/5] Add GitHub Actions workflow.yml --- .github/workflows/workflow.yml | 63 ++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 .github/workflows/workflow.yml diff --git a/.github/workflows/workflow.yml b/.github/workflows/workflow.yml new file mode 100644 index 0000000000..24ba759f61 --- /dev/null +++ b/.github/workflows/workflow.yml @@ -0,0 +1,63 @@ +name: Puma + +on: [push] + +jobs: + build: + name: >- + OS: ${{ matrix.os }} Ruby: ${{ matrix.ruby }} + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ 'ubuntu-16.04', 'ubuntu-18.04', 'macos', 'windows-latest' ] + ruby: [ '2.3.x', '2.4.x', '2.5.x', '2.6.x' ] + exclude: + - os: ubuntu-16.04 + ruby: 2.4.x + - os: ubuntu-16.04 + ruby: 2.5.x + - os: ubuntu-16.04 + ruby: 2.6.x + - os: ubuntu-18.04 + ruby: 2.3.x + - os: macos + ruby: 2.3.x + - os: windows-latest + ruby: 2.3.x + steps: + - name: repo checkout + uses: actions/checkout@v1 + with: + fetch-depth: 10 + - name: load ruby + uses: actions/setup-ruby@v1 + with: + ruby-version: ${{ matrix.ruby }} + + - name: Install ragel + if: startsWith(matrix.os, 'ubuntu') || startsWith(matrix.os, 'macos') + run: | + if [ "${{ matrix.os }}" == "macos" ]; then + brew install ragel + else + sudo apt-get install ragel + fi + - name: Windows MSYS2 + if: startsWith(matrix.os, 'windows') + uses: MSP-Greg/msys2-action@master + with: + base: update + mingw: openssl ragel + + - name: RubyGems, Bundler Update + run: gem update --system --no-document --conservative + - name: bundle install + run: bundle install --jobs 4 --retry 3 + - name: compile + run: bundle exec rake compile + - name: test + run: bundle exec rake + env: + CI: true + TESTOPTS: -v From e907a0e0ba6d10a50ba29f8592d6e486c75dfb0a Mon Sep 17 00:00:00 2001 From: MSP-Greg Date: Mon, 9 Sep 2019 14:33:53 -0500 Subject: [PATCH 3/5] Update tests 1. test_integration.rb - Rewrite so all tests create a server via IO.popen. 2. Update some tests for parallelize_me! 3. Add config/worker_shutdown_timeout_2.rb, rackup/sleep_pid.ru --- test/config/worker_shutdown_timeout_2.rb | 1 + test/helper.rb | 38 +- test/helpers/apps.rb | 8 + test/rackup/sleep_pid.ru | 8 + test/test_integration.rb | 783 +++++++++-------------- test/test_persistent.rb | 22 +- test/test_puma_server.rb | 3 +- test/test_puma_server_ssl.rb | 2 + test/test_web_server.rb | 2 + 9 files changed, 388 insertions(+), 479 deletions(-) create mode 100644 test/config/worker_shutdown_timeout_2.rb create mode 100644 test/rackup/sleep_pid.ru diff --git a/test/config/worker_shutdown_timeout_2.rb b/test/config/worker_shutdown_timeout_2.rb new file mode 100644 index 0000000000..ddb8c6453d --- /dev/null +++ b/test/config/worker_shutdown_timeout_2.rb @@ -0,0 +1 @@ +worker_shutdown_timeout 2 diff --git a/test/helper.rb b/test/helper.rb index e7afdbe721..142b8db5af 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -47,7 +47,11 @@ module UniquePort @mutex = Mutex.new def self.call - @mutex.synchronize { @port += 1 } + @mutex.synchronize { + @port += 1 + @port = 3307 if @port == 3306 # MySQL on Actions + @port + } end end @@ -70,6 +74,34 @@ def run(*) module TestSkips + # finds bad signals, value is an array of symbols + BAD_SIGNAL_LIST = begin + code = <<-HEREDOC + data = (%w[HUP INT TTIN TTOU USR1 USR2] - Signal.list.keys).join(' ') + + begin + Signal.trap('TERM') { } + rescue ArgumentError + data << 'TERM' + end + + puts data + HEREDOC + pipe = IO.popen RbConfig.ruby, 'r+' + pid = pipe.pid + pipe.puts code + pipe.close_write + + begin + Process.kill :TERM, pid + pipe.read.strip.split(' ').sort.map(&:to_sym) + rescue Errno::EINVAL + "#{pipe.read.strip} TERM".split(' ').sort.map(&:to_sym) + ensure + Process.wait pid + end + end + # usage: skip NO_FORK_MSG unless HAS_FORK # windows >= 2.6 fork is not defined, < 2.6 fork raises NotImplementedError HAS_FORK = ::Process.respond_to? :fork @@ -82,8 +114,8 @@ module TestSkips # usage: skip_unless_signal_exist? :USR2 def skip_unless_signal_exist?(sig, bt: caller) - signal = sig.to_s - unless Signal.list.key? signal + signal = sig.to_s.sub(/\ASIG/i, '').to_sym + if BAD_SIGNAL_LIST.include? signal skip "Signal #{signal} isn't available on the #{RUBY_PLATFORM} platform", bt end end diff --git a/test/helpers/apps.rb b/test/helpers/apps.rb index f74e3eb09a..3be3b3d1b5 100644 --- a/test/helpers/apps.rb +++ b/test/helpers/apps.rb @@ -9,4 +9,12 @@ module TestApps [200, {"Content-Type" => "text/plain"}, ["Slept #{dly}"]] end + # call with "GET /sleep HTTP/1.1\r\n\r\n", where is the number of + # seconds to sleep + # same as rackup/sleep_pid.ru + SLEEP_PID = -> (env) do + dly = (env['REQUEST_PATH'][/\/sleep(\d+)/,1] || '0').to_i + sleep dly + [200, {"Content-Type" => "text/plain"}, ["Slept #{dly} #{Process.pid}"]] + end end diff --git a/test/rackup/sleep_pid.ru b/test/rackup/sleep_pid.ru new file mode 100644 index 0000000000..0f1811c8c7 --- /dev/null +++ b/test/rackup/sleep_pid.ru @@ -0,0 +1,8 @@ +# call with "GET /sleep HTTP/1.1\r\n\r\n", where is the number of +# seconds to sleep, returns process pid + +run lambda { |env| + dly = (env['REQUEST_PATH'][/\/sleep(\d+)/,1] || '0').to_i + sleep dly + [200, {"Content-Type" => "text/plain"}, ["Slept #{dly} #{Process.pid}"]] +} diff --git a/test/test_integration.rb b/test/test_integration.rb index faefb402c1..164a78ea00 100644 --- a/test/test_integration.rb +++ b/test/test_integration.rb @@ -1,207 +1,97 @@ # frozen_string_literal: true require_relative "helper" -require "puma/cli" require "puma/control_cli" require "open3" -# TODO: Remove over-utilization of @instance variables -# TODO: remove stdout logging, get everything out of my rainbow dots - class TestIntegration < Minitest::Test + parallelize_me! + HOST = "127.0.0.1" TOKEN = "xxyyzz" - def setup - @state_path = "test/test_#{name}_puma.state" - @bind_path = "test/test_#{name}_server.sock" - @control_path = "test/test_#{name}_control.sock" - - @server = nil + BASE = defined?(Bundler) ? "bundle exec #{Gem.ruby} -Ilib" : + "#{Gem.ruby} -Ilib" - @wait, @ready = IO.pipe + WORKERS = 2 - @events = Puma::Events.strings - @events.on_booted { @ready << "!" } + def setup + @ios_to_close = [] + @state_path = "test/#{name}_puma.state" + @bind_path = "test/#{name}_server.sock" + @control_path = "test/#{name}_control.sock" end def teardown - File.unlink @state_path rescue nil - File.unlink @bind_path rescue nil - File.unlink @control_path rescue nil - - @wait.close - @ready.close - - if @server - Process.kill "INT", @server.pid + if defined?(@server) && @server + begin + Process.kill :INT, @server.pid + rescue Errno::ESRCH + end begin - Process.wait @server.pid + Process.wait @pid rescue Errno::ECHILD end - - @server.close + @server.close unless @server.closed? + @server = nil end - end - def server_cmd(argv) - @tcp_port = UniquePort.call - base = "#{Gem.ruby} -Ilib bin/puma" - base = "bundle exec #{base}" if defined?(Bundler) - "#{base} -b tcp://127.0.0.1:#{@tcp_port} #{argv}" - end - - def server(argv) - @server = IO.popen(server_cmd(argv), "r") - - wait_for_server_to_boot(@server) - - @server - end - - def start_forked_server(argv) - servercmd = server_cmd(argv) - pid = fork do - exec servercmd + @ios_to_close.each do |io| + io.close if io.is_a?(IO) && !io.closed? + io = nil end - sleep 5 - pid - end - - def stop_forked_server(pid) - Process.kill(:TERM, pid) - sleep 1 - Process.wait2(pid) - end - - def restart_server_and_listen(argv) - server(argv) - connection = connect - initial_reply = read_body(connection) - restart_server(@server, connection) - [initial_reply, read_body(connect)] - end - - def wait_booted - @wait.sysread 1 - end - - # reuses an existing connection to make sure that works - def restart_server(server, connection) - Process.kill :USR2, @server.pid - - connection.write "GET / HTTP/1.1\r\n\r\n" # trigger it to start by sending a new request - - wait_for_server_to_boot(server) - end - - def connect(path = nil) - s = TCPSocket.new "localhost", @tcp_port - s << "GET /#{path} HTTP/1.1\r\n\r\n" - true until s.gets == "\r\n" - s - end - - def wait_for_server_to_boot(server) - true while server.gets !~ /Ctrl-C/ # wait for server to say it booted - end - - def read_body(connection) - Timeout.timeout(10) do - loop do - response = connection.readpartial(1024) - body = response.split("\r\n\r\n", 2).last - return body if body && !body.empty? - sleep 0.01 - end - end + File.unlink @state_path rescue nil + File.unlink @bind_path rescue nil + File.unlink @control_path rescue nil end - def test_stop_via_pumactl + def test_pumactl_stop skip UNIX_SKT_MSG unless UNIX_SKT_EXIST + cli_server "-q test/rackup/sleep.ru --control-url unix://#{@control_path} --control-token #{TOKEN} -S #{@state_path}" - conf = Puma::Configuration.new do |c| - c.quiet - c.state_path @state_path - c.bind "unix://#{@bind_path}" - c.activate_control_app "unix://#{@control_path}", :auth_token => TOKEN - c.rackup "test/rackup/hello.ru" - end - - l = Puma::Launcher.new conf, :events => @events - - t = Thread.new do - Thread.current.abort_on_exception = true - l.run - end + cli_pumactl "stop", unix: true - wait_booted - - s = UNIXSocket.new @bind_path - s << "GET / HTTP/1.0\r\n\r\n" - assert_equal "Hello World", read_body(s) - - sout = StringIO.new - - ccli = Puma::ControlCLI.new %W!-S #{@state_path} stop!, sout - - ccli.run + _, status = Process.wait2 @pid + assert_equal 0, status - assert_kind_of Thread, t.join, "server didn't stop" + @server = nil end - def test_phased_restart_via_pumactl + def test_pumactl_phased_restart_cluster skip NO_FORK_MSG unless HAS_FORK - delay = 40 + cli_server "-q -w #{WORKERS} test/rackup/sleep.ru --control-url unix://#{@control_path} --control-token #{TOKEN} -S #{@state_path}", unix: true - conf = Puma::Configuration.new do |c| - c.quiet - c.state_path @state_path - c.bind "unix://#{@bind_path}" - c.activate_control_app "unix://#{@control_path}", :auth_token => TOKEN - c.workers 2 - c.worker_shutdown_timeout 2 - c.rackup "test/rackup/sleep.ru" - end + s = UNIXSocket.new @bind_path + @ios_to_close << s + s << "GET /sleep5 HTTP/1.0\r\n\r\n" - l = Puma::Launcher.new conf, :events => @events + # Get the PIDs of the phase 0 workers. + phase0_worker_pids = get_worker_pids 0 - t = Thread.new do - Thread.current.abort_on_exception = true - l.run - end + # Phased restart + cli_pumactl "phased-restart", unix: true - wait_booted + # Get the PIDs of the phase 1 workers. + phase1_worker_pids = get_worker_pids 1 - s = UNIXSocket.new @bind_path - s << "GET /sleep#{delay} HTTP/1.0\r\n\r\n" + msg = "phase 0 pids #{phase0_worker_pids.inspect} phase 1 pids #{phase1_worker_pids.inspect}" + + assert_equal WORKERS, phase0_worker_pids.length, msg + assert_equal WORKERS, phase1_worker_pids.length, msg + assert_empty phase0_worker_pids & phase1_worker_pids, "#{msg}\nBoth workers should be replaced with new" - sout = StringIO.new - # Phased restart - ccli = Puma::ControlCLI.new ["-S", @state_path, "phased-restart"], sout - ccli.run - - done = false - until done - @events.stdout.rewind - log = @events.stdout.readlines.join("") - if log =~ /- Worker \d \(pid: \d+\) booted, phase: 1/ - assert_match(/TERM sent/, log) - assert_match(/- Worker \d \(pid: \d+\) booted, phase: 1/, log) - done = true - end - end # Stop - ccli = Puma::ControlCLI.new ["-S", @state_path, "stop"], sout - ccli.run + cli_pumactl "stop", unix: true - assert_kind_of Thread, t.join, "server didn't stop" - assert File.exist? @bind_path + _, status = Process.wait2 @pid + assert_equal 0, status + + @server = nil end - def test_kill_unknown_via_pumactl + def test_pumactl_kill_unknown skip_on :jruby # we run ls to get a 'safe' pid to pass off as puma in cli stop @@ -213,8 +103,7 @@ def test_kill_unknown_via_pumactl sout = StringIO.new e = assert_raises SystemExit do - ccli = Puma::ControlCLI.new %W!-p #{safe_pid} stop!, sout - ccli.run + Puma::ControlCLI.new(%W!-p #{safe_pid} stop!, sout).run end sout.rewind # windows bad URI(is not URI?) @@ -222,152 +111,184 @@ def test_kill_unknown_via_pumactl assert_equal(1, e.status) end - def test_restart_closes_keepalive_sockets + def test_usr2_restart_single skip_unless_signal_exist? :USR2 _, new_reply = restart_server_and_listen("-q test/rackup/hello.ru") assert_equal "Hello World", new_reply end - def test_restart_closes_keepalive_sockets_workers + def test_usr2_restart_cluster skip NO_FORK_MSG unless HAS_FORK - _, new_reply = restart_server_and_listen("-q -w 2 test/rackup/hello.ru") + _, new_reply = restart_server_and_listen("-q -w #{WORKERS} test/rackup/hello.ru") assert_equal "Hello World", new_reply end - def test_sigterm_closes_listeners_on_forked_servers + # It does not share environments between multiple generations, which would break Dotenv + def test_usr2_restart_restores_environment + # jruby has a bug where setting `nil` into the ENV or `delete` do not change the + # next workers ENV + skip_on :jruby + skip_unless_signal_exist? :USR2 + + initial_reply, new_reply = restart_server_and_listen("-q test/rackup/hello-env.ru") + + assert_includes initial_reply, "Hello RAND" + assert_includes new_reply, "Hello RAND" + refute_equal initial_reply, new_reply + end + + # Send requests 10 per second. Send 10, then :TERM server, then send another 30. + # No more than 10 should throw Errno::ECONNRESET. + def test_term_closes_listeners_cluster skip NO_FORK_MSG unless HAS_FORK - pid = start_forked_server("-w 2 -q test/rackup/sleep.ru") + skip_unless_signal_exist? :TERM + + cli_server "-w #{WORKERS} -t 0:6 -q test/rackup/sleep_pid.ru" threads = [] - initial_reply = nil - next_replies = [] - condition_variable = ConditionVariable.new + replies = [] mutex = Mutex.new - - threads << Thread.new do - s = connect "sleep1" - mutex.synchronize { condition_variable.broadcast } - initial_reply = read_body(s) + div = 10 + + 41.times.each do |i| + if i == 10 + threads << Thread.new do + sleep i.to_f/div + Process.kill :TERM, @pid + mutex.synchronize { replies << :term_sent } + end + else + threads << Thread.new do + thread_run replies, i.to_f/div, 1, mutex + end + end end - threads << Thread.new do - mutex.synchronize { - condition_variable.wait(mutex, 1) - Process.kill("SIGTERM", pid) - } - end + threads.each(&:join) - 10.times.each do |i| - threads << Thread.new do - mutex.synchronize { condition_variable.wait(mutex, 1.5) } + responses = replies.count { |r| r[/\ASlept 1/] } + resets = replies.count { |r| r == :reset } + refused = replies.count { |r| r == :refused } + msg = "#{responses} responses, #{resets} resets, #{refused} refused" - begin - s = connect "sleep1" - read_body(s) - next_replies << :success - rescue Errno::ECONNRESET - # connection was accepted but then closed - # client would see an empty response - next_replies << :connection_reset - rescue Errno::ECONNREFUSED - # connection was was never accepted - # it can therefore be re-tried before the - # client receives an empty response - next_replies << :connection_refused - end + assert_operator 9, :<=, responses, msg + + assert_operator 10, :>=, resets , msg + + assert_operator 20, :<=, refused , msg + end + + # Send requests 1 per second. Send 1, then :USR1 server, then send another 24. + # All should be responded to, and at least three workers should be used + def test_usr1_all_respond_cluster + skip NO_FORK_MSG unless HAS_FORK + skip_unless_signal_exist? :USR1 + + cli_server "-w #{WORKERS} -t 0:5 -q test/rackup/sleep_pid.ru" + threads = [] + replies = [] + mutex = Mutex.new + + s = connect "sleep1" + replies << read_body(s) + Process.kill :USR1, @pid + + 24.times do |delay| + threads << Thread.new do + thread_run replies, delay, 1, mutex end end - threads.map(&:join) + threads.each(&:join) - assert_equal "Slept 1", initial_reply + responses = replies.count { |r| r[/\ASlept 1/] } + resets = replies.count { |r| r == :reset } + refused = replies.count { |r| r == :refused } - assert_includes next_replies, :connection_refused + # get pids from replies, generate uniq array + qty_pids = replies.map { |body| body[/\d+\z/] }.uniq.compact.length - refute_includes next_replies, :connection_reset - end + msg = "#{responses} responses, #{qty_pids} uniq pids" - # It does not share environments between multiple generations, which would break Dotenv - def test_restart_restores_environment - # jruby has a bug where setting `nil` into the ENV or `delete` do not change the - # next workers ENV - skip_on :jruby - skip_unless_signal_exist? :USR2 + assert_equal 25, responses, msg + assert_operator qty_pids, :>, 2, msg - initial_reply, new_reply = restart_server_and_listen("-q test/rackup/hello-env.ru") + msg = "#{responses} responses, #{resets} resets, #{refused} refused" - assert_includes initial_reply, "Hello RAND" - assert_includes new_reply, "Hello RAND" - refute_equal initial_reply, new_reply + refute_includes replies, :refused, msg + + refute_includes replies, :reset , msg end - def test_term_signal_exit_code_in_single_mode - skip NO_FORK_MSG unless HAS_FORK + def test_term_exit_code_single + skip_unless_signal_exist? :TERM - pid = start_forked_server("test/rackup/hello.ru") - _, status = stop_forked_server(pid) + cli_server "test/rackup/hello.ru" + _, status = stop_server assert_equal 15, status end - def test_term_signal_exit_code_in_clustered_mode + def test_term_exit_code_cluster skip NO_FORK_MSG unless HAS_FORK - pid = start_forked_server("-w 2 test/rackup/hello.ru") - _, status = stop_forked_server(pid) + cli_server "-w #{WORKERS} test/rackup/hello.ru" + _, status = stop_server assert_equal 15, status end - def test_term_signal_suppress_in_single_mode - skip NO_FORK_MSG unless HAS_FORK + def test_term_suppress_single + skip_unless_signal_exist? :TERM - pid = start_forked_server("-C test/config/suppress_exception.rb test/rackup/hello.ru") - _, status = stop_forked_server(pid) + cli_server "-C test/config/suppress_exception.rb test/rackup/hello.ru" + _, status = stop_server assert_equal 0, status end - def test_term_signal_suppress_in_clustered_mode + def test_term_suppress_cluster skip NO_FORK_MSG unless HAS_FORK - server("-w 2 -C test/config/suppress_exception.rb test/rackup/hello.ru") + cli_server "-w #{WORKERS} -C test/config/suppress_exception.rb test/rackup/hello.ru" - Process.kill(:TERM, @server.pid) + Process.kill :TERM, @pid begin - Process.wait @server.pid + Process.wait @pid rescue Errno::ECHILD end status = $?.exitstatus assert_equal 0, status + @server.close unless @server.closed? @server = nil # prevent `#teardown` from killing already killed server end def test_load_path_includes_extra_deps skip NO_FORK_MSG unless HAS_FORK - server("-w 2 -C test/config/prune_bundler_with_deps.rb test/rackup/hello-last-load-path.ru") + cli_server "-w 2 -C test/config/prune_bundler_with_deps.rb test/rackup/hello-last-load-path.ru" last_load_path = read_body(connect) assert_match(%r{gems/rdoc-[\d.]+/lib$}, last_load_path) end - def test_not_accepts_new_connections_after_term_signal - skip_on :jruby, :windows + def test_term_not_accepts_new_connections + skip_unless_signal_exist? :TERM + skip_on :jruby - server('test/rackup/sleep.ru') + cli_server 'test/rackup/sleep.ru' - _stdin, curl_stdout, _stderr, curl_wait_thread = Open3.popen3("curl http://127.0.0.1:#{@tcp_port}/sleep10") + _stdin, curl_stdout, _stderr, curl_wait_thread = Open3.popen3("curl http://#{HOST}:#{@tcp_port}/sleep10") sleep 1 # ensure curl send a request - Process.kill(:TERM, @server.pid) + Process.kill :TERM, @pid true while @server.gets !~ /Gracefully stopping/ # wait for server to begin graceful shutdown # Invoke a request which must be rejected - _stdin, _stdout, rejected_curl_stderr, rejected_curl_wait_thread = Open3.popen3("curl 127.0.0.1:#{@tcp_port}") + _stdin, _stdout, rejected_curl_stderr, rejected_curl_wait_thread = Open3.popen3("curl #{HOST}:#{@tcp_port}") - assert nil != Process.getpgid(@server.pid) # ensure server is still running - assert nil != Process.getpgid(rejected_curl_wait_thread[:pid]) # ensure first curl invokation still in progress + refute_nil Process.getpgid(@pid) # ensure server is still running + refute_nil Process.getpgid(rejected_curl_wait_thread[:pid]) # ensure first curl invokation still in progress curl_wait_thread.join rejected_curl_wait_thread.join @@ -375,287 +296,217 @@ def test_not_accepts_new_connections_after_term_signal assert_match(/Slept 10/, curl_stdout.read) assert_match(/Connection refused/, rejected_curl_stderr.read) - Process.wait(@server.pid) + Process.wait @pid + @server.close unless @server.closed? @server = nil # prevent `#teardown` from killing already killed server end - def test_no_zombie_children + def test_term_worker_clean_exit_cluster skip NO_FORK_MSG unless HAS_FORK + skip_unless_signal_exist? :TERM skip "Intermittent failure on Ruby 2.2" if RUBY_VERSION < '2.3' - worker_pids = [] - server = server("-w 2 test/rackup/hello.ru") + cli_server "-w #{WORKERS} test/rackup/hello.ru" + # Get the PIDs of the child workers. - while worker_pids.size < 2 - next unless line = server.gets.match(/pid: (\d+)/) - worker_pids << line.captures.first.to_i - end + worker_pids = get_worker_pids 0 # Signal the workers to terminate, and wait for them to die. - Process.kill :TERM, @server.pid - Process.wait @server.pid - @server = nil # prevent `#teardown` from killing already killed server + Process.kill :TERM, @pid + Process.wait @pid - # Check if the worker processes remain in the process table. - # Process.kill should raise the Errno::ESRCH exception, - # indicating the process is dead and has been reaped. - zombies = worker_pids.map do |pid| - begin - pid if Process.kill 0, pid - rescue Errno::ESRCH - nil - end - end.compact - assert_empty zombies, "Process ids #{zombies} became zombies" - end + zombies = bad_exit_pids worker_pids - def test_worker_spawn_external_term - worker_respawn { |l, old_pids| - old_pids.each { |p| Process.kill :TERM, p } - } - end - - def test_worker_phased_restart - worker_respawn { |l, old_pids| l.phased_restart } + assert_empty zombies, "Process ids #{zombies} became zombies" end - private - - def worker_respawn - skip NO_FORK_MSG unless HAS_FORK - port = UniquePort.call - workers_booted = 0 - - conf = Puma::Configuration.new do |c| - c.bind "tcp://#{HOST}:#{port}" - c.threads 1, 1 - c.workers 2 - c.worker_shutdown_timeout 2 - c.app TestApps::SLEEP - c.after_worker_fork { |idx| workers_booted += 1 } - end - - # start Puma via launcher - thr, launcher, _e = run_launcher conf + # mimicking stuck workers, test respawn with external TERM + def test_stuck_external_term_spawn_cluster + skip_unless_signal_exist? :TERM - # make sure two workers have booted - time = 0 - until workers_booted >= 2 || time >= 10 - sleep 2 - time += 2 - end - - cluster = launcher.instance_variable_get :@runner - - http0 = Net::HTTP.new HOST, port - http1 = Net::HTTP.new HOST, port - body0 = nil - body1 = nil - - worker0 = Thread.new do - begin - req0 = Net::HTTP::Get.new "/sleep35", {} - http0.start.request(req0) { |rep0| body0 = rep0.body } - rescue - end - end - - worker1 = Thread.new do - begin - req1 = Net::HTTP::Get.new "/sleep40", {} - http1.start.request(req1) { |rep1| body1 = rep1.body } - rescue + worker_respawn(0) do |phase0_worker_pids| + last = phase0_worker_pids.last + phase0_worker_pids.each do |pid| + Process.kill :TERM, pid + sleep 4 unless pid == last end end + end - old_pids = cluster.instance_variable_get(:@workers).map(&:pid) + # mimicking stuck workers, test restart + def test_stuck_phased_restart_cluster + skip_unless_signal_exist? :USR1 + worker_respawn { |phase0_worker_pids| Process.kill :USR1, @pid } + end - start_time = Time.now.to_f + private - # below should 'cancel' the phase 0 workers, either via phased_restart or - # externally SIGTERM'ing them - yield launcher, old_pids - - # make sure four workers have booted - time = 0 - until workers_booted >= 4 || time >= 45 - sleep 2 - time += 2 + def cli_server(argv, unix: false) + if unix + cmd = "#{BASE} bin/puma -b unix://#{@bind_path} #{argv}" + else + @tcp_port = UniquePort.call + cmd = "#{BASE} bin/puma -b tcp://#{HOST}:#{@tcp_port} #{argv}" end - - new_pids = cluster.instance_variable_get(:@workers).map(&:pid) - - # should be empty if all old workers removed - old_waited = old_pids.map { |pid| - begin - Process.wait(pid, Process::WNOHANG) - pid - rescue Errno::ECHILD - nil # child is already terminated - end - }.compact - - Thread.kill worker0 - Thread.kill worker1 - - launcher.stop - assert_kind_of Thread, thr.join, "server didn't stop" - - refute_equal 'Slept 35', body0 - refute_equal 'Slept 40', body1 - - # Since 35 is the shorter of the two requests, server should restart - # and cancel both requests - assert_operator (Time.now.to_f - start_time).round(2), :<, 35 - - msg = "old_pids #{old_pids.inspect} new_pids #{new_pids.inspect} old_waited #{old_waited.inspect}" - assert_equal 2, new_pids.length, msg - assert_equal 2, old_pids.length, msg - assert_empty new_pids & old_pids, "#{msg}\nBoth workers should be replaced with new" - assert_empty old_waited, msg + @server = IO.popen(cmd, "r") + wait_for_server_to_boot + @pid = @server.pid + @server end - def run_launcher(conf) - # below for future PR - #@wait, @ready = IO.pipe - # @ios_to_close << @wait << @ready - #@events = Puma::Events.strings - #@events.on_booted { @ready << "!" } - - launcher = Puma::Launcher.new conf, :events => @events - - thr = Thread.new do - launcher.run + def stop_server(pid = @pid, signal: :TERM) + Process.kill signal, pid + sleep 1 + begin + Process.wait2 pid + rescue Errno::ECHILD end + end - # wait for boot from #@events.on_booted - @wait.sysread 1 - - [thr, launcher, @events] + def restart_server_and_listen(argv) + cli_server argv + connection = connect + initial_reply = read_body(connection) + restart_server(connection) + [initial_reply, read_body(connect)] end - def test_worker_spawn_external_term - worker_respawn { |l, old_pids| - old_pids.each { |p| Process.kill :TERM, p } - } + # reuses an existing connection to make sure that works + def restart_server(connection) + Process.kill :USR2, @pid + connection.write "GET / HTTP/1.1\r\n\r\n" # trigger it to start by sending a new request + wait_for_server_to_boot end - def test_worker_phased_restart - worker_respawn { |l, old_pids| l.phased_restart } + def wait_for_server_to_boot + true while @server.gets !~ /Ctrl-C/ # wait for server to say it booted end - private + def connect(path = nil, unix: false) + s = unix ? UNIXSocket.new("unix://#{@bind_path}") : TCPSocket.new(HOST, @tcp_port) + @ios_to_close << s + s << "GET /#{path} HTTP/1.1\r\n\r\n" + true until s.gets == "\r\n" + s + end - def worker_respawn - skip NO_FORK_MSG unless HAS_FORK - port = UniquePort.call - workers_booted = 0 - - conf = Puma::Configuration.new do |c| - c.bind "tcp://#{HOST}:#{port}" - c.threads 1, 1 - c.workers 2 - c.worker_shutdown_timeout 2 - c.app TestApps::SLEEP - c.after_worker_fork { |idx| workers_booted += 1 } + def read_body(connection) + Timeout.timeout(10) do + loop do + response = connection.readpartial(1024) + body = response.split("\r\n\r\n", 2).last + return body if body && !body.empty? + sleep 0.01 + end end + end - # start Puma via launcher - thr, launcher, _e = run_launcher conf - - # make sure two workers have booted - time = 0 - until workers_booted >= 2 || time >= 10 - sleep 2 - time += 2 + def cli_pumactl(argv, unix: false) + if unix + pumactl = IO.popen("#{BASE} bin/pumactl -C unix://#{@control_path} -T #{TOKEN} #{argv}", "r") + else + pumactl = IO.popen("#{BASE} bin/pumactl #{argv}", "r") end + @ios_to_close << pumactl + Process.wait pumactl.pid + pumactl + end - cluster = launcher.instance_variable_get :@runner + def worker_respawn(phase = 1, size = WORKERS) + skip NO_FORK_MSG unless HAS_FORK + threads = [] - http0 = Net::HTTP.new HOST, port - http1 = Net::HTTP.new HOST, port - body0 = nil - body1 = nil + cli_server "-w #{WORKERS} -t 1:1 -C test/config/worker_shutdown_timeout_2.rb test/rackup/sleep_pid.ru" - worker0 = Thread.new do - begin - req0 = Net::HTTP::Get.new "/sleep35", {} - http0.start.request(req0) { |rep0| body0 = rep0.body } - rescue - end - end + # make sure two workers have booted + phase0_worker_pids = get_worker_pids 0 - worker1 = Thread.new do - begin - req1 = Net::HTTP::Get.new "/sleep40", {} - http1.start.request(req1) { |rep1| body1 = rep1.body } - rescue + [35, 40].each do |sleep_time| + threads << Thread.new do + begin + connect "sleep#{sleep_time}" + # stuck connections will raise IOError or Errno::ECONNRESET + # when shutdown + rescue IOError, Errno::ECONNRESET + end end end - old_pids = cluster.instance_variable_get(:@workers).map(&:pid) - - start_time = Time.now.to_f + @start_time = Time.now.to_f # below should 'cancel' the phase 0 workers, either via phased_restart or - # externally SIGTERM'ing them - yield launcher, old_pids - - # make sure four workers have booted - time = 0 - until workers_booted >= 4 || time >= 45 - sleep 2 - time += 2 - end - - new_pids = cluster.instance_variable_get(:@workers).map(&:pid) - - # should be empty if all old workers removed - old_waited = old_pids.map { |pid| - begin - Process.wait(pid, Process::WNOHANG) - pid - rescue Errno::ECHILD - nil # child is already terminated - end - }.compact + # externally TERM'ing them + yield phase0_worker_pids - Thread.kill worker0 - Thread.kill worker1 + # wait for new workers to boot + phase1_worker_pids = get_worker_pids phase - launcher.stop - assert_kind_of Thread, thr.join, "server didn't stop" - - refute_equal 'Slept 35', body0 - refute_equal 'Slept 40', body1 + # should be empty if all phase 0 workers cleanly exited + phase0_exited = bad_exit_pids phase0_worker_pids # Since 35 is the shorter of the two requests, server should restart # and cancel both requests - assert_operator (Time.now.to_f - start_time).round(2), :<, 35 + assert_operator (Time.now.to_f - @start_time).round(2), :<, 35 - msg = "old_pids #{old_pids.inspect} new_pids #{new_pids.inspect} old_waited #{old_waited.inspect}" - assert_equal 2, new_pids.length, msg - assert_equal 2, old_pids.length, msg - assert_empty new_pids & old_pids, "#{msg}\nBoth workers should be replaced with new" - assert_empty old_waited, msg - end + msg = "phase0_worker_pids #{phase0_worker_pids.inspect} phase1_worker_pids #{phase1_worker_pids.inspect} phase0_exited #{phase0_exited.inspect}" + assert_equal WORKERS, phase0_worker_pids.length, msg - def run_launcher(conf) - # below for future PR - #@wait, @ready = IO.pipe - # @ios_to_close << @wait << @ready - #@events = Puma::Events.strings - #@events.on_booted { @ready << "!" } + assert_equal WORKERS, phase1_worker_pids.length, msg + assert_empty phase0_worker_pids & phase1_worker_pids, "#{msg}\nBoth workers should be replaced with new" - launcher = Puma::Launcher.new conf, :events => @events + assert_empty phase0_exited, msg + + threads.each { |th| Thread.kill th } + stop_server signal: :KILL + end - thr = Thread.new do - launcher.run + # gets worker pids from @server output + # new workers created from 'phased-restart' increment phase + # new workers created from externally shutdown pids maintain same phase + def get_worker_pids(phase = 1, size = WORKERS) + pids = [] + re = /pid: (\d+)\) booted, phase: #{phase}/ + while pids.size < size + line = @server.gets # line variable left for debugging + if pid = line[re, 1] + pids << pid + else + sleep 2 + end end + pids.map(&:to_i) + end - # wait for boot from #@events.on_booted - @wait.sysread 1 + # Returns an array of pids still in the process table, so it should + # be empty for a clean exit. + # Process.kill should raise the Errno::ESRCH exception, indicating the + # process is dead and has been reaped. + def bad_exit_pids(pids) + pids.map do |pid| + begin + pid if Process.kill 0, pid + rescue Errno::ESRCH + nil + end + end.compact + end - [thr, launcher, @events] + def thread_run(replies, delay, sleep_time, mutex, unix: false) + begin + sleep delay + s = connect "sleep#{sleep_time}", unix: unix + body = read_body(s) + mutex.synchronize { replies << body } + rescue Errno::ECONNRESET + # connection was accepted but then closed + # client would see an empty response + mutex.synchronize { replies << :reset } + rescue Errno::ECONNREFUSED, Errno::EPIPE + # TCP - Errno::ECONNREFUSED, Errno::EPIPE + # TODO UNIX + # connection was never accepted it can therefore be + # re-tried before the client receives an empty response + mutex.synchronize { replies << :refused } + end end end diff --git a/test/test_persistent.rb b/test/test_persistent.rb index 7144f72fc8..2e4c77e9ec 100644 --- a/test/test_persistent.rb +++ b/test/test_persistent.rb @@ -1,14 +1,17 @@ require_relative "helper" class TestPersistent < Minitest::Test + + HOST = "127.0.0.1" + def setup - @valid_request = "GET / HTTP/1.1\r\nHost: test.com\r\nContent-Type: text/plain\r\n\r\n" - @close_request = "GET / HTTP/1.1\r\nHost: test.com\r\nContent-Type: text/plain\r\nConnection: close\r\n\r\n" + @valid_request = "GET / HTTP/1.1\r\nHost: test.com\r\nContent-Type: text/plain\r\n\r\n" + @close_request = "GET / HTTP/1.1\r\nHost: test.com\r\nContent-Type: text/plain\r\nConnection: close\r\n\r\n" @http10_request = "GET / HTTP/1.0\r\nHost: test.com\r\nContent-Type: text/plain\r\n\r\n" - @keep_request = "GET / HTTP/1.0\r\nHost: test.com\r\nContent-Type: text/plain\r\nConnection: Keep-Alive\r\n\r\n" + @keep_request = "GET / HTTP/1.0\r\nHost: test.com\r\nContent-Type: text/plain\r\nConnection: Keep-Alive\r\n\r\n" - @valid_post = "POST / HTTP/1.1\r\nHost: test.com\r\nContent-Type: text/plain\r\nContent-Length: 5\r\n\r\nhello" - @valid_no_body = "GET / HTTP/1.1\r\nHost: test.com\r\nX-Status: 204\r\nContent-Type: text/plain\r\n\r\n" + @valid_post = "POST / HTTP/1.1\r\nHost: test.com\r\nContent-Type: text/plain\r\nContent-Length: 5\r\n\r\nhello" + @valid_no_body = "GET / HTTP/1.1\r\nHost: test.com\r\nX-Status: 204\r\nContent-Type: text/plain\r\n\r\n" @headers = { "X-Header" => "Works" } @body = ["Hello"] @@ -20,15 +23,14 @@ def setup [status, @headers, @body] end - @host = "127.0.0.1" @port = UniquePort.call @server = Puma::Server.new @simple - @server.add_tcp_listener "127.0.0.1", @port + @server.add_tcp_listener HOST, @port @server.max_threads = 1 @server.run - @client = TCPSocket.new "127.0.0.1", @port + @client = TCPSocket.new HOST, @port end def teardown @@ -230,7 +232,7 @@ def test_keepalive_doesnt_starve_clients @client << @valid_request - c2 = TCPSocket.new @host, @port + c2 = TCPSocket.new HOST, @port c2 << @valid_request out = IO.select([c2], nil, nil, 1) @@ -240,6 +242,8 @@ def test_keepalive_doesnt_starve_clients assert_equal "HTTP/1.1 200 OK\r\nX-Header: Works\r\nContent-Length: #{sz}\r\n\r\n", lines(4, c2) assert_equal "Hello", c2.read(5) + ensure + c2.close end end diff --git a/test/test_puma_server.rb b/test/test_puma_server.rb index cb5a99767d..39e987a509 100644 --- a/test/test_puma_server.rb +++ b/test/test_puma_server.rb @@ -1,9 +1,10 @@ require_relative "helper" class TestPumaServer < Minitest::Test + parallelize_me! def setup - @port = 0 + @port = UniquePort.call @host = "127.0.0.1" @app = lambda { |env| [200, {}, [env['rack.url_scheme']]] } diff --git a/test/test_puma_server_ssl.rb b/test/test_puma_server_ssl.rb index 9e2a3c02cb..4b60ecfbc0 100644 --- a/test/test_puma_server_ssl.rb +++ b/test/test_puma_server_ssl.rb @@ -32,6 +32,7 @@ def ssl_error(server, peeraddr, peercert, error) end class TestPumaServerSSL < Minitest::Test + parallelize_me! def setup @http = nil @@ -203,6 +204,7 @@ def test_tls_v1_1_rejection # client-side TLS authentication tests class TestPumaServerSSLClient < Minitest::Test + parallelize_me! def assert_ssl_client_error_match(error, subject=nil, &blk) host = "127.0.0.1" diff --git a/test/test_web_server.rb b/test/test_web_server.rb index a433ed13dd..c8073b0681 100644 --- a/test/test_web_server.rb +++ b/test/test_web_server.rb @@ -17,6 +17,8 @@ def call(env) end class WebServerTest < Minitest::Test + parallelize_me! + VALID_REQUEST = "GET / HTTP/1.1\r\nHost: www.zedshaw.com\r\nContent-Type: text/plain\r\n\r\n" def setup From 259d565b89a0db72b8f322f4517e5c929df96406 Mon Sep 17 00:00:00 2001 From: MSP-Greg Date: Mon, 9 Sep 2019 20:06:46 -0500 Subject: [PATCH 4/5] Travis macOS - Ruby 2.4.7 not available yet, revert to 2.4.6 --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 5345ce74ad..de8e5cc351 100644 --- a/.travis.yml +++ b/.travis.yml @@ -45,7 +45,7 @@ matrix: env: OS="Bionic 18.04 OpenSSL 1.1.1" - rvm: ruby-head env: jit=yes - - rvm: 2.4.7 + - rvm: 2.4.6 os: osx osx_image: xcode11 env: OS="osx xcode11" From ea2c264f2dbf002dcc7ef6b76c20e19a7b05b093 Mon Sep 17 00:00:00 2001 From: MSP-Greg Date: Tue, 10 Sep 2019 11:04:59 -0500 Subject: [PATCH 5/5] test_integration.rb - add two tests using UNIX binders cluster.rb - clean up duplicate term? method workflow.yml - better step descriptions, add PR's --- .github/workflows/workflow.yml | 12 +- lib/puma/cluster.rb | 4 - test/test_integration.rb | 196 ++++++++++++++++++++------------- 3 files changed, 130 insertions(+), 82 deletions(-) diff --git a/.github/workflows/workflow.yml b/.github/workflows/workflow.yml index 24ba759f61..8e4945e003 100644 --- a/.github/workflows/workflow.yml +++ b/.github/workflows/workflow.yml @@ -1,6 +1,12 @@ name: Puma -on: [push] +on: + push: + branches: + - master + pull_request: + branches: + - '*' jobs: build: @@ -35,7 +41,7 @@ jobs: with: ruby-version: ${{ matrix.ruby }} - - name: Install ragel + - name: ubuntu & macos - install ragel if: startsWith(matrix.os, 'ubuntu') || startsWith(matrix.os, 'macos') run: | if [ "${{ matrix.os }}" == "macos" ]; then @@ -43,7 +49,7 @@ jobs: else sudo apt-get install ragel fi - - name: Windows MSYS2 + - name: windows - update MSYS2, openssl, ragel if: startsWith(matrix.os, 'windows') uses: MSP-Greg/msys2-action@master with: diff --git a/lib/puma/cluster.rb b/lib/puma/cluster.rb index 5bc22d83f3..2cc88d9f7c 100644 --- a/lib/puma/cluster.rb +++ b/lib/puma/cluster.rb @@ -92,10 +92,6 @@ def term? @term end - def term? - @term - end - def ping!(status) @last_checkin = Time.now @last_status = status diff --git a/test/test_integration.rb b/test/test_integration.rb index 164a78ea00..6b59966e82 100644 --- a/test/test_integration.rb +++ b/test/test_integration.rb @@ -15,6 +15,8 @@ class TestIntegration < Minitest::Test WORKERS = 2 + DARWIN = !!RUBY_PLATFORM[/darwin/] + def setup @ios_to_close = [] @state_path = "test/#{name}_puma.state" @@ -139,84 +141,36 @@ def test_usr2_restart_restores_environment # Send requests 10 per second. Send 10, then :TERM server, then send another 30. # No more than 10 should throw Errno::ECONNRESET. - def test_term_closes_listeners_cluster + def test_term_closes_listeners_cluster_tcp skip NO_FORK_MSG unless HAS_FORK skip_unless_signal_exist? :TERM + term_closes_listeners_cluster unix: false + end - cli_server "-w #{WORKERS} -t 0:6 -q test/rackup/sleep_pid.ru" - threads = [] - replies = [] - mutex = Mutex.new - div = 10 - - 41.times.each do |i| - if i == 10 - threads << Thread.new do - sleep i.to_f/div - Process.kill :TERM, @pid - mutex.synchronize { replies << :term_sent } - end - else - threads << Thread.new do - thread_run replies, i.to_f/div, 1, mutex - end - end - end - - threads.each(&:join) - - responses = replies.count { |r| r[/\ASlept 1/] } - resets = replies.count { |r| r == :reset } - refused = replies.count { |r| r == :refused } - msg = "#{responses} responses, #{resets} resets, #{refused} refused" - - assert_operator 9, :<=, responses, msg - - assert_operator 10, :>=, resets , msg - - assert_operator 20, :<=, refused , msg + # Send requests 10 per second. Send 10, then :TERM server, then send another 30. + # No more than 10 should throw Errno::ECONNRESET. + def test_term_closes_listeners_cluster_unix + skip NO_FORK_MSG unless HAS_FORK + skip_unless_signal_exist? :TERM + term_closes_listeners_cluster unix: true end + # Use TCPSocket for bind # Send requests 1 per second. Send 1, then :USR1 server, then send another 24. # All should be responded to, and at least three workers should be used - def test_usr1_all_respond_cluster + def test_usr1_all_respond_cluster_tcp skip NO_FORK_MSG unless HAS_FORK skip_unless_signal_exist? :USR1 + usr1_all_respond_cluster unix: false + end - cli_server "-w #{WORKERS} -t 0:5 -q test/rackup/sleep_pid.ru" - threads = [] - replies = [] - mutex = Mutex.new - - s = connect "sleep1" - replies << read_body(s) - Process.kill :USR1, @pid - - 24.times do |delay| - threads << Thread.new do - thread_run replies, delay, 1, mutex - end - end - - threads.each(&:join) - - responses = replies.count { |r| r[/\ASlept 1/] } - resets = replies.count { |r| r == :reset } - refused = replies.count { |r| r == :refused } - - # get pids from replies, generate uniq array - qty_pids = replies.map { |body| body[/\d+\z/] }.uniq.compact.length - - msg = "#{responses} responses, #{qty_pids} uniq pids" - - assert_equal 25, responses, msg - assert_operator qty_pids, :>, 2, msg - - msg = "#{responses} responses, #{resets} resets, #{refused} refused" - - refute_includes replies, :refused, msg - - refute_includes replies, :reset , msg + # Use UNIXSocket for bind + # Send requests 1 per second. Send 1, then :USR1 server, then send another 24. + # All should be responded to, and at least three workers should be used + def test_usr1_all_respond_cluster_unix + skip NO_FORK_MSG unless HAS_FORK + skip_unless_signal_exist? :USR1 + usr1_all_respond_cluster unix: true end def test_term_exit_code_single @@ -383,7 +337,7 @@ def wait_for_server_to_boot end def connect(path = nil, unix: false) - s = unix ? UNIXSocket.new("unix://#{@bind_path}") : TCPSocket.new(HOST, @tcp_port) + s = unix ? UNIXSocket.new(@bind_path) : TCPSocket.new(HOST, @tcp_port) @ios_to_close << s s << "GET /#{path} HTTP/1.1\r\n\r\n" true until s.gets == "\r\n" @@ -412,6 +366,91 @@ def cli_pumactl(argv, unix: false) pumactl end + # Send requests 10 per second. Send 10, then :TERM server, then send another 30. + # No more than 10 should throw Errno::ECONNRESET. + # Run by both tcp and unix test versions + def term_closes_listeners_cluster(unix: false) + skip NO_FORK_MSG unless HAS_FORK + skip_unless_signal_exist? :TERM + + cli_server "-w #{WORKERS} -t 0:6 -q test/rackup/sleep_pid.ru", unix: unix + threads = [] + replies = [] + mutex = Mutex.new + div = 10 + + refused = thread_run_refused unix: unix + + 41.times.each do |i| + if i == 10 + threads << Thread.new do + sleep i.to_f/div + Process.kill :TERM, @pid + mutex.synchronize { replies << :term_sent } + end + else + threads << Thread.new do + thread_run replies, i.to_f/div, 1, mutex, refused, unix: unix + end + end + end + + threads.each(&:join) + + responses = replies.count { |r| r[/\ASlept 1/] } + resets = replies.count { |r| r == :reset } + refused = replies.count { |r| r == :refused } + msg = "#{responses} responses, #{resets} resets, #{refused} refused" + + assert_operator 9, :<=, responses, msg + + assert_operator 10, :>=, resets , msg + + assert_operator 20, :<=, refused , msg + end + + # Send requests 1 per second. Send 1, then :USR1 server, then send another 24. + # All should be responded to, and at least three workers should be used + # Run by both tcp and unix test versions + def usr1_all_respond_cluster(unix: false) + cli_server "-w #{WORKERS} -t 0:5 -q test/rackup/sleep_pid.ru", unix: unix + threads = [] + replies = [] + mutex = Mutex.new + + s = connect "sleep1", unix: unix + replies << read_body(s) + Process.kill :USR1, @pid + + refused = thread_run_refused unix: unix + + 24.times do |delay| + threads << Thread.new do + thread_run replies, delay, 1, mutex, refused, unix: unix + end + end + + threads.each(&:join) + + responses = replies.count { |r| r[/\ASlept 1/] } + resets = replies.count { |r| r == :reset } + refused = replies.count { |r| r == :refused } + + # get pids from replies, generate uniq array + qty_pids = replies.map { |body| body[/\d+\z/] }.uniq.compact.length + + msg = "#{responses} responses, #{qty_pids} uniq pids" + + assert_equal 25, responses, msg + assert_operator qty_pids, :>, 2, msg + + msg = "#{responses} responses, #{resets} resets, #{refused} refused" + + refute_includes replies, :refused, msg + + refute_includes replies, :reset , msg + end + def worker_respawn(phase = 1, size = WORKERS) skip NO_FORK_MSG unless HAS_FORK threads = [] @@ -491,7 +530,18 @@ def bad_exit_pids(pids) end.compact end - def thread_run(replies, delay, sleep_time, mutex, unix: false) + # used with thread_run to define correct 'refused' errors + def thread_run_refused(unix: false) + if unix + DARWIN ? [Errno::ENOENT, IOError] : [Errno::ENOENT] + else + DARWIN ? [Errno::ECONNREFUSED, Errno::EPIPE, EOFError] : + [Errno::ECONNREFUSED] + end + end + + # used in loop to create several 'requests' + def thread_run(replies, delay, sleep_time, mutex, refused, unix: false) begin sleep delay s = connect "sleep#{sleep_time}", unix: unix @@ -501,11 +551,7 @@ def thread_run(replies, delay, sleep_time, mutex, unix: false) # connection was accepted but then closed # client would see an empty response mutex.synchronize { replies << :reset } - rescue Errno::ECONNREFUSED, Errno::EPIPE - # TCP - Errno::ECONNREFUSED, Errno::EPIPE - # TODO UNIX - # connection was never accepted it can therefore be - # re-tried before the client receives an empty response + rescue *refused mutex.synchronize { replies << :refused } end end