Skip to content

Commit

Permalink
test_integration.rb - add two tests using UNIX binders
Browse files Browse the repository at this point in the history
cluster.rb - clean up duplicate term? method

workflow.yml - better step descriptions, add PR's
  • Loading branch information
MSP-Greg committed Sep 10, 2019
1 parent 259d565 commit ea2c264
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 82 deletions.
12 changes: 9 additions & 3 deletions .github/workflows/workflow.yml
@@ -1,6 +1,12 @@
name: Puma

on: [push]
on:
push:
branches:
- master
pull_request:
branches:
- '*'

jobs:
build:
Expand Down Expand Up @@ -35,15 +41,15 @@ jobs:
with:
ruby-version: ${{ matrix.ruby }}

- name: Install ragel
- name: ubuntu & macos - install ragel
if: startsWith(matrix.os, 'ubuntu') || startsWith(matrix.os, 'macos')
run: |
if [ "${{ matrix.os }}" == "macos" ]; then
brew install ragel
else
sudo apt-get install ragel
fi
- name: Windows MSYS2
- name: windows - update MSYS2, openssl, ragel
if: startsWith(matrix.os, 'windows')
uses: MSP-Greg/msys2-action@master
with:
Expand Down
4 changes: 0 additions & 4 deletions lib/puma/cluster.rb
Expand Up @@ -92,10 +92,6 @@ def term?
@term
end

def term?
@term
end

def ping!(status)
@last_checkin = Time.now
@last_status = status
Expand Down
196 changes: 121 additions & 75 deletions test/test_integration.rb
Expand Up @@ -15,6 +15,8 @@ class TestIntegration < Minitest::Test

WORKERS = 2

DARWIN = !!RUBY_PLATFORM[/darwin/]

def setup
@ios_to_close = []
@state_path = "test/#{name}_puma.state"
Expand Down Expand Up @@ -139,84 +141,36 @@ def test_usr2_restart_restores_environment

# Send requests 10 per second. Send 10, then :TERM server, then send another 30.
# No more than 10 should throw Errno::ECONNRESET.
def test_term_closes_listeners_cluster
def test_term_closes_listeners_cluster_tcp
skip NO_FORK_MSG unless HAS_FORK
skip_unless_signal_exist? :TERM
term_closes_listeners_cluster unix: false
end

cli_server "-w #{WORKERS} -t 0:6 -q test/rackup/sleep_pid.ru"
threads = []
replies = []
mutex = Mutex.new
div = 10

41.times.each do |i|
if i == 10
threads << Thread.new do
sleep i.to_f/div
Process.kill :TERM, @pid
mutex.synchronize { replies << :term_sent }
end
else
threads << Thread.new do
thread_run replies, i.to_f/div, 1, mutex
end
end
end

threads.each(&:join)

responses = replies.count { |r| r[/\ASlept 1/] }
resets = replies.count { |r| r == :reset }
refused = replies.count { |r| r == :refused }
msg = "#{responses} responses, #{resets} resets, #{refused} refused"

assert_operator 9, :<=, responses, msg

assert_operator 10, :>=, resets , msg

assert_operator 20, :<=, refused , msg
# Send requests 10 per second. Send 10, then :TERM server, then send another 30.
# No more than 10 should throw Errno::ECONNRESET.
def test_term_closes_listeners_cluster_unix
skip NO_FORK_MSG unless HAS_FORK
skip_unless_signal_exist? :TERM
term_closes_listeners_cluster unix: true
end

# Use TCPSocket for bind
# Send requests 1 per second. Send 1, then :USR1 server, then send another 24.
# All should be responded to, and at least three workers should be used
def test_usr1_all_respond_cluster
def test_usr1_all_respond_cluster_tcp
skip NO_FORK_MSG unless HAS_FORK
skip_unless_signal_exist? :USR1
usr1_all_respond_cluster unix: false
end

cli_server "-w #{WORKERS} -t 0:5 -q test/rackup/sleep_pid.ru"
threads = []
replies = []
mutex = Mutex.new

s = connect "sleep1"
replies << read_body(s)
Process.kill :USR1, @pid

24.times do |delay|
threads << Thread.new do
thread_run replies, delay, 1, mutex
end
end

threads.each(&:join)

responses = replies.count { |r| r[/\ASlept 1/] }
resets = replies.count { |r| r == :reset }
refused = replies.count { |r| r == :refused }

# get pids from replies, generate uniq array
qty_pids = replies.map { |body| body[/\d+\z/] }.uniq.compact.length

msg = "#{responses} responses, #{qty_pids} uniq pids"

assert_equal 25, responses, msg
assert_operator qty_pids, :>, 2, msg

msg = "#{responses} responses, #{resets} resets, #{refused} refused"

refute_includes replies, :refused, msg

