diff --git a/History.md b/History.md index 94e72bc609..8067a8e12e 100644 --- a/History.md +++ b/History.md @@ -39,6 +39,7 @@ * Fix `out_of_band` hook never executed if the number of worker threads is > 1 (#2177) * Fix ThreadPool#shutdown timeout accuracy (#2221) * Fix `UserFileDefaultOptions#fetch` to properly use `default` (#2233) + * Improvements to `out_of_band` hook (#2234) * Prefer the rackup file specified by the CLI (#2225) * Refactor diff --git a/lib/puma/server.rb b/lib/puma/server.rb index b5269f0ddd..a467e23715 100644 --- a/lib/puma/server.rb +++ b/lib/puma/server.rb @@ -224,8 +224,11 @@ def run(background=true) @reactor.add client end end + + process_now end + @thread_pool.out_of_band_hook = @options[:out_of_band] @thread_pool.clean_thread_locals = @options[:clean_thread_locals] if @queue_requests @@ -279,6 +282,7 @@ def handle_servers break if handle_check else begin + pool.wait_until_not_full if io = sock.accept_nonblock client = Client.new io, @binder.env(sock) if remote_addr_value @@ -288,7 +292,6 @@ def handle_servers end pool << client - pool.wait_until_not_full end rescue SystemCallError # nothing @@ -436,12 +439,6 @@ def process_client(client, buffer) rescue StandardError => e @events.unknown_error self, e, "Client" end - - if @options[:out_of_band] - @thread_pool.with_mutex do - @options[:out_of_band].each(&:call) if @thread_pool.busy_threads == 1 - end - end end end diff --git a/lib/puma/thread_pool.rb b/lib/puma/thread_pool.rb index d6a27c41de..4dcf2f08ab 100644 --- a/lib/puma/thread_pool.rb +++ b/lib/puma/thread_pool.rb @@ -47,6 +47,7 @@ def initialize(min, max, *extra, &block) @shutdown = false @trim_requested = 0 + @out_of_band_pending = false @workers = [] @@ -65,6 +66,7 @@ def initialize(min, max, *extra, &block) attr_reader :spawned, :trim_requested, :waiting attr_accessor :clean_thread_locals + attr_accessor :out_of_band_hook def self.clean_thread_locals Thread.current.keys.each do |key| # rubocop: disable Performance/HashEachMethods @@ -116,6 +118,9 @@ def spawn_thread end @waiting += 1 + if @out_of_band_pending && trigger_out_of_band_hook + @out_of_band_pending = false + end not_full.signal not_empty.wait mutex @waiting -= 1 @@ -129,7 +134,7 @@ def spawn_thread end begin - block.call(work, *extra) + @out_of_band_pending = true if block.call(work, *extra) rescue Exception => e STDERR.puts "Error reached top of thread-pool: #{e.message} (#{e.class})" end @@ -143,6 +148,21 @@ def spawn_thread private :spawn_thread + def trigger_out_of_band_hook + return false unless out_of_band_hook && out_of_band_hook.any? + + # we execute on idle hook when all threads are free + return false unless @spawned == @waiting + + out_of_band_hook.each(&:call) + true + rescue Exception => e + STDERR.puts "Exception calling out_of_band_hook: #{e.message} (#{e.class})" + true + end + + private :trigger_out_of_band_hook + def with_mutex(&block) @mutex.owned? ? yield : diff --git a/test/test_out_of_band_server.rb b/test/test_out_of_band_server.rb new file mode 100644 index 0000000000..d81be8985e --- /dev/null +++ b/test/test_out_of_band_server.rb @@ -0,0 +1,159 @@ +require_relative "helper" +require "puma/events" + +class TestOutOfBandServer < Minitest::Test + parallelize_me! + + def setup + @ios = [] + @server = nil + @oob_finished = ConditionVariable.new + @app_finished = ConditionVariable.new + end + + def teardown + @oob_finished.broadcast + @app_finished.broadcast + @server.stop(true) if @server + @ios.each {|i| i.close unless i.closed?} + end + + def new_connection + TCPSocket.new('127.0.0.1', @server.connected_ports[0]).tap {|s| @ios << s} + rescue IOError + Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue + retry + end + + def send_http(req) + new_connection << req + end + + def send_http_and_read(req) + send_http(req).read + end + + def oob_server(**options) + @request_count = 0 + @oob_count = 0 + in_oob = Mutex.new + @mutex = Mutex.new + oob_wait = options.delete(:oob_wait) + oob = -> do + in_oob.synchronize do + @mutex.synchronize do + @oob_count += 1 + @oob_finished.signal + @oob_finished.wait(@mutex, 1) if oob_wait + end + end + end + app_wait = options.delete(:app_wait) + app = ->(_) do + raise 'OOB conflict' if in_oob.locked? + @mutex.synchronize do + @request_count += 1 + @app_finished.signal + @app_finished.wait(@mutex, 1) if app_wait + end + [200, {}, [""]] + end + + @server = Puma::Server.new app, Puma::Events.strings, out_of_band: [oob], **options + @server.min_threads = options[:min_threads] || 1 + @server.max_threads = options[:max_threads] || 1 + @server.add_tcp_listener '127.0.0.1', 0 + @server.run + end + + # Sequential requests should trigger out_of_band after every request. + def test_sequential + n = 100 + oob_server + n.times do + @mutex.synchronize do + send_http "GET / HTTP/1.0\r\n\r\n" + @oob_finished.wait(@mutex, 1) + end + end + assert_equal n, @request_count + assert_equal n, @oob_count + end + + # Stream of requests on concurrent connections should trigger + # out_of_band hooks only once after the final request. + def test_stream + oob_server app_wait: true, max_threads: 2 + n = 100 + Array.new(n) {send_http("GET / HTTP/1.0\r\n\r\n")} + Thread.pass until @request_count == n + @mutex.synchronize do + @app_finished.signal + @oob_finished.wait(@mutex, 1) + end + assert_equal n, @request_count + assert_equal 1, @oob_count + end + + # New requests should not get processed while OOB is running. + def test_request_overlapping_hook + oob_server oob_wait: true, max_threads: 2 + + # Establish connection for Req2 before OOB + req2 = new_connection + sleep 0.01 + + @mutex.synchronize do + send_http "GET / HTTP/1.0\r\n\r\n" + @oob_finished.wait(@mutex) # enter OOB + + # Send Req2 + req2 << "GET / HTTP/1.0\r\n\r\n" + # If Req2 is processed now it raises 'OOB Conflict' in the response. + sleep 0.01 + + @oob_finished.signal # exit OOB + # Req2 should be processed now. + @oob_finished.wait(@mutex, 1) # enter OOB + @oob_finished.signal # exit OOB + end + + refute_match(/OOB conflict/, req2.read) + end + + # Partial requests should not trigger OOB. + def test_partial_request + oob_server + new_connection.close + sleep 0.01 + assert_equal 0, @oob_count + end + + # OOB should be triggered following a completed request + # concurrent with other partial requests. + def test_partial_concurrent + oob_server max_threads: 2 + @mutex.synchronize do + send_http("GET / HTTP/1.0\r\n\r\n") + 100.times {new_connection.close} + @oob_finished.wait(@mutex, 1) + end + assert_equal 1, @oob_count + end + + # OOB should block new connections from being accepted. + def test_blocks_new_connection + oob_server oob_wait: true, max_threads: 2 + @mutex.synchronize do + send_http("GET / HTTP/1.0\r\n\r\n") + @oob_finished.wait(@mutex) + end + accepted = false + io = @server.binder.ios.last + io.stub(:accept_nonblock, -> {accepted = true; new_connection}) do + new_connection.close + sleep 0.01 + end + refute accepted, 'New connection accepted during out of band' + end +end diff --git a/test/test_puma_server.rb b/test/test_puma_server.rb index b7fdb3b215..3179579c72 100644 --- a/test/test_puma_server.rb +++ b/test/test_puma_server.rb @@ -984,85 +984,4 @@ def test_http10_connection_header_no_queue assert_equal ["HTTP/1.0 200 OK", "Content-Length: 0"], header(sock) sock.close end - - def oob_server(**options) - @request_count = 0 - @oob_count = 0 - @start = Time.now - in_oob = Mutex.new - @mutex = Mutex.new - @oob_finished = ConditionVariable.new - oob_wait = options.delete(:oob_wait) - oob = -> do - in_oob.synchronize do - @mutex.synchronize do - @oob_count += 1 - @oob_finished.signal - @oob_finished.wait(@mutex, 1) if oob_wait - end - end - end - @server = Puma::Server.new @app, @events, out_of_band: [oob], **options - @server.min_threads = 5 - @server.max_threads = 5 - server_run app: ->(_) do - raise 'OOB conflict' if in_oob.locked? - @mutex.synchronize {@request_count += 1} - [200, {}, [""]] - end - end - - # Sequential requests should trigger out_of_band hooks after every request. - def test_out_of_band - n = 100 - oob_server queue_requests: false - n.times do - @mutex.synchronize do - send_http "GET / HTTP/1.0\r\n\r\n" - @oob_finished.wait(@mutex, 1) - end - end - assert_equal n, @request_count - assert_equal n, @oob_count - end - - # Streaming requests on parallel connections without delay should trigger - # out_of_band hooks only once after the final request. - def test_out_of_band_stream - n = 100 - threads = 10 - oob_server - req = "GET / HTTP/1.1\r\n" - @mutex.synchronize do - Array.new(threads) do - Thread.new do - send_http "#{req}\r\n" * (n/threads-1) + "#{req}Connection: close\r\n\r\n" - end - end.each(&:join) - @oob_finished.wait(@mutex, 1) - end - assert_equal n, @request_count - assert_equal 1, @oob_count - end - - def test_out_of_band_overlapping_requests - oob_server oob_wait: true - sock = nil - sock2 = nil - @mutex.synchronize do - sock2 = send_http "GET / HTTP/1.0\r\n" - sleep 0.01 - sock = send_http "GET / HTTP/1.0\r\n\r\n" - # Request 1 processed - @oob_finished.wait(@mutex) # enter oob - sock2 << "\r\n" - sleep 0.01 - @oob_finished.signal # exit oob - # Request 2 processed - @oob_finished.wait(@mutex) # enter oob - @oob_finished.signal # exit oob - end - assert_match(/200/, sock.read) - assert_match(/200/, sock2.read) - end end