/
integration.rb
331 lines (293 loc) · 9.34 KB
/
integration.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# frozen_string_literal: true
require "puma/control_cli"
require "open3"
require_relative 'tmp_path'
# Only single mode tests go here. Cluster and pumactl tests
# have their own files, use those instead
class TestIntegration < Minitest::Test
include TmpPath
DARWIN = !!RUBY_PLATFORM[/darwin/]
HOST = "127.0.0.1"
TOKEN = "xxyyzz"
BASE = defined?(Bundler) ? "bundle exec #{Gem.ruby} -Ilib" :
"#{Gem.ruby} -Ilib"
def setup
@ios_to_close = []
@bind_path = tmp_path('.sock')
end
def teardown
if defined?(@server) && @server && @pid
stop_server @pid, signal: :INT
end
if @ios_to_close
@ios_to_close.each do |io|
io.close if io.is_a?(IO) && !io.closed?
io = nil
end
end
if @bind_path
refute File.exist?(@bind_path), "Bind path must be removed after stop"
File.unlink(@bind_path) rescue nil
end
# wait until the end for OS buffering?
if defined?(@server) && @server
@server.close unless @server.closed?
@server = nil
end
end
private
def silent_and_checked_system_command(*args)
assert(system(*args, out: File::NULL, err: File::NULL))
end
def cli_server(argv, unix: false, config: nil, merge_err: false)
if config
config_file = Tempfile.new(%w(config .rb))
config_file.write config
config_file.close
config = "-C #{config_file.path}"
end
puma_path = File.expand_path '../../../bin/puma', __FILE__
if unix
cmd = "#{BASE} #{puma_path} #{config} -b unix://#{@bind_path} #{argv}"
else
@tcp_port = UniquePort.call
cmd = "#{BASE} #{puma_path} #{config} -b tcp://#{HOST}:#{@tcp_port} #{argv}"
end
if merge_err
@server = IO.popen(cmd, "r", :err=>[:child, :out])
else
@server = IO.popen(cmd, "r")
end
wait_for_server_to_boot
@pid = @server.pid
@server
end
# rescue statements are just in case method is called with a server
# that is already stopped/killed, especially since Process.wait2 is
# blocking
def stop_server(pid = @pid, signal: :TERM)
begin
Process.kill signal, pid
rescue Errno::ESRCH
end
begin
Process.wait2 pid
rescue Errno::ECHILD
end
end
def restart_server_and_listen(argv)
cli_server argv
connection = connect
initial_reply = read_body(connection)
restart_server connection
[initial_reply, read_body(connect)]
end
# reuses an existing connection to make sure that works
def restart_server(connection, log: false)
Process.kill :USR2, @pid
connection.write "GET / HTTP/1.1\r\n\r\n" # trigger it to start by sending a new request
wait_for_server_to_boot(log: log)
end
# wait for server to say it booted
def wait_for_server_to_boot(log: false)
if log
puts "Waiting for server to boot..."
begin
line = @server.gets
puts line if line && line.strip != ''
end while line !~ /Ctrl-C/
puts "Server booted!"
else
true while @server.gets !~ /Ctrl-C/
end
end
def connect(path = nil, unix: false)
s = unix ? UNIXSocket.new(@bind_path) : TCPSocket.new(HOST, @tcp_port)
@ios_to_close << s
s << "GET /#{path} HTTP/1.1\r\n\r\n"
true until s.gets == "\r\n"
s
end
# use only if all socket writes are fast
# does not wait for a read
def fast_connect(path = nil, unix: false)
s = unix ? UNIXSocket.new(@bind_path) : TCPSocket.new(HOST, @tcp_port)
@ios_to_close << s
fast_write s, "GET /#{path} HTTP/1.1\r\n\r\n"
s
end
def fast_write(io, str)
n = 0
while true
begin
n = io.syswrite str
rescue Errno::EAGAIN, Errno::EWOULDBLOCK => e
if !IO.select(nil, [io], nil, 5)
raise e
end
retry
rescue Errno::EPIPE, SystemCallError, IOError => e
raise e
end
return if n == str.bytesize
str = str.byteslice(n..-1)
end
end
def read_body(connection, time_out = 10)
Timeout.timeout(time_out) do
loop do
response = connection.readpartial(1024)
body = response.split("\r\n\r\n", 2).last
return body if body && !body.empty?
sleep 0.01
end
end
end
# gets worker pids from @server output
def get_worker_pids(phase = 0, size = workers)
pids = []
re = /PID: (\d+)\) booted in [.0-9]+s, phase: #{phase}/
while pids.size < size
if pid = @server.gets[re, 1]
pids << pid
end
end
pids.map(&:to_i)
end
# used to define correct 'refused' errors
def thread_run_refused(unix: false)
if unix
DARWIN ? [Errno::ENOENT, Errno::EPIPE, IOError] :
[Errno::ENOENT, IOError]
else
DARWIN ? [Errno::EBADF, Errno::ECONNREFUSED, Errno::EPIPE, EOFError] :
[Errno::ECONNREFUSED]
end
end
def cli_pumactl(argv, unix: false)
arg =
if unix
%W[-C unix://#{@control_path} -T #{TOKEN} #{argv}]
else
%W[-C tcp://#{HOST}:#{@control_tcp_port} -T #{TOKEN} #{argv}]
end
r, w = IO.pipe
Thread.new { Puma::ControlCLI.new(arg, w, w).run }.join
w.close
@ios_to_close << r
r
end
def hot_restart_does_not_drop_connections(num_threads: 1, total_requests: 500)
skipped = true
skip_if :jruby, suffix: <<-MSG
- file descriptors are not preserved on exec on JRuby; connection reset errors are expected during restarts
MSG
skip_if :truffleruby, suffix: ' - Undiagnosed failures on TruffleRuby'
skip "Undiagnosed failures on Ruby 2.2" if RUBY_VERSION < '2.3'
args = "-w #{workers} -t 0:5 -q test/rackup/hello_with_delay.ru"
if Puma.windows?
@control_tcp_port = UniquePort.call
cli_server "--control-url tcp://#{HOST}:#{@control_tcp_port} --control-token #{TOKEN} #{args}"
else
cli_server args
end
skipped = false
replies = Hash.new 0
refused = thread_run_refused unix: false
message = 'A' * 16_256 # 2^14 - 128
mutex = Mutex.new
restart_count = 0
client_threads = []
num_requests = (total_requests/num_threads).to_i
num_threads.times do |thread|
client_threads << Thread.new do
num_requests.times do
begin
socket = TCPSocket.new HOST, @tcp_port
fast_write socket, "POST / HTTP/1.1\r\nContent-Length: #{message.bytesize}\r\n\r\n#{message}"
body = read_body(socket, 10)
if body == "Hello World"
mutex.synchronize {
replies[:success] += 1
replies[:restart] += 1 if restart_count > 0
}
else
mutex.synchronize { replies[:unexpected_response] += 1 }
end
rescue Errno::ECONNRESET, Errno::EBADF
# connection was accepted but then closed
# client would see an empty response
# Errno::EBADF Windows may not be able to make a connection
mutex.synchronize { replies[:reset] += 1 }
rescue *refused, IOError
# IOError intermittently thrown by Ubuntu, add to allow retry
mutex.synchronize { replies[:refused] += 1 }
rescue ::Timeout::Error
mutex.synchronize { replies[:read_timeout] += 1 }
ensure
if socket.is_a?(IO) && !socket.closed?
begin
socket.close
rescue Errno::EBADF
end
end
end
end
# STDOUT.puts "#{thread} #{replies[:success]}"
end
end
run = true
restart_thread = Thread.new do
sleep 0.30 # let some connections in before 1st restart
while run
if Puma.windows?
cli_pumactl 'restart'
else
Process.kill :USR2, @pid
end
wait_for_server_to_boot
restart_count += 1
sleep 1
end
end
client_threads.each(&:join)
run = false
restart_thread.join
if Puma.windows?
cli_pumactl 'stop'
Process.wait @server.pid
@server = nil
end
msg = (" %4d unexpected_response\n" % replies.fetch(:unexpected_response,0)).dup
msg << " %4d refused\n" % replies.fetch(:refused,0)
msg << " %4d read timeout\n" % replies.fetch(:read_timeout,0)
msg << " %4d reset\n" % replies.fetch(:reset,0)
msg << " %4d success\n" % replies.fetch(:success,0)
msg << " %4d success after restart\n" % replies.fetch(:restart,0)
msg << " %4d restart count\n" % restart_count
reset = replies[:reset]
if Puma.windows?
# 5 is default thread count in Puma?
reset_max = num_threads * restart_count
assert_operator reset_max, :>=, reset, "#{msg}Expected reset_max >= reset errors"
else
assert_equal 0, reset, "#{msg}Expected no reset errors"
end
assert_equal 0, replies[:unexpected_response], "#{msg}Unexpected response"
assert_equal 0, replies[:refused], "#{msg}Expected no refused connections"
assert_equal 0, replies[:read_timeout], "#{msg}Expected no read timeouts"
if Puma.windows?
assert_equal (num_threads * num_requests) - reset, replies[:success]
else
assert_equal (num_threads * num_requests), replies[:success]
end
ensure
return if skipped
if passed?
msg = " restart_count #{restart_count}, reset #{reset}, success after restart #{replies[:restart]}"
$debugging_info << "#{full_name}\n#{msg}\n"
else
$debugging_info << "#{full_name}\n#{msg}\n"
end
end
end