Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add hot_restart_does_not_drop_connections tests [changelog skip] #2423

Merged
merged 1 commit into from
Oct 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
171 changes: 169 additions & 2 deletions test/helpers/integration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ class TestIntegration < Minitest::Test
DARWIN = !!RUBY_PLATFORM[/darwin/]
HOST = "127.0.0.1"
TOKEN = "xxyyzz"
WORKERS = 2

BASE = defined?(Bundler) ? "bundle exec #{Gem.ruby} -Ilib" :
"#{Gem.ruby} -Ilib"
Expand Down Expand Up @@ -129,7 +128,7 @@ def read_body(connection, time_out = 10)
end

# gets worker pids from @server output
def get_worker_pids(phase = 0, size = WORKERS)
def get_worker_pids(phase = 0, size = workers)
pids = []
re = /pid: (\d+)\) booted, phase: #{phase}/
while pids.size < size
Expand All @@ -139,4 +138,172 @@ def get_worker_pids(phase = 0, size = WORKERS)
end
pids.map(&:to_i)
end

# used to define correct 'refused' errors
def thread_run_refused(unix: false)
if unix
[Errno::ENOENT, IOError]
else
DARWIN ? [Errno::ECONNREFUSED, Errno::EPIPE, EOFError] :
[Errno::ECONNREFUSED]
end
end

def cli_pumactl(argv, unix: false)
if unix
pumactl = IO.popen("#{BASE} bin/pumactl -C unix://#{@control_path} -T #{TOKEN} #{argv}", "r")
else
pumactl = IO.popen("#{BASE} bin/pumactl -C tcp://#{HOST}:#{@control_tcp_port} -T #{TOKEN} #{argv}", "r")
end
@ios_to_close << pumactl
Process.wait pumactl.pid
pumactl
end

def hot_restart_does_not_drop_connections(num_threads: 1, total_requests: 500)
skipped = true
skip_on :jruby, suffix: <<-MSG
- file descriptors are not preserved on exec on JRuby; connection reset errors are expected during restarts
Copy link
Member

@cjlarose cjlarose Oct 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the behavior on JRuby should be the same as on Windows: Since exec doesn't preserve file descriptors on those platforms, puma has to unbind-and-rebind the listening socket, so we lose some connections during restart.

Is there any reason why we run the test on Windows (with the expectation that the number of connection reset errors per thread is bound from above by the number of restarts), but skip the test on JRuby? Seems like we can make the same decision in both places.

Either we can skip on both platforms (connection reset errors are expected on both, so maybe not worth having explicit tests for), or we can run it on both.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I lean towards just skipping it on both. Since we expect some connection reset errors on Windows, when this test runs on Windows, it's basically just testing that we can make requests to a running Puma server on that platform and get a successful response; it doesn't seem like it adds a lot of value on top of existing tests.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cjlarose

I don't have strong feelings either way. I doubt there are many publicly facing Windows Puma instances.

Running it locally, I found the reset errors were bounded and a very small percentage of total requests, and I think resets will normally be retried by most clients. Hence, I including Windows in the tests since if the tests start failing, something has changed for the worse. Again, not bothered by removing Windows tests.

Re JRuby and/or TruffleRuby, I'm not sure if I tried them, especially when the code was close to finished...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just added JRuby in my fork, it definitely did not pass the new tests. See:
https://github.com/MSP-Greg/puma/actions/runs/302733028

Also, see comment below ('I hate intermittent tests') for more info.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests are somewhat dependent on timing. Restarting on JRuby probably takes a bit longer than MRI Rubies, and the gap during restart will be difficult to account for.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. In my fork, I had changed the test to run for a max number of seconds instead of doing a particular number of requests. cjlarose@691a5e4

Just something to consider

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of felt that it was working, and along with more runs to see if they're stable, some of the quantities in the tests may be able to change.

I started with 'single' tests, added the threading, then added the 'cluster' tests. I added the debug output at the end of the test log, and noticed that on (probably) the threaded cluster test, all the requests were being processed before the first restart. That's when I kicked the counts up, and later added the small delay in hello_with_delay.ru.

They're all finishing fairly quickly, and seeing the cluster threaded test complete 3k requests in less than 3 seconds seemed like it could stay that way even though the 3k number could probably drop to 2k.

