Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] Various test-suite speed/reliability improvements #2241

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/puma.yml
Expand Up @@ -9,6 +9,8 @@ jobs:
env:
CI: true
TESTOPTS: -v
MT_CPU: 10
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Niiice.

JRUBY_OPTS: --dev

runs-on: ${{ matrix.os }}-latest
if: |
Expand Down
2 changes: 2 additions & 0 deletions History.md
Expand Up @@ -40,6 +40,8 @@
* Fix ThreadPool#shutdown timeout accuracy (#2221)
* Fix `UserFileDefaultOptions#fetch` to properly use `default` (#2233)
* Prefer the rackup file specified by the CLI (#2225)
* Improvements to `out_of_band` hook (#2234)
* Fire `on_booted` after server starts

* Refactor
* Remove unused loader argument from Plugin initializer (#2095)
Expand Down
12 changes: 1 addition & 11 deletions Rakefile
Expand Up @@ -72,18 +72,8 @@ else
end

namespace :test do
desc "Run the integration tests"

task :integration do
sh "ruby test/shell/run.rb"
end

desc "Run all tests"
if (Puma.jruby? && ENV['TRAVIS']) || Puma.windows?
task :all => :test
else
task :all => [:test, "test:integration"]
end
task :all => :test
end

task :default => [:rubocop, "test:all"]
2 changes: 1 addition & 1 deletion lib/puma/app/status.rb
Expand Up @@ -25,7 +25,7 @@ def call(env)
rack_response(200, OK_STATUS)

when /\/halt$/
@cli.halt
Thread.new {@cli.halt}
rack_response(200, OK_STATUS)

when /\/restart$/
Expand Down
10 changes: 7 additions & 3 deletions lib/puma/binder.rb
Expand Up @@ -74,11 +74,11 @@ def create_activated_fds(env_hash)
env_hash['LISTEN_FDS'].to_i.times do |index|
sock = TCPServer.for_fd(socket_activation_fd(index))
key = begin # Try to parse as a path
[:unix, Socket.unpack_sockaddr_un(sock.getsockname)]
rescue ArgumentError # Try to parse as a port/ip
port, addr = Socket.unpack_sockaddr_in(sock.getsockname)
addr = "[#{addr}]" if addr =~ /\:/
[:tcp, addr, port]
rescue ArgumentError # Try to parse as a port/ip
[:unix, Socket.unpack_sockaddr_un(sock.getsockname)]
end
@activated_sockets[key] = sock
@events.debug "Registered #{key.join ':'} for activation from LISTEN_FDS"
Expand Down Expand Up @@ -165,7 +165,11 @@ def parse(binds, logger, log_msg = 'Listening')
logger.log "* Activated #{str}"
else
io = add_ssl_listener uri.host, uri.port, ctx
logger.log "* Listening on #{str}"
if io.is_a?(TCPServer) && uri.port == 0
uri.port = io.connect_address.ip_port
end

logger.log "* #{log_msg} on #{uri.to_s}"
end

@listeners << [str, io] if io
Expand Down
14 changes: 11 additions & 3 deletions lib/puma/client.rb
Expand Up @@ -142,10 +142,12 @@ def reset(fast_check=true)

def close
begin
@io.close
rescue IOError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
@io.shutdown(:WR) if @io.respond_to?(:shutdown)
rescue Errno::ENOTCONN #ignore
end
@io.close
rescue IOError, Errno::EBADF
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
end

def try_to_finish
Expand Down Expand Up @@ -259,6 +261,12 @@ def finish(timeout)
true
end

def cancel
finish 0
rescue ConnectionError
close
end

def write_error(status_code)
begin
@io << ERROR_RESPONSE[status_code]
Expand Down
34 changes: 18 additions & 16 deletions lib/puma/cluster.rb
Expand Up @@ -261,29 +261,25 @@ def worker(index, master)
server.stop
end

begin
@worker_write << "b#{Process.pid}\n"
rescue SystemCallError, IOError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
STDERR.puts "Master seems to have exited, exiting."
return
end
server_thread = server.run

Thread.new(@worker_write) do |io|
Puma.set_thread_name "stat payload"

while true
sleep Const::WORKER_CHECK_INTERVAL
begin
io << "p#{Process.pid}#{server.stats.to_json}\n"
rescue IOError
rescue SystemCallError, IOError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
STDERR.puts "Master seems to have exited, exiting."
server.stop
break
end
sleep Const::WORKER_CHECK_INTERVAL
end
end

server.run.join
server_thread.join

# Invoke any worker shutdown hooks so they can prevent the worker
# exiting until any background operations are completed
Expand Down Expand Up @@ -467,9 +463,9 @@ def run
stop
end

@launcher.events.fire_on_booted!

begin
booted = false

while @status == :run
begin
if @phased_restart
Expand All @@ -491,17 +487,23 @@ def run

if w = @workers.find { |x| x.pid == pid }
case req
when "b"
w.boot!
log "- Worker #{w.index} (pid: #{pid}) booted, phase: #{w.phase}"
@next_check = Time.now
when "e"
# external term, see worker method, Signal.trap "SIGTERM"
w.instance_variable_set :@term, true
@next_check = Time.now + 0.1
when "t"
w.term unless w.term?
when "p"
w.ping!(result.sub(/^\d+/,'').chomp)
unless w.booted?
w.boot!
log "- Worker #{w.index} (pid: #{pid}) booted, phase: #{w.phase}"
if !booted && all_workers_booted?
@launcher.events.fire_on_booted!
booted = true
end
@next_check = Time.now
end
end
else
log "! Out-of-sync worker list, no #{pid} worker"
Expand Down
1 change: 1 addition & 0 deletions lib/puma/control_cli.rb
Expand Up @@ -196,6 +196,7 @@ def send_request
if uri.scheme == "ssl"
server.sysclose
else
server.shutdown(:WR) if server.respond_to?(:shutdown)
server.close unless server.closed?
end
end
Expand Down
10 changes: 2 additions & 8 deletions lib/puma/dsl.rb
Expand Up @@ -606,19 +606,13 @@ def tag(string)
# not a request timeout, it is to protect against a hung or dead process.
# Setting this value will not protect against slow requests.
#
# The minimum value is 6 seconds, the default value is 60 seconds.
# The default value is 60 seconds.
#
# @note Cluster mode only.
# @example
# worker_timeout 60
def worker_timeout(timeout)
timeout = Integer(timeout)
min = Const::WORKER_CHECK_INTERVAL

if timeout <= min
raise "The minimum worker_timeout must be greater than the worker reporting interval (#{min})"
end

timeout = Float(timeout)
@options[:worker_timeout] = timeout
end

Expand Down
6 changes: 6 additions & 0 deletions lib/puma/events.rb
Expand Up @@ -67,14 +67,17 @@ def register(hook, obj=nil, &blk)
#
def log(str)
@stdout.puts format(str)
rescue Errno::EPIPE # ignore
end

def write(str)
@stdout.write format(str)
rescue Errno::EPIPE # ignore
end

def debug(str)
log("% #{str}") if @debug
rescue Errno::EPIPE # ignore
end

# Write +str+ to +@stderr+
Expand All @@ -97,6 +100,7 @@ def parse_error(server, env, error)
"(#{env[HTTP_X_FORWARDED_FOR] || env[REMOTE_ADDR]}#{env[REQUEST_PATH]}): " \
"#{error.inspect}" \
"\n---\n"
rescue Errno::EPIPE # ignore
end

# An SSL error has occurred.
Expand All @@ -106,6 +110,7 @@ def parse_error(server, env, error)
def ssl_error(server, peeraddr, peercert, error)
subject = peercert ? peercert.subject : nil
@stderr.puts "#{Time.now}: SSL error, peer: #{peeraddr}, peer cert: #{subject}, #{error.inspect}"
rescue Errno::EPIPE # ignore
end

# An unknown error has occurred.
Expand All @@ -125,6 +130,7 @@ def unknown_error(server, error, kind="Unknown", env=nil)
string_block << error.backtrace
@stderr.puts string_block.join("\n")
end
rescue Errno::EPIPE # ignore
end

def on_booted(&block)
Expand Down
7 changes: 7 additions & 0 deletions lib/puma/io_stopgap.rb
@@ -0,0 +1,7 @@
if %w(2.2.7 2.2.8 2.2.9 2.2.10 2.3.4 2.4.1).include? RUBY_VERSION
begin
require 'stopgap_13632'
rescue LoadError
STDERR.puts "WARNING: For stability, you should install the stopgap_13632 gem."
end
end
6 changes: 4 additions & 2 deletions lib/puma/minissl.rb
Expand Up @@ -139,12 +139,14 @@ def should_drop_bytes?
@engine.init? || !@engine.shutdown
end

SHUTDOWN_TIMEOUT = 1 # seconds

def close
begin
# Read any drop any partially initialized sockets and any received bytes during shutdown.
# Don't let this socket hold this loop forever.
# If it can't send more packets within 1s, then give up.
return if [:timeout, :eof].include?(read_and_drop(1)) while should_drop_bytes?
# If it can't send more packets within SHUTDOWN_TIMEOUT, then give up.
return if [:timeout, :eof].include?(read_and_drop(SHUTDOWN_TIMEOUT)) while should_drop_bytes?
rescue IOError, SystemCallError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
# nothing
Expand Down