Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor response writes, test refactors, fix UNPACK_TCP_STATE_FROM_TCP_INFO location #2554

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 15 additions & 15 deletions lib/puma/minissl.rb
Expand Up @@ -114,27 +114,27 @@ def read_nonblock(size, *_)
end
end

# Elsewhere, most socket writes use `syswrite`. When returning a large
# response body (2MB), Ubuntu works fine with `syswrite`, but macOS &
# Windows have OpenSSL errors on the client.
#
def write(data)
return 0 if data.empty?
n = 0
byte_size = data.bytesize
enc_wr = ''.dup
enc = nil

data_size = data.bytesize
need = data_size
while n < byte_size
n += @engine.write(n == 0 ? data : data.byteslice(n..-1))

while true
wrote = @engine.write data
enc_wr << enc while (enc = @engine.extract)

enc_wr = ''.dup
while (enc = @engine.extract)
enc_wr << enc
end
@socket.write enc_wr unless enc_wr.empty?

need -= wrote

return data_size if need == 0

data = data.byteslice(wrote..-1)
enc_wr.clear
end
enc.clear unless enc.nil?
byte_size
end

alias_method :syswrite, :write
Expand Down Expand Up @@ -184,7 +184,7 @@ def close
# If it can't send more packets within 1s, then give up.
return if [:timeout, :eof].include?(read_and_drop(1)) while should_drop_bytes?
rescue IOError, SystemCallError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
Thread.current.purge_interrupt_queue if SERVER::PURGE_INTERRUPT_QUEUE
# nothing
ensure
@socket.close
Expand Down
19 changes: 6 additions & 13 deletions lib/puma/request.rb
Expand Up @@ -154,19 +154,15 @@ def handle_request(client, lines)
else
fast_write io, part
end
io.flush
end

if chunked
fast_write io, CLOSE_CHUNKED
io.flush
end
fast_write io, CLOSE_CHUNKED if chunked
rescue SystemCallError, IOError
raise ConnectionError, "Connection error detected during write"
end

ensure
uncork_socket io
io.flush

body.close
client.tempfile.unlink if client.tempfile
Expand Down Expand Up @@ -196,21 +192,18 @@ def default_server_port(env)
#
def fast_write(io, str)
n = 0
while true
byte_size = str.bytesize
while n < byte_size
begin
n = io.syswrite str
n += io.syswrite(n == 0 ? str : str.byteslice(n..-1))
rescue Errno::EAGAIN, Errno::EWOULDBLOCK
if !IO.select(nil, [io], nil, WRITE_TIMEOUT)
unless IO.select(nil, [io], nil, WRITE_TIMEOUT)
raise ConnectionError, "Socket timeout writing data"
end

retry
rescue Errno::EPIPE, SystemCallError, IOError
raise ConnectionError, "Socket timeout writing data"
end

return if n == str.bytesize
str = str.byteslice(n..-1)
end
end
private :fast_write
Expand Down
37 changes: 21 additions & 16 deletions lib/puma/server.rb
Expand Up @@ -34,6 +34,8 @@ class Server
include Request
extend Forwardable

PURGE_INTERRUPT_QUEUE = ::Thread.current.respond_to? :purge_interrupt_queue

attr_reader :thread
attr_reader :events
attr_reader :min_threads, :max_threads # for #stats
Expand Down Expand Up @@ -114,49 +116,49 @@ def inherit_binder(bind)
class << self
# @!attribute [r] current
def current
Thread.current[ThreadLocalKey]
::Thread.current[ThreadLocalKey]
end

# :nodoc:
# @version 5.0.0
def tcp_cork_supported?
Socket.const_defined?(:TCP_CORK) && Socket.const_defined?(:IPPROTO_TCP)
::Socket.const_defined?(:TCP_CORK) && ::Socket.const_defined?(:IPPROTO_TCP)
end

# :nodoc:
# @version 5.0.0
def closed_socket_supported?
Socket.const_defined?(:TCP_INFO) && Socket.const_defined?(:IPPROTO_TCP)
::Socket.const_defined?(:TCP_INFO) && ::Socket.const_defined?(:IPPROTO_TCP)
end
private :tcp_cork_supported?
private :closed_socket_supported?
end

# On Linux, use TCP_CORK to better control how the TCP stack
# packetizes our stream. This improves both latency and throughput.
# socket parameter may be an MiniSSL::Socket, so use to_io
# socket parameter may be a `MiniSSL::Socket`, so use to_io
#
if tcp_cork_supported?
UNPACK_TCP_STATE_FROM_TCP_INFO = "C".freeze

# 6 == Socket::IPPROTO_TCP
# 3 == TCP_CORK
# 1/0 == turn on/off
def cork_socket(socket)
skt = socket.to_io
return unless skt.kind_of? ::TCPSocket
begin
skt.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_CORK, 1) if skt.kind_of? TCPSocket
skt.setsockopt ::Socket::IPPROTO_TCP, ::Socket::TCP_CORK, 1
rescue IOError, SystemCallError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
Thread.current.purge_interrupt_queue if PURGE_INTERRUPT_QUEUE
end
end