After seeing some runs during the busy CI time (Thurs & Fri late afternoon), we might be able to add a 'something slowed Puma down' assert. Really don't have anything like that in the test suite.

BTW, thanks for taking a look at it, and your PR's...

MSG
skip_on :truffleruby, suffix: ' - Undiagnosed failures on TruffleRuby'
skip "Undiagnosed failures on Ruby 2.2" if RUBY_VERSION < '2.3'

args = "-w #{workers} -t 0:5 -q test/rackup/hello_with_delay.ru"
if Puma.windows?
@control_tcp_port = UniquePort.call
cli_server "#{args} --control-url tcp://#{HOST}:#{@control_tcp_port} --control-token #{TOKEN}"
else
cli_server args
end

skipped = false
replies = Hash.new 0
refused = thread_run_refused unix: false
message = 'A' * 16_256 # 2^14 - 128

mutex = Mutex.new
restart_count = 0
client_threads = []

num_requests = (total_requests/num_threads).to_i

num_threads.times do |thread|
client_threads << Thread.new do
num_requests.times do
begin
socket = TCPSocket.new HOST, @tcp_port
fast_write socket, "POST / HTTP/1.1\r\nContent-Length: #{message.bytesize}\r\n\r\n#{message}"
true until socket.gets == "\r\n"
body = read_body(socket, 10)
if body == "Hello World"
mutex.synchronize {
replies[:success] += 1
replies[:restart] += 1 if restart_count > 0
}
else
mutex.synchronize { replies[:unexpected_response] += 1 }
end
rescue Errno::ECONNRESET, Errno::EBADF
# connection was accepted but then closed
# client would see an empty response
# Errno::EBADF Windows may not be able to make a connection
mutex.synchronize { replies[:reset] += 1 }
rescue *refused
mutex.synchronize { replies[:refused] += 1 }
rescue ::Timeout::Error
mutex.synchronize { replies[:read_timeout] += 1 }
ensure
if socket.is_a?(IO) && !socket.closed?
begin
socket.close
rescue Errno::EBADF
end
end
end
end
# STDOUT.puts "#{thread} #{replies[:success]}"
end
end

run = true

restart_thread = Thread.new do
sleep 0.30 # let some connections in before 1st restart
while run
if Puma.windows?
cli_pumactl 'restart'
else
Process.kill :USR2, @pid
end
wait_for_server_to_boot
restart_count += 1
sleep 1
end
end

client_threads.each(&:join)
run = false
restart_thread.join
if Puma.windows?
cli_pumactl 'stop'
Process.wait @server.pid
@server = nil
end

msg = (" %4d unexpected_response\n" % replies.fetch(:unexpected_response,0)).dup
msg << " %4d refused\n" % replies.fetch(:refused,0)
msg << " %4d read timeout\n" % replies.fetch(:read_timeout,0)
msg << " %4d reset\n" % replies.fetch(:reset,0)
msg << " %4d success\n" % replies.fetch(:success,0)
msg << " %4d success after restart\n" % replies.fetch(:restart,0)
msg << " %4d restart count\n" % restart_count

reset = replies[:reset]

if Puma.windows?
# 5 is default thread count in Puma?
reset_max = num_threads > 1 ? restart_count * 5 : 5
assert_operator reset_max, :>=, reset, "#{msg}Expected reset_max >= reset errors"
else
assert_equal 0, reset, "#{msg}Expected no reset errors"
end
assert_equal 0, replies[:unexpected_response], "#{msg}Unexpected response"
assert_equal 0, replies[:refused], "#{msg}Expected no refused connections"
assert_equal 0, replies[:read_timeout], "#{msg}Expected no read timeouts"

if Puma.windows?
assert_equal (num_threads * num_requests) - reset, replies[:success]
else
assert_equal (num_threads * num_requests), replies[:success]
end

ensure
return if skipped
if passed?
msg = " restart_count #{restart_count}, reset #{reset}, success after restart #{replies[:restart]}"
$debugging_info << "#{full_name}\n#{msg}\n"
else
$debugging_info << "#{full_name}\n#{msg}\n"
end
end

def fast_write(io, str)
n = 0
while true
begin
n = io.syswrite str
rescue Errno::EAGAIN, Errno::EWOULDBLOCK => e
if !IO.select(nil, [io], nil, 5)
raise e
end

