From 5b8c7857c66ce4cb0bde5961fbc9ad5c97e7acf8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Trzci=C5=84ski?= Date: Wed, 20 Nov 2019 15:39:37 +0100 Subject: [PATCH] Inject small delay to improve requests distribution Ruby MRI when used can at most process a single thread concurrently due to GVL. This results in a over-utilisation if unfavourable distribution of connections is happening. This tries to prefer less-busy workers (ie. faster to accept the connection) to improve workers utilisation. --- History.md | 1 + benchmarks/ab/cpu_spin.sh | 79 +++++++++++++++++++++++++++++++++++++++ lib/puma/dsl.rb | 9 +++++ lib/puma/server.rb | 4 ++ lib/puma/thread_pool.rb | 23 ++++++++++++ test/config/cpu_spin.rb | 39 +++++++++++++++++++ test/helper.rb | 1 + test/test_puma_server.rb | 34 ++++++++++++++++- 8 files changed, 189 insertions(+), 1 deletion(-) create mode 100755 benchmarks/ab/cpu_spin.sh create mode 100644 test/config/cpu_spin.rb diff --git a/History.md b/History.md index d7f8562537..21b27ae3cc 100644 --- a/History.md +++ b/History.md @@ -10,6 +10,7 @@ * Increases maximum URI path length from 2048 to 8196 bytes (#2167) * Force shutdown responses can be overridden by using the `lowlevel_error_handler` config (#2203) * Faster phased restart and worker timeout (#2121) + * Inject small delay for busy workers to improve requests distribution (#2079) * Deprecations, Removals and Breaking API Changes * `Puma.stats` now returns a Hash instead of a JSON string (#2086) diff --git a/benchmarks/ab/cpu_spin.sh b/benchmarks/ab/cpu_spin.sh new file mode 100755 index 0000000000..62aaa40c03 --- /dev/null +++ b/benchmarks/ab/cpu_spin.sh @@ -0,0 +1,79 @@ +#!/bin/bash + +set -eo pipefail + +CPU_TIME=50 +HOST=127.0.0.1:9292 +URL="http://$HOST/cpu/$CPU_TIME" + +MIN_WORKERS=1 +MAX_WORKERS=4 + +MIN_THREADS=4 +MAX_THREADS=4 + +REQUESTS_PER_TEST=300 +MIN_CONCURRENT=1 +MAX_CONCURRENT=8 + +retry() { + local tries="$1" + local sleep="$2" + shift 2 + + for i in $(seq 1 $tries); do + if eval "$@"; then + return 0 + fi + + sleep "$sleep" + done + + return 1 +} + +run_ab() { + result=$(ab -q -n "$requests" -c "$concurrent" "$@") + time_taken=$(echo "$result" | grep "Time taken for tests:" | cut -d" " -f7) + time_per_req=$(echo "$result" | grep "Time per request:" | grep "(mean)" | cut -d" " -f10) + + echo -e "$workers\t$threads\t$requests\t$concurrent\t$time_taken\t$time_per_req" +} + +run_concurrency_tests() { + echo + echo -e "PUMA_W\tPUMA_T\tAB_R\tAB_C\tT_TOTAL\tT_PER_REQ" + for concurrent in $(seq $MIN_CONCURRENT $MAX_CONCURRENT); do + requests="$((concurrent*$REQUESTS_PER_TEST))" + eval "$@" + sleep 1 + done + echo +} + +with_puma() { + # start puma and wait for 10s for it to start + bundle exec bin/puma -w "$workers" -t "$threads" -b "tcp://$HOST" -C test/config/cpu_spin.rb & + local puma_pid=$! + trap "kill $puma_pid" EXIT + + # wait for Puma to be up + if ! retry 10 1s curl --fail "$URL" &>/dev/null; then + echo "Failed to connect to $URL." + return 1 + fi + + # execute testing command + eval "$@" + kill "$puma_pid" || true + trap - EXIT + wait +} + +for workers in $(seq $MIN_WORKERS $MAX_WORKERS); do + for threads in $(seq $MIN_THREADS $MAX_THREADS); do + with_puma \ + run_concurrency_tests \ + run_ab "$URL" + done +done diff --git a/lib/puma/dsl.rb b/lib/puma/dsl.rb index 9bc51f77aa..c2be3e3877 100644 --- a/lib/puma/dsl.rb +++ b/lib/puma/dsl.rb @@ -664,6 +664,15 @@ def shutdown_debug(val=true) @options[:shutdown_debug] = val end + # Controls whether to inject small delay before + # accepting new socket + # if we are already processing requests + # this allows less busy workers to pick before us + # This only affects Ruby MRI implementation + def wait_for_less_busy_worker(val=true) + @options[:wait_for_less_busy_worker] = val + end + # Control how the remote address of the connection is set. This # is configurable because to calculate the true socket peer address # a kernel syscall is required which for very fast rack handlers diff --git a/lib/puma/server.rb b/lib/puma/server.rb index b5269f0ddd..b5a87d2693 100644 --- a/lib/puma/server.rb +++ b/lib/puma/server.rb @@ -279,6 +279,10 @@ def handle_servers break if handle_check else begin + if @options.fetch(:wait_for_less_busy_worker, true) + pool.wait_for_less_busy_worker + end + if io = sock.accept_nonblock client = Client.new io, @binder.env(sock) if remote_addr_value diff --git a/lib/puma/thread_pool.rb b/lib/puma/thread_pool.rb index d6a27c41de..141d975b16 100644 --- a/lib/puma/thread_pool.rb +++ b/lib/puma/thread_pool.rb @@ -23,6 +23,11 @@ class ForceShutdown < RuntimeError # up its work before leaving the thread to die on the vine. SHUTDOWN_GRACE_TIME = 5 # seconds + # Delay injected before accepting new data + # if we are already processing requests + # this allows less busy workers to pick before us + BUSY_WORKER_DELAY = 0.005 # 5 millisecond + # Maintain a minimum of +min+ and maximum of +max+ threads # in the pool. # @@ -208,6 +213,24 @@ def wait_until_not_full end end + def wait_for_less_busy_worker + # Ruby MRI does GVL, this can result + # in processing contention when multiple threads + # (requests) are running concurrently + return unless Puma.mri? + + with_mutex do + return if @shutdown + + # do not delay, if we are not busy + return unless busy_threads > 0 + + # this will be signaled once a request finishes, + # which can happen earlier than delay + @not_full.wait @mutex, BUSY_WORKER_DELAY + end + end + # If there are any free threads in the pool, tell one to go ahead # and exit. If +force+ is true, then a trim request is requested # even if all threads are being utilized. diff --git a/test/config/cpu_spin.rb b/test/config/cpu_spin.rb new file mode 100644 index 0000000000..71b7a81568 --- /dev/null +++ b/test/config/cpu_spin.rb @@ -0,0 +1,39 @@ +# call with "GET /cpu/ HTTP/1.1\r\n\r\n", where is the number of +# milliseconds to spin CPU, returns process pid + +# configure `wait_for_less_busy_workers` based on ENV, default `true` +wait_for_less_busy_worker ENV.fetch('WAIT_FOR_LESS_BUSY_WORKERS', 'true') == 'true' + +def cpu_threadtime + # Not all OS kernels are supporting `Process::CLOCK_THREAD_CPUTIME_ID` + # Refer: https://gitlab.com/gitlab-org/gitlab/issues/30567#note_221765627 + return unless defined?(Process::CLOCK_THREAD_CPUTIME_ID) + + Process.clock_gettime(Process::CLOCK_THREAD_CPUTIME_ID, :float_second) +end + +def rand_for(duration_s) + end_time = cpu_threadtime + duration_s + rand while cpu_threadtime < end_time +end + +app do |env| + duration_ms = (env['REQUEST_PATH'][/\/cpu\/(\d.*)/,1] || '1000').to_f + + # This simulates an interleaved workload + # When thread is 50% free, and 50% busy with Ruby + # + # Another request might be picked during the `sleep` + # But, then it would compete with another request during + # `rand_for` execution + + start_time = cpu_threadtime + expected_end_time = cpu_threadtime + duration_ms / 1000.0 + while cpu_threadtime < expected_end_time do + sleep(0.01) + rand_for(0.01) + end + end_time = cpu_threadtime - start_time + + [200, {"Content-Type" => "text/plain"}, ["Run for #{end_time} #{Process.pid}"]] +end diff --git a/test/helper.rb b/test/helper.rb index fe190b6c5c..eda616b879 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -116,6 +116,7 @@ def skip_unless(eng, bt: caller) when :darwin then "Skip unless darwin" unless RUBY_PLATFORM[/darwin/] when :jruby then "Skip unless JRuby" unless Puma.jruby? when :windows then "Skip unless Windows" unless Puma.windows? + when :mri then "Skip unless MRI" unless Puma.mri? else false end skip skip_msg, bt if skip_msg diff --git a/test/test_puma_server.rb b/test/test_puma_server.rb index b7fdb3b215..50b820eecd 100644 --- a/test/test_puma_server.rb +++ b/test/test_puma_server.rb @@ -993,6 +993,7 @@ def oob_server(**options) @mutex = Mutex.new @oob_finished = ConditionVariable.new oob_wait = options.delete(:oob_wait) + request_sleep = options.delete(:request_sleep) oob = -> do in_oob.synchronize do @mutex.synchronize do @@ -1007,6 +1008,11 @@ def oob_server(**options) @server.max_threads = 5 server_run app: ->(_) do raise 'OOB conflict' if in_oob.locked? + + # we make each request to take some time (3ms) + # as we test parallel execution + sleep(request_sleep) if request_sleep + @mutex.synchronize {@request_count += 1} [200, {}, [""]] end @@ -1028,10 +1034,13 @@ def test_out_of_band # Streaming requests on parallel connections without delay should trigger # out_of_band hooks only once after the final request. + # Connection will take 50ms to be processed, with 5ms per request + # it makes the connections to be processed in parallel def test_out_of_band_stream n = 100 threads = 10 - oob_server + oob_server(request_sleep: 0.005) + req = "GET / HTTP/1.1\r\n" @mutex.synchronize do Array.new(threads) do @@ -1045,6 +1054,29 @@ def test_out_of_band_stream assert_equal 1, @oob_count end + # Long running connections run in parallel with a waiting on less busy + # worker will mark a server as a free for short period which allows it + # to rapidly accept another connection. Each request will take at least 3m, + # which is smaller than 5ms (the current delay), which results in a short + # period where we can use OOB hook to discover that Puma prefers idle-first + # workers. + def test_out_of_band_stream_with_waiting_on_less_busy_worker + skip_unless :mri + + parallel_requests = 10 + oob_server(wait_for_less_busy_worker: true) + Array.new(parallel_requests) do + Thread.new do + send_http "GET / HTTP/1.0\r\n\r\n" + @mutex.synchronize do + @oob_finished.wait(@mutex, 1) + end + end + end.each(&:join) + assert_equal parallel_requests, @request_count + assert_equal parallel_requests, @oob_count + end + def test_out_of_band_overlapping_requests oob_server oob_wait: true sock = nil