Skip to content

Commit

Permalink
Merge pull request puma#2423 from MSP-Greg/restart-ok
Browse files Browse the repository at this point in the history
Add hot_restart_does_not_drop_connections tests [changelog skip]
  • Loading branch information
MSP-Greg committed Oct 13, 2020
2 parents 0f5c7f3 + 67c686a commit 54f6911
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 49 deletions.
171 changes: 169 additions & 2 deletions test/helpers/integration.rb
Expand Up @@ -11,7 +11,6 @@ class TestIntegration < Minitest::Test
DARWIN = !!RUBY_PLATFORM[/darwin/]
HOST = "127.0.0.1"
TOKEN = "xxyyzz"
WORKERS = 2

BASE = defined?(Bundler) ? "bundle exec #{Gem.ruby} -Ilib" :
"#{Gem.ruby} -Ilib"
Expand Down Expand Up @@ -129,7 +128,7 @@ def read_body(connection, time_out = 10)
end

# gets worker pids from @server output
def get_worker_pids(phase = 0, size = WORKERS)
def get_worker_pids(phase = 0, size = workers)
pids = []
re = /pid: (\d+)\) booted, phase: #{phase}/
while pids.size < size
Expand All @@ -139,4 +138,172 @@ def get_worker_pids(phase = 0, size = WORKERS)
end
pids.map(&:to_i)
end

# used to define correct 'refused' errors
def thread_run_refused(unix: false)
if unix
[Errno::ENOENT, IOError]
else
DARWIN ? [Errno::ECONNREFUSED, Errno::EPIPE, EOFError] :
[Errno::ECONNREFUSED]
end
end

def cli_pumactl(argv, unix: false)
if unix
pumactl = IO.popen("#{BASE} bin/pumactl -C unix://#{@control_path} -T #{TOKEN} #{argv}", "r")
else
pumactl = IO.popen("#{BASE} bin/pumactl -C tcp://#{HOST}:#{@control_tcp_port} -T #{TOKEN} #{argv}", "r")
end
@ios_to_close << pumactl
Process.wait pumactl.pid
pumactl
end

def hot_restart_does_not_drop_connections(num_threads: 1, total_requests: 500)
skipped = true
skip_on :jruby, suffix: <<-MSG
- file descriptors are not preserved on exec on JRuby; connection reset errors are expected during restarts
MSG
skip_on :truffleruby, suffix: ' - Undiagnosed failures on TruffleRuby'
skip "Undiagnosed failures on Ruby 2.2" if RUBY_VERSION < '2.3'

args = "-w #{workers} -t 0:5 -q test/rackup/hello_with_delay.ru"
if Puma.windows?
@control_tcp_port = UniquePort.call
cli_server "#{args} --control-url tcp://#{HOST}:#{@control_tcp_port} --control-token #{TOKEN}"
else
cli_server args
end

skipped = false
replies = Hash.new 0
refused = thread_run_refused unix: false
message = 'A' * 16_256 # 2^14 - 128

mutex = Mutex.new
restart_count = 0
client_threads = []

num_requests = (total_requests/num_threads).to_i

num_threads.times do |thread|
client_threads << Thread.new do
num_requests.times do
begin
socket = TCPSocket.new HOST, @tcp_port
fast_write socket, "POST / HTTP/1.1\r\nContent-Length: #{message.bytesize}\r\n\r\n#{message}"
true until socket.gets == "\r\n"
body = read_body(socket, 10)
if body == "Hello World"
mutex.synchronize {
replies[:success] += 1
replies[:restart] += 1 if restart_count > 0
}
else
mutex.synchronize { replies[:unexpected_response] += 1 }
end
rescue Errno::ECONNRESET, Errno::EBADF
# connection was accepted but then closed
# client would see an empty response
# Errno::EBADF Windows may not be able to make a connection
mutex.synchronize { replies[:reset] += 1 }
rescue *refused
mutex.synchronize { replies[:refused] += 1 }
rescue ::Timeout::Error
mutex.synchronize { replies[:read_timeout] += 1 }
ensure
if socket.is_a?(IO) && !socket.closed?
begin
socket.close
rescue Errno::EBADF
end
end
end
end
# STDOUT.puts "#{thread} #{replies[:success]}"
end
end

run = true

restart_thread = Thread.new do
sleep 0.30 # let some connections in before 1st restart
while run
if Puma.windows?
cli_pumactl 'restart'
else
Process.kill :USR2, @pid
end
wait_for_server_to_boot
restart_count += 1
sleep 1
end
end

client_threads.each(&:join)
run = false
restart_thread.join
if Puma.windows?
cli_pumactl 'stop'
Process.wait @server.pid
@server = nil
end

msg = (" %4d unexpected_response\n" % replies.fetch(:unexpected_response,0)).dup
msg << " %4d refused\n" % replies.fetch(:refused,0)
msg << " %4d read timeout\n" % replies.fetch(:read_timeout,0)
msg << " %4d reset\n" % replies.fetch(:reset,0)
msg << " %4d success\n" % replies.fetch(:success,0)
msg << " %4d success after restart\n" % replies.fetch(:restart,0)
msg << " %4d restart count\n" % restart_count

