Skip to content

Commit

Permalink
Inject small delay to improve requests distribution
Browse files Browse the repository at this point in the history
Ruby MRI when used can at most process a single thread
concurrently due to GVL. This results in a over-utilisation
if unfavourable distribution of connections is happening.

This tries to prefer less-busy workers (ie. faster to accept
the connection) to improve workers utilisation.
  • Loading branch information
ayufan committed Apr 24, 2020
1 parent b0b17f6 commit 5b8c785
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 1 deletion.
1 change: 1 addition & 0 deletions History.md
Expand Up @@ -10,6 +10,7 @@
* Increases maximum URI path length from 2048 to 8196 bytes (#2167)
* Force shutdown responses can be overridden by using the `lowlevel_error_handler` config (#2203)
* Faster phased restart and worker timeout (#2121)
* Inject small delay for busy workers to improve requests distribution (#2079)

* Deprecations, Removals and Breaking API Changes
* `Puma.stats` now returns a Hash instead of a JSON string (#2086)
Expand Down
79 changes: 79 additions & 0 deletions benchmarks/ab/cpu_spin.sh
@@ -0,0 +1,79 @@
#!/bin/bash

set -eo pipefail

CPU_TIME=50
HOST=127.0.0.1:9292
URL="http://$HOST/cpu/$CPU_TIME"

MIN_WORKERS=1
MAX_WORKERS=4

MIN_THREADS=4
MAX_THREADS=4

REQUESTS_PER_TEST=300
MIN_CONCURRENT=1
MAX_CONCURRENT=8

retry() {
local tries="$1"
local sleep="$2"
shift 2

for i in $(seq 1 $tries); do
if eval "$@"; then
return 0
fi

sleep "$sleep"
done

return 1
}

run_ab() {
result=$(ab -q -n "$requests" -c "$concurrent" "$@")
time_taken=$(echo "$result" | grep "Time taken for tests:" | cut -d" " -f7)
time_per_req=$(echo "$result" | grep "Time per request:" | grep "(mean)" | cut -d" " -f10)

echo -e "$workers\t$threads\t$requests\t$concurrent\t$time_taken\t$time_per_req"
}

run_concurrency_tests() {
echo
echo -e "PUMA_W\tPUMA_T\tAB_R\tAB_C\tT_TOTAL\tT_PER_REQ"
for concurrent in $(seq $MIN_CONCURRENT $MAX_CONCURRENT); do
requests="$((concurrent*$REQUESTS_PER_TEST))"
eval "$@"
sleep 1
done
echo
}

with_puma() {
# start puma and wait for 10s for it to start
bundle exec bin/puma -w "$workers" -t "$threads" -b "tcp://$HOST" -C test/config/cpu_spin.rb &
local puma_pid=$!
trap "kill $puma_pid" EXIT

# wait for Puma to be up
if ! retry 10 1s curl --fail "$URL" &>/dev/null; then
echo "Failed to connect to $URL."
return 1
fi

# execute testing command
eval "$@"
kill "$puma_pid" || true
trap - EXIT
wait
}

for workers in $(seq $MIN_WORKERS $MAX_WORKERS); do
for threads in $(seq $MIN_THREADS $MAX_THREADS); do
with_puma \
run_concurrency_tests \
run_ab "$URL"
done
done
9 changes: 9 additions & 0 deletions lib/puma/dsl.rb
Expand Up @@ -664,6 +664,15 @@ def shutdown_debug(val=true)
@options[:shutdown_debug] = val
end

# Controls whether to inject small delay before
# accepting new socket
# if we are already processing requests
# this allows less busy workers to pick before us
# This only affects Ruby MRI implementation
def wait_for_less_busy_worker(val=true)
@options[:wait_for_less_busy_worker] = val
end

# Control how the remote address of the connection is set. This
# is configurable because to calculate the true socket peer address
# a kernel syscall is required which for very fast rack handlers
Expand Down
4 changes: 4 additions & 0 deletions lib/puma/server.rb
Expand Up @@ -279,6 +279,10 @@ def handle_servers
break if handle_check
else
begin
if @options.fetch(:wait_for_less_busy_worker, true)
pool.wait_for_less_busy_worker
end

if io = sock.accept_nonblock
client = Client.new io, @binder.env(sock)
if remote_addr_value
Expand Down
23 changes: 23 additions & 0 deletions lib/puma/thread_pool.rb
Expand Up @@ -23,6 +23,11 @@ class ForceShutdown < RuntimeError
# up its work before leaving the thread to die on the vine.
SHUTDOWN_GRACE_TIME = 5 # seconds

# Delay injected before accepting new data
# if we are already processing requests
# this allows less busy workers to pick before us
BUSY_WORKER_DELAY = 0.005 # 5 millisecond

# Maintain a minimum of +min+ and maximum of +max+ threads
# in the pool.
#
Expand Down Expand Up @@ -208,6 +213,24 @@ def wait_until_not_full
end
end

def wait_for_less_busy_worker
# Ruby MRI does GVL, this can result
# in processing contention when multiple threads
# (requests) are running concurrently
return unless Puma.mri?

with_mutex do
return if @shutdown

# do not delay, if we are not busy
return unless busy_threads > 0

# this will be signaled once a request finishes,
# which can happen earlier than delay
@not_full.wait @mutex, BUSY_WORKER_DELAY
end
end

# If there are any free threads in the pool, tell one to go ahead
# and exit. If +force+ is true, then a trim request is requested
# even if all threads are being utilized.
Expand Down
39 changes: 39 additions & 0 deletions test/config/cpu_spin.rb
@@ -0,0 +1,39 @@
# call with "GET /cpu/<d> HTTP/1.1\r\n\r\n", where <d> is the number of
# milliseconds to spin CPU, returns process pid

# configure `wait_for_less_busy_workers` based on ENV, default `true`
wait_for_less_busy_worker ENV.fetch('WAIT_FOR_LESS_BUSY_WORKERS', 'true') == 'true'

def cpu_threadtime
# Not all OS kernels are supporting `Process::CLOCK_THREAD_CPUTIME_ID`
# Refer: https://gitlab.com/gitlab-org/gitlab/issues/30567#note_221765627
return unless defined?(Process::CLOCK_THREAD_CPUTIME_ID)

Process.clock_gettime(Process::CLOCK_THREAD_CPUTIME_ID, :float_second)
end

def rand_for(duration_s)
end_time = cpu_threadtime + duration_s
rand while cpu_threadtime < end_time
end

app do |env|
duration_ms = (env['REQUEST_PATH'][/\/cpu\/(\d.*)/,1] || '1000').to_f

# This simulates an interleaved workload
# When thread is 50% free, and 50% busy with Ruby
#
# Another request might be picked during the `sleep`
# But, then it would compete with another request during
# `rand_for` execution

start_time = cpu_threadtime
expected_end_time = cpu_threadtime + duration_ms / 1000.0
while cpu_threadtime < expected_end_time do
sleep(0.01)
rand_for(0.01)
end
end_time = cpu_threadtime - start_time

[200, {"Content-Type" => "text/plain"}, ["Run for #{end_time} #{Process.pid}"]]
end
1 change: 1 addition & 0 deletions test/helper.rb
Expand Up @@ -116,6 +116,7 @@ def skip_unless(eng, bt: caller)
when :darwin then "Skip unless darwin" unless RUBY_PLATFORM[/darwin/]
when :jruby then "Skip unless JRuby" unless Puma.jruby?
when :windows then "Skip unless Windows" unless Puma.windows?
when :mri then "Skip unless MRI" unless Puma.mri?
else false
end
skip skip_msg, bt if skip_msg
Expand Down
34 changes: 33 additions & 1 deletion test/test_puma_server.rb
Expand Up @@ -993,6 +993,7 @@ def oob_server(**options)
@mutex = Mutex.new
@oob_finished = ConditionVariable.new
oob_wait = options.delete(:oob_wait)
request_sleep = options.delete(:request_sleep)
oob = -> do
in_oob.synchronize do
@mutex.synchronize do
Expand All @@ -1007,6 +1008,11 @@ def oob_server(**options)
@server.max_threads = 5
server_run app: ->(_) do
raise 'OOB conflict' if in_oob.locked?

# we make each request to take some time (3ms)
# as we test parallel execution
sleep(request_sleep) if request_sleep

@mutex.synchronize {@request_count += 1}
[200, {}, [""]]
end
Expand All @@ -1028,10 +1034,13 @@ def test_out_of_band

# Streaming requests on parallel connections without delay should trigger
# out_of_band hooks only once after the final request.
# Connection will take 50ms to be processed, with 5ms per request
# it makes the connections to be processed in parallel
def test_out_of_band_stream
n = 100
threads = 10
oob_server
oob_server(request_sleep: 0.005)

req = "GET / HTTP/1.1\r\n"
@mutex.synchronize do
Array.new(threads) do
Expand All @@ -1045,6 +1054,29 @@ def test_out_of_band_stream
assert_equal 1, @oob_count
end

# Long running connections run in parallel with a waiting on less busy
# worker will mark a server as a free for short period which allows it
# to rapidly accept another connection. Each request will take at least 3m,
# which is smaller than 5ms (the current delay), which results in a short
# period where we can use OOB hook to discover that Puma prefers idle-first
# workers.
def test_out_of_band_stream_with_waiting_on_less_busy_worker
skip_unless :mri

parallel_requests = 10
oob_server(wait_for_less_busy_worker: true)
Array.new(parallel_requests) do
Thread.new do
send_http "GET / HTTP/1.0\r\n\r\n"
@mutex.synchronize do
@oob_finished.wait(@mutex, 1)
end
end
end.each(&:join)
assert_equal parallel_requests, @request_count
assert_equal parallel_requests, @oob_count
end

def test_out_of_band_overlapping_requests
oob_server oob_wait: true
sock = nil
Expand Down

0 comments on commit 5b8c785

Please sign in to comment.