Skip to content

Commit

Permalink
Inject small delay to improve requests distribution
Browse files Browse the repository at this point in the history
Ruby MRI when used can at most process a single thread concurrently due to GVL. This results in a over-utilisation if unfavourable distribution of connections is happening.

This tries to prefer less-busy workers (ie. faster to accept
the connection) to improve workers utilisation.
  • Loading branch information
ayufan committed May 5, 2020
1 parent 774c460 commit e022667
Show file tree
Hide file tree
Showing 8 changed files with 261 additions and 0 deletions.
1 change: 1 addition & 0 deletions History.md
Expand Up @@ -10,6 +10,7 @@
* Increases maximum URI path length from 2048 to 8196 bytes (#2167)
* Force shutdown responses can be overridden by using the `lowlevel_error_handler` config (#2203)
* Faster phased restart and worker timeout (#2121)
* Inject small delay for busy workers to improve requests distribution (#2079)

* Deprecations, Removals and Breaking API Changes
* `Puma.stats` now returns a Hash instead of a JSON string (#2086)
Expand Down
102 changes: 102 additions & 0 deletions benchmarks/wrk/cpu_spin.sh
@@ -0,0 +1,102 @@
#!/bin/bash

set -eo pipefail

ITERATIONS=400000
HOST=127.0.0.1:9292
URL="http://$HOST/cpu/$ITERATIONS"

MIN_WORKERS=1
MAX_WORKERS=4

MIN_THREADS=4
MAX_THREADS=4

DURATION=2
MIN_CONCURRENT=1
MAX_CONCURRENT=8

retry() {
local tries="$1"
local sleep="$2"
shift 2

for i in $(seq 1 $tries); do
if eval "$@"; then
return 0
fi

sleep "$sleep"
done

return 1
}

ms() {
VALUE=$(cat)
FRAC=${VALUE%%[ums]*}
case "$VALUE" in
*us)
echo "scale=1; ${FRAC}/1000" | bc
;;

*ms)
echo "scale=1; ${FRAC}/1" | bc
;;

*s)
echo "scale=1; ${FRAC}*1000/1" | bc
;;
esac
}

run_wrk() {
result=$(wrk -H "Connection: Close" -c "$wrk_c" -t "$wrk_t" -d "$DURATION" --latency "$@" | tee -a wrk.txt)
req_sec=$(echo "$result" | grep "^Requests/sec:" | awk '{print $2}')
latency_avg=$(echo "$result" | grep "^\s*Latency.*%" | awk '{print $2}' | ms)
latency_stddev=$(echo "$result" | grep "^\s*Latency.*%" | awk '{print $3}' | ms)
latency_50=$(echo "$result" | grep "^\s*50%" | awk '{print $2}' | ms)
latency_75=$(echo "$result" | grep "^\s*75%" | awk '{print $2}' | ms)
latency_90=$(echo "$result" | grep "^\s*90%" | awk '{print $2}' | ms)
latency_99=$(echo "$result" | grep "^\s*99%" | awk '{print $2}' | ms)

echo -e "$workers\t$threads\t$wrk_c\t$wrk_t\t$req_sec\t$latency_avg\t$latency_stddev\t$latency_50\t$latency_75\t$latency_90\t$latency_99"
}

run_concurrency_tests() {
echo
echo -e "PUMA_W\tPUMA_T\tWRK_C\tWRK_T\tREQ_SEC\tL_AVG\tL_DEV\tL_50%\tL_75%\tL_90%\tL_99%"
for wrk_c in $(seq $MIN_CONCURRENT $MAX_CONCURRENT); do
wrk_t="$wrk_c"
eval "$@"
sleep 1
done
echo
}

with_puma() {
# start puma and wait for 10s for it to start
bundle exec bin/puma -w "$workers" -t "$threads" -b "tcp://$HOST" -C test/config/cpu_spin.rb &
local puma_pid=$!
trap "kill $puma_pid" EXIT

# wait for Puma to be up
if ! retry 10 1s curl --fail "$URL" &>/dev/null; then
echo "Failed to connect to $URL."
return 1
fi

# execute testing command
eval "$@"
kill "$puma_pid" || true
trap - EXIT
wait
}

for workers in $(seq $MIN_WORKERS $MAX_WORKERS); do
for threads in $(seq $MIN_THREADS $MAX_THREADS); do
with_puma \
run_concurrency_tests \
run_wrk "$URL"
done
done
12 changes: 12 additions & 0 deletions lib/puma/dsl.rb
Expand Up @@ -664,6 +664,18 @@ def shutdown_debug(val=true)
@options[:shutdown_debug] = val
end

# Controls an injected delay (in seconds) before
# accepting new socket
# if we are already processing requests,
# it gives a time for less busy workers
# to pick workbefore us
#
# This only affects Ruby MRI implementation
# The best value is between 0.001 (1ms) to 0.010 (10ms)
def wait_for_less_busy_worker(val)
@options[:wait_for_less_busy_worker] = val.to_f
end

# Control how the remote address of the connection is set. This
# is configurable because to calculate the true socket peer address
# a kernel syscall is required which for very fast rack handlers
Expand Down
3 changes: 3 additions & 0 deletions lib/puma/server.rb
Expand Up @@ -283,6 +283,9 @@ def handle_servers
else
begin
pool.wait_until_not_full
pool.wait_for_less_busy_worker(
@options[:wait_for_less_busy_worker].to_f)

if io = sock.accept_nonblock
client = Client.new io, @binder.env(sock)
if remote_addr_value
Expand Down
19 changes: 19 additions & 0 deletions lib/puma/thread_pool.rb
Expand Up @@ -228,6 +228,25 @@ def wait_until_not_full
end
end

def wait_for_less_busy_worker(delay_s)
# Ruby MRI does GVL, this can result
# in processing contention when multiple threads
# (requests) are running concurrently
return unless Puma.mri?
return unless delay_s > 0

with_mutex do
return if @shutdown

# do not delay, if we are not busy
return unless busy_threads > 0

# this will be signaled once a request finishes,
# which can happen earlier than delay
@not_full.wait @mutex, delay_s
end
end

# If there are any free threads in the pool, tell one to go ahead
# and exit. If +force+ is true, then a trim request is requested
# even if all threads are being utilized.
Expand Down
17 changes: 17 additions & 0 deletions test/config/cpu_spin.rb
@@ -0,0 +1,17 @@
# call with "GET /cpu/<d> HTTP/1.1\r\n\r\n",
# where <d> is the number of iterations

require 'benchmark'

# configure `wait_for_less_busy_workers` based on ENV, default `true`
wait_for_less_busy_worker ENV.fetch('WAIT_FOR_LESS_BUSY_WORKERS', '0.005').to_f

app do |env|
iterations = (env['REQUEST_PATH'][/\/cpu\/(\d.*)/,1] || '1000').to_i

duration = Benchmark.measure do
iterations.times { rand }
end

[200, {"Content-Type" => "text/plain"}, ["Run for #{duration.total} #{Process.pid}"]]
end
1 change: 1 addition & 0 deletions test/helper.rb
Expand Up @@ -116,6 +116,7 @@ def skip_unless(eng, bt: caller)
when :darwin then "Skip unless darwin" unless RUBY_PLATFORM[/darwin/]
when :jruby then "Skip unless JRuby" unless Puma.jruby?
when :windows then "Skip unless Windows" unless Puma.windows?
when :mri then "Skip unless MRI" unless Puma.mri?
else false
end
skip skip_msg, bt if skip_msg
Expand Down
106 changes: 106 additions & 0 deletions test/test_busy_worker.rb
@@ -0,0 +1,106 @@
require_relative "helper"
require "puma/events"

class TestBusyWorker < Minitest::Test
parallelize_me!

def setup
@ios = []
@server = nil
end

def teardown
@server.stop(true) if @server
@ios.each {|i| i.close unless i.closed?}
end

def new_connection
TCPSocket.new('127.0.0.1', @server.connected_ports[0]).tap {|s| @ios << s}
rescue IOError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
retry
end

def send_http(req)
new_connection << req
end

def send_http_and_read(req)
send_http(req).read
end

def with_server(**options, &app)
@requests_count = 0 # number of requests processed
@requests_running = 0 # current number of requests running
@requests_max_running = 0 # max number of requests running in parallel
@mutex = Mutex.new

request_handler = ->(env) do
@mutex.synchronize do
@requests_count += 1
@requests_running += 1
if @requests_running > @requests_max_running
@requests_max_running = @requests_running
end
end

begin
app.call(env)
ensure
@mutex.synchronize do
@requests_running -= 1
end
end
end

@server = Puma::Server.new request_handler, Puma::Events.strings, **options
@server.min_threads = options[:min_threads] || 0
@server.max_threads = options[:max_threads] || 10
@server.add_tcp_listener '127.0.0.1', 0
@server.run
end

# Multiple concurrent requests are not processed
# sequentially as a small delay is introduced
def test_multiple_requests_waiting_on_less_busy_worker
skip_unless :mri

with_server(wait_for_less_busy_worker: 1.0) do |_|
sleep(0.1)

[200, {}, [""]]
end

n = 2

Array.new(n) do
Thread.new { send_http_and_read "GET / HTTP/1.0\r\n\r\n" }
end.each(&:join)

assert_equal n, @requests_count, "number of requests needs to match"
assert_equal 0, @requests_running, "none of requests needs to be running"
assert_equal 1, @requests_max_running, "maximum number of concurrent requests needs to be 1"
end

# Multiple concurrent requests are processed
# in parallel as a delay is disabled
def test_multiple_requests_processing_in_parallel
skip_unless :mri

with_server(wait_for_less_busy_worker: 0.0) do |_|
sleep(0.1)

[200, {}, [""]]
end

n = 4

Array.new(n) do
Thread.new { send_http_and_read "GET / HTTP/1.0\r\n\r\n" }
end.each(&:join)

assert_equal n, @requests_count, "number of requests needs to match"
assert_equal 0, @requests_running, "none of requests needs to be running"
assert_equal n, @requests_max_running, "maximum number of concurrent requests needs to match"
end
end

0 comments on commit e022667

Please sign in to comment.