diff --git a/.github/workflows/workflow.yml b/.github/workflows/workflow.yml new file mode 100644 index 0000000000..8e4945e003 --- /dev/null +++ b/.github/workflows/workflow.yml @@ -0,0 +1,69 @@ +name: Puma + +on: + push: + branches: + - master + pull_request: + branches: + - '*' + +jobs: + build: + name: >- + OS: ${{ matrix.os }} Ruby: ${{ matrix.ruby }} + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ 'ubuntu-16.04', 'ubuntu-18.04', 'macos', 'windows-latest' ] + ruby: [ '2.3.x', '2.4.x', '2.5.x', '2.6.x' ] + exclude: + - os: ubuntu-16.04 + ruby: 2.4.x + - os: ubuntu-16.04 + ruby: 2.5.x + - os: ubuntu-16.04 + ruby: 2.6.x + - os: ubuntu-18.04 + ruby: 2.3.x + - os: macos + ruby: 2.3.x + - os: windows-latest + ruby: 2.3.x + steps: + - name: repo checkout + uses: actions/checkout@v1 + with: + fetch-depth: 10 + - name: load ruby + uses: actions/setup-ruby@v1 + with: + ruby-version: ${{ matrix.ruby }} + + - name: ubuntu & macos - install ragel + if: startsWith(matrix.os, 'ubuntu') || startsWith(matrix.os, 'macos') + run: | + if [ "${{ matrix.os }}" == "macos" ]; then + brew install ragel + else + sudo apt-get install ragel + fi + - name: windows - update MSYS2, openssl, ragel + if: startsWith(matrix.os, 'windows') + uses: MSP-Greg/msys2-action@master + with: + base: update + mingw: openssl ragel + + - name: RubyGems, Bundler Update + run: gem update --system --no-document --conservative + - name: bundle install + run: bundle install --jobs 4 --retry 3 + - name: compile + run: bundle exec rake compile + - name: test + run: bundle exec rake + env: + CI: true + TESTOPTS: -v diff --git a/.travis.yml b/.travis.yml index 5345ce74ad..de8e5cc351 100644 --- a/.travis.yml +++ b/.travis.yml @@ -45,7 +45,7 @@ matrix: env: OS="Bionic 18.04 OpenSSL 1.1.1" - rvm: ruby-head env: jit=yes - - rvm: 2.4.7 + - rvm: 2.4.6 os: osx osx_image: xcode11 env: OS="osx xcode11" diff --git a/lib/puma/binder.rb b/lib/puma/binder.rb index f2165f7474..d0b122e782 100644 --- a/lib/puma/binder.rb +++ b/lib/puma/binder.rb @@ -50,14 +50,6 @@ def env(sock) def close @ios.each { |i| i.close } - @unix_paths.each do |i| - # Errno::ENOENT is intermittently raised - begin - unix_socket = UNIXSocket.new i - unix_socket.close - rescue Errno::ENOENT - end - end end def import_from_env diff --git a/lib/puma/cluster.rb b/lib/puma/cluster.rb index 24f4ea5f85..2cc88d9f7c 100644 --- a/lib/puma/cluster.rb +++ b/lib/puma/cluster.rb @@ -68,7 +68,7 @@ def initialize(idx, pid, phase, options) @pid = pid @phase = phase @stage = :started - @signal = "TERM" + @signal = :TERM @options = options @first_term_sent = nil @started_at = Time.now @@ -92,10 +92,6 @@ def term? @term end - def term? - @term - end - def ping!(status) @last_checkin = Time.now @last_status = status @@ -108,7 +104,7 @@ def ping_timeout?(which) def term begin if @first_term_sent && (Time.now - @first_term_sent) > @options[:worker_shutdown_timeout] - @signal = "KILL" + @signal = :KILL else @term ||= true @first_term_sent ||= Time.now @@ -119,12 +115,12 @@ def term end def kill - Process.kill "KILL", @pid + Process.kill :KILL, @pid rescue Errno::ESRCH end def hup - Process.kill "HUP", @pid + Process.kill :HUP, @pid rescue Errno::ESRCH end end @@ -220,8 +216,10 @@ def check_workers(force=false) log "- Stopping #{w.pid} for phased upgrade..." end - w.term - log "- #{w.signal} sent to #{w.pid}..." + unless w.term? + w.term + log "- #{w.signal} sent to #{w.pid}..." + end end end end @@ -270,6 +268,7 @@ def worker(index, master) server = start_server Signal.trap "SIGTERM" do + @worker_write << "e#{Process.pid}\n" rescue nil server.stop end @@ -501,8 +500,10 @@ def run w.boot! log "- Worker #{w.index} (pid: #{pid}) booted, phase: #{w.phase}" force_check = true + when "e" + w.instance_variable_set :@term, true when "t" - w.term + w.term unless w.term? force_check = true when "p" w.ping!(result.sub(/^\d+/,'').chomp) diff --git a/lib/puma/launcher.rb b/lib/puma/launcher.rb index da9bce2573..ee21a0761f 100644 --- a/lib/puma/launcher.rb +++ b/lib/puma/launcher.rb @@ -217,11 +217,12 @@ def restart_args end def close_binder_listeners - @binder.listeners.each do |l, io| - io.close + # close binder listeners as fast as possible, so separate loop + @binder.listeners.each { |_, io| io.close } + @binder.listeners.each do |l, _| uri = URI.parse(l) next unless uri.scheme == 'unix' - File.unlink("#{uri.host}#{uri.path}") + File.unlink "#{uri.host}#{uri.path}" end end diff --git a/test/config/worker_shutdown_timeout_2.rb b/test/config/worker_shutdown_timeout_2.rb new file mode 100644 index 0000000000..ddb8c6453d --- /dev/null +++ b/test/config/worker_shutdown_timeout_2.rb @@ -0,0 +1 @@ +worker_shutdown_timeout 2 diff --git a/test/helper.rb b/test/helper.rb index e7afdbe721..142b8db5af 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -47,7 +47,11 @@ module UniquePort @mutex = Mutex.new def self.call - @mutex.synchronize { @port += 1 } + @mutex.synchronize { + @port += 1 + @port = 3307 if @port == 3306 # MySQL on Actions + @port + } end end @@ -70,6 +74,34 @@ def run(*) module TestSkips + # finds bad signals, value is an array of symbols + BAD_SIGNAL_LIST = begin + code = <<-HEREDOC + data = (%w[HUP INT TTIN TTOU USR1 USR2] - Signal.list.keys).join(' ') + + begin + Signal.trap('TERM') { } + rescue ArgumentError + data << 'TERM' + end + + puts data + HEREDOC + pipe = IO.popen RbConfig.ruby, 'r+' + pid = pipe.pid + pipe.puts code + pipe.close_write + + begin + Process.kill :TERM, pid + pipe.read.strip.split(' ').sort.map(&:to_sym) + rescue Errno::EINVAL + "#{pipe.read.strip} TERM".split(' ').sort.map(&:to_sym) + ensure + Process.wait pid + end + end + # usage: skip NO_FORK_MSG unless HAS_FORK # windows >= 2.6 fork is not defined, < 2.6 fork raises NotImplementedError HAS_FORK = ::Process.respond_to? :fork @@ -82,8 +114,8 @@ module TestSkips # usage: skip_unless_signal_exist? :USR2 def skip_unless_signal_exist?(sig, bt: caller) - signal = sig.to_s - unless Signal.list.key? signal + signal = sig.to_s.sub(/\ASIG/i, '').to_sym + if BAD_SIGNAL_LIST.include? signal skip "Signal #{signal} isn't available on the #{RUBY_PLATFORM} platform", bt end end diff --git a/test/helpers/apps.rb b/test/helpers/apps.rb index f74e3eb09a..3be3b3d1b5 100644 --- a/test/helpers/apps.rb +++ b/test/helpers/apps.rb @@ -9,4 +9,12 @@ module TestApps [200, {"Content-Type" => "text/plain"}, ["Slept #{dly}"]] end + # call with "GET /sleep HTTP/1.1\r\n\r\n", where is the number of + # seconds to sleep + # same as rackup/sleep_pid.ru + SLEEP_PID = -> (env) do + dly = (env['REQUEST_PATH'][/\/sleep(\d+)/,1] || '0').to_i + sleep dly + [200, {"Content-Type" => "text/plain"}, ["Slept #{dly} #{Process.pid}"]] + end end diff --git a/test/rackup/sleep_pid.ru b/test/rackup/sleep_pid.ru new file mode 100644 index 0000000000..0f1811c8c7 --- /dev/null +++ b/test/rackup/sleep_pid.ru @@ -0,0 +1,8 @@ +# call with "GET /sleep HTTP/1.1\r\n\r\n", where is the number of +# seconds to sleep, returns process pid + +run lambda { |env| + dly = (env['REQUEST_PATH'][/\/sleep(\d+)/,1] || '0').to_i + sleep dly + [200, {"Content-Type" => "text/plain"}, ["Slept #{dly} #{Process.pid}"]] +} diff --git a/test/test_integration.rb b/test/test_integration.rb index faefb402c1..6b59966e82 100644 --- a/test/test_integration.rb +++ b/test/test_integration.rb @@ -1,207 +1,99 @@ # frozen_string_literal: true require_relative "helper" -require "puma/cli" require "puma/control_cli" require "open3" -# TODO: Remove over-utilization of @instance variables -# TODO: remove stdout logging, get everything out of my rainbow dots - class TestIntegration < Minitest::Test + parallelize_me! + HOST = "127.0.0.1" TOKEN = "xxyyzz" - def setup - @state_path = "test/test_#{name}_puma.state" - @bind_path = "test/test_#{name}_server.sock" - @control_path = "test/test_#{name}_control.sock" + BASE = defined?(Bundler) ? "bundle exec #{Gem.ruby} -Ilib" : + "#{Gem.ruby} -Ilib" - @server = nil + WORKERS = 2 - @wait, @ready = IO.pipe + DARWIN = !!RUBY_PLATFORM[/darwin/] - @events = Puma::Events.strings - @events.on_booted { @ready << "!" } + def setup + @ios_to_close = [] + @state_path = "test/#{name}_puma.state" + @bind_path = "test/#{name}_server.sock" + @control_path = "test/#{name}_control.sock" end def teardown - File.unlink @state_path rescue nil - File.unlink @bind_path rescue nil - File.unlink @control_path rescue nil - - @wait.close - @ready.close - - if @server - Process.kill "INT", @server.pid + if defined?(@server) && @server begin - Process.wait @server.pid + Process.kill :INT, @server.pid + rescue Errno::ESRCH + end + begin + Process.wait @pid rescue Errno::ECHILD end - - @server.close + @server.close unless @server.closed? + @server = nil end - end - def server_cmd(argv) - @tcp_port = UniquePort.call - base = "#{Gem.ruby} -Ilib bin/puma" - base = "bundle exec #{base}" if defined?(Bundler) - "#{base} -b tcp://127.0.0.1:#{@tcp_port} #{argv}" - end - - def server(argv) - @server = IO.popen(server_cmd(argv), "r") - - wait_for_server_to_boot(@server) - - @server - end - - def start_forked_server(argv) - servercmd = server_cmd(argv) - pid = fork do - exec servercmd + @ios_to_close.each do |io| + io.close if io.is_a?(IO) && !io.closed? + io = nil end - sleep 5 - pid - end - - def stop_forked_server(pid) - Process.kill(:TERM, pid) - sleep 1 - Process.wait2(pid) - end - - def restart_server_and_listen(argv) - server(argv) - connection = connect - initial_reply = read_body(connection) - restart_server(@server, connection) - [initial_reply, read_body(connect)] - end - - def wait_booted - @wait.sysread 1 - end - - # reuses an existing connection to make sure that works - def restart_server(server, connection) - Process.kill :USR2, @server.pid - - connection.write "GET / HTTP/1.1\r\n\r\n" # trigger it to start by sending a new request - - wait_for_server_to_boot(server) - end - - def connect(path = nil) - s = TCPSocket.new "localhost", @tcp_port - s << "GET /#{path} HTTP/1.1\r\n\r\n" - true until s.gets == "\r\n" - s - end - - def wait_for_server_to_boot(server) - true while server.gets !~ /Ctrl-C/ # wait for server to say it booted - end - - def read_body(connection) - Timeout.timeout(10) do - loop do - response = connection.readpartial(1024) - body = response.split("\r\n\r\n", 2).last - return body if body && !body.empty? - sleep 0.01 - end - end + File.unlink @state_path rescue nil + File.unlink @bind_path rescue nil + File.unlink @control_path rescue nil end - def test_stop_via_pumactl + def test_pumactl_stop skip UNIX_SKT_MSG unless UNIX_SKT_EXIST + cli_server "-q test/rackup/sleep.ru --control-url unix://#{@control_path} --control-token #{TOKEN} -S #{@state_path}" - conf = Puma::Configuration.new do |c| - c.quiet - c.state_path @state_path - c.bind "unix://#{@bind_path}" - c.activate_control_app "unix://#{@control_path}", :auth_token => TOKEN - c.rackup "test/rackup/hello.ru" - end - - l = Puma::Launcher.new conf, :events => @events - - t = Thread.new do - Thread.current.abort_on_exception = true - l.run - end - - wait_booted + cli_pumactl "stop", unix: true - s = UNIXSocket.new @bind_path - s << "GET / HTTP/1.0\r\n\r\n" - assert_equal "Hello World", read_body(s) - - sout = StringIO.new - - ccli = Puma::ControlCLI.new %W!-S #{@state_path} stop!, sout - - ccli.run + _, status = Process.wait2 @pid + assert_equal 0, status - assert_kind_of Thread, t.join, "server didn't stop" + @server = nil end - def test_phased_restart_via_pumactl + def test_pumactl_phased_restart_cluster skip NO_FORK_MSG unless HAS_FORK - delay = 40 + cli_server "-q -w #{WORKERS} test/rackup/sleep.ru --control-url unix://#{@control_path} --control-token #{TOKEN} -S #{@state_path}", unix: true - conf = Puma::Configuration.new do |c| - c.quiet - c.state_path @state_path - c.bind "unix://#{@bind_path}" - c.activate_control_app "unix://#{@control_path}", :auth_token => TOKEN - c.workers 2 - c.worker_shutdown_timeout 2 - c.rackup "test/rackup/sleep.ru" - end + s = UNIXSocket.new @bind_path + @ios_to_close << s + s << "GET /sleep5 HTTP/1.0\r\n\r\n" - l = Puma::Launcher.new conf, :events => @events + # Get the PIDs of the phase 0 workers. + phase0_worker_pids = get_worker_pids 0 - t = Thread.new do - Thread.current.abort_on_exception = true - l.run - end + # Phased restart + cli_pumactl "phased-restart", unix: true - wait_booted + # Get the PIDs of the phase 1 workers. + phase1_worker_pids = get_worker_pids 1 - s = UNIXSocket.new @bind_path - s << "GET /sleep#{delay} HTTP/1.0\r\n\r\n" + msg = "phase 0 pids #{phase0_worker_pids.inspect} phase 1 pids #{phase1_worker_pids.inspect}" + + assert_equal WORKERS, phase0_worker_pids.length, msg + assert_equal WORKERS, phase1_worker_pids.length, msg + assert_empty phase0_worker_pids & phase1_worker_pids, "#{msg}\nBoth workers should be replaced with new" - sout = StringIO.new - # Phased restart - ccli = Puma::ControlCLI.new ["-S", @state_path, "phased-restart"], sout - ccli.run - - done = false - until done - @events.stdout.rewind - log = @events.stdout.readlines.join("") - if log =~ /- Worker \d \(pid: \d+\) booted, phase: 1/ - assert_match(/TERM sent/, log) - assert_match(/- Worker \d \(pid: \d+\) booted, phase: 1/, log) - done = true - end - end # Stop - ccli = Puma::ControlCLI.new ["-S", @state_path, "stop"], sout - ccli.run + cli_pumactl "stop", unix: true - assert_kind_of Thread, t.join, "server didn't stop" - assert File.exist? @bind_path + _, status = Process.wait2 @pid + assert_equal 0, status + + @server = nil end - def test_kill_unknown_via_pumactl + def test_pumactl_kill_unknown skip_on :jruby # we run ls to get a 'safe' pid to pass off as puma in cli stop @@ -213,8 +105,7 @@ def test_kill_unknown_via_pumactl sout = StringIO.new e = assert_raises SystemExit do - ccli = Puma::ControlCLI.new %W!-p #{safe_pid} stop!, sout - ccli.run + Puma::ControlCLI.new(%W!-p #{safe_pid} stop!, sout).run end sout.rewind # windows bad URI(is not URI?) @@ -222,72 +113,20 @@ def test_kill_unknown_via_pumactl assert_equal(1, e.status) end - def test_restart_closes_keepalive_sockets + def test_usr2_restart_single skip_unless_signal_exist? :USR2 _, new_reply = restart_server_and_listen("-q test/rackup/hello.ru") assert_equal "Hello World", new_reply end - def test_restart_closes_keepalive_sockets_workers + def test_usr2_restart_cluster skip NO_FORK_MSG unless HAS_FORK - _, new_reply = restart_server_and_listen("-q -w 2 test/rackup/hello.ru") + _, new_reply = restart_server_and_listen("-q -w #{WORKERS} test/rackup/hello.ru") assert_equal "Hello World", new_reply end - def test_sigterm_closes_listeners_on_forked_servers - skip NO_FORK_MSG unless HAS_FORK - pid = start_forked_server("-w 2 -q test/rackup/sleep.ru") - threads = [] - initial_reply = nil - next_replies = [] - condition_variable = ConditionVariable.new - mutex = Mutex.new - - threads << Thread.new do - s = connect "sleep1" - mutex.synchronize { condition_variable.broadcast } - initial_reply = read_body(s) - end - - threads << Thread.new do - mutex.synchronize { - condition_variable.wait(mutex, 1) - Process.kill("SIGTERM", pid) - } - end - - 10.times.each do |i| - threads << Thread.new do - mutex.synchronize { condition_variable.wait(mutex, 1.5) } - - begin - s = connect "sleep1" - read_body(s) - next_replies << :success - rescue Errno::ECONNRESET - # connection was accepted but then closed - # client would see an empty response - next_replies << :connection_reset - rescue Errno::ECONNREFUSED - # connection was was never accepted - # it can therefore be re-tried before the - # client receives an empty response - next_replies << :connection_refused - end - end - end - - threads.map(&:join) - - assert_equal "Slept 1", initial_reply - - assert_includes next_replies, :connection_refused - - refute_includes next_replies, :connection_reset - end - # It does not share environments between multiple generations, which would break Dotenv - def test_restart_restores_environment + def test_usr2_restart_restores_environment # jruby has a bug where setting `nil` into the ENV or `delete` do not change the # next workers ENV skip_on :jruby @@ -300,74 +139,110 @@ def test_restart_restores_environment refute_equal initial_reply, new_reply end - def test_term_signal_exit_code_in_single_mode + # Send requests 10 per second. Send 10, then :TERM server, then send another 30. + # No more than 10 should throw Errno::ECONNRESET. + def test_term_closes_listeners_cluster_tcp skip NO_FORK_MSG unless HAS_FORK + skip_unless_signal_exist? :TERM + term_closes_listeners_cluster unix: false + end - pid = start_forked_server("test/rackup/hello.ru") - _, status = stop_forked_server(pid) + # Send requests 10 per second. Send 10, then :TERM server, then send another 30. + # No more than 10 should throw Errno::ECONNRESET. + def test_term_closes_listeners_cluster_unix + skip NO_FORK_MSG unless HAS_FORK + skip_unless_signal_exist? :TERM + term_closes_listeners_cluster unix: true + end - assert_equal 15, status + # Use TCPSocket for bind + # Send requests 1 per second. Send 1, then :USR1 server, then send another 24. + # All should be responded to, and at least three workers should be used + def test_usr1_all_respond_cluster_tcp + skip NO_FORK_MSG unless HAS_FORK + skip_unless_signal_exist? :USR1 + usr1_all_respond_cluster unix: false end - def test_term_signal_exit_code_in_clustered_mode + # Use UNIXSocket for bind + # Send requests 1 per second. Send 1, then :USR1 server, then send another 24. + # All should be responded to, and at least three workers should be used + def test_usr1_all_respond_cluster_unix skip NO_FORK_MSG unless HAS_FORK + skip_unless_signal_exist? :USR1 + usr1_all_respond_cluster unix: true + end - pid = start_forked_server("-w 2 test/rackup/hello.ru") - _, status = stop_forked_server(pid) + def test_term_exit_code_single + skip_unless_signal_exist? :TERM + + cli_server "test/rackup/hello.ru" + _, status = stop_server assert_equal 15, status end - def test_term_signal_suppress_in_single_mode + def test_term_exit_code_cluster skip NO_FORK_MSG unless HAS_FORK - pid = start_forked_server("-C test/config/suppress_exception.rb test/rackup/hello.ru") - _, status = stop_forked_server(pid) + cli_server "-w #{WORKERS} test/rackup/hello.ru" + _, status = stop_server + + assert_equal 15, status + end + + def test_term_suppress_single + skip_unless_signal_exist? :TERM + + cli_server "-C test/config/suppress_exception.rb test/rackup/hello.ru" + _, status = stop_server assert_equal 0, status end - def test_term_signal_suppress_in_clustered_mode + def test_term_suppress_cluster skip NO_FORK_MSG unless HAS_FORK - server("-w 2 -C test/config/suppress_exception.rb test/rackup/hello.ru") + cli_server "-w #{WORKERS} -C test/config/suppress_exception.rb test/rackup/hello.ru" - Process.kill(:TERM, @server.pid) + Process.kill :TERM, @pid begin - Process.wait @server.pid + Process.wait @pid rescue Errno::ECHILD end status = $?.exitstatus assert_equal 0, status + @server.close unless @server.closed? @server = nil # prevent `#teardown` from killing already killed server end def test_load_path_includes_extra_deps skip NO_FORK_MSG unless HAS_FORK - server("-w 2 -C test/config/prune_bundler_with_deps.rb test/rackup/hello-last-load-path.ru") + cli_server "-w 2 -C test/config/prune_bundler_with_deps.rb test/rackup/hello-last-load-path.ru" last_load_path = read_body(connect) assert_match(%r{gems/rdoc-[\d.]+/lib$}, last_load_path) end - def test_not_accepts_new_connections_after_term_signal - skip_on :jruby, :windows + def test_term_not_accepts_new_connections + skip_unless_signal_exist? :TERM + skip_on :jruby - server('test/rackup/sleep.ru') + cli_server 'test/rackup/sleep.ru' - _stdin, curl_stdout, _stderr, curl_wait_thread = Open3.popen3("curl http://127.0.0.1:#{@tcp_port}/sleep10") + _stdin, curl_stdout, _stderr, curl_wait_thread = Open3.popen3("curl http://#{HOST}:#{@tcp_port}/sleep10") sleep 1 # ensure curl send a request - Process.kill(:TERM, @server.pid) + Process.kill :TERM, @pid true while @server.gets !~ /Gracefully stopping/ # wait for server to begin graceful shutdown # Invoke a request which must be rejected - _stdin, _stdout, rejected_curl_stderr, rejected_curl_wait_thread = Open3.popen3("curl 127.0.0.1:#{@tcp_port}") + _stdin, _stdout, rejected_curl_stderr, rejected_curl_wait_thread = Open3.popen3("curl #{HOST}:#{@tcp_port}") - assert nil != Process.getpgid(@server.pid) # ensure server is still running - assert nil != Process.getpgid(rejected_curl_wait_thread[:pid]) # ensure first curl invokation still in progress + refute_nil Process.getpgid(@pid) # ensure server is still running + refute_nil Process.getpgid(rejected_curl_wait_thread[:pid]) # ensure first curl invokation still in progress curl_wait_thread.join rejected_curl_wait_thread.join @@ -375,287 +250,309 @@ def test_not_accepts_new_connections_after_term_signal assert_match(/Slept 10/, curl_stdout.read) assert_match(/Connection refused/, rejected_curl_stderr.read) - Process.wait(@server.pid) + Process.wait @pid + @server.close unless @server.closed? @server = nil # prevent `#teardown` from killing already killed server end - def test_no_zombie_children + def test_term_worker_clean_exit_cluster skip NO_FORK_MSG unless HAS_FORK + skip_unless_signal_exist? :TERM skip "Intermittent failure on Ruby 2.2" if RUBY_VERSION < '2.3' - worker_pids = [] - server = server("-w 2 test/rackup/hello.ru") + cli_server "-w #{WORKERS} test/rackup/hello.ru" + # Get the PIDs of the child workers. - while worker_pids.size < 2 - next unless line = server.gets.match(/pid: (\d+)/) - worker_pids << line.captures.first.to_i - end + worker_pids = get_worker_pids 0 # Signal the workers to terminate, and wait for them to die. - Process.kill :TERM, @server.pid - Process.wait @server.pid - @server = nil # prevent `#teardown` from killing already killed server + Process.kill :TERM, @pid + Process.wait @pid + + zombies = bad_exit_pids worker_pids - # Check if the worker processes remain in the process table. - # Process.kill should raise the Errno::ESRCH exception, - # indicating the process is dead and has been reaped. - zombies = worker_pids.map do |pid| - begin - pid if Process.kill 0, pid - rescue Errno::ESRCH - nil - end - end.compact assert_empty zombies, "Process ids #{zombies} became zombies" end - def test_worker_spawn_external_term - worker_respawn { |l, old_pids| - old_pids.each { |p| Process.kill :TERM, p } - } + # mimicking stuck workers, test respawn with external TERM + def test_stuck_external_term_spawn_cluster + skip_unless_signal_exist? :TERM + + worker_respawn(0) do |phase0_worker_pids| + last = phase0_worker_pids.last + phase0_worker_pids.each do |pid| + Process.kill :TERM, pid + sleep 4 unless pid == last + end + end end - def test_worker_phased_restart - worker_respawn { |l, old_pids| l.phased_restart } + # mimicking stuck workers, test restart + def test_stuck_phased_restart_cluster + skip_unless_signal_exist? :USR1 + worker_respawn { |phase0_worker_pids| Process.kill :USR1, @pid } end private - def worker_respawn - skip NO_FORK_MSG unless HAS_FORK - port = UniquePort.call - workers_booted = 0 - - conf = Puma::Configuration.new do |c| - c.bind "tcp://#{HOST}:#{port}" - c.threads 1, 1 - c.workers 2 - c.worker_shutdown_timeout 2 - c.app TestApps::SLEEP - c.after_worker_fork { |idx| workers_booted += 1 } + def cli_server(argv, unix: false) + if unix + cmd = "#{BASE} bin/puma -b unix://#{@bind_path} #{argv}" + else + @tcp_port = UniquePort.call + cmd = "#{BASE} bin/puma -b tcp://#{HOST}:#{@tcp_port} #{argv}" end + @server = IO.popen(cmd, "r") + wait_for_server_to_boot + @pid = @server.pid + @server + end - # start Puma via launcher - thr, launcher, _e = run_launcher conf - - # make sure two workers have booted - time = 0 - until workers_booted >= 2 || time >= 10 - sleep 2 - time += 2 + def stop_server(pid = @pid, signal: :TERM) + Process.kill signal, pid + sleep 1 + begin + Process.wait2 pid + rescue Errno::ECHILD end + end - cluster = launcher.instance_variable_get :@runner + def restart_server_and_listen(argv) + cli_server argv + connection = connect + initial_reply = read_body(connection) + restart_server(connection) + [initial_reply, read_body(connect)] + end - http0 = Net::HTTP.new HOST, port - http1 = Net::HTTP.new HOST, port - body0 = nil - body1 = nil + # reuses an existing connection to make sure that works + def restart_server(connection) + Process.kill :USR2, @pid + connection.write "GET / HTTP/1.1\r\n\r\n" # trigger it to start by sending a new request + wait_for_server_to_boot + end - worker0 = Thread.new do - begin - req0 = Net::HTTP::Get.new "/sleep35", {} - http0.start.request(req0) { |rep0| body0 = rep0.body } - rescue - end - end + def wait_for_server_to_boot + true while @server.gets !~ /Ctrl-C/ # wait for server to say it booted + end - worker1 = Thread.new do - begin - req1 = Net::HTTP::Get.new "/sleep40", {} - http1.start.request(req1) { |rep1| body1 = rep1.body } - rescue + def connect(path = nil, unix: false) + s = unix ? UNIXSocket.new(@bind_path) : TCPSocket.new(HOST, @tcp_port) + @ios_to_close << s + s << "GET /#{path} HTTP/1.1\r\n\r\n" + true until s.gets == "\r\n" + s + end + + def read_body(connection) + Timeout.timeout(10) do + loop do + response = connection.readpartial(1024) + body = response.split("\r\n\r\n", 2).last + return body if body && !body.empty? + sleep 0.01 end end + end - old_pids = cluster.instance_variable_get(:@workers).map(&:pid) + def cli_pumactl(argv, unix: false) + if unix + pumactl = IO.popen("#{BASE} bin/pumactl -C unix://#{@control_path} -T #{TOKEN} #{argv}", "r") + else + pumactl = IO.popen("#{BASE} bin/pumactl #{argv}", "r") + end + @ios_to_close << pumactl + Process.wait pumactl.pid + pumactl + end - start_time = Time.now.to_f + # Send requests 10 per second. Send 10, then :TERM server, then send another 30. + # No more than 10 should throw Errno::ECONNRESET. + # Run by both tcp and unix test versions + def term_closes_listeners_cluster(unix: false) + skip NO_FORK_MSG unless HAS_FORK + skip_unless_signal_exist? :TERM - # below should 'cancel' the phase 0 workers, either via phased_restart or - # externally SIGTERM'ing them - yield launcher, old_pids - - # make sure four workers have booted - time = 0 - until workers_booted >= 4 || time >= 45 - sleep 2 - time += 2 - end + cli_server "-w #{WORKERS} -t 0:6 -q test/rackup/sleep_pid.ru", unix: unix + threads = [] + replies = [] + mutex = Mutex.new + div = 10 - new_pids = cluster.instance_variable_get(:@workers).map(&:pid) + refused = thread_run_refused unix: unix - # should be empty if all old workers removed - old_waited = old_pids.map { |pid| - begin - Process.wait(pid, Process::WNOHANG) - pid - rescue Errno::ECHILD - nil # child is already terminated + 41.times.each do |i| + if i == 10 + threads << Thread.new do + sleep i.to_f/div + Process.kill :TERM, @pid + mutex.synchronize { replies << :term_sent } + end + else + threads << Thread.new do + thread_run replies, i.to_f/div, 1, mutex, refused, unix: unix + end end - }.compact + end - Thread.kill worker0 - Thread.kill worker1 + threads.each(&:join) - launcher.stop - assert_kind_of Thread, thr.join, "server didn't stop" + responses = replies.count { |r| r[/\ASlept 1/] } + resets = replies.count { |r| r == :reset } + refused = replies.count { |r| r == :refused } + msg = "#{responses} responses, #{resets} resets, #{refused} refused" - refute_equal 'Slept 35', body0 - refute_equal 'Slept 40', body1 + assert_operator 9, :<=, responses, msg - # Since 35 is the shorter of the two requests, server should restart - # and cancel both requests - assert_operator (Time.now.to_f - start_time).round(2), :<, 35 + assert_operator 10, :>=, resets , msg - msg = "old_pids #{old_pids.inspect} new_pids #{new_pids.inspect} old_waited #{old_waited.inspect}" - assert_equal 2, new_pids.length, msg - assert_equal 2, old_pids.length, msg - assert_empty new_pids & old_pids, "#{msg}\nBoth workers should be replaced with new" - assert_empty old_waited, msg + assert_operator 20, :<=, refused , msg end - def run_launcher(conf) - # below for future PR - #@wait, @ready = IO.pipe - # @ios_to_close << @wait << @ready - #@events = Puma::Events.strings - #@events.on_booted { @ready << "!" } + # Send requests 1 per second. Send 1, then :USR1 server, then send another 24. + # All should be responded to, and at least three workers should be used + # Run by both tcp and unix test versions + def usr1_all_respond_cluster(unix: false) + cli_server "-w #{WORKERS} -t 0:5 -q test/rackup/sleep_pid.ru", unix: unix + threads = [] + replies = [] + mutex = Mutex.new - launcher = Puma::Launcher.new conf, :events => @events + s = connect "sleep1", unix: unix + replies << read_body(s) + Process.kill :USR1, @pid - thr = Thread.new do - launcher.run + refused = thread_run_refused unix: unix + + 24.times do |delay| + threads << Thread.new do + thread_run replies, delay, 1, mutex, refused, unix: unix + end end - # wait for boot from #@events.on_booted - @wait.sysread 1 + threads.each(&:join) - [thr, launcher, @events] - end + responses = replies.count { |r| r[/\ASlept 1/] } + resets = replies.count { |r| r == :reset } + refused = replies.count { |r| r == :refused } - def test_worker_spawn_external_term - worker_respawn { |l, old_pids| - old_pids.each { |p| Process.kill :TERM, p } - } - end + # get pids from replies, generate uniq array + qty_pids = replies.map { |body| body[/\d+\z/] }.uniq.compact.length - def test_worker_phased_restart - worker_respawn { |l, old_pids| l.phased_restart } - end + msg = "#{responses} responses, #{qty_pids} uniq pids" - private + assert_equal 25, responses, msg + assert_operator qty_pids, :>, 2, msg - def worker_respawn - skip NO_FORK_MSG unless HAS_FORK - port = UniquePort.call - workers_booted = 0 - - conf = Puma::Configuration.new do |c| - c.bind "tcp://#{HOST}:#{port}" - c.threads 1, 1 - c.workers 2 - c.worker_shutdown_timeout 2 - c.app TestApps::SLEEP - c.after_worker_fork { |idx| workers_booted += 1 } - end + msg = "#{responses} responses, #{resets} resets, #{refused} refused" - # start Puma via launcher - thr, launcher, _e = run_launcher conf + refute_includes replies, :refused, msg - # make sure two workers have booted - time = 0 - until workers_booted >= 2 || time >= 10 - sleep 2 - time += 2 - end + refute_includes replies, :reset , msg + end - cluster = launcher.instance_variable_get :@runner + def worker_respawn(phase = 1, size = WORKERS) + skip NO_FORK_MSG unless HAS_FORK + threads = [] - http0 = Net::HTTP.new HOST, port - http1 = Net::HTTP.new HOST, port - body0 = nil - body1 = nil + cli_server "-w #{WORKERS} -t 1:1 -C test/config/worker_shutdown_timeout_2.rb test/rackup/sleep_pid.ru" - worker0 = Thread.new do - begin - req0 = Net::HTTP::Get.new "/sleep35", {} - http0.start.request(req0) { |rep0| body0 = rep0.body } - rescue - end - end + # make sure two workers have booted + phase0_worker_pids = get_worker_pids 0 - worker1 = Thread.new do - begin - req1 = Net::HTTP::Get.new "/sleep40", {} - http1.start.request(req1) { |rep1| body1 = rep1.body } - rescue + [35, 40].each do |sleep_time| + threads << Thread.new do + begin + connect "sleep#{sleep_time}" + # stuck connections will raise IOError or Errno::ECONNRESET + # when shutdown + rescue IOError, Errno::ECONNRESET + end end end - old_pids = cluster.instance_variable_get(:@workers).map(&:pid) - - start_time = Time.now.to_f + @start_time = Time.now.to_f # below should 'cancel' the phase 0 workers, either via phased_restart or - # externally SIGTERM'ing them - yield launcher, old_pids - - # make sure four workers have booted - time = 0 - until workers_booted >= 4 || time >= 45 - sleep 2 - time += 2 - end + # externally TERM'ing them + yield phase0_worker_pids - new_pids = cluster.instance_variable_get(:@workers).map(&:pid) + # wait for new workers to boot + phase1_worker_pids = get_worker_pids phase - # should be empty if all old workers removed - old_waited = old_pids.map { |pid| - begin - Process.wait(pid, Process::WNOHANG) - pid - rescue Errno::ECHILD - nil # child is already terminated - end - }.compact + # should be empty if all phase 0 workers cleanly exited + phase0_exited = bad_exit_pids phase0_worker_pids - Thread.kill worker0 - Thread.kill worker1 + # Since 35 is the shorter of the two requests, server should restart + # and cancel both requests + assert_operator (Time.now.to_f - @start_time).round(2), :<, 35 - launcher.stop - assert_kind_of Thread, thr.join, "server didn't stop" + msg = "phase0_worker_pids #{phase0_worker_pids.inspect} phase1_worker_pids #{phase1_worker_pids.inspect} phase0_exited #{phase0_exited.inspect}" + assert_equal WORKERS, phase0_worker_pids.length, msg - refute_equal 'Slept 35', body0 - refute_equal 'Slept 40', body1 + assert_equal WORKERS, phase1_worker_pids.length, msg + assert_empty phase0_worker_pids & phase1_worker_pids, "#{msg}\nBoth workers should be replaced with new" - # Since 35 is the shorter of the two requests, server should restart - # and cancel both requests - assert_operator (Time.now.to_f - start_time).round(2), :<, 35 + assert_empty phase0_exited, msg - msg = "old_pids #{old_pids.inspect} new_pids #{new_pids.inspect} old_waited #{old_waited.inspect}" - assert_equal 2, new_pids.length, msg - assert_equal 2, old_pids.length, msg - assert_empty new_pids & old_pids, "#{msg}\nBoth workers should be replaced with new" - assert_empty old_waited, msg + threads.each { |th| Thread.kill th } + stop_server signal: :KILL end - def run_launcher(conf) - # below for future PR - #@wait, @ready = IO.pipe - # @ios_to_close << @wait << @ready - #@events = Puma::Events.strings - #@events.on_booted { @ready << "!" } + # gets worker pids from @server output + # new workers created from 'phased-restart' increment phase + # new workers created from externally shutdown pids maintain same phase + def get_worker_pids(phase = 1, size = WORKERS) + pids = [] + re = /pid: (\d+)\) booted, phase: #{phase}/ + while pids.size < size + line = @server.gets # line variable left for debugging + if pid = line[re, 1] + pids << pid + else + sleep 2 + end + end + pids.map(&:to_i) + end - launcher = Puma::Launcher.new conf, :events => @events + # Returns an array of pids still in the process table, so it should + # be empty for a clean exit. + # Process.kill should raise the Errno::ESRCH exception, indicating the + # process is dead and has been reaped. + def bad_exit_pids(pids) + pids.map do |pid| + begin + pid if Process.kill 0, pid + rescue Errno::ESRCH + nil + end + end.compact + end - thr = Thread.new do - launcher.run + # used with thread_run to define correct 'refused' errors + def thread_run_refused(unix: false) + if unix + DARWIN ? [Errno::ENOENT, IOError] : [Errno::ENOENT] + else + DARWIN ? [Errno::ECONNREFUSED, Errno::EPIPE, EOFError] : + [Errno::ECONNREFUSED] end + end - # wait for boot from #@events.on_booted - @wait.sysread 1 - - [thr, launcher, @events] + # used in loop to create several 'requests' + def thread_run(replies, delay, sleep_time, mutex, refused, unix: false) + begin + sleep delay + s = connect "sleep#{sleep_time}", unix: unix + body = read_body(s) + mutex.synchronize { replies << body } + rescue Errno::ECONNRESET + # connection was accepted but then closed + # client would see an empty response + mutex.synchronize { replies << :reset } + rescue *refused + mutex.synchronize { replies << :refused } + end end end diff --git a/test/test_persistent.rb b/test/test_persistent.rb index 7144f72fc8..2e4c77e9ec 100644 --- a/test/test_persistent.rb +++ b/test/test_persistent.rb @@ -1,14 +1,17 @@ require_relative "helper" class TestPersistent < Minitest::Test + + HOST = "127.0.0.1" + def setup - @valid_request = "GET / HTTP/1.1\r\nHost: test.com\r\nContent-Type: text/plain\r\n\r\n" - @close_request = "GET / HTTP/1.1\r\nHost: test.com\r\nContent-Type: text/plain\r\nConnection: close\r\n\r\n" + @valid_request = "GET / HTTP/1.1\r\nHost: test.com\r\nContent-Type: text/plain\r\n\r\n" + @close_request = "GET / HTTP/1.1\r\nHost: test.com\r\nContent-Type: text/plain\r\nConnection: close\r\n\r\n" @http10_request = "GET / HTTP/1.0\r\nHost: test.com\r\nContent-Type: text/plain\r\n\r\n" - @keep_request = "GET / HTTP/1.0\r\nHost: test.com\r\nContent-Type: text/plain\r\nConnection: Keep-Alive\r\n\r\n" + @keep_request = "GET / HTTP/1.0\r\nHost: test.com\r\nContent-Type: text/plain\r\nConnection: Keep-Alive\r\n\r\n" - @valid_post = "POST / HTTP/1.1\r\nHost: test.com\r\nContent-Type: text/plain\r\nContent-Length: 5\r\n\r\nhello" - @valid_no_body = "GET / HTTP/1.1\r\nHost: test.com\r\nX-Status: 204\r\nContent-Type: text/plain\r\n\r\n" + @valid_post = "POST / HTTP/1.1\r\nHost: test.com\r\nContent-Type: text/plain\r\nContent-Length: 5\r\n\r\nhello" + @valid_no_body = "GET / HTTP/1.1\r\nHost: test.com\r\nX-Status: 204\r\nContent-Type: text/plain\r\n\r\n" @headers = { "X-Header" => "Works" } @body = ["Hello"] @@ -20,15 +23,14 @@ def setup [status, @headers, @body] end - @host = "127.0.0.1" @port = UniquePort.call @server = Puma::Server.new @simple - @server.add_tcp_listener "127.0.0.1", @port + @server.add_tcp_listener HOST, @port @server.max_threads = 1 @server.run - @client = TCPSocket.new "127.0.0.1", @port + @client = TCPSocket.new HOST, @port end def teardown @@ -230,7 +232,7 @@ def test_keepalive_doesnt_starve_clients @client << @valid_request - c2 = TCPSocket.new @host, @port + c2 = TCPSocket.new HOST, @port c2 << @valid_request out = IO.select([c2], nil, nil, 1) @@ -240,6 +242,8 @@ def test_keepalive_doesnt_starve_clients assert_equal "HTTP/1.1 200 OK\r\nX-Header: Works\r\nContent-Length: #{sz}\r\n\r\n", lines(4, c2) assert_equal "Hello", c2.read(5) + ensure + c2.close end end diff --git a/test/test_puma_server.rb b/test/test_puma_server.rb index cb5a99767d..39e987a509 100644 --- a/test/test_puma_server.rb +++ b/test/test_puma_server.rb @@ -1,9 +1,10 @@ require_relative "helper" class TestPumaServer < Minitest::Test + parallelize_me! def setup - @port = 0 + @port = UniquePort.call @host = "127.0.0.1" @app = lambda { |env| [200, {}, [env['rack.url_scheme']]] } diff --git a/test/test_puma_server_ssl.rb b/test/test_puma_server_ssl.rb index 9e2a3c02cb..4b60ecfbc0 100644 --- a/test/test_puma_server_ssl.rb +++ b/test/test_puma_server_ssl.rb @@ -32,6 +32,7 @@ def ssl_error(server, peeraddr, peercert, error) end class TestPumaServerSSL < Minitest::Test + parallelize_me! def setup @http = nil @@ -203,6 +204,7 @@ def test_tls_v1_1_rejection # client-side TLS authentication tests class TestPumaServerSSLClient < Minitest::Test + parallelize_me! def assert_ssl_client_error_match(error, subject=nil, &blk) host = "127.0.0.1" diff --git a/test/test_web_server.rb b/test/test_web_server.rb index a433ed13dd..c8073b0681 100644 --- a/test/test_web_server.rb +++ b/test/test_web_server.rb @@ -17,6 +17,8 @@ def call(env) end class WebServerTest < Minitest::Test + parallelize_me! + VALID_REQUEST = "GET / HTTP/1.1\r\nHost: www.zedshaw.com\r\nContent-Type: text/plain\r\n\r\n" def setup