diff --git a/History.md b/History.md index 8067a8e12e..655df789f7 100644 --- a/History.md +++ b/History.md @@ -10,6 +10,7 @@ * Increases maximum URI path length from 2048 to 8196 bytes (#2167) * Force shutdown responses can be overridden by using the `lowlevel_error_handler` config (#2203) * Faster phased restart and worker timeout (#2121) + * Inject small delay for busy workers to improve requests distribution (#2079) * Deprecations, Removals and Breaking API Changes * `Puma.stats` now returns a Hash instead of a JSON string (#2086) diff --git a/benchmarks/wrk/cpu_spin.sh b/benchmarks/wrk/cpu_spin.sh new file mode 100755 index 0000000000..0642ad003f --- /dev/null +++ b/benchmarks/wrk/cpu_spin.sh @@ -0,0 +1,102 @@ +#!/bin/bash + +set -eo pipefail + +ITERATIONS=400000 +HOST=127.0.0.1:9292 +URL="http://$HOST/cpu/$ITERATIONS" + +MIN_WORKERS=1 +MAX_WORKERS=4 + +MIN_THREADS=4 +MAX_THREADS=4 + +DURATION=2 +MIN_CONCURRENT=1 +MAX_CONCURRENT=8 + +retry() { + local tries="$1" + local sleep="$2" + shift 2 + + for i in $(seq 1 $tries); do + if eval "$@"; then + return 0 + fi + + sleep "$sleep" + done + + return 1 +} + +ms() { + VALUE=$(cat) + FRAC=${VALUE%%[ums]*} + case "$VALUE" in + *us) + echo "scale=1; ${FRAC}/1000" | bc + ;; + + *ms) + echo "scale=1; ${FRAC}/1" | bc + ;; + + *s) + echo "scale=1; ${FRAC}*1000/1" | bc + ;; + esac +} + +run_wrk() { + result=$(wrk -H "Connection: Close" -c "$wrk_c" -t "$wrk_t" -d "$DURATION" --latency "$@" | tee -a wrk.txt) + req_sec=$(echo "$result" | grep "^Requests/sec:" | awk '{print $2}') + latency_avg=$(echo "$result" | grep "^\s*Latency.*%" | awk '{print $2}' | ms) + latency_stddev=$(echo "$result" | grep "^\s*Latency.*%" | awk '{print $3}' | ms) + latency_50=$(echo "$result" | grep "^\s*50%" | awk '{print $2}' | ms) + latency_75=$(echo "$result" | grep "^\s*75%" | awk '{print $2}' | ms) + latency_90=$(echo "$result" | grep "^\s*90%" | awk '{print $2}' | ms) + latency_99=$(echo "$result" | grep "^\s*99%" | awk '{print $2}' | ms) + + echo -e "$workers\t$threads\t$wrk_c\t$wrk_t\t$req_sec\t$latency_avg\t$latency_stddev\t$latency_50\t$latency_75\t$latency_90\t$latency_99" +} + +run_concurrency_tests() { + echo + echo -e "PUMA_W\tPUMA_T\tWRK_C\tWRK_T\tREQ_SEC\tL_AVG\tL_DEV\tL_50%\tL_75%\tL_90%\tL_99%" + for wrk_c in $(seq $MIN_CONCURRENT $MAX_CONCURRENT); do + wrk_t="$wrk_c" + eval "$@" + sleep 1 + done + echo +} + +with_puma() { + # start puma and wait for 10s for it to start + bundle exec bin/puma -w "$workers" -t "$threads" -b "tcp://$HOST" -C test/config/cpu_spin.rb & + local puma_pid=$! + trap "kill $puma_pid" EXIT + + # wait for Puma to be up + if ! retry 10 1s curl --fail "$URL" &>/dev/null; then + echo "Failed to connect to $URL." + return 1 + fi + + # execute testing command + eval "$@" + kill "$puma_pid" || true + trap - EXIT + wait +} + +for workers in $(seq $MIN_WORKERS $MAX_WORKERS); do + for threads in $(seq $MIN_THREADS $MAX_THREADS); do + with_puma \ + run_concurrency_tests \ + run_wrk "$URL" + done +done diff --git a/lib/puma/dsl.rb b/lib/puma/dsl.rb index 41e03a1a7e..a58ae53c82 100644 --- a/lib/puma/dsl.rb +++ b/lib/puma/dsl.rb @@ -664,6 +664,18 @@ def shutdown_debug(val=true) @options[:shutdown_debug] = val end + # Controls an injected delay (in seconds) before + # accepting new socket + # if we are already processing requests, + # it gives a time for less busy workers + # to pick workbefore us + # + # This only affects Ruby MRI implementation + # The best value is between 0.001 (1ms) to 0.010 (10ms) + def wait_for_less_busy_worker(val) + @options[:wait_for_less_busy_worker] = val.to_f + end + # Control how the remote address of the connection is set. This # is configurable because to calculate the true socket peer address # a kernel syscall is required which for very fast rack handlers diff --git a/lib/puma/server.rb b/lib/puma/server.rb index a467e23715..54c9488908 100644 --- a/lib/puma/server.rb +++ b/lib/puma/server.rb @@ -283,6 +283,9 @@ def handle_servers else begin pool.wait_until_not_full + pool.wait_for_less_busy_worker( + @options[:wait_for_less_busy_worker].to_f) + if io = sock.accept_nonblock client = Client.new io, @binder.env(sock) if remote_addr_value diff --git a/lib/puma/thread_pool.rb b/lib/puma/thread_pool.rb index 4dcf2f08ab..53e05e0cdc 100644 --- a/lib/puma/thread_pool.rb +++ b/lib/puma/thread_pool.rb @@ -228,6 +228,25 @@ def wait_until_not_full end end + def wait_for_less_busy_worker(delay_s) + # Ruby MRI does GVL, this can result + # in processing contention when multiple threads + # (requests) are running concurrently + return unless Puma.mri? + return unless delay_s > 0 + + with_mutex do + return if @shutdown + + # do not delay, if we are not busy + return unless busy_threads > 0 + + # this will be signaled once a request finishes, + # which can happen earlier than delay + @not_full.wait @mutex, delay_s + end + end + # If there are any free threads in the pool, tell one to go ahead # and exit. If +force+ is true, then a trim request is requested # even if all threads are being utilized. diff --git a/test/config/cpu_spin.rb b/test/config/cpu_spin.rb new file mode 100644 index 0000000000..6d1996be0b --- /dev/null +++ b/test/config/cpu_spin.rb @@ -0,0 +1,17 @@ +# call with "GET /cpu/ HTTP/1.1\r\n\r\n", +# where is the number of iterations + +require 'benchmark' + +# configure `wait_for_less_busy_workers` based on ENV, default `true` +wait_for_less_busy_worker ENV.fetch('WAIT_FOR_LESS_BUSY_WORKERS', '0.005').to_f + +app do |env| + iterations = (env['REQUEST_PATH'][/\/cpu\/(\d.*)/,1] || '1000').to_i + + duration = Benchmark.measure do + iterations.times { rand } + end + + [200, {"Content-Type" => "text/plain"}, ["Run for #{duration.total} #{Process.pid}"]] +end diff --git a/test/helper.rb b/test/helper.rb index fe190b6c5c..eda616b879 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -116,6 +116,7 @@ def skip_unless(eng, bt: caller) when :darwin then "Skip unless darwin" unless RUBY_PLATFORM[/darwin/] when :jruby then "Skip unless JRuby" unless Puma.jruby? when :windows then "Skip unless Windows" unless Puma.windows? + when :mri then "Skip unless MRI" unless Puma.mri? else false end skip skip_msg, bt if skip_msg diff --git a/test/test_busy_worker.rb b/test/test_busy_worker.rb new file mode 100644 index 0000000000..eed31b2b47 --- /dev/null +++ b/test/test_busy_worker.rb @@ -0,0 +1,106 @@ +require_relative "helper" +require "puma/events" + +class TestBusyWorker < Minitest::Test + parallelize_me! + + def setup + @ios = [] + @server = nil + end + + def teardown + @server.stop(true) if @server + @ios.each {|i| i.close unless i.closed?} + end + + def new_connection + TCPSocket.new('127.0.0.1', @server.connected_ports[0]).tap {|s| @ios << s} + rescue IOError + Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue + retry + end + + def send_http(req) + new_connection << req + end + + def send_http_and_read(req) + send_http(req).read + end + + def with_server(**options, &app) + @requests_count = 0 # number of requests processed + @requests_running = 0 # current number of requests running + @requests_max_running = 0 # max number of requests running in parallel + @mutex = Mutex.new + + request_handler = ->(env) do + @mutex.synchronize do + @requests_count += 1 + @requests_running += 1 + if @requests_running > @requests_max_running + @requests_max_running = @requests_running + end + end + + begin + yield(env) + ensure + @mutex.synchronize do + @requests_running -= 1 + end + end + end + + @server = Puma::Server.new request_handler, Puma::Events.strings, **options + @server.min_threads = options[:min_threads] || 0 + @server.max_threads = options[:max_threads] || 10 + @server.add_tcp_listener '127.0.0.1', 0 + @server.run + end + + # Multiple concurrent requests are not processed + # sequentially as a small delay is introduced + def test_multiple_requests_waiting_on_less_busy_worker + skip_unless :mri + + with_server(wait_for_less_busy_worker: 1.0) do |_| + sleep(0.1) + + [200, {}, [""]] + end + + n = 2 + + Array.new(n) do + Thread.new { send_http_and_read "GET / HTTP/1.0\r\n\r\n" } + end.each(&:join) + + assert_equal n, @requests_count, "number of requests needs to match" + assert_equal 0, @requests_running, "none of requests needs to be running" + assert_equal 1, @requests_max_running, "maximum number of concurrent requests needs to be 1" + end + + # Multiple concurrent requests are processed + # in parallel as a delay is disabled + def test_multiple_requests_processing_in_parallel + skip_unless :mri + + with_server(wait_for_less_busy_worker: 0.0) do |_| + sleep(0.1) + + [200, {}, [""]] + end + + n = 4 + + Array.new(n) do + Thread.new { send_http_and_read "GET / HTTP/1.0\r\n\r\n" } + end.each(&:join) + + assert_equal n, @requests_count, "number of requests needs to match" + assert_equal 0, @requests_running, "none of requests needs to be running" + assert_equal n, @requests_max_running, "maximum number of concurrent requests needs to match" + end +end