def uncork_socket(socket)
skt = socket.to_io
return unless skt.kind_of? ::TCPSocket
begin
skt.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_CORK, 0) if skt.kind_of? TCPSocket
skt.setsockopt ::Socket::IPPROTO_TCP, ::Socket::TCP_CORK, 0
rescue IOError, SystemCallError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
Thread.current.purge_interrupt_queue if PURGE_INTERRUPT_QUEUE
end
end
else
Expand All @@ -168,14 +170,17 @@ def uncork_socket(socket)
end

if closed_socket_supported?
UNPACK_TCP_STATE_FROM_TCP_INFO = "C".freeze

def closed_socket?(socket)
return false unless socket.kind_of? TCPSocket
skt = socket.to_io
return false unless skt.kind_of? ::TCPSocket
return false unless @precheck_closing

begin
tcp_info = socket.getsockopt(Socket::IPPROTO_TCP, Socket::TCP_INFO)
tcp_info = skt.getsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_INFO)
rescue IOError, SystemCallError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
Thread.current.purge_interrupt_queue if PURGE_INTERRUPT_QUEUE
@precheck_closing = false
false
else
Expand Down Expand Up @@ -475,7 +480,7 @@ def process_client(client, buffer)
begin
client.close if close_socket
rescue IOError, SystemCallError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
Thread.current.purge_interrupt_queue if PURGE_INTERRUPT_QUEUE
# Already closed
rescue StandardError => e
@events.unknown_error e, nil, "Client"
Expand Down Expand Up @@ -588,11 +593,11 @@ def notify_safely(message)
@notify << message
rescue IOError, NoMethodError, Errno::EPIPE
# The server, in another thread, is shutting down
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
Thread.current.purge_interrupt_queue if PURGE_INTERRUPT_QUEUE
rescue RuntimeError => e
# Temporary workaround for https://bugs.ruby-lang.org/issues/13239
if e.message.include?('IOError')
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
Thread.current.purge_interrupt_queue if PURGE_INTERRUPT_QUEUE
else
raise e
end
Expand Down
32 changes: 19 additions & 13 deletions test/test_puma_server.rb
Expand Up @@ -18,8 +18,8 @@ def setup
end

def teardown
@server.stop(true)
@ios.each { |io| io.close if io && !io.closed? }
@server.stop true
end

def server_run(app: @app, early_hints: false)
Expand Down Expand Up @@ -88,7 +88,7 @@ def test_puma_socket
end

def test_very_large_return
giant = "x" * 2056610
giant = '─x─x-x' * 209_716 # bytesize is 2_097_160 (2 MB is 2_097_152)

server_run app: ->(env) do
[200, {}, [giant]]
Expand All @@ -101,9 +101,13 @@ def test_very_large_return
break if line == "\r\n"
end

out = sock.read
body = sock.read.force_encoding('UTF-8')

assert_equal giant.bytesize, out.bytesize
assert_equal giant.bytesize, body.bytesize
# just check start and end, in case of error, we don't want 2MB dumped
# to the console
assert_equal giant[0..100], body[0..100]
assert_equal giant[-100..-1], body[-100..-1]
end

def test_respect_x_forwarded_proto
Expand Down Expand Up @@ -264,7 +268,6 @@ def test_eof_on_connection_close_is_not_logged_as_an_error

new_connection.close # Make a connection and close without writing

@server.stop(true)
stderr = @events.stderr.string
assert stderr.empty?, "Expected stderr from server to be empty but it was #{stderr.inspect}"
end
Expand All @@ -274,7 +277,7 @@ def test_force_shutdown_custom_error_message
@server = Puma::Server.new @app, @events, {:lowlevel_error_handler => handler, :force_shutdown_after => 2}

server_run app: ->(env) do
@server.stop
@server.stop true
sleep 5
end

Expand All @@ -289,7 +292,7 @@ def test_force_shutdown_error_default
@server = Puma::Server.new @app, @events, {:force_shutdown_after => 2}

server_run app: ->(env) do
@server.stop
@server.stop true
sleep 5
end

Expand Down Expand Up @@ -377,7 +380,7 @@ def test_status_hook_fires_when_server_changes_states

assert_equal [:booting, :running], states

@server.stop(true)
@server.stop true

assert_equal [:booting, :running, :stop, :done], states
end
Expand Down Expand Up @@ -1173,7 +1176,7 @@ def test_idle_connections_closed_immediately_on_shutdown
server_run
sock = new_connection
sleep 0.5 # give enough time for new connection to enter reactor
@server.stop false
@server.stop

assert IO.select([sock], nil, nil, 1), 'Unexpected timeout'
assert_raises EOFError do
Expand All @@ -1184,20 +1187,23 @@ def test_idle_connections_closed_immediately_on_shutdown
def test_run_stop_thread_safety
100.times do
thread = @server.run
@server.stop
@server.stop true
assert thread.join(1)
end
end

def test_command_ignored_before_run
@server.stop # ignored
@server.run
@server.halt
done = Queue.new
@server.events.register(:state) do |state|
done << @server.instance_variable_get(:@status) if state == :done
end

@server.stop # ignored
@server.run
@server.halt

assert_equal :halt, done.pop
assert_empty done
end

def test_custom_io_selector
Expand Down