retry
rescue Errno::EPIPE, SystemCallError, IOError => e
raise e
end

return if n == str.bytesize
str = str.byteslice(n..-1)
end
end
end
4 changes: 4 additions & 0 deletions test/rackup/hello_with_delay.ru
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
run lambda { |env|
sleep 0.001
[200, {"Content-Type" => "text/plain"}, ["Hello World"]]
}
60 changes: 29 additions & 31 deletions test/test_integration_cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
class TestIntegrationCluster < TestIntegration
parallelize_me!

def workers ; 2 ; end

def setup
skip NO_FORK_MSG unless HAS_FORK
super
Expand All @@ -14,12 +16,20 @@ def teardown
super
end

def test_hot_restart_does_not_drop_connections_threads
hot_restart_does_not_drop_connections num_threads: 10, total_requests: 3_000
end

def test_hot_restart_does_not_drop_connections
hot_restart_does_not_drop_connections num_threads: 1, total_requests: 1_000
end

def test_pre_existing_unix
skip UNIX_SKT_MSG unless UNIX_SKT_EXIST

File.open(@bind_path, mode: 'wb') { |f| f.puts 'pre existing' }

cli_server "-w #{WORKERS} -q test/rackup/sleep_step.ru", unix: :unix
cli_server "-w #{workers} -q test/rackup/sleep_step.ru", unix: :unix

stop_server

Expand All @@ -34,7 +44,7 @@ def test_pre_existing_unix
def test_siginfo_thread_print
skip_unless_signal_exist? :INFO

cli_server "-w #{WORKERS} -q test/rackup/hello.ru"
cli_server "-w #{workers} -q test/rackup/hello.ru"
worker_pids = get_worker_pids
output = []
t = Thread.new { output << @server.readlines }
Expand All @@ -46,7 +56,7 @@ def test_siginfo_thread_print
end

def test_usr2_restart
_, new_reply = restart_server_and_listen("-q -w #{WORKERS} test/rackup/hello.ru")
_, new_reply = restart_server_and_listen("-q -w #{workers} test/rackup/hello.ru")
assert_equal "Hello World", new_reply
end

Expand Down Expand Up @@ -84,14 +94,14 @@ def test_usr1_all_respond_unix
end

def test_term_exit_code
cli_server "-w #{WORKERS} test/rackup/hello.ru"
cli_server "-w #{workers} test/rackup/hello.ru"
_, status = stop_server

assert_equal 15, status
end

def test_term_suppress
cli_server "-w #{WORKERS} -C test/config/suppress_exception.rb test/rackup/hello.ru"
cli_server "-w #{workers} -C test/config/suppress_exception.rb test/rackup/hello.ru"

_, status = stop_server

Expand All @@ -101,7 +111,7 @@ def test_term_suppress
def test_term_worker_clean_exit
skip "Intermittent failure on Ruby 2.2" if RUBY_VERSION < '2.3'

cli_server "-w #{WORKERS} test/rackup/hello.ru"
cli_server "-w #{workers} test/rackup/hello.ru"

# Get the PIDs of the child workers.
worker_pids = get_worker_pids
Expand Down Expand Up @@ -182,13 +192,13 @@ def test_worker_index_is_with_in_options_limit

def test_refork
refork = Tempfile.new('refork')
cli_server "-w #{WORKERS} test/rackup/sleep.ru", config: <<RUBY
cli_server "-w #{workers} test/rackup/sleep.ru", config: <<RUBY
fork_worker 1
on_refork {File.write('#{refork.path}', 'Reforked')}
RUBY
pids = get_worker_pids
read_body(connect('sleep1')) until refork.read == 'Reforked'
refute_includes pids, get_worker_pids(1, WORKERS - 1)
refute_includes pids, get_worker_pids(1, workers - 1)
end

def test_fork_worker_spawn
Expand All @@ -206,7 +216,7 @@ def test_fork_worker_spawn
end

def test_nakayoshi
cli_server "-w #{WORKERS} test/rackup/hello.ru", config: <<RUBY
cli_server "-w #{workers} test/rackup/hello.ru", config: <<RUBY
nakayoshi_fork true
RUBY

Expand All @@ -229,7 +239,7 @@ def test_prune_bundler_with_multiple_workers
end

