diff --git a/History.md b/History.md index 6f066bbd6c..7afd5d5ba7 100644 --- a/History.md +++ b/History.md @@ -57,6 +57,7 @@ * JSON parse cluster worker stats instead of regex (#2124) * Support parallel tests in verbose progress reporting (#2223) * Refactor error handling in server accept loop (#2239) + * Refactor Reactor and Client request buffering (#2279) ## 4.3.4/4.3.5 and 3.12.5/3.12.6 / 2020-05-22 diff --git a/lib/puma/client.rb b/lib/puma/client.rb index affae352f0..e492d72762 100644 --- a/lib/puma/client.rb +++ b/lib/puma/client.rb @@ -104,6 +104,15 @@ def set_timeout(val) @timeout_at = Time.now + val end + # Number of seconds until the timeout elapses. + def timeout + [@timeout_at - Time.now, 0].max + end + + def <=>(other) + @timeout_at <=> other.timeout_at + end + def reset(fast_check=true) @parser.reset @read_header = true @@ -251,14 +260,16 @@ def finish(timeout) rescue ThreadPool::ForceShutdown nil end - unless can_read - write_error(408) if in_data_phase - raise ConnectionError - end + timeout! unless can_read end true end + def timeout! + write_error(408) if in_data_phase + raise ConnectionError + end + def write_error(status_code) begin @io << ERROR_RESPONSE[status_code] diff --git a/lib/puma/queue_close.rb b/lib/puma/queue_close.rb new file mode 100644 index 0000000000..a9244b6722 --- /dev/null +++ b/lib/puma/queue_close.rb @@ -0,0 +1,24 @@ +# Queue#close was added in Ruby 2.3. +# Add a simple implementation for earlier Ruby versions. +unless Queue.instance_methods.include?(:close) + class ClosedQueueError < StandardError; end + module Puma + module QueueClose + def initialize + @closed = false + super + end + def close + @closed = true + end + def closed? + @closed + end + def push(object) + raise ClosedQueueError if @closed + super + end + end + Queue.prepend QueueClose + end +end diff --git a/lib/puma/reactor.rb b/lib/puma/reactor.rb index 7a4fce5cad..cdfad3a603 100644 --- a/lib/puma/reactor.rb +++ b/lib/puma/reactor.rb @@ -1,405 +1,103 @@ # frozen_string_literal: true -require 'puma/util' -require 'puma/minissl' - require 'nio' +require 'set' +SortedSet.new if RUBY_VERSION < '2.5' # Ruby bug #13735 + +require 'puma/queue_close' if RUBY_VERSION < '2.3' + module Puma - # Internal Docs, Not a public interface. - # - # The Reactor object is responsible for ensuring that a request has been - # completely received before it starts to be processed. This may be known as read buffering. - # If read buffering is not done, and no other read buffering is performed (such as by an application server - # such as nginx) then the application would be subject to a slow client attack. - # - # Each Puma "worker" process has its own Reactor. For example if you start puma with `$ puma -w 5` then - # it will have 5 workers and each worker will have it's own reactor. - # - # For a graphical representation of how the reactor works see [architecture.md](https://github.com/puma/puma/blob/master/docs/architecture.md#connection-pipeline). - # - # ## Reactor Flow - # - # A connection comes into a `Puma::Server` instance, it is then passed to a `Puma::Reactor` instance, - # which stores it in an array and waits for any of the connections to be ready for reading. + # Monitors a collection of IO objects, calling a block whenever + # any monitored object either receives data or times out, or when the Reactor shuts down. # - # The waiting/wake up is performed with nio4r, which will use the appropriate backend (libev, Java NIO or - # just plain IO#select). The call to `NIO::Selector#select` will "wake up" and - # return the references to any objects that caused it to "wake". The reactor - # then loops through each of these request objects, and sees if they're complete. If they - # have a full header and body then the reactor passes the request to a thread pool. - # Once in a thread pool, a "worker thread" can run the the application's Ruby code against the request. + # The waiting/wake up is performed with nio4r, which will use the appropriate backend (libev, + # Java NIO or just plain IO#select). The call to `NIO::Selector#select` will + # 'wakeup' any IO object that receives data. # - # If the request is not complete, then it stays in the array, and the next time any - # data is written to that socket reference, then the loop is woken up and it is checked for completeness again. + # This class additionally tracks a timeout for every added object, + # and wakes up any object when its timeout elapses. # - # A detailed example is given in the docs for `run_internal` which is where the bulk - # of this logic lives. + # The implementation uses a SortedSet to track monitored objects sorted by timeout, + # and a Queue to synchronize adding new objects from the internal select loop. class Reactor - DefaultSleepFor = 5 - - # Creates an instance of Puma::Reactor - # - # The `server` argument is an instance of `Puma::Server` - # that is used to write a response for "low level errors" - # when there is an exception inside of the reactor. - # - # The `app_pool` is an instance of `Puma::ThreadPool`. - # Once a request is fully formed (header and body are received) - # it will be passed to the `app_pool`. - def initialize(server, app_pool) - @server = server - @events = server.events - @app_pool = app_pool - + # Create a new Reactor to monitor IO objects added by #add. + # The provided block will be invoked when an IO has data available to read, + # its timeout elapses, or when the Reactor shuts down. + def initialize(&block) @selector = NIO::Selector.new - - @mutex = Mutex.new - - # Read / Write pipes to wake up internal while loop - @ready, @trigger = Puma::Util.pipe - @input = [] - @sleep_for = DefaultSleepFor - @timeouts = [] - - mon = @selector.register(@ready, :r) - mon.value = @ready - - @monitors = [mon] + @input = Queue.new + @timeouts = SortedSet.new + @block = block end - private - - # Until a request is added via the `add` method this method will internally - # loop, waiting on the `sockets` array objects. The only object in this - # array at first is the `@ready` IO object, which is the read end of a pipe - # connected to `@trigger` object. When `@trigger` is written to, then the loop - # will break on `NIO::Selector#select` and return an array. - # - # ## When a request is added: - # - # When the `add` method is called, an instance of `Puma::Client` is added to the `@input` array. - # Next the `@ready` pipe is "woken" by writing a string of `"*"` to `@trigger`. - # - # When that happens, the internal loop stops blocking at `NIO::Selector#select` and returns a reference - # to whatever "woke" it up. On the very first loop, the only thing in `sockets` is `@ready`. - # When `@trigger` is written-to, the loop "wakes" and the `ready` - # variable returns an array of arrays that looks like `[[#], [], []]` where the - # first IO object is the `@ready` object. This first array `[#]` - # is saved as a `reads` variable. - # - # The `reads` variable is iterated through. In the case that the object - # is the same as the `@ready` input pipe, then we know that there was a `trigger` event. - # - # If there was a trigger event, then one byte of `@ready` is read into memory. In the case of the first request, - # the reactor sees that it's a `"*"` value and the reactor adds the contents of `@input` into the `sockets` array. - # The while then loop continues to iterate again, but now the `sockets` array contains a `Puma::Client` instance in addition - # to the `@ready` IO object. For example: `[#, #]`. - # - # Since the `Puma::Client` in this example has data that has not been read yet, - # the `NIO::Selector#select` is immediately able to "wake" and read from the `Puma::Client`. At this point the - # `ready` output looks like this: `[[#], [], []]`. - # - # Each element in the first entry is iterated over. The `Puma::Client` object is not - # the `@ready` pipe, so the reactor checks to see if it has the full header and body with - # the `Puma::Client#try_to_finish` method. If the full request has been sent, - # then the request is passed off to the `@app_pool` thread pool so that a "worker thread" - # can pick up the request and begin to execute application logic. This is done - # via `@app_pool << c`. The `Puma::Client` is then removed from the `sockets` array. - # - # If the request body is not present then nothing will happen, and the loop will iterate - # again. When the client sends more data to the socket the `Puma::Client` object will - # wake up the `NIO::Selector#select` and it can again be checked to see if it's ready to be - # passed to the thread pool. - # - # ## Time Out Case - # - # In addition to being woken via a write to one of the sockets the `NIO::Selector#select` will - # periodically "time out" of the sleep. One of the functions of this is to check for - # any requests that have "timed out". At the end of the loop it's checked to see if - # the first element in the `@timeout` array has exceed its allowed time. If so, - # the client object is removed from the timeout array, a 408 response is written. - # Then its connection is closed, and the object is removed from the `sockets` array - # that watches for new data. - # - # This behavior loops until all the objects that have timed out have been removed. - # - # Once all the timeouts have been processed, the next duration of the `NIO::Selector#select` sleep - # will be set to be equal to the amount of time it will take for the next timeout to occur. - # This calculation happens in `calculate_sleep`. - def run_internal - monitors = @monitors - selector = @selector - - while true - begin - ready = selector.select @sleep_for - rescue IOError => e - Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue - if monitors.any? { |mon| mon.value.closed? } - STDERR.puts "Error in select: #{e.message} (#{e.class})" - STDERR.puts e.backtrace - - monitors.reject! do |mon| - if mon.value.closed? - selector.deregister mon.value - true - end - end - - retry - else - raise - end - end - - if ready - ready.each do |mon| - if mon.value == @ready - @mutex.synchronize do - case @ready.read(1) - when "*" - @input.each do |c| - mon = nil - begin - begin - mon = selector.register(c, :r) - rescue ArgumentError - # There is a bug where we seem to be registering an already registered - # client. This code deals with this situation but I wish we didn't have to. - monitors.delete_if { |submon| submon.value.to_io == c.to_io } - selector.deregister(c) - mon = selector.register(c, :r) - end - rescue IOError - # Means that the io is closed, so we should ignore this request - # entirely - else - mon.value = c - @timeouts << mon if c.timeout_at - monitors << mon - end - end - @input.clear - - @timeouts.sort! { |a,b| a.value.timeout_at <=> b.value.timeout_at } - calculate_sleep - when "c" - monitors.reject! do |submon| - if submon.value == @ready - false - else - if submon.value.can_close? - submon.value.close - else - # Pass remaining open client connections to the thread pool. - @app_pool << submon.value - end - begin - selector.deregister submon.value - rescue IOError - # nio4r on jruby seems to throw an IOError here if the IO is closed, so - # we need to swallow it. - end - true - end - end - when "!" - return - end - end - else - c = mon.value - - # We have to be sure to remove it from the timeout - # list or we'll accidentally close the socket when - # it's in use! - if c.timeout_at - @mutex.synchronize do - @timeouts.delete mon - end - end - - begin - if c.try_to_finish - @app_pool << c - clear_monitor mon - end - - # Don't report these to the lowlevel_error handler, otherwise - # will be flooding them with errors when persistent connections - # are closed. - rescue ConnectionError - c.write_error(500) - c.close - - clear_monitor mon - - # SSL handshake failure - rescue MiniSSL::SSLError => e - @server.lowlevel_error(e, c.env) - - ssl_socket = c.io - begin - addr = ssl_socket.peeraddr.last - # EINVAL can happen when browser closes socket w/security exception - rescue IOError, Errno::EINVAL - addr = "" - end - - cert = ssl_socket.peercert - - c.close - clear_monitor mon - - @events.ssl_error @server, addr, cert, e - - # The client doesn't know HTTP well - rescue HttpParserError => e - @server.lowlevel_error(e, c.env) - - c.write_error(400) - c.close - - clear_monitor mon - - @events.parse_error @server, c.env, e - rescue StandardError => e - @server.lowlevel_error(e, c.env) - - c.write_error(500) - c.close - - clear_monitor mon - end - end - end - end - - unless @timeouts.empty? - @mutex.synchronize do - now = Time.now - - while @timeouts.first.value.timeout_at < now - mon = @timeouts.shift - c = mon.value - c.write_error(408) if c.in_data_phase - c.close - - clear_monitor mon - - break if @timeouts.empty? - end - - calculate_sleep - end + # Run the internal select loop, using a background thread by default. + def run(background=true) + if background + @thread = Thread.new do + Puma.set_thread_name "reactor" + select_loop end + else + select_loop end end - def clear_monitor(mon) - @selector.deregister mon.value - @monitors.delete mon + # Add a new IO object to monitor. + # The object must be sortable and respond to #timeout. + def add(io) + @input << io + @selector.wakeup + rescue ClosedQueueError + @block.call(io) end - public - - def run - run_internal - ensure - @trigger.close - @ready.close + # Shutdown the reactor, blocking until the background thread is finished. + def shutdown + @input.close + @selector.wakeup + @thread.join if @thread end - def run_in_thread - @thread = Thread.new do - Puma.set_thread_name "reactor" - begin - run_internal - rescue StandardError => e - STDERR.puts "Error in reactor loop escaped: #{e.message} (#{e.class})" - STDERR.puts e.backtrace - retry - ensure - @trigger.close - @ready.close - end - end - end + private - # The `calculate_sleep` sets the value that the `NIO::Selector#select` will - # sleep for in the main reactor loop when no sockets are being written to. - # - # The values kept in `@timeouts` are sorted so that the first timeout - # comes first in the array. When there are no timeouts the default timeout is used. - # - # Otherwise a sleep value is set that is the same as the amount of time it - # would take for the first element to time out. - # - # If that value is in the past, then a sleep value of zero is used. - def calculate_sleep - if @timeouts.empty? - @sleep_for = DefaultSleepFor - else - diff = @timeouts.first.value.timeout_at.to_f - Time.now.to_f + def select_loop + begin + until @input.closed? && @input.empty? + # Wakeup any registered object that receives incoming data. + # Block until the earliest timeout or Selector#wakeup is called. + timeout = (earliest = @timeouts.first) && earliest.timeout + @selector.select(timeout) {|mon| wakeup!(mon.value)} - if diff < 0.0 - @sleep_for = 0 - else - @sleep_for = diff - end - end - end + # Wakeup all objects that timed out. + timed_out = @timeouts.take_while {|t| t.timeout == 0} + timed_out.each(&method(:wakeup!)) - # This method adds a connection to the reactor - # - # Typically called by `Puma::Server` the value passed in - # is usually a `Puma::Client` object that responds like an IO - # object. - # - # The main body of the reactor loop is in `run_internal` and it - # will sleep on `NIO::Selector#select`. When a new connection is added to the - # reactor it cannot be added directly to the `sockets` array, because - # the `NIO::Selector#select` will not be watching for it yet. - # - # Instead what needs to happen is that `NIO::Selector#select` needs to be woken up, - # the contents of `@input` added to the `sockets` array, and then - # another call to `NIO::Selector#select` needs to happen. Since the `Puma::Client` - # object can be read immediately, it does not block, but instead returns - # right away. - # - # This behavior is accomplished by writing to `@trigger` which wakes up - # the `NIO::Selector#select` and then there is logic to detect the value of `*`, - # pull the contents from `@input` and add them to the sockets array. - # - # If the object passed in has a timeout value in `timeout_at` then - # it is added to a `@timeouts` array. This array is then re-arranged - # so that the first element to timeout will be at the front of the - # array. Then a value to sleep for is derived in the call to `calculate_sleep` - def add(c) - @mutex.synchronize do - @input << c - @trigger << "*" + register(@input.pop) until @input.empty? + end + rescue StandardError => e + STDERR.puts "Error in reactor loop escaped: #{e.message} (#{e.class})" + STDERR.puts e.backtrace + retry end + # Wakeup all remaining objects on shutdown. + @timeouts.each(&method(:wakeup!)) + @selector.close end - # Close all watched sockets and clear them from being watched - def clear! - begin - @trigger << "c" - rescue IOError - Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue - end + # Start monitoring the object. + def register(io) + @selector.register(io, :r).value = io + @timeouts << io end - def shutdown - begin - @trigger << "!" - rescue IOError - Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue + # 'Wake up' a monitored object by calling the provided block. + # Stop monitoring the object if the block returns `true`. + def wakeup!(io) + if @block.call(io) + @selector.deregister(io) + @timeouts.delete(io) end - - @thread.join end end end diff --git a/lib/puma/server.rb b/lib/puma/server.rb index 8ec2a5b16f..6b71faacaa 100644 --- a/lib/puma/server.rb +++ b/lib/puma/server.rb @@ -184,55 +184,19 @@ def run(background=true) @status = :run - @thread_pool = ThreadPool.new(@min_threads, - @max_threads, - ::Puma::IOBuffer) do |client, buffer| - - # Advertise this server into the thread - Thread.current[ThreadLocalKey] = self - - process_now = false - - begin - if @queue_requests - process_now = client.eagerly_finish - else - client.finish(@first_data_timeout) - process_now = true - end - rescue MiniSSL::SSLError => e - ssl_socket = client.io - addr = ssl_socket.peeraddr.last - cert = ssl_socket.peercert - - client.close - - @events.ssl_error self, addr, cert, e - rescue HttpParserError => e - client.write_error(400) - client.close - - @events.parse_error self, client.env, e - rescue ConnectionError, EOFError - client.close - else - if process_now - process_client client, buffer - else - client.set_timeout @first_data_timeout - @reactor.add client - end - end - - process_now - end + @thread_pool = ThreadPool.new( + @min_threads, + @max_threads, + ::Puma::IOBuffer, + &method(:process_client) + ) @thread_pool.out_of_band_hook = @options[:out_of_band] @thread_pool.clean_thread_locals = @options[:clean_thread_locals] if @queue_requests - @reactor = Reactor.new self, @thread_pool - @reactor.run_in_thread + @reactor = Reactor.new(&method(:reactor_wakeup)) + @reactor.run end if @reaping_time @@ -256,6 +220,44 @@ def run(background=true) end end + # This method is called from the Reactor thread when a queued Client receives data, + # times out, or when the Reactor is shutting down. + # + # It is responsible for ensuring that a request has been completely received + # before it starts to be processed by the ThreadPool. This may be known as read buffering. + # If read buffering is not done, and no other read buffering is performed (such as by an application server + # such as nginx) then the application would be subject to a slow client attack. + # + # For a graphical representation of how the request buffer works see [architecture.md](https://github.com/puma/puma/blob/master/docs/architecture.md#connection-pipeline). + # + # The method checks to see if it has the full header and body with + # the `Puma::Client#try_to_finish` method. If the full request has been sent, + # then the request is passed to the ThreadPool (`@thread_pool << client`) + # so that a "worker thread" can pick up the request and begin to execute application logic. + # The Client is then removed from the reactor (return `true`). + # + # If a client object times out, a 408 response is written, its connection is closed, + # and the object is removed from the reactor (return `true`). + # + # If the Reactor is shutting down, all Clients are either timed out or passed to the + # ThreadPool, depending on their current state (#can_close?). + # + # Otherwise, if the full request is not ready then the client will remain in the reactor + # (return `false`). When the client sends more data to the socket the `Puma::Client` object + # will wake up and again be checked to see if it's ready to be passed to the thread pool. + def reactor_wakeup(client) + shutdown = !@queue_requests + if client.try_to_finish || (shutdown && !client.can_close?) + @thread_pool << client + elsif shutdown || client.timeout == 0 + client.timeout! + end + rescue StandardError => e + client_error(e, client) + client.close + true + end + def handle_servers @check, @notify = Puma::Util.pipe unless @notify begin @@ -308,7 +310,6 @@ def handle_servers if queue_requests @queue_requests = false - @reactor.clear! @reactor.shutdown end graceful_shutdown if @status == :stop || @status == :restart @@ -351,15 +352,27 @@ def handle_check # returning. # def process_client(client, buffer) + # Advertise this server into the thread + Thread.current[ThreadLocalKey] = self + + requests = 0 + close_socket = true + begin + if @queue_requests && + !client.eagerly_finish - clean_thread_locals = @options[:clean_thread_locals] - close_socket = true + close_socket = false + client.set_timeout(@first_data_timeout) + @reactor.add client + return + end - requests = 0 + clean_thread_locals = @options[:clean_thread_locals] + client.finish(@first_data_timeout) while true - case handle_request(client, buffer) + case handle_request(client, buffer).tap {requests += 1} when false return when :async @@ -370,8 +383,6 @@ def process_client(client, buffer) ThreadPool.clean_thread_locals if clean_thread_locals - requests += 1 - check_for_more_data = @status == :run if requests >= MAX_FAST_INLINE @@ -385,45 +396,16 @@ def process_client(client, buffer) unless client.reset(check_for_more_data) return unless @queue_requests close_socket = false - client.set_timeout @persistent_timeout + client.set_timeout(@persistent_timeout) @reactor.add client return end end end - # The client disconnected while we were reading data - rescue ConnectionError - # Swallow them. The ensure tries to close +client+ down - - # SSL handshake error - rescue MiniSSL::SSLError => e - lowlevel_error(e, client.env) - - ssl_socket = client.io - addr = ssl_socket.peeraddr.last - cert = ssl_socket.peercert - - close_socket = true - - @events.ssl_error self, addr, cert, e - - # The client doesn't know HTTP well - rescue HttpParserError => e - lowlevel_error(e, client.env) - - client.write_error(400) - - @events.parse_error self, client.env, e - - # Server error rescue StandardError => e - lowlevel_error(e, client.env) - - client.write_error(500) - - @events.unknown_error self, e, "Read" - + client_error(e, client) + # The ensure tries to close +client+ down ensure buffer.reset @@ -435,6 +417,8 @@ def process_client(client, buffer) rescue StandardError => e @events.unknown_error self, e, "Client" end + + return requests > 0 end end @@ -834,6 +818,31 @@ def read_body(env, client, body, cl) return stream end + # Handle various error types thrown by Client I/O operations. + def client_error(e, client) + return if [ConnectionError, EOFError].include?(e.class) + + lowlevel_error(e, client.env) + case e + when MiniSSL::SSLError + ssl_socket = client.io + addr = begin + ssl_socket.peeraddr.last + # EINVAL can happen when browser closes socket w/security exception + rescue IOError, Errno::EINVAL + "" + end + cert = ssl_socket.peercert + @events.ssl_error self, addr, cert, e + when HttpParserError + client.write_error(400) + @events.parse_error self, client.env, e + else + client.write_error(500) + @events.unknown_error self, e, "Read" + end + end + # A fallback rack response if +@app+ raises as exception. # def lowlevel_error(e, env, status=500)