From ffa5d56b845971557dccaf50628705535473650b Mon Sep 17 00:00:00 2001 From: Will Jordan Date: Thu, 20 May 2021 09:16:02 -0700 Subject: [PATCH] Improvements to keepalive-connection shedding (#2628) * Improvements to keepalive-connection shedding * alternative still using max_fast_inline * fix --- lib/puma/client.rb | 3 ++- lib/puma/request.rb | 17 ++++++++++++++--- lib/puma/server.rb | 27 +++++++++++---------------- 3 files changed, 27 insertions(+), 20 deletions(-) diff --git a/lib/puma/client.rb b/lib/puma/client.rb index b6895bbef3..8b203376a8 100644 --- a/lib/puma/client.rb +++ b/lib/puma/client.rb @@ -69,6 +69,7 @@ def initialize(io, env=nil) @hijacked = false @peerip = nil + @listener = nil @remote_addr_header = nil @body_remain = 0 @@ -81,7 +82,7 @@ def initialize(io, env=nil) attr_writer :peerip - attr_accessor :remote_addr_header + attr_accessor :remote_addr_header, :listener def_delegators :@io, :closed? diff --git a/lib/puma/request.rb b/lib/puma/request.rb index 08b0354819..82db731f7a 100644 --- a/lib/puma/request.rb +++ b/lib/puma/request.rb @@ -26,9 +26,10 @@ module Request # Finally, it'll return +true+ on keep-alive connections. # @param client [Puma::Client] # @param lines [Puma::IOBuffer] + # @param requests [Integer] # @return [Boolean,:async] # - def handle_request(client, lines) + def handle_request(client, lines, requests) env = client.env io = client.io # io may be a MiniSSL::Socket @@ -110,7 +111,7 @@ def handle_request(client, lines) cork_socket io - str_headers(env, status, headers, res_info, lines) + str_headers(env, status, headers, res_info, lines, requests, client) line_ending = LINE_END @@ -367,9 +368,11 @@ def str_early_hints(headers) # @param headers [Hash] the headers returned by the Rack application # @param res_info [Hash] used to pass info between this method and #handle_request # @param lines [Puma::IOBuffer] modified inn place + # @param requests [Integer] number of inline requests handled + # @param client [Puma::Client] # @version 5.0.3 # - def str_headers(env, status, headers, res_info, lines) + def str_headers(env, status, headers, res_info, lines, requests, client) line_ending = LINE_END colon = COLON @@ -410,6 +413,14 @@ def str_headers(env, status, headers, res_info, lines) # if running without request queueing res_info[:keep_alive] &&= @queue_requests + # Close the connection after a reasonable number of inline requests + # if the server is at capacity and the listener has a new connection ready. + # This allows Puma to service connections fairly when the number + # of concurrent connections exceeds the size of the threadpool. + res_info[:keep_alive] &&= requests < @max_fast_inline || + @thread_pool.busy_threads < @max_threads || + !IO.select([client.listener], nil, nil, 0) + res_info[:response_hijack] = nil headers.each do |k, vs| diff --git a/lib/puma/server.rb b/lib/puma/server.rb index 32bdbd2eca..bf2dbc4f4d 100644 --- a/lib/puma/server.rb +++ b/lib/puma/server.rb @@ -341,6 +341,7 @@ def handle_servers end drain += 1 if shutting_down? client = Client.new io, @binder.env(sock) + client.listener = sock if remote_addr_value client.peerip = remote_addr_value elsif remote_addr_header @@ -434,7 +435,7 @@ def process_client(client, buffer) while true @requests_count += 1 - case handle_request(client, buffer) + case handle_request(client, buffer, requests + 1) when false break when :async @@ -447,23 +448,17 @@ def process_client(client, buffer) requests += 1 - # Closing keepalive sockets after they've made a reasonable - # number of requests allows Puma to service many connections - # fairly, even when the number of concurrent connections exceeds - # the size of the threadpool. It also allows cluster mode Pumas - # to keep load evenly distributed across workers, because clients - # are randomly assigned a new worker when opening a new connection. - # - # Previously, Puma would kick connections in this conditional back - # to the reactor. However, because this causes the todo set to increase - # in size, the wait_until_full mutex would never unlock, leaving - # any additional connections unserviced. - break if requests >= @max_fast_inline - - check_for_more_data = @status == :run + # As an optimization, try to read the next request from the + # socket for a short time before returning to the reactor. + fast_check = @status == :run + + # Always pass the client back to the reactor after a reasonable + # number of inline requests if there are other requests pending. + fast_check = false if requests >= @max_fast_inline && + @thread_pool.backlog > 0 next_request_ready = with_force_shutdown(client) do - client.reset(check_for_more_data) + client.reset(fast_check) end unless next_request_ready