From ea2c264f2dbf002dcc7ef6b76c20e19a7b05b093 Mon Sep 17 00:00:00 2001 From: MSP-Greg Date: Tue, 10 Sep 2019 11:04:59 -0500 Subject: [PATCH] test_integration.rb - add two tests using UNIX binders cluster.rb - clean up duplicate term? method workflow.yml - better step descriptions, add PR's --- .github/workflows/workflow.yml | 12 +- lib/puma/cluster.rb | 4 - test/test_integration.rb | 196 ++++++++++++++++++++------------- 3 files changed, 130 insertions(+), 82 deletions(-) diff --git a/.github/workflows/workflow.yml b/.github/workflows/workflow.yml index 24ba759f61..8e4945e003 100644 --- a/.github/workflows/workflow.yml +++ b/.github/workflows/workflow.yml @@ -1,6 +1,12 @@ name: Puma -on: [push] +on: + push: + branches: + - master + pull_request: + branches: + - '*' jobs: build: @@ -35,7 +41,7 @@ jobs: with: ruby-version: ${{ matrix.ruby }} - - name: Install ragel + - name: ubuntu & macos - install ragel if: startsWith(matrix.os, 'ubuntu') || startsWith(matrix.os, 'macos') run: | if [ "${{ matrix.os }}" == "macos" ]; then @@ -43,7 +49,7 @@ jobs: else sudo apt-get install ragel fi - - name: Windows MSYS2 + - name: windows - update MSYS2, openssl, ragel if: startsWith(matrix.os, 'windows') uses: MSP-Greg/msys2-action@master with: diff --git a/lib/puma/cluster.rb b/lib/puma/cluster.rb index 5bc22d83f3..2cc88d9f7c 100644 --- a/lib/puma/cluster.rb +++ b/lib/puma/cluster.rb @@ -92,10 +92,6 @@ def term? @term end - def term? - @term - end - def ping!(status) @last_checkin = Time.now @last_status = status diff --git a/test/test_integration.rb b/test/test_integration.rb index 164a78ea00..6b59966e82 100644 --- a/test/test_integration.rb +++ b/test/test_integration.rb @@ -15,6 +15,8 @@ class TestIntegration < Minitest::Test WORKERS = 2 + DARWIN = !!RUBY_PLATFORM[/darwin/] + def setup @ios_to_close = [] @state_path = "test/#{name}_puma.state" @@ -139,84 +141,36 @@ def test_usr2_restart_restores_environment # Send requests 10 per second. Send 10, then :TERM server, then send another 30. # No more than 10 should throw Errno::ECONNRESET. - def test_term_closes_listeners_cluster + def test_term_closes_listeners_cluster_tcp skip NO_FORK_MSG unless HAS_FORK skip_unless_signal_exist? :TERM + term_closes_listeners_cluster unix: false + end - cli_server "-w #{WORKERS} -t 0:6 -q test/rackup/sleep_pid.ru" - threads = [] - replies = [] - mutex = Mutex.new - div = 10 - - 41.times.each do |i| - if i == 10 - threads << Thread.new do - sleep i.to_f/div - Process.kill :TERM, @pid - mutex.synchronize { replies << :term_sent } - end - else - threads << Thread.new do - thread_run replies, i.to_f/div, 1, mutex - end - end - end - - threads.each(&:join) - - responses = replies.count { |r| r[/\ASlept 1/] } - resets = replies.count { |r| r == :reset } - refused = replies.count { |r| r == :refused } - msg = "#{responses} responses, #{resets} resets, #{refused} refused" - - assert_operator 9, :<=, responses, msg - - assert_operator 10, :>=, resets , msg - - assert_operator 20, :<=, refused , msg + # Send requests 10 per second. Send 10, then :TERM server, then send another 30. + # No more than 10 should throw Errno::ECONNRESET. + def test_term_closes_listeners_cluster_unix + skip NO_FORK_MSG unless HAS_FORK + skip_unless_signal_exist? :TERM + term_closes_listeners_cluster unix: true end + # Use TCPSocket for bind # Send requests 1 per second. Send 1, then :USR1 server, then send another 24. # All should be responded to, and at least three workers should be used - def test_usr1_all_respond_cluster + def test_usr1_all_respond_cluster_tcp skip NO_FORK_MSG unless HAS_FORK skip_unless_signal_exist? :USR1 + usr1_all_respond_cluster unix: false + end - cli_server "-w #{WORKERS} -t 0:5 -q test/rackup/sleep_pid.ru" - threads = [] - replies = [] - mutex = Mutex.new - - s = connect "sleep1" - replies << read_body(s) - Process.kill :USR1, @pid - - 24.times do |delay| - threads << Thread.new do - thread_run replies, delay, 1, mutex - end - end - - threads.each(&:join) - - responses = replies.count { |r| r[/\ASlept 1/] } - resets = replies.count { |r| r == :reset } - refused = replies.count { |r| r == :refused } - - # get pids from replies, generate uniq array - qty_pids = replies.map { |body| body[/\d+\z/] }.uniq.compact.length - - msg = "#{responses} responses, #{qty_pids} uniq pids" - - assert_equal 25, responses, msg - assert_operator qty_pids, :>, 2, msg - - msg = "#{responses} responses, #{resets} resets, #{refused} refused" - - refute_includes replies, :refused, msg - - refute_includes replies, :reset , msg + # Use UNIXSocket for bind + # Send requests 1 per second. Send 1, then :USR1 server, then send another 24. + # All should be responded to, and at least three workers should be used + def test_usr1_all_respond_cluster_unix + skip NO_FORK_MSG unless HAS_FORK + skip_unless_signal_exist? :USR1 + usr1_all_respond_cluster unix: true end def test_term_exit_code_single @@ -383,7 +337,7 @@ def wait_for_server_to_boot end def connect(path = nil, unix: false) - s = unix ? UNIXSocket.new("unix://#{@bind_path}") : TCPSocket.new(HOST, @tcp_port) + s = unix ? UNIXSocket.new(@bind_path) : TCPSocket.new(HOST, @tcp_port) @ios_to_close << s s << "GET /#{path} HTTP/1.1\r\n\r\n" true until s.gets == "\r\n" @@ -412,6 +366,91 @@ def cli_pumactl(argv, unix: false) pumactl end + # Send requests 10 per second. Send 10, then :TERM server, then send another 30. + # No more than 10 should throw Errno::ECONNRESET. + # Run by both tcp and unix test versions + def term_closes_listeners_cluster(unix: false) + skip NO_FORK_MSG unless HAS_FORK + skip_unless_signal_exist? :TERM + + cli_server "-w #{WORKERS} -t 0:6 -q test/rackup/sleep_pid.ru", unix: unix + threads = [] + replies = [] + mutex = Mutex.new + div = 10 + + refused = thread_run_refused unix: unix + + 41.times.each do |i| + if i == 10 + threads << Thread.new do + sleep i.to_f/div + Process.kill :TERM, @pid + mutex.synchronize { replies << :term_sent } + end + else + threads << Thread.new do + thread_run replies, i.to_f/div, 1, mutex, refused, unix: unix + end + end + end + + threads.each(&:join) + + responses = replies.count { |r| r[/\ASlept 1/] } + resets = replies.count { |r| r == :reset } + refused = replies.count { |r| r == :refused } + msg = "#{responses} responses, #{resets} resets, #{refused} refused" + + assert_operator 9, :<=, responses, msg + + assert_operator 10, :>=, resets , msg + + assert_operator 20, :<=, refused , msg + end + + # Send requests 1 per second. Send 1, then :USR1 server, then send another 24. + # All should be responded to, and at least three workers should be used + # Run by both tcp and unix test versions + def usr1_all_respond_cluster(unix: false) + cli_server "-w #{WORKERS} -t 0:5 -q test/rackup/sleep_pid.ru", unix: unix + threads = [] + replies = [] + mutex = Mutex.new + + s = connect "sleep1", unix: unix + replies << read_body(s) + Process.kill :USR1, @pid + + refused = thread_run_refused unix: unix + + 24.times do |delay| + threads << Thread.new do + thread_run replies, delay, 1, mutex, refused, unix: unix + end + end + + threads.each(&:join) + + responses = replies.count { |r| r[/\ASlept 1/] } + resets = replies.count { |r| r == :reset } + refused = replies.count { |r| r == :refused } + + # get pids from replies, generate uniq array + qty_pids = replies.map { |body| body[/\d+\z/] }.uniq.compact.length + + msg = "#{responses} responses, #{qty_pids} uniq pids" + + assert_equal 25, responses, msg + assert_operator qty_pids, :>, 2, msg + + msg = "#{responses} responses, #{resets} resets, #{refused} refused" + + refute_includes replies, :refused, msg + + refute_includes replies, :reset , msg + end + def worker_respawn(phase = 1, size = WORKERS) skip NO_FORK_MSG unless HAS_FORK threads = [] @@ -491,7 +530,18 @@ def bad_exit_pids(pids) end.compact end - def thread_run(replies, delay, sleep_time, mutex, unix: false) + # used with thread_run to define correct 'refused' errors + def thread_run_refused(unix: false) + if unix + DARWIN ? [Errno::ENOENT, IOError] : [Errno::ENOENT] + else + DARWIN ? [Errno::ECONNREFUSED, Errno::EPIPE, EOFError] : + [Errno::ECONNREFUSED] + end + end + + # used in loop to create several 'requests' + def thread_run(replies, delay, sleep_time, mutex, refused, unix: false) begin sleep delay s = connect "sleep#{sleep_time}", unix: unix @@ -501,11 +551,7 @@ def thread_run(replies, delay, sleep_time, mutex, unix: false) # connection was accepted but then closed # client would see an empty response mutex.synchronize { replies << :reset } - rescue Errno::ECONNREFUSED, Errno::EPIPE - # TCP - Errno::ECONNREFUSED, Errno::EPIPE - # TODO UNIX - # connection was never accepted it can therefore be - # re-tried before the client receives an empty response + rescue *refused mutex.synchronize { replies << :refused } end end