Skip to content

Commit

Permalink
Improvements to keepalive-connection shedding (puma#2628)
Browse files Browse the repository at this point in the history
* Improvements to keepalive-connection shedding

* alternative still using max_fast_inline

* fix
  • Loading branch information
wjordan authored and JuanitoFatas committed Sep 9, 2022
1 parent fa350f5 commit 1cbc98f
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 20 deletions.
3 changes: 2 additions & 1 deletion lib/puma/client.rb
Expand Up @@ -69,6 +69,7 @@ def initialize(io, env=nil)
@hijacked = false

@peerip = nil
@listener = nil
@remote_addr_header = nil

@body_remain = 0
Expand All @@ -81,7 +82,7 @@ def initialize(io, env=nil)

attr_writer :peerip

attr_accessor :remote_addr_header
attr_accessor :remote_addr_header, :listener

def_delegators :@io, :closed?

Expand Down
17 changes: 14 additions & 3 deletions lib/puma/request.rb
Expand Up @@ -26,9 +26,10 @@ module Request
# Finally, it'll return +true+ on keep-alive connections.
# @param client [Puma::Client]
# @param lines [Puma::IOBuffer]
# @param requests [Integer]
# @return [Boolean,:async]
#
def handle_request(client, lines)
def handle_request(client, lines, requests)
env = client.env
io = client.io # io may be a MiniSSL::Socket

Expand Down Expand Up @@ -110,7 +111,7 @@ def handle_request(client, lines)

cork_socket io

str_headers(env, status, headers, res_info, lines)
str_headers(env, status, headers, res_info, lines, requests, client)

line_ending = LINE_END

Expand Down Expand Up @@ -367,9 +368,11 @@ def str_early_hints(headers)
# @param headers [Hash] the headers returned by the Rack application
# @param res_info [Hash] used to pass info between this method and #handle_request
# @param lines [Puma::IOBuffer] modified inn place
# @param requests [Integer] number of inline requests handled
# @param client [Puma::Client]
# @version 5.0.3
#
def str_headers(env, status, headers, res_info, lines)
def str_headers(env, status, headers, res_info, lines, requests, client)
line_ending = LINE_END
colon = COLON

Expand Down Expand Up @@ -410,6 +413,14 @@ def str_headers(env, status, headers, res_info, lines)
# if running without request queueing
res_info[:keep_alive] &&= @queue_requests

# Close the connection after a reasonable number of inline requests
# if the server is at capacity and the listener has a new connection ready.
# This allows Puma to service connections fairly when the number
# of concurrent connections exceeds the size of the threadpool.
res_info[:keep_alive] &&= requests < @max_fast_inline ||
@thread_pool.busy_threads < @max_threads ||
!IO.select([client.listener], nil, nil, 0)

res_info[:response_hijack] = nil

headers.each do |k, vs|
Expand Down
27 changes: 11 additions & 16 deletions lib/puma/server.rb
Expand Up @@ -341,6 +341,7 @@ def handle_servers
end
drain += 1 if shutting_down?
client = Client.new io, @binder.env(sock)
client.listener = sock
if remote_addr_value
client.peerip = remote_addr_value
elsif remote_addr_header
Expand Down Expand Up @@ -434,7 +435,7 @@ def process_client(client, buffer)

while true
@requests_count += 1
case handle_request(client, buffer)
case handle_request(client, buffer, requests + 1)
when false
break
when :async
Expand All @@ -447,23 +448,17 @@ def process_client(client, buffer)

requests += 1

# Closing keepalive sockets after they've made a reasonable
# number of requests allows Puma to service many connections
# fairly, even when the number of concurrent connections exceeds
# the size of the threadpool. It also allows cluster mode Pumas
# to keep load evenly distributed across workers, because clients
# are randomly assigned a new worker when opening a new connection.
#
# Previously, Puma would kick connections in this conditional back
# to the reactor. However, because this causes the todo set to increase
# in size, the wait_until_full mutex would never unlock, leaving
# any additional connections unserviced.
break if requests >= @max_fast_inline

check_for_more_data = @status == :run
# As an optimization, try to read the next request from the
# socket for a short time before returning to the reactor.
fast_check = @status == :run

# Always pass the client back to the reactor after a reasonable
# number of inline requests if there are other requests pending.
fast_check = false if requests >= @max_fast_inline &&
@thread_pool.backlog > 0

next_request_ready = with_force_shutdown(client) do
client.reset(check_for_more_data)
client.reset(fast_check)
end

unless next_request_ready
Expand Down

0 comments on commit 1cbc98f

Please sign in to comment.