Skip to content

Commit

Permalink
Move threadpool init out of server (#2942)
Browse files Browse the repository at this point in the history
* Simplify `ThreadPool` initializations code in `Puma::Server`

This commit introduces four `ThreadPool` options in `Configuration::DEFAULTS`:
`auto_trim_time`, `reaping_time`, `clean_thread_locals`, and
`out_of_band_hook` they could be configured via file/user options
`Puma::Configuration`.

The auto reap/trim methods stay in `Puma::Server` because the way we test
in `Puma::Server` tests.

Adds `slice(keys)` method to `UserFileDefaultOptions` so it acts
like a Hash and we could read `ThreadPool` options from the user-file-default options.

Adds missing require statement to `test/test_puma_server.rb`, so this test
could run individually.

Co-Authored-By: Shohei Umemoto <cafedomancer@gmail.com>
Co-Authored-By: Nate Berkopec <nate.berkopec@gmail.com>

* Fix out of band

* Fixup lib files

* Fixup tests

Co-authored-by: Juanito Fatas <me@juanitofatas.com>
Co-authored-by: Shohei Umemoto <cafedomancer@gmail.com>
Co-authored-by: MSP-Greg <Greg.mpls@gmail.com>
  • Loading branch information
4 people committed Sep 15, 2022
1 parent 5199ff2 commit 317e890
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 38 deletions.
4 changes: 4 additions & 0 deletions lib/puma/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ def final_options
# configuration files.
class Configuration
DEFAULTS = {
auto_trim_time: 30,
binds: ['tcp://0.0.0.0:9292'.freeze],
clean_thread_locals: false,
debug: false,
early_hints: nil,
environment: 'development'.freeze,
Expand All @@ -145,11 +147,13 @@ class Configuration
min_threads: 0,
mode: :http,
mutate_stdout_and_stderr_to_sync_on_write: true,
out_of_band: [],
# Number of seconds for another request within a persistent session.
persistent_timeout: 20,
queue_requests: true,
rackup: 'config.ru'.freeze,
raise_exception_on_sigterm: true,
reaping_time: 1,
remote_address: :socket,
silence_single_worker_warning: false,
tag: File.basename(Dir.getwd),
Expand Down
23 changes: 3 additions & 20 deletions lib/puma/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ module Puma
#
# Each `Puma::Server` will have one reactor and one thread pool.
class Server

include Puma::Const
include Request
extend Forwardable
Expand Down Expand Up @@ -81,9 +80,6 @@ def initialize(app, log_writer=LogWriter.stdio, events=Events.new, options = {})
@check, @notify = nil
@status = :stop

@auto_trim_time = 30
@reaping_time = 1

@thread = nil
@thread_pool = nil

Expand Down Expand Up @@ -235,29 +231,16 @@ def run(background=true, thread_name: 'srv')

@status = :run

@thread_pool = ThreadPool.new(
thread_name,
@min_threads,
@max_threads,
::Puma::IOBuffer,
&method(:process_client)
)

@thread_pool.out_of_band_hook = @options[:out_of_band]
@thread_pool.clean_thread_locals = @options[:clean_thread_locals]
@thread_pool = ThreadPool.new(thread_name, @options, &method(:process_client))

if @queue_requests
@reactor = Reactor.new(@io_selector_backend, &method(:reactor_wakeup))
@reactor.run
end

if @reaping_time
@thread_pool.auto_reap!(@reaping_time)
end

if @auto_trim_time
@thread_pool.auto_trim!(@auto_trim_time)
end
@thread_pool.auto_reap! if @options[:reaping_time]
@thread_pool.auto_trim! if @options[:auto_trim_time]

@check, @notify = Puma::Util.pipe unless @notify

Expand Down
25 changes: 14 additions & 11 deletions lib/puma/thread_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

require 'thread'

require 'puma/io_buffer'

module Puma
# Internal Docs for A simple thread pool management object.
#
Expand Down Expand Up @@ -29,7 +31,7 @@ class ForceShutdown < RuntimeError
# The block passed is the work that will be performed in each
# thread.
#
def initialize(name, min, max, *extra, &block)
def initialize(name, options = {}, &block)
@not_empty = ConditionVariable.new
@not_full = ConditionVariable.new
@mutex = Mutex.new
Expand All @@ -40,10 +42,14 @@ def initialize(name, min, max, *extra, &block)
@waiting = 0

@name = name
@min = Integer(min)
@max = Integer(max)
@min = Integer(options[:min_threads])
@max = Integer(options[:max_threads])
@block = block
@extra = extra
@extra = [::Puma::IOBuffer]
@out_of_band = options[:out_of_band]
@clean_thread_locals = options[:clean_thread_locals]
@reaping_time = options[:reaping_time]
@auto_trim_time = options[:auto_trim_time]

@shutdown = false

Expand All @@ -62,14 +68,11 @@ def initialize(name, min, max, *extra, &block)
end
end

@clean_thread_locals = false
@force_shutdown = false
@shutdown_mutex = Mutex.new
end

attr_reader :spawned, :trim_requested, :waiting
attr_accessor :clean_thread_locals
attr_accessor :out_of_band_hook # @version 5.0.0

def self.clean_thread_locals
Thread.current.keys.each do |key| # rubocop: disable Style/HashEachMethods
Expand Down Expand Up @@ -160,12 +163,12 @@ def spawn_thread

# @version 5.0.0
def trigger_out_of_band_hook
return false unless out_of_band_hook && out_of_band_hook.any?
return false unless @out_of_band && @out_of_band.any?

# we execute on idle hook when all threads are free
return false unless @spawned == @waiting

out_of_band_hook.each(&:call)
@out_of_band.each(&:call)
true
rescue Exception => e
STDERR.puts "Exception calling out_of_band_hook: #{e.message} (#{e.class})"
Expand Down Expand Up @@ -319,12 +322,12 @@ def stop
end
end

def auto_trim!(timeout=30)
def auto_trim!(timeout=@auto_trim_time)
@auto_trim = Automaton.new(self, timeout, "#{@name} threadpool trimmer", :trim)
@auto_trim.start!
end

def auto_reap!(timeout=5)
def auto_reap!(timeout=@reaping_time)
@reaper = Automaton.new(self, timeout, "#{@name} threadpool reaper", :reap)
@reaper.start!
end
Expand Down
5 changes: 3 additions & 2 deletions test/test_busy_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ def with_server(**options, &app)
end
end

options[:min_threads] ||= 0
options[:max_threads] ||= 10

@server = Puma::Server.new request_handler, Puma::LogWriter.strings, Puma::Events.new, **options
@server.min_threads = options[:min_threads] || 0
@server.max_threads = options[:max_threads] || 10
@port = (@server.add_tcp_listener '127.0.0.1', 0).addr[1]
@server.run
end
Expand Down
5 changes: 3 additions & 2 deletions test/test_out_of_band_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ def oob_server(**options)
[200, {}, [""]]
end

options[:min_threads] ||= 1
options[:max_threads] ||= 1

@server = Puma::Server.new app, Puma::LogWriter.strings, Puma::Events.new, out_of_band: [oob], **options
@server.min_threads = options[:min_threads] || 1
@server.max_threads = options[:max_threads] || 1
@port = (@server.add_tcp_listener '127.0.0.1', 0).addr[1]
@server.run
sleep 0.15 if Puma.jruby?
Expand Down
1 change: 1 addition & 0 deletions test/test_puma_server.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require_relative "helper"
require "puma/events"
require "puma/server"
require "net/http"
require "nio"
require "ipaddr"
Expand Down
14 changes: 11 additions & 3 deletions test/test_thread_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,20 @@ def teardown

def new_pool(min, max, &block)
block = proc { } unless block
@pool = Puma::ThreadPool.new('tst', min, max, &block)
options = {
min_threads: min,
max_threads: max
}
@pool = Puma::ThreadPool.new('tst', options, &block)
end

def mutex_pool(min, max, &block)
block = proc { } unless block
@pool = MutexPool.new('tst', min, max, &block)
options = {
min_threads: min,
max_threads: max
}
@pool = MutexPool.new('tst', options, &block)
end

# Wraps ThreadPool work in mutex for better concurrency control.
Expand Down Expand Up @@ -184,7 +192,7 @@ def test_cleanliness
Thread.current[:foo] = :hai
}

pool.clean_thread_locals = true
pool.instance_variable_set :@clean_thread_locals, true

pool << [1] * n

Expand Down

0 comments on commit 317e890

Please sign in to comment.