refute_includes replies, :reset , msg
# Use UNIXSocket for bind
# Send requests 1 per second. Send 1, then :USR1 server, then send another 24.
# All should be responded to, and at least three workers should be used
def test_usr1_all_respond_cluster_unix
skip NO_FORK_MSG unless HAS_FORK
skip_unless_signal_exist? :USR1
usr1_all_respond_cluster unix: true
end

def test_term_exit_code_single
Expand Down Expand Up @@ -383,7 +337,7 @@ def wait_for_server_to_boot
end

def connect(path = nil, unix: false)
s = unix ? UNIXSocket.new("unix://#{@bind_path}") : TCPSocket.new(HOST, @tcp_port)
s = unix ? UNIXSocket.new(@bind_path) : TCPSocket.new(HOST, @tcp_port)
@ios_to_close << s
s << "GET /#{path} HTTP/1.1\r\n\r\n"
true until s.gets == "\r\n"
Expand Down Expand Up @@ -412,6 +366,91 @@ def cli_pumactl(argv, unix: false)
pumactl
end

# Send requests 10 per second. Send 10, then :TERM server, then send another 30.
# No more than 10 should throw Errno::ECONNRESET.
# Run by both tcp and unix test versions
def term_closes_listeners_cluster(unix: false)
skip NO_FORK_MSG unless HAS_FORK
skip_unless_signal_exist? :TERM

cli_server "-w #{WORKERS} -t 0:6 -q test/rackup/sleep_pid.ru", unix: unix
threads = []
replies = []
mutex = Mutex.new
div = 10

refused = thread_run_refused unix: unix

41.times.each do |i|
if i == 10
threads << Thread.new do
sleep i.to_f/div
Process.kill :TERM, @pid
mutex.synchronize { replies << :term_sent }
end
else
threads << Thread.new do
thread_run replies, i.to_f/div, 1, mutex, refused, unix: unix
end
end
end

threads.each(&:join)

responses = replies.count { |r| r[/\ASlept 1/] }
resets = replies.count { |r| r == :reset }
refused = replies.count { |r| r == :refused }
msg = "#{responses} responses, #{resets} resets, #{refused} refused"

assert_operator 9, :<=, responses, msg

assert_operator 10, :>=, resets , msg

assert_operator 20, :<=, refused , msg
end

# Send requests 1 per second. Send 1, then :USR1 server, then send another 24.
# All should be responded to, and at least three workers should be used
# Run by both tcp and unix test versions
def usr1_all_respond_cluster(unix: false)
cli_server "-w #{WORKERS} -t 0:5 -q test/rackup/sleep_pid.ru", unix: unix
threads = []
replies = []
mutex = Mutex.new

s = connect "sleep1", unix: unix
replies << read_body(s)
Process.kill :USR1, @pid

refused = thread_run_refused unix: unix

24.times do |delay|
threads << Thread.new do
thread_run replies, delay, 1, mutex, refused, unix: unix
end
end

threads.each(&:join)

responses = replies.count { |r| r[/\ASlept 1/] }
resets = replies.count { |r| r == :reset }
refused = replies.count { |r| r == :refused }

# get pids from replies, generate uniq array
qty_pids = replies.map { |body| body[/\d+\z/] }.uniq.compact.length

msg = "#{responses} responses, #{qty_pids} uniq pids"

assert_equal 25, responses, msg
assert_operator qty_pids, :>, 2, msg

msg = "#{responses} responses, #{resets} resets, #{refused} refused"

refute_includes replies, :refused, msg

refute_includes replies, :reset , msg
end

def worker_respawn(phase = 1, size = WORKERS)
skip NO_FORK_MSG unless HAS_FORK
threads = []
Expand Down Expand Up @@ -491,7 +530,18 @@ def bad_exit_pids(pids)
end.compact
end

def thread_run(replies, delay, sleep_time, mutex, unix: false)
# used with thread_run to define correct 'refused' errors
def thread_run_refused(unix: false)
if unix
DARWIN ? [Errno::ENOENT, IOError] : [Errno::ENOENT]
else
DARWIN ? [Errno::ECONNREFUSED, Errno::EPIPE, EOFError] :
[Errno::ECONNREFUSED]
end
end

# used in loop to create several 'requests'
def thread_run(replies, delay, sleep_time, mutex, refused, unix: false)
begin
sleep delay
s = connect "sleep#{sleep_time}", unix: unix
Expand All @@ -501,11 +551,7 @@ def thread_run(replies, delay, sleep_time, mutex, unix: false)
# connection was accepted but then closed
# client would see an empty response
mutex.synchronize { replies << :reset }
rescue Errno::ECONNREFUSED, Errno::EPIPE
# TCP - Errno::ECONNREFUSED, Errno::EPIPE
# TODO UNIX
# connection was never accepted it can therefore be
# re-tried before the client receives an empty response
rescue *refused
mutex.synchronize { replies << :refused }
end
end
Expand Down

0 comments on commit ea2c264

Please sign in to comment.