Skip to content

Commit

Permalink
Further improvements to out-of-band hook
Browse files Browse the repository at this point in the history
* Don't trigger OOB on partially-queued requests
* Don't accept new connections during OOB
  • Loading branch information
wjordan committed Apr 24, 2020
1 parent 92bae74 commit 3aa150f
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 123 deletions.
2 changes: 1 addition & 1 deletion History.md
Expand Up @@ -38,7 +38,7 @@
* Fixed a few minor concurrency bugs in ThreadPool that may have affected non-GVL Rubies (#2220)
* Fix `out_of_band` hook never executed if the number of worker threads is > 1 (#2177)
* Fix ThreadPool#shutdown timeout accuracy (#2221)
* Call `out_of_band` hook outside of busy loop (#2230)
* Improvements to `out_of_band` hook (#xxxx)

* Refactor
* Remove unused loader argument from Plugin initializer (#2095)
Expand Down
44 changes: 24 additions & 20 deletions lib/puma/server.rb
Expand Up @@ -224,6 +224,8 @@ def run(background=true)
@reactor.add client
end
end

process_now
end

@thread_pool.out_of_band_hook = @options[:out_of_band]
Expand Down Expand Up @@ -279,27 +281,15 @@ def handle_servers
if sock == check
break if handle_check
else
begin
if io = sock.accept_nonblock
client = Client.new io, @binder.env(sock)
if remote_addr_value
client.peerip = remote_addr_value
elsif remote_addr_header
client.remote_addr_header = remote_addr_header
end

pool << client
pool.wait_until_not_full
end
rescue SystemCallError
# nothing
rescue Errno::ECONNABORTED
# client closed the socket even before accept
begin
io.close
rescue
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
pool.wait_until_not_full
if (io = accept_socket(sock))
client = Client.new io, @binder.env(sock)
if remote_addr_value
client.peerip = remote_addr_value
elsif remote_addr_header
client.remote_addr_header = remote_addr_header
end
pool << client
end
end
end
Expand Down Expand Up @@ -327,6 +317,20 @@ def handle_servers
@events.fire :state, :done
end

def accept_socket(sock)
sock.accept_nonblock
rescue SystemCallError
# nothing
rescue Errno::ECONNABORTED
# client closed the socket even before accept
begin
sock.close
rescue
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
end
end
private :accept_socket

# :nodoc:
def handle_check
cmd = @check.read(1)
Expand Down
39 changes: 18 additions & 21 deletions lib/puma/thread_pool.rb
Expand Up @@ -47,6 +47,7 @@ def initialize(min, max, *extra, &block)
@shutdown = false

@trim_requested = 0
@out_of_band_pending = false

@workers = []

Expand Down Expand Up @@ -117,8 +118,10 @@ def spawn_thread
end

@waiting += 1
if @out_of_band_pending && trigger_out_of_band_hook
@out_of_band_pending = false
end
not_full.signal
trigger_out_of_band_hook
not_empty.wait mutex
@waiting -= 1
end
Expand All @@ -131,7 +134,7 @@ def spawn_thread
end

begin
block.call(work, *extra)
@out_of_band_pending = true if block.call(work, *extra)
rescue Exception => e
STDERR.puts "Error reached top of thread-pool: #{e.message} (#{e.class})"
end
Expand All @@ -146,29 +149,23 @@ def spawn_thread
private :spawn_thread

def trigger_out_of_band_hook
return unless out_of_band_hook && out_of_band_hook.any?

with_mutex do |mutex|
# we execute on idle hook when all threads are free
return unless @spawned == @waiting

# we unlock thread for the duration of out of band hook
# to allow other requests to be accepted
mutex.unlock

begin
out_of_band_hook.each(&:call)
rescue Exception => e
STDERR.puts "Exception calling out_of_band_hook: #{e.message} (#{e.class})"
ensure
mutex.lock
end
end
return false unless out_of_band_hook && out_of_band_hook.any?

# we execute on idle hook when all threads are free
return false unless @spawned == @waiting

out_of_band_hook.each(&:call)
true
rescue Exception => e
STDERR.puts "Exception calling out_of_band_hook: #{e.message} (#{e.class})"
true
end

private :trigger_out_of_band_hook

def with_mutex(&block)
@mutex.owned? ?
yield(@mutex) :
yield :
@mutex.synchronize(&block)
end

Expand Down
158 changes: 158 additions & 0 deletions test/test_out_of_band_server.rb
@@ -0,0 +1,158 @@
require_relative "helper"
require "puma/events"

class TestOutOfBandServer < Minitest::Test
parallelize_me!

def setup
@ios = []
@server = nil
@oob_finished = ConditionVariable.new
@app_finished = ConditionVariable.new
end

def teardown
@ios.each {|i| i.close unless i.closed?}
@oob_finished.broadcast
@app_finished.broadcast
@server.stop(true) if @server
end

def new_connection
TCPSocket.new('127.0.0.1', @server.connected_ports[0]).tap {|s| @ios << s}
rescue IOError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
retry
end

def send_http(req)
new_connection << req
end

def send_http_and_read(req)
send_http(req).read
end

def oob_server(**options)
@request_count = 0
@oob_count = 0
in_oob = Mutex.new
@mutex = Mutex.new
oob_wait = options.delete(:oob_wait)
oob = -> do
in_oob.synchronize do
@mutex.synchronize do
@oob_count += 1
@oob_finished.signal
@oob_finished.wait(@mutex, 1) if oob_wait
end
end
end
app_wait = options.delete(:app_wait)
app = ->(_) do
raise 'OOB conflict' if in_oob.locked?
@mutex.synchronize do
@request_count += 1
@app_finished.signal
@app_finished.wait(@mutex, 1) if app_wait
end
[200, {}, [""]]
end

@server = Puma::Server.new app, Puma::Events.strings, out_of_band: [oob], **options
@server.min_threads = options[:min_threads] || 1
@server.max_threads = options[:max_threads] || 1
@server.add_tcp_listener '127.0.0.1', 0
@server.run
end

# Sequential requests should trigger out_of_band after every request.
def test_sequential
n = 100
oob_server
n.times do
@mutex.synchronize do
send_http "GET / HTTP/1.0\r\n\r\n"
@oob_finished.wait(@mutex, 1)
end
end
assert_equal n, @request_count
assert_equal n, @oob_count
end

# Stream of requests on concurrent connections should trigger
# out_of_band hooks only once after the final request.
def test_stream
oob_server app_wait: true, max_threads: 2
n = 100
Array.new(n) {send_http("GET / HTTP/1.0\r\n\r\n")}
Thread.pass until @request_count == n
@mutex.synchronize do
@app_finished.signal
@oob_finished.wait(@mutex, 1)
end
assert_equal n, @request_count
assert_equal 1, @oob_count
end

# New requests should not get processed while OOB is running.
def test_request_overlapping_hook
oob_server oob_wait: true, max_threads: 2

# Establish connection for Req2 before OOB
req2 = new_connection
sleep 0.01

@mutex.synchronize do
send_http "GET / HTTP/1.0\r\n\r\n"
@oob_finished.wait(@mutex) # enter OOB

# Send Req2
req2 << "GET / HTTP/1.0\r\n\r\n"
# If Req2 is processed now it raises 'OOB Conflict' in the response.
sleep 0.01

@oob_finished.signal # exit OOB
# Req2 should be processed now.
@oob_finished.wait(@mutex, 1) # enter OOB
@oob_finished.signal # exit OOB
end

refute_match(/OOB conflict/, req2.read)
end

# Partial requests should not trigger OOB.
def test_partial_request
oob_server
new_connection.close
sleep 0.01
assert_equal 0, @oob_count
end

# OOB should be triggered following a completed request
# concurrent with other partial requests.
def test_partial_concurrent
oob_server max_threads: 2
@mutex.synchronize do
send_http("GET / HTTP/1.0\r\n\r\n")
100.times {new_connection.close}
@oob_finished.wait(@mutex, 1)
end
assert_equal 1, @oob_count
end

# OOB should block new connections from being accepted.
def test_blocks_new_connection
oob_server oob_wait: true, max_threads: 2
@mutex.synchronize do
send_http("GET / HTTP/1.0\r\n\r\n")
@oob_finished.wait(@mutex)
end
accepted = false
@server.stub(:accept_socket, ->(_) {accepted = true; false}) do
new_connection
sleep 0.01
end
refute accepted, 'New connection accepted during out of band'
end
end
81 changes: 0 additions & 81 deletions test/test_puma_server.rb
Expand Up @@ -984,85 +984,4 @@ def test_http10_connection_header_no_queue
assert_equal ["HTTP/1.0 200 OK", "Content-Length: 0"], header(sock)
sock.close
end

def oob_server(**options)
@request_count = 0
@oob_count = 0
@start = Time.now
in_oob = Mutex.new
@mutex = Mutex.new
@oob_finished = ConditionVariable.new
oob_wait = options.delete(:oob_wait)
oob = -> do
in_oob.synchronize do
@mutex.synchronize do
@oob_count += 1
@oob_finished.signal
@oob_finished.wait(@mutex, 1) if oob_wait
end
end
end
@server = Puma::Server.new @app, @events, out_of_band: [oob], **options
@server.min_threads = 5
@server.max_threads = 5
server_run app: ->(_) do
raise 'OOB conflict' if in_oob.locked?
@mutex.synchronize {@request_count += 1}
[200, {}, [""]]
end
end

# Sequential requests should trigger out_of_band hooks after every request.
def test_out_of_band
n = 100
oob_server queue_requests: false
n.times do
@mutex.synchronize do
send_http "GET / HTTP/1.0\r\n\r\n"
@oob_finished.wait(@mutex, 1)
end
end
assert_equal n, @request_count
assert_equal n, @oob_count
end

# Streaming requests on parallel connections without delay should trigger
# out_of_band hooks only once after the final request.
def test_out_of_band_stream
n = 100
threads = 10
oob_server
req = "GET / HTTP/1.1\r\n"
@mutex.synchronize do
Array.new(threads) do
Thread.new do
send_http "#{req}\r\n" * (n/threads-1) + "#{req}Connection: close\r\n\r\n"
end
end.each(&:join)
@oob_finished.wait(@mutex, 1)
end
assert_equal n, @request_count
assert_equal 1, @oob_count
end

def test_out_of_band_overlapping_requests
oob_server oob_wait: true
sock = nil
sock2 = nil
@mutex.synchronize do
sock2 = send_http "GET / HTTP/1.0\r\n"
sleep 0.01
sock = send_http "GET / HTTP/1.0\r\n\r\n"
# Request 1 processed
@oob_finished.wait(@mutex) # enter oob
sock2 << "\r\n"
sleep 0.01
@oob_finished.signal # exit oob
# Request 2 processed
@oob_finished.wait(@mutex) # enter oob
@oob_finished.signal # exit oob
end
assert_match(/200/, sock.read)
assert_match(/200/, sock2.read)
end
end

0 comments on commit 3aa150f

Please sign in to comment.