Skip to content

Commit

Permalink
Introduce support for fiber-per-request.
Browse files Browse the repository at this point in the history
This enables the correct scope of `Fiber.storage` per request.

- Unify `clean_thread_locals` and `fiber_per_request` configuration options.
  • Loading branch information
ioquatix committed Apr 3, 2024
1 parent 6568209 commit 6cc6b48
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 37 deletions.
2 changes: 1 addition & 1 deletion lib/puma/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class Configuration
DEFAULTS = {
auto_trim_time: 30,
binds: ['tcp://0.0.0.0:9292'.freeze],
clean_thread_locals: false,
fiber_per_request: ENV["PUMA_FIBER_PER_REQUEST"] == "true",
debug: false,
early_hints: nil,
environment: 'development'.freeze,
Expand Down
11 changes: 7 additions & 4 deletions lib/puma/dsl.rb
Original file line number Diff line number Diff line change
Expand Up @@ -333,12 +333,15 @@ def idle_timeout(seconds)
@options[:idle_timeout] = Integer(seconds)
end

# Work around leaky apps that leave garbage in Thread locals
# across requests.
def clean_thread_locals(which=true)
@options[:clean_thread_locals] = which
# Use a clean fiber per request which ensures a clean slate for thread
# locals, fiber locals and fiber storage. Also provides a cleaner
# backtrace with less Puma internal stack frames.
def fiber_per_request(which=true)
@options[:fiber_per_request] = which
end

alias clean_thread_locals fiber_per_request

# When shutting down, drain the accept socket of pending connections and
# process them. This loops over the accept socket until there are no more
# read events and then stops looking and waits for the requests to finish.
Expand Down
15 changes: 12 additions & 3 deletions lib/puma/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ module Puma
#
# Each `Puma::Server` will have one reactor and one thread pool.
class Server
module FiberPerRequest
def handle_request(client, requests)
Fiber.new do
super
end.resume
end
end

include Puma::Const
include Request

Expand Down Expand Up @@ -98,6 +106,10 @@ def initialize(app, events = nil, options = {})
@io_selector_backend = @options[:io_selector_backend]
@http_content_length_limit = @options[:http_content_length_limit]

if @options[:fiber_per_request]
singleton_class.prepend(FiberPerRequest)
end

# make this a hash, since we prefer `key?` over `include?`
@supported_http_methods =
if @options[:supported_http_methods] == :any
Expand Down Expand Up @@ -442,7 +454,6 @@ def process_client(client)
# Advertise this server into the thread
Thread.current.puma_server = self

clean_thread_locals = options[:clean_thread_locals]
close_socket = true

requests = 0
Expand Down Expand Up @@ -471,8 +482,6 @@ def process_client(client)
close_socket = false
break
when true
ThreadPool.clean_thread_locals if clean_thread_locals

requests += 1

# As an optimization, try to read the next request from the
Expand Down
11 changes: 0 additions & 11 deletions lib/puma/thread_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ def initialize(name, options = {}, &block)
@shutdown_grace_time = Float(options[:pool_shutdown_grace_time] || SHUTDOWN_GRACE_TIME)
@block = block
@out_of_band = options[:out_of_band]
@clean_thread_locals = options[:clean_thread_locals]
@before_thread_start = options[:before_thread_start]
@before_thread_exit = options[:before_thread_exit]
@reaping_time = options[:reaping_time]
Expand Down Expand Up @@ -79,12 +78,6 @@ def initialize(name, options = {}, &block)

attr_reader :spawned, :trim_requested, :waiting

def self.clean_thread_locals
Thread.current.keys.each do |key| # rubocop: disable Style/HashEachMethods
Thread.current[key] = nil unless key == :__recursive_key__
end
end

# How many objects have yet to be processed by the pool?
#
def backlog
Expand Down Expand Up @@ -147,10 +140,6 @@ def spawn_thread
work = todo.shift
end

if @clean_thread_locals
ThreadPool.clean_thread_locals
end

begin
@out_of_band_pending = true if block.call(work)
rescue Exception => e
Expand Down
38 changes: 38 additions & 0 deletions test/test_fiber_local_application.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2023, by Samuel Williams.

require_relative "helper"

require "puma/server"

class FiberLocalApplication
def call(env)
# Just in case you didn't know, this is fiber local...
count = (Thread.current[:request_count] ||= 0)
Thread.current[:request_count] += 1
[200, {"Content-Type" => "text/plain"}, [count.to_s]]
end
end

class FiberLocalApplicationTest < Minitest::Test
parallelize_me!

def setup
@tester = FiberLocalApplication.new
@server = Puma::Server.new @tester, nil, {log_writer: Puma::LogWriter.strings, fiber_per_request: true}
@port = (@server.add_tcp_listener "127.0.0.1", 0).addr[1]
@tcp = "http://127.0.0.1:#{@port}"
@server.run
end

def teardown
@server.stop(true)
end

def test_empty_locals
response = hit(["#{@tcp}/test"] * 3)
assert_equal ["0", "0", "0"], response
end
end
39 changes: 39 additions & 0 deletions test/test_fiber_storage_application.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2023, by Samuel Williams.

require_relative "helper"

require "puma/server"

class FiberStorageApplication
def call(env)
count = (Fiber[:request_count] ||= 0)
Fiber[:request_count] += 1
[200, {"Content-Type" => "text/plain"}, [count.to_s]]
end
end

class FiberStorageApplicationTest < Minitest::Test
parallelize_me!

def setup
skip "Fiber Storage is not supported on this Ruby" unless Fiber.respond_to?(:[])

@tester = FiberStorageApplication.new
@server = Puma::Server.new @tester, nil, {log_writer: Puma::LogWriter.strings, fiber_per_request: true}
@port = (@server.add_tcp_listener "127.0.0.1", 0).addr[1]
@tcp = "http://127.0.0.1:#{@port}"
@server.run
end

def teardown
@server.stop(true)
end

def test_empty_storage
response = hit(["#{@tcp}/test"] * 3)
assert_equal ["0", "0", "0"], response
end
end
18 changes: 0 additions & 18 deletions test/test_thread_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -227,24 +227,6 @@ def test_autotrim
assert_equal 1, pool.spawned
end

def test_cleanliness
values = []
n = 100

pool = mutex_pool(1,1) {
values.push Thread.current[:foo]
Thread.current[:foo] = :hai
}

pool.instance_variable_set :@clean_thread_locals, true

pool << [1] * n

assert_equal n, values.length

assert_equal [], values.compact
end

def test_reap_only_dead_threads
pool = mutex_pool(2,2) do
th = Thread.current
Expand Down

0 comments on commit 6cc6b48

Please sign in to comment.