Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to out_of_band hook #2234

Merged
merged 4 commits into from Apr 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions History.md
Expand Up @@ -39,6 +39,7 @@
* Fix `out_of_band` hook never executed if the number of worker threads is > 1 (#2177)
* Fix ThreadPool#shutdown timeout accuracy (#2221)
* Fix `UserFileDefaultOptions#fetch` to properly use `default` (#2233)
* Improvements to `out_of_band` hook (#2234)
* Prefer the rackup file specified by the CLI (#2225)

* Refactor
Expand Down
11 changes: 4 additions & 7 deletions lib/puma/server.rb
Expand Up @@ -224,8 +224,11 @@ def run(background=true)
@reactor.add client
end
end

process_now
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This return value of the ThreadPool block (process_now is true when a complete request was processed by the block) is used to determine if an OOB hook should be scheduled to fire once the thread is eventually idle.

end

@thread_pool.out_of_band_hook = @options[:out_of_band]
@thread_pool.clean_thread_locals = @options[:clean_thread_locals]

if @queue_requests
Expand Down Expand Up @@ -279,6 +282,7 @@ def handle_servers
break if handle_check
else
begin
pool.wait_until_not_full
if io = sock.accept_nonblock
client = Client.new io, @binder.env(sock)
if remote_addr_value
Expand All @@ -288,7 +292,6 @@ def handle_servers
end

pool << client
pool.wait_until_not_full
end
rescue SystemCallError
# nothing
Expand Down Expand Up @@ -436,12 +439,6 @@ def process_client(client, buffer)
rescue StandardError => e
@events.unknown_error self, e, "Client"
end

if @options[:out_of_band]
@thread_pool.with_mutex do
@options[:out_of_band].each(&:call) if @thread_pool.busy_threads == 1
end
end
end
end

Expand Down
22 changes: 21 additions & 1 deletion lib/puma/thread_pool.rb
Expand Up @@ -47,6 +47,7 @@ def initialize(min, max, *extra, &block)
@shutdown = false

@trim_requested = 0
@out_of_band_pending = false

@workers = []

Expand All @@ -65,6 +66,7 @@ def initialize(min, max, *extra, &block)

attr_reader :spawned, :trim_requested, :waiting
attr_accessor :clean_thread_locals
attr_accessor :out_of_band_hook

def self.clean_thread_locals
Thread.current.keys.each do |key| # rubocop: disable Performance/HashEachMethods
Expand Down Expand Up @@ -116,6 +118,9 @@ def spawn_thread
end

@waiting += 1
if @out_of_band_pending && trigger_out_of_band_hook
@out_of_band_pending = false
end
not_full.signal
not_empty.wait mutex
@waiting -= 1
Expand All @@ -129,7 +134,7 @@ def spawn_thread
end

begin
block.call(work, *extra)
@out_of_band_pending = true if block.call(work, *extra)
rescue Exception => e
STDERR.puts "Error reached top of thread-pool: #{e.message} (#{e.class})"
end
Expand All @@ -143,6 +148,21 @@ def spawn_thread

private :spawn_thread

def trigger_out_of_band_hook
return false unless out_of_band_hook && out_of_band_hook.any?

# we execute on idle hook when all threads are free
return false unless @spawned == @waiting

out_of_band_hook.each(&:call)
true
rescue Exception => e
STDERR.puts "Exception calling out_of_band_hook: #{e.message} (#{e.class})"
true
end

private :trigger_out_of_band_hook

def with_mutex(&block)
@mutex.owned? ?
yield :
Expand Down
159 changes: 159 additions & 0 deletions test/test_out_of_band_server.rb
@@ -0,0 +1,159 @@
require_relative "helper"
require "puma/events"

class TestOutOfBandServer < Minitest::Test
parallelize_me!

def setup
@ios = []
@server = nil
@oob_finished = ConditionVariable.new
@app_finished = ConditionVariable.new
end

def teardown
@oob_finished.broadcast
@app_finished.broadcast
@server.stop(true) if @server
@ios.each {|i| i.close unless i.closed?}
end

def new_connection
TCPSocket.new('127.0.0.1', @server.connected_ports[0]).tap {|s| @ios << s}
rescue IOError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
retry
end

def send_http(req)
new_connection << req
end

def send_http_and_read(req)
send_http(req).read
end

def oob_server(**options)
@request_count = 0
@oob_count = 0
in_oob = Mutex.new
@mutex = Mutex.new
oob_wait = options.delete(:oob_wait)
oob = -> do
in_oob.synchronize do
@mutex.synchronize do
@oob_count += 1
@oob_finished.signal
@oob_finished.wait(@mutex, 1) if oob_wait
end
end
end
app_wait = options.delete(:app_wait)
app = ->(_) do
raise 'OOB conflict' if in_oob.locked?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I like that approach.

@mutex.synchronize do
@request_count += 1
@app_finished.signal
@app_finished.wait(@mutex, 1) if app_wait
end
[200, {}, [""]]
end

@server = Puma::Server.new app, Puma::Events.strings, out_of_band: [oob], **options
@server.min_threads = options[:min_threads] || 1
@server.max_threads = options[:max_threads] || 1
@server.add_tcp_listener '127.0.0.1', 0
@server.run
end

# Sequential requests should trigger out_of_band after every request.
def test_sequential
n = 100
oob_server
n.times do
@mutex.synchronize do
send_http "GET / HTTP/1.0\r\n\r\n"
@oob_finished.wait(@mutex, 1)
end
end
assert_equal n, @request_count
assert_equal n, @oob_count
end

# Stream of requests on concurrent connections should trigger
# out_of_band hooks only once after the final request.
def test_stream
oob_server app_wait: true, max_threads: 2
n = 100
Array.new(n) {send_http("GET / HTTP/1.0\r\n\r\n")}
Thread.pass until @request_count == n
@mutex.synchronize do
@app_finished.signal
@oob_finished.wait(@mutex, 1)
end
assert_equal n, @request_count
assert_equal 1, @oob_count
end

# New requests should not get processed while OOB is running.
def test_request_overlapping_hook
oob_server oob_wait: true, max_threads: 2

# Establish connection for Req2 before OOB
req2 = new_connection
sleep 0.01

@mutex.synchronize do
send_http "GET / HTTP/1.0\r\n\r\n"
@oob_finished.wait(@mutex) # enter OOB

# Send Req2
req2 << "GET / HTTP/1.0\r\n\r\n"
# If Req2 is processed now it raises 'OOB Conflict' in the response.
sleep 0.01

@oob_finished.signal # exit OOB
# Req2 should be processed now.
@oob_finished.wait(@mutex, 1) # enter OOB
@oob_finished.signal # exit OOB
end

refute_match(/OOB conflict/, req2.read)
end

# Partial requests should not trigger OOB.
def test_partial_request
oob_server
new_connection.close
sleep 0.01
assert_equal 0, @oob_count
end

# OOB should be triggered following a completed request
# concurrent with other partial requests.
def test_partial_concurrent
oob_server max_threads: 2
@mutex.synchronize do
send_http("GET / HTTP/1.0\r\n\r\n")
100.times {new_connection.close}
@oob_finished.wait(@mutex, 1)
end
assert_equal 1, @oob_count
end

# OOB should block new connections from being accepted.
def test_blocks_new_connection
oob_server oob_wait: true, max_threads: 2
@mutex.synchronize do
send_http("GET / HTTP/1.0\r\n\r\n")
@oob_finished.wait(@mutex)
end
accepted = false
io = @server.binder.ios.last
io.stub(:accept_nonblock, -> {accepted = true; new_connection}) do
new_connection.close
sleep 0.01
end
refute accepted, 'New connection accepted during out of band'
end
end
81 changes: 0 additions & 81 deletions test/test_puma_server.rb
Expand Up @@ -984,85 +984,4 @@ def test_http10_connection_header_no_queue
assert_equal ["HTTP/1.0 200 OK", "Content-Length: 0"], header(sock)
sock.close
end

def oob_server(**options)
@request_count = 0
@oob_count = 0
@start = Time.now
in_oob = Mutex.new
@mutex = Mutex.new
@oob_finished = ConditionVariable.new
oob_wait = options.delete(:oob_wait)
oob = -> do
in_oob.synchronize do
@mutex.synchronize do
@oob_count += 1
@oob_finished.signal
@oob_finished.wait(@mutex, 1) if oob_wait
end
end
end
@server = Puma::Server.new @app, @events, out_of_band: [oob], **options
@server.min_threads = 5
@server.max_threads = 5
server_run app: ->(_) do
raise 'OOB conflict' if in_oob.locked?
@mutex.synchronize {@request_count += 1}
[200, {}, [""]]
end
end

# Sequential requests should trigger out_of_band hooks after every request.
def test_out_of_band
n = 100
oob_server queue_requests: false
n.times do
@mutex.synchronize do
send_http "GET / HTTP/1.0\r\n\r\n"
@oob_finished.wait(@mutex, 1)
end
end
assert_equal n, @request_count
assert_equal n, @oob_count
end

# Streaming requests on parallel connections without delay should trigger
# out_of_band hooks only once after the final request.
def test_out_of_band_stream
n = 100
threads = 10
oob_server
req = "GET / HTTP/1.1\r\n"
@mutex.synchronize do
Array.new(threads) do
Thread.new do
send_http "#{req}\r\n" * (n/threads-1) + "#{req}Connection: close\r\n\r\n"
end
end.each(&:join)
@oob_finished.wait(@mutex, 1)
end
assert_equal n, @request_count
assert_equal 1, @oob_count
end

def test_out_of_band_overlapping_requests
oob_server oob_wait: true
sock = nil
sock2 = nil
@mutex.synchronize do
sock2 = send_http "GET / HTTP/1.0\r\n"
sleep 0.01
sock = send_http "GET / HTTP/1.0\r\n\r\n"
# Request 1 processed
@oob_finished.wait(@mutex) # enter oob
sock2 << "\r\n"
sleep 0.01
@oob_finished.signal # exit oob
# Request 2 processed
@oob_finished.wait(@mutex) # enter oob
@oob_finished.signal # exit oob
end
assert_match(/200/, sock.read)
assert_match(/200/, sock2.read)
end
end