Skip to content

Commit

Permalink
Merge branch 'gitlab-latency' of https://github.com/ayufan-research/puma
Browse files Browse the repository at this point in the history
 into ayufan-research-gitlab-latency
  • Loading branch information
nateberkopec committed May 11, 2020
2 parents 0e7d6fa + 7af9807 commit 554b969
Show file tree
Hide file tree
Showing 8 changed files with 261 additions and 0 deletions.
1 change: 1 addition & 0 deletions History.md
Expand Up @@ -13,6 +13,7 @@
* New configuration option to set state file permissions (#2238)
* `Puma.stats_hash` returns a stats in Hash instead of a JSON string (#2086, #2253)
* Add `fork_worker` option and `refork` command for improved copy-on-write performance (#2099)
* Inject small delay for busy workers to improve requests distribution (#2079)

* Deprecations, Removals and Breaking API Changes
* `--control` has been removed. Use `--control-url` (#1487)
Expand Down
102 changes: 102 additions & 0 deletions benchmarks/wrk/cpu_spin.sh
@@ -0,0 +1,102 @@
#!/bin/bash

set -eo pipefail

ITERATIONS=400000
HOST=127.0.0.1:9292
URL="http://$HOST/cpu/$ITERATIONS"

MIN_WORKERS=1
MAX_WORKERS=4

MIN_THREADS=4
MAX_THREADS=4

DURATION=2
MIN_CONCURRENT=1
MAX_CONCURRENT=8

retry() {
local tries="$1"
local sleep="$2"
shift 2

for i in $(seq 1 $tries); do
if eval "$@"; then
return 0
fi

sleep "$sleep"
done

return 1
}

ms() {
VALUE=$(cat)
FRAC=${VALUE%%[ums]*}
case "$VALUE" in
*us)
echo "scale=1; ${FRAC}/1000" | bc
;;

*ms)
echo "scale=1; ${FRAC}/1" | bc
;;

*s)
echo "scale=1; ${FRAC}*1000/1" | bc
;;
esac
}

run_wrk() {
result=$(wrk -H "Connection: Close" -c "$wrk_c" -t "$wrk_t" -d "$DURATION" --latency "$@" | tee -a wrk.txt)
req_sec=$(echo "$result" | grep "^Requests/sec:" | awk '{print $2}')
latency_avg=$(echo "$result" | grep "^\s*Latency.*%" | awk '{print $2}' | ms)
latency_stddev=$(echo "$result" | grep "^\s*Latency.*%" | awk '{print $3}' | ms)
latency_50=$(echo "$result" | grep "^\s*50%" | awk '{print $2}' | ms)
latency_75=$(echo "$result" | grep "^\s*75%" | awk '{print $2}' | ms)
latency_90=$(echo "$result" | grep "^\s*90%" | awk '{print $2}' | ms)
latency_99=$(echo "$result" | grep "^\s*99%" | awk '{print $2}' | ms)

echo -e "$workers\t$threads\t$wrk_c\t$wrk_t\t$req_sec\t$latency_avg\t$latency_stddev\t$latency_50\t$latency_75\t$latency_90\t$latency_99"
}

run_concurrency_tests() {
echo
echo -e "PUMA_W\tPUMA_T\tWRK_C\tWRK_T\tREQ_SEC\tL_AVG\tL_DEV\tL_50%\tL_75%\tL_90%\tL_99%"
for wrk_c in $(seq $MIN_CONCURRENT $MAX_CONCURRENT); do
wrk_t="$wrk_c"
eval "$@"
sleep 1
done
echo
}

with_puma() {
# start puma and wait for 10s for it to start
bundle exec bin/puma -w "$workers" -t "$threads" -b "tcp://$HOST" -C test/config/cpu_spin.rb &
local puma_pid=$!
trap "kill $puma_pid" EXIT

# wait for Puma to be up
if ! retry 10 1s curl --fail "$URL" &>/dev/null; then
echo "Failed to connect to $URL."
return 1
fi

# execute testing command
eval "$@"
kill "$puma_pid" || true
trap - EXIT
wait
}

for workers in $(seq $MIN_WORKERS $MAX_WORKERS); do
for threads in $(seq $MIN_THREADS $MAX_THREADS); do
with_puma \
run_concurrency_tests \
run_wrk "$URL"
done
done
12 changes: 12 additions & 0 deletions lib/puma/dsl.rb
Expand Up @@ -694,6 +694,18 @@ def shutdown_debug(val=true)
@options[:shutdown_debug] = val
end

# Controls an injected delay (in seconds) before
# accepting new socket
# if we are already processing requests,
# it gives a time for less busy workers
# to pick workbefore us
#
# This only affects Ruby MRI implementation
# The best value is between 0.001 (1ms) to 0.010 (10ms)
def wait_for_less_busy_worker(val)
@options[:wait_for_less_busy_worker] = val.to_f
end

# Control how the remote address of the connection is set. This
# is configurable because to calculate the true socket peer address
# a kernel syscall is required which for very fast rack handlers
Expand Down
3 changes: 3 additions & 0 deletions lib/puma/server.rb
Expand Up @@ -283,6 +283,9 @@ def handle_servers
else
begin
pool.wait_until_not_full
pool.wait_for_less_busy_worker(
@options[:wait_for_less_busy_worker].to_f)

if io = sock.accept_nonblock
client = Client.new io, @binder.env(sock)
if remote_addr_value
Expand Down
19 changes: 19 additions & 0 deletions lib/puma/thread_pool.rb
Expand Up @@ -228,6 +228,25 @@ def wait_until_not_full
end
end

def wait_for_less_busy_worker(delay_s)
# Ruby MRI does GVL, this can result
# in processing contention when multiple threads
# (requests) are running concurrently
return unless Puma.mri?
return unless delay_s > 0

with_mutex do
return if @shutdown

# do not delay, if we are not busy
return unless busy_threads > 0

# this will be signaled once a request finishes,
# which can happen earlier than delay
@not_full.wait @mutex, delay_s
end
end

# If there are any free threads in the pool, tell one to go ahead
# and exit. If +force+ is true, then a trim request is requested
# even if all threads are being utilized.
Expand Down
17 changes: 17 additions & 0 deletions test/config/cpu_spin.rb
@@ -0,0 +1,17 @@
# call with "GET /cpu/<d> HTTP/1.1\r\n\r\n",
# where <d> is the number of iterations

require 'benchmark'

# configure `wait_for_less_busy_workers` based on ENV, default `true`
wait_for_less_busy_worker ENV.fetch('WAIT_FOR_LESS_BUSY_WORKERS', '0.005').to_f

app do |env|
iterations = (env['REQUEST_PATH'][/\/cpu\/(\d.*)/,1] || '1000').to_i

duration = Benchmark.measure do
iterations.times { rand }
end

[200, {"Content-Type" => "text/plain"}, ["Run for #{duration.total} #{Process.pid}"]]
end
1 change: 1 addition & 0 deletions test/helper.rb
Expand Up @@ -116,6 +116,7 @@ def skip_unless(eng, bt: caller)
when :darwin then "Skip unless darwin" unless RUBY_PLATFORM[/darwin/]
when :jruby then "Skip unless JRuby" unless Puma.jruby?
when :windows then "Skip unless Windows" unless Puma.windows?
when :mri then "Skip unless MRI" unless Puma.mri?
else false
end
skip skip_msg, bt if skip_msg
Expand Down
106 changes: 106 additions & 0 deletions test/test_busy_worker.rb
@@ -0,0 +1,106 @@
require_relative "helper"
require "puma/events"

class TestBusyWorker < Minitest::Test
parallelize_me!

def setup
@ios = []
@server = nil
end

def teardown
@server.stop(true) if @server
@ios.each {|i| i.close unless i.closed?}
end

def new_connection
TCPSocket.new('127.0.0.1', @server.connected_ports[0]).tap {|s| @ios << s}
rescue IOError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
retry
end

def send_http(req)
new_connection << req
end

def send_http_and_read(req)
send_http(req).read
end

def with_server(**options, &app)
@requests_count = 0 # number of requests processed
@requests_running = 0 # current number of requests running
@requests_max_running = 0 # max number of requests running in parallel
@mutex = Mutex.new

request_handler = ->(env) do
@mutex.synchronize do
@requests_count += 1
@requests_running += 1
if @requests_running > @requests_max_running
@requests_max_running = @requests_running
end
end

begin
yield(env)
ensure
@mutex.synchronize do
@requests_running -= 1
end
end
end

@server = Puma::Server.new request_handler, Puma::Events.strings, **options
@server.min_threads = options[:min_threads] || 0
@server.max_threads = options[:max_threads] || 10
@server.add_tcp_listener '127.0.0.1', 0
@server.run
end

# Multiple concurrent requests are not processed
# sequentially as a small delay is introduced
def test_multiple_requests_waiting_on_less_busy_worker
skip_unless :mri

with_server(wait_for_less_busy_worker: 1.0) do |_|
sleep(0.1)

[200, {}, [""]]
end

n = 2

Array.new(n) do
Thread.new { send_http_and_read "GET / HTTP/1.0\r\n\r\n" }
end.each(&:join)

assert_equal n, @requests_count, "number of requests needs to match"
assert_equal 0, @requests_running, "none of requests needs to be running"
assert_equal 1, @requests_max_running, "maximum number of concurrent requests needs to be 1"
end

# Multiple concurrent requests are processed
# in parallel as a delay is disabled
def test_multiple_requests_processing_in_parallel
skip_unless :mri

with_server(wait_for_less_busy_worker: 0.0) do |_|
sleep(0.1)

[200, {}, [""]]
end

n = 4

Array.new(n) do
Thread.new { send_http_and_read "GET / HTTP/1.0\r\n\r\n" }
end.each(&:join)

assert_equal n, @requests_count, "number of requests needs to match"
assert_equal 0, @requests_running, "none of requests needs to be running"
assert_equal n, @requests_max_running, "maximum number of concurrent requests needs to match"
end
end

0 comments on commit 554b969

Please sign in to comment.