def test_load_path_includes_extra_deps
cli_server "-w #{WORKERS} -C test/config/prune_bundler_with_deps.rb test/rackup/hello-last-load-path.ru"
cli_server "-w #{workers} -C test/config/prune_bundler_with_deps.rb test/rackup/hello-last-load-path.ru"
last_load_path = read_body(connect)

assert_match(%r{gems/rdoc-[\d.]+/lib$}, last_load_path)
Expand All @@ -238,11 +248,11 @@ def test_load_path_includes_extra_deps
private

def worker_timeout(timeout, iterations, config)
cli_server "-w #{WORKERS} -t 1:1 test/rackup/hello.ru", config: config
cli_server "-w #{workers} -t 1:1 test/rackup/hello.ru", config: config

pids = []
Timeout.timeout(iterations * timeout + 1) do
(pids << @server.gets[/Terminating timed out worker: (\d+)/, 1]).compact! while pids.size < WORKERS * iterations
(pids << @server.gets[/Terminating timed out worker: (\d+)/, 1]).compact! while pids.size < workers * iterations
pids.map!(&:to_i)
end

Expand All @@ -254,7 +264,7 @@ def worker_timeout(timeout, iterations, config)
def term_closes_listeners(unix: false)
skip_unless_signal_exist? :TERM

cli_server "-w #{WORKERS} -t 0:6 -q test/rackup/sleep_step.ru", unix: unix
cli_server "-w #{workers} -t 0:6 -q test/rackup/sleep_step.ru", unix: unix
threads = []
replies = []
mutex = Mutex.new
Expand Down Expand Up @@ -318,7 +328,7 @@ def term_closes_listeners(unix: false)
# Send requests 1 per second. Send 1, then :USR1 server, then send another 24.
# All should be responded to, and at least three workers should be used
def usr1_all_respond(unix: false, config: '')
cli_server "-w #{WORKERS} -t 0:5 -q test/rackup/sleep_pid.ru #{config}", unix: unix
cli_server "-w #{workers} -t 0:5 -q test/rackup/sleep_pid.ru #{config}", unix: unix
threads = []
replies = []
mutex = Mutex.new
Expand Down Expand Up @@ -361,10 +371,10 @@ def usr1_all_respond(unix: false, config: '')
end
end

def worker_respawn(phase = 1, size = WORKERS)
def worker_respawn(phase = 1, size = workers)
threads = []

cli_server "-w #{WORKERS} -t 1:1 -C test/config/worker_shutdown_timeout_2.rb test/rackup/sleep_pid.ru"
cli_server "-w #{workers} -t 1:1 -C test/config/worker_shutdown_timeout_2.rb test/rackup/sleep_pid.ru"

# make sure two workers have booted
phase0_worker_pids = get_worker_pids
Expand Down Expand Up @@ -397,9 +407,9 @@ def worker_respawn(phase = 1, size = WORKERS)
assert_operator (Time.now.to_f - @start_time).round(2), :<, 35

msg = "phase0_worker_pids #{phase0_worker_pids.inspect} phase1_worker_pids #{phase1_worker_pids.inspect} phase0_exited #{phase0_exited.inspect}"
assert_equal WORKERS, phase0_worker_pids.length, msg
assert_equal workers, phase0_worker_pids.length, msg

assert_equal WORKERS, phase1_worker_pids.length, msg
assert_equal workers, phase1_worker_pids.length, msg
assert_empty phase0_worker_pids & phase1_worker_pids, "#{msg}\nBoth workers should be replaced with new"

assert_empty phase0_exited, msg
Expand All @@ -421,16 +431,6 @@ def bad_exit_pids(pids)
end.compact
end

# used with thread_run to define correct 'refused' errors
def thread_run_refused(unix: false)
if unix
[Errno::ENOENT, IOError]
else
DARWIN ? [Errno::ECONNREFUSED, Errno::EPIPE, EOFError] :
[Errno::ECONNREFUSED]
end
end

# used in loop to create several 'requests'
def thread_run_pid(replies, delay, sleep_time, mutex, refused, unix: false)
begin
Expand Down Expand Up @@ -466,6 +466,4 @@ def thread_run_step(replies, delay, sleep_time, step, mutex, refused, unix: fals
mutex.synchronize { replies[step] = :refused }
end
end


end
end if ::Process.respond_to?(:fork)