reset = replies[:reset]

if Puma.windows?
# 5 is default thread count in Puma?
reset_max = num_threads > 1 ? restart_count * 5 : 5
assert_operator reset_max, :>=, reset, "#{msg}Expected reset_max >= reset errors"
else
assert_equal 0, reset, "#{msg}Expected no reset errors"
end
assert_equal 0, replies[:unexpected_response], "#{msg}Unexpected response"
assert_equal 0, replies[:refused], "#{msg}Expected no refused connections"
assert_equal 0, replies[:read_timeout], "#{msg}Expected no read timeouts"

if Puma.windows?
assert_equal (num_threads * num_requests) - reset, replies[:success]
else
assert_equal (num_threads * num_requests), replies[:success]
end

ensure
return if skipped
if passed?
msg = " restart_count #{restart_count}, reset #{reset}, success after restart #{replies[:restart]}"
$debugging_info << "#{full_name}\n#{msg}\n"
else
$debugging_info << "#{full_name}\n#{msg}\n"
end
end

def fast_write(io, str)
n = 0
while true
begin
n = io.syswrite str
rescue Errno::EAGAIN, Errno::EWOULDBLOCK => e
if !IO.select(nil, [io], nil, 5)
raise e
end

retry
rescue Errno::EPIPE, SystemCallError, IOError => e
raise e
end

return if n == str.bytesize
str = str.byteslice(n..-1)
end
end
end
4 changes: 4 additions & 0 deletions test/rackup/hello_with_delay.ru
@@ -0,0 +1,4 @@
run lambda { |env|
sleep 0.001
[200, {"Content-Type" => "text/plain"}, ["Hello World"]]
}
60 changes: 29 additions & 31 deletions test/test_integration_cluster.rb
Expand Up @@ -4,6 +4,8 @@
class TestIntegrationCluster < TestIntegration
parallelize_me!

def workers ; 2 ; end

def setup
skip NO_FORK_MSG unless HAS_FORK
super
Expand All @@ -14,12 +16,20 @@ def teardown
super
end

def test_hot_restart_does_not_drop_connections_threads
hot_restart_does_not_drop_connections num_threads: 10, total_requests: 3_000
end

def test_hot_restart_does_not_drop_connections
hot_restart_does_not_drop_connections num_threads: 1, total_requests: 1_000
end

def test_pre_existing_unix
skip UNIX_SKT_MSG unless UNIX_SKT_EXIST

File.open(@bind_path, mode: 'wb') { |f| f.puts 'pre existing' }

cli_server "-w #{WORKERS} -q test/rackup/sleep_step.ru", unix: :unix
cli_server "-w #{workers} -q test/rackup/sleep_step.ru", unix: :unix

stop_server

Expand All @@ -34,7 +44,7 @@ def test_pre_existing_unix
def test_siginfo_thread_print
skip_unless_signal_exist? :INFO

cli_server "-w #{WORKERS} -q test/rackup/hello.ru"
cli_server "-w #{workers} -q test/rackup/hello.ru"
worker_pids = get_worker_pids
output = []
t = Thread.new { output << @server.readlines }
Expand All @@ -46,7 +56,7 @@ def test_siginfo_thread_print
end

def test_usr2_restart
_, new_reply = restart_server_and_listen("-q -w #{WORKERS} test/rackup/hello.ru")
_, new_reply = restart_server_and_listen("-q -w #{workers} test/rackup/hello.ru")
assert_equal "Hello World", new_reply
end

Expand Down Expand Up @@ -84,14 +94,14 @@ def test_usr1_all_respond_unix
end

def test_term_exit_code
cli_server "-w #{WORKERS} test/rackup/hello.ru"
cli_server "-w #{workers} test/rackup/hello.ru"
_, status = stop_server

assert_equal 15, status
end

def test_term_suppress
cli_server "-w #{WORKERS} -C test/config/suppress_exception.rb test/rackup/hello.ru"
cli_server "-w #{workers} -C test/config/suppress_exception.rb test/rackup/hello.ru"

_, status = stop_server

Expand All @@ -101,7 +111,7 @@ def test_term_suppress
def test_term_worker_clean_exit
skip "Intermittent failure on Ruby 2.2" if RUBY_VERSION < '2.3'

cli_server "-w #{WORKERS} test/rackup/hello.ru"
cli_server "-w #{workers} test/rackup/hello.ru"

# Get the PIDs of the child workers.
worker_pids = get_worker_pids
Expand Down Expand Up @@ -182,13 +192,13 @@ def test_worker_index_is_with_in_options_limit

def test_refork
refork = Tempfile.new('refork')
cli_server "-w #{WORKERS} test/rackup/sleep.ru", config: <<RUBY
cli_server "-w #{workers} test/rackup/sleep.ru", config: <<RUBY
fork_worker 1
on_refork {File.write('#{refork.path}', 'Reforked')}
RUBY
pids = get_worker_pids
read_body(connect('sleep1')) until refork.read == 'Reforked'
refute_includes pids, get_worker_pids(1, WORKERS - 1)
refute_includes pids, get_worker_pids(1, workers - 1)
end

