Skip to content

Commit

Permalink
[CI] Fix intermittent integration test failures/errors (#2751)
Browse files Browse the repository at this point in the history
* [CI] helpers/integration.rb - fix read_body

* [CI] helpers/integration.rb - Windows refused

* [CI] helpers/integration.rb - exception handling code

* [CI] helpers/integration.rb - Windows fixes

* [CI] test_preserve_bundler_env.rb - remove redundant write

* [CI] test_integration_cluster.rb - allow for read timeout error

* [CI] helpers/integration.rb - darwin exception

* [CI] helpers/integration.rb - hot-restart timing
  • Loading branch information
MSP-Greg committed Dec 10, 2021
1 parent 955e193 commit 909f51e
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 30 deletions.
86 changes: 69 additions & 17 deletions test/helpers/integration.rb
Expand Up @@ -2,26 +2,33 @@

require "puma/control_cli"
require "open3"
require "io/wait"
require_relative 'tmp_path'

# Only single mode tests go here. Cluster and pumactl tests
# have their own files, use those instead
class TestIntegration < Minitest::Test
include TmpPath
DARWIN = !!RUBY_PLATFORM[/darwin/]
DARWIN = RUBY_PLATFORM.include? 'darwin'
HOST = "127.0.0.1"
TOKEN = "xxyyzz"
RESP_READ_LEN = 65_536
RESP_READ_TIMEOUT = 10
RESP_SPLIT = "\r\n\r\n"

BASE = defined?(Bundler) ? "bundle exec #{Gem.ruby} -Ilib" :
"#{Gem.ruby} -Ilib"

def setup
@server = nil
@ios_to_close = []
@bind_path = tmp_path('.sock')
end

def teardown
if defined?(@server) && @server && @pid
if @server && defined?(@control_tcp_port) && Puma.windows?
cli_pumactl 'stop'
elsif @server && @pid && !Puma.windows?
stop_server @pid, signal: :INT
end

Expand All @@ -38,7 +45,7 @@ def teardown
end

# wait until the end for OS buffering?
if defined?(@server) && @server
if @server
@server.close unless @server.closed?
@server = nil
end
Expand Down Expand Up @@ -114,15 +121,15 @@ def wait_for_server_to_boot(log: false)
end until line && line.include?('Ctrl-C')
puts "Server booted!"
else
true until @server && (@server.gets || '').include?('Ctrl-C')
sleep 0.1 until @server.is_a?(IO)
true until (@server.gets || '').include?('Ctrl-C')
end
end

def connect(path = nil, unix: false)
s = unix ? UNIXSocket.new(@bind_path) : TCPSocket.new(HOST, @tcp_port)
@ios_to_close << s
s << "GET /#{path} HTTP/1.1\r\n\r\n"
true until s.gets == "\r\n"
s
end

Expand Down Expand Up @@ -155,17 +162,59 @@ def fast_write(io, str)
end
end

def read_body(connection, time_out = 10)
Timeout.timeout(time_out) do
def read_body(connection, timeout = nil)
read_response(connection, timeout).last
end

def read_response(connection, timeout = nil)
timeout ||= RESP_READ_TIMEOUT
content_length = nil
chunked = nil
response = ''.dup
t_st = Process.clock_gettime Process::CLOCK_MONOTONIC
if connection.to_io.wait_readable timeout
loop do
response = connection.readpartial(1024)
body = response.split("\r\n\r\n", 2).last
return body if body && !body.empty?
sleep 0.01
begin
part = connection.read_nonblock(RESP_READ_LEN, exception: false)
case part
when String
unless content_length || chunked
chunked ||= part.include? "\r\nTransfer-Encoding: chunked\r\n"
content_length = (t = part[/^Content-Length: (\d+)/i , 1]) ? t.to_i : nil
end

response << part
hdrs, body = response.split RESP_SPLIT, 2
unless body.nil?
# below could be simplified, but allows for debugging...
ret =
if content_length
body.bytesize == content_length
elsif chunked
body.end_with? "\r\n0\r\n\r\n"
elsif !hdrs.empty? && !body.empty?
true
else
false
end
if ret
return [hdrs, body]
end
end
sleep 0.000_1
when :wait_readable, :wait_writable # :wait_writable for ssl
sleep 0.000_2
when nil
raise EOFError
end
if timeout < Process.clock_gettime(Process::CLOCK_MONOTONIC) - t_st
raise Timeout::Error, 'Client Read Timeout'
end
end
end
else
raise Timeout::Error, 'Client Read Timeout'
end
rescue Timeout::Error
flunk "response read_body timeout error"
end

# gets worker pids from @server output
Expand All @@ -186,7 +235,8 @@ def thread_run_refused(unix: false)
DARWIN ? [Errno::ENOENT, Errno::EPIPE, IOError] :
[IOError, Errno::ENOENT]
else
DARWIN ? [Errno::EBADF, Errno::ECONNREFUSED, Errno::EPIPE, EOFError] :
# Errno::ECONNABORTED is thrown intermittently on TCPSocket.new
DARWIN ? [Errno::EBADF, Errno::ECONNREFUSED, Errno::EPIPE, EOFError, Errno::ECONNABORTED] :
[IOError, Errno::ECONNREFUSED]
end
end
Expand Down Expand Up @@ -234,7 +284,7 @@ def hot_restart_does_not_drop_connections(num_threads: 1, total_requests: 500)

num_threads.times do |thread|
client_threads << Thread.new do
num_requests.times do
num_requests.times do |req_num|
begin
socket = TCPSocket.new HOST, @tcp_port
fast_write socket, "POST / HTTP/1.1\r\nContent-Length: #{message.bytesize}\r\n\r\n#{message}"
Expand Down Expand Up @@ -280,9 +330,10 @@ def hot_restart_does_not_drop_connections(num_threads: 1, total_requests: 500)
else
Process.kill :USR2, @pid
end
sleep 0.5
wait_for_server_to_boot
restart_count += 1
sleep 1
sleep(Puma.windows? ? 3.0 : 1.0)
end
end

Expand All @@ -309,7 +360,7 @@ def hot_restart_does_not_drop_connections(num_threads: 1, total_requests: 500)
# 5 is default thread count in Puma?
reset_max = num_threads * restart_count
assert_operator reset_max, :>=, reset, "#{msg}Expected reset_max >= reset errors"
assert_operator 30, :>=, replies[:refused], "#{msg}Too many refused connections"
assert_operator 40, :>=, replies[:refused], "#{msg}Too many refused connections"
else
assert_equal 0, reset, "#{msg}Expected no reset errors"
assert_equal 0, replies[:refused], "#{msg}Expected no refused connections"
Expand All @@ -329,6 +380,7 @@ def hot_restart_does_not_drop_connections(num_threads: 1, total_requests: 500)
msg = " restart_count #{restart_count}, reset #{reset}, success after restart #{replies[:restart]}"
$debugging_info << "#{full_name}\n#{msg}\n"
else
client_threads.each { |thr| thr.kill if thr.is_a? Thread }
$debugging_info << "#{full_name}\n#{msg}\n"
end
end
Expand Down
34 changes: 22 additions & 12 deletions test/test_integration_cluster.rb
Expand Up @@ -66,6 +66,7 @@ def test_usr2_restart

def test_term_closes_listeners_tcp
skip_unless_signal_exist? :TERM
skip "Intermittent failure on Ruby 2.2" if RUBY_VERSION < '2.3'
term_closes_listeners unix: false
end

Expand Down Expand Up @@ -376,19 +377,21 @@ def term_closes_listeners(unix: false)

threads.each(&:join)

failures = replies.count(:failure)
successes = replies.count(:success)
resets = replies.count(:reset)
refused = replies.count(:refused)
failures = replies.count(:failure)
successes = replies.count(:success)
resets = replies.count(:reset)
refused = replies.count(:refused)
read_timeouts = replies.count(:read_timeout)

r_success = replies.rindex(:success)
l_reset = replies.index(:reset)
r_reset = replies.rindex(:reset)
l_refused = replies.index(:refused)

msg = "#{successes} successes, #{resets} resets, #{refused} refused, failures #{failures}"
msg = "#{successes} successes, #{resets} resets, #{refused} refused, #{failures} failures, #{read_timeouts} read timeouts"

assert_equal 0, failures, msg
assert_equal 0, read_timeouts, msg

assert_operator 9, :<=, successes, msg

Expand Down Expand Up @@ -436,9 +439,10 @@ def usr1_all_respond(unix: false, config: '')

threads.each(&:join)

responses = replies.count { |r| r[/\ASlept 1/] }
resets = replies.count { |r| r == :reset }
refused = replies.count { |r| r == :refused }
responses = replies.count { |r| r[/\ASlept 1/] }
resets = replies.count { |r| r == :reset }
refused = replies.count { |r| r == :refused }
read_timeouts = replies.count { |r| r == :read_timeout }

# get pids from replies, generate uniq array
qty_pids = replies.map { |body| body[/\d+\z/] }.uniq.compact.length
Expand All @@ -448,11 +452,13 @@ def usr1_all_respond(unix: false, config: '')
assert_equal 25, responses, msg
assert_operator qty_pids, :>, 2, msg

msg = "#{responses} responses, #{resets} resets, #{refused} refused"
msg = "#{responses} responses, #{resets} resets, #{refused} refused, #{read_timeouts} read timeouts"

assert_equal 0, refused, msg

refute_includes replies, :refused, msg
assert_equal 0, resets, msg

refute_includes replies, :reset , msg
assert_equal 0, read_timeouts, msg
ensure
unless passed?
$debugging_info << "#{full_name}\n #{msg}\n#{replies.inspect}\n"
Expand Down Expand Up @@ -532,6 +538,8 @@ def thread_run_pid(replies, delay, sleep_time, mutex, refused, unix: false)
mutex.synchronize { replies << :reset }
rescue *refused
mutex.synchronize { replies << :refused }
rescue Timeout::Error
mutex.synchronize { replies << :read_timeout }
end
end

Expand All @@ -540,7 +548,7 @@ def thread_run_step(replies, delay, sleep_time, step, mutex, refused, unix: fals
begin
sleep delay
s = connect "sleep#{sleep_time}-#{step}", unix: unix
body = read_body(s, 15)
body = read_body(s, 20)
if body[/\ASlept /]
mutex.synchronize { replies[step] = :success }
else
Expand All @@ -552,6 +560,8 @@ def thread_run_step(replies, delay, sleep_time, step, mutex, refused, unix: fals
mutex.synchronize { replies[step] = :reset }
rescue *refused
mutex.synchronize { replies[step] = :refused }
rescue Timeout::Error
mutex.synchronize { replies[step] = :read_timeout }
end
end
end if ::Process.respond_to?(:fork)
1 change: 0 additions & 1 deletion test/test_preserve_bundler_env.rb
Expand Up @@ -86,7 +86,6 @@ def test_phased_restart_preserves_unspecified_bundle_gemfile
start_phased_restart

connection = connect
connection.write "GET / HTTP/1.1\r\n\r\n"
new_reply = read_body(connection)
expected_gemfile = File.expand_path("bundle_preservation_test/version2/Gemfile", __dir__).inspect
assert_equal(expected_gemfile, new_reply)
Expand Down

0 comments on commit 909f51e

Please sign in to comment.