def test_fork_worker_spawn
Expand All @@ -206,7 +216,7 @@ def test_fork_worker_spawn
end

def test_nakayoshi
cli_server "-w #{WORKERS} test/rackup/hello.ru", config: <<RUBY
cli_server "-w #{workers} test/rackup/hello.ru", config: <<RUBY
nakayoshi_fork true
RUBY

Expand All @@ -229,7 +239,7 @@ def test_prune_bundler_with_multiple_workers
end

def test_load_path_includes_extra_deps
cli_server "-w #{WORKERS} -C test/config/prune_bundler_with_deps.rb test/rackup/hello-last-load-path.ru"
cli_server "-w #{workers} -C test/config/prune_bundler_with_deps.rb test/rackup/hello-last-load-path.ru"
last_load_path = read_body(connect)

assert_match(%r{gems/rdoc-[\d.]+/lib$}, last_load_path)
Expand All @@ -238,11 +248,11 @@ def test_load_path_includes_extra_deps
private

def worker_timeout(timeout, iterations, config)
cli_server "-w #{WORKERS} -t 1:1 test/rackup/hello.ru", config: config
cli_server "-w #{workers} -t 1:1 test/rackup/hello.ru", config: config

pids = []
Timeout.timeout(iterations * timeout + 1) do
(pids << @server.gets[/Terminating timed out worker: (\d+)/, 1]).compact! while pids.size < WORKERS * iterations
(pids << @server.gets[/Terminating timed out worker: (\d+)/, 1]).compact! while pids.size < workers * iterations
pids.map!(&:to_i)
end

Expand All @@ -254,7 +264,7 @@ def worker_timeout(timeout, iterations, config)
def term_closes_listeners(unix: false)
skip_unless_signal_exist? :TERM

cli_server "-w #{WORKERS} -t 0:6 -q test/rackup/sleep_step.ru", unix: unix
cli_server "-w #{workers} -t 0:6 -q test/rackup/sleep_step.ru", unix: unix
threads = []
replies = []
mutex = Mutex.new
Expand Down Expand Up @@ -318,7 +328,7 @@ def term_closes_listeners(unix: false)
# Send requests 1 per second. Send 1, then :USR1 server, then send another 24.
# All should be responded to, and at least three workers should be used
def usr1_all_respond(unix: false, config: '')
cli_server "-w #{WORKERS} -t 0:5 -q test/rackup/sleep_pid.ru #{config}", unix: unix
cli_server "-w #{workers} -t 0:5 -q test/rackup/sleep_pid.ru #{config}", unix: unix
threads = []
replies = []
mutex = Mutex.new
Expand Down Expand Up @@ -361,10 +371,10 @@ def usr1_all_respond(unix: false, config: '')
end
end

def worker_respawn(phase = 1, size = WORKERS)
def worker_respawn(phase = 1, size = workers)
threads = []

cli_server "-w #{WORKERS} -t 1:1 -C test/config/worker_shutdown_timeout_2.rb test/rackup/sleep_pid.ru"
cli_server "-w #{workers} -t 1:1 -C test/config/worker_shutdown_timeout_2.rb test/rackup/sleep_pid.ru"

# make sure two workers have booted
phase0_worker_pids = get_worker_pids
Expand Down Expand Up @@ -397,9 +407,9 @@ def worker_respawn(phase = 1, size = WORKERS)
assert_operator (Time.now.to_f - @start_time).round(2), :<, 35

msg = "phase0_worker_pids #{phase0_worker_pids.inspect} phase1_worker_pids #{phase1_worker_pids.inspect} phase0_exited #{phase0_exited.inspect}"
assert_equal WORKERS, phase0_worker_pids.length, msg
assert_equal workers, phase0_worker_pids.length, msg

assert_equal WORKERS, phase1_worker_pids.length, msg
assert_equal workers, phase1_worker_pids.length, msg
assert_empty phase0_worker_pids & phase1_worker_pids, "#{msg}\nBoth workers should be replaced with new"

assert_empty phase0_exited, msg
Expand All @@ -421,16 +431,6 @@ def bad_exit_pids(pids)
end.compact
end

# used with thread_run to define correct 'refused' errors
def thread_run_refused(unix: false)
if unix
[Errno::ENOENT, IOError]
else
DARWIN ? [Errno::ECONNREFUSED, Errno::EPIPE, EOFError] :
[Errno::ECONNREFUSED]
end
end

# used in loop to create several 'requests'
def thread_run_pid(replies, delay, sleep_time, mutex, refused, unix: false)
begin
Expand Down Expand Up @@ -466,6 +466,4 @@ def thread_run_step(replies, delay, sleep_time, step, mutex, refused, unix: fals
mutex.synchronize { replies[step] = :refused }
end
end


end
end if ::Process.respond_to?(:fork)

0 comments on commit 54f6911

Please sign in to comment.