Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faster TraceBuffer for CRuby #1172

Merged
merged 3 commits into from Sep 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions ddtrace.gemspec
Expand Up @@ -41,6 +41,7 @@ Gem::Specification.new do |spec|
spec.add_development_dependency 'opentracing', '>= 0.4.1'

# Development dependencies
spec.add_development_dependency 'concurrent-ruby' # Leave it open as we also have it as an integration and want Appraisal to control the version under test.
spec.add_development_dependency 'rake', '>= 10.5'
spec.add_development_dependency 'rubocop', '= 0.49.1' if RUBY_VERSION >= '2.1.0'
spec.add_development_dependency 'rspec', '~> 3.0'
Expand Down
195 changes: 152 additions & 43 deletions lib/ddtrace/buffer.rb
Expand Up @@ -2,23 +2,77 @@
require 'ddtrace/diagnostics/health'
require 'ddtrace/runtime/object_space'

# Trace buffer that accumulates traces for a consumer.
# Consumption can happen from a different thread.
module Datadog
# Trace buffer that stores application traces. The buffer has a maximum size and when
# the buffer is full, a random trace is discarded. This class is thread-safe and is used
# automatically by the ``Tracer`` instance when a ``Span`` is finished.
class TraceBuffer
def initialize(max_size)
@max_size = max_size
# Aggregate metrics:
# They reflect buffer activity since last #pop.
# These may not be as accurate or as granular, but they
# don't use as much network traffic as live stats.
class MeasuredBuffer
def initialize
@buffer_accepted = 0
@buffer_accepted_lengths = 0
marcotc marked this conversation as resolved.
Show resolved Hide resolved
@buffer_dropped = 0
@buffer_spans = 0
end

@mutex = Mutex.new()
@traces = []
@closed = false
def measure_accept(trace)
@buffer_spans += trace.length
@buffer_accepted += 1
@buffer_accepted_lengths += trace.length
rescue StandardError => e
Datadog.logger.debug("Failed to measure queue accept. Cause: #{e.message} Source: #{e.backtrace.first}")
end

def measure_drop(trace)
@buffer_dropped += 1
@buffer_spans -= trace.length
@buffer_accepted_lengths -= trace.length
rescue StandardError => e
Datadog.logger.debug("Failed to measure queue drop. Cause: #{e.message} Source: #{e.backtrace.first}")
end

# Initialize metric values
def measure_pop(traces)
# Accepted
Datadog.health_metrics.queue_accepted(@buffer_accepted)
Datadog.health_metrics.queue_accepted_lengths(@buffer_accepted_lengths)

# Dropped
Datadog.health_metrics.queue_dropped(@buffer_dropped)

# Queue gauges
Datadog.health_metrics.queue_max_length(@max_size)
Datadog.health_metrics.queue_spans(@buffer_spans)
Datadog.health_metrics.queue_length(traces.length)

# Reset aggregated metrics
@buffer_accepted = 0
@buffer_accepted_lengths = 0
@buffer_dropped = 0
@buffer_spans = 0
rescue StandardError => e
Datadog.logger.debug("Failed to measure queue. Cause: #{e.message} Source: #{e.backtrace.first}")
end
end

# Trace buffer that stores application traces and
# can be safely used concurrently on any environment.
#
# This implementation uses a {Mutex} around public methods, incurring
# overhead in order to ensure full thread-safety.
#
# This is implementation is recommended for non-CRuby environments.
# If using CRuby, {Datadog::CRubyTraceBuffer} is a faster implementation with minimal compromise.
class ThreadSafeBuffer < MeasuredBuffer
def initialize(max_size)
super()

@max_size = max_size

@mutex = Mutex.new()
@traces = []
@closed = false
end

# Add a new ``trace`` in the local queue. This method doesn't block the execution
Expand Down Expand Up @@ -72,48 +126,103 @@ def close
@closed = true
end
end
end

# Trace buffer that stores application traces and
# can be safely used concurrently with CRuby.
#
# Under extreme concurrency scenarios, this class can exceed
# its +max_size+ by up to 4%.
#
# Because singular +Array+ operations are thread-safe in CRuby,
ericmustin marked this conversation as resolved.
Show resolved Hide resolved
# we can implement the trace buffer without an explicit lock,
# while making the compromise of allowing the buffer to go
# over its maximum limit under extreme circumstances.
#
# On the following scenario:
# * 4.5 million spans/second.
# * Pushed into a single CRubyTraceBuffer from 1000 threads.
# The buffer can exceed its maximum size by no more than 4%.
#
# This implementation allocates less memory and is faster
# than {Datadog::ThreadSafeBuffer}.
#
# @see spec/ddtrace/benchmark/buffer_benchmark_spec.rb Buffer benchmarks
# @see https://github.com/ruby-concurrency/concurrent-ruby/blob/c1114a0c6891d9634f019f1f9fe58dcae8658964/lib/concurrent-ruby/concurrent/array.rb#L23-L27
class CRubyTraceBuffer < MeasuredBuffer
def initialize(max_size)
super()

@max_size = max_size

@traces = []
@closed = false
end

# Aggregate metrics:
# They reflect buffer activity since last #pop.
# These may not be as accurate or as granular, but they
# don't use as much network traffic as live stats.
# Add a new ``trace`` in the local queue. This method doesn't block the execution
# even if the buffer is full. In that case, a random trace is discarded.
def push(trace)
return if @closed
len = @traces.length
if len < @max_size || @max_size <= 0
@traces << trace
else
# we should replace a random trace with the new one
replace_index = rand(len)
replaced_trace = @traces.delete_at(replace_index)
@traces << trace

# Check if we deleted the element right when the buffer
# was popped. In that case we didn't actually delete anything,
# we just inserted into a newly cleared buffer instead.
measure_drop(replaced_trace) if replaced_trace
ericmustin marked this conversation as resolved.
Show resolved Hide resolved
end

def measure_accept(trace)
@buffer_spans += trace.length
@buffer_accepted += 1
@buffer_accepted_lengths += trace.length
rescue StandardError => e
Datadog.logger.debug("Failed to measure queue accept. Cause: #{e.message} Source: #{e.backtrace.first}")
measure_accept(trace)
end

def measure_drop(trace)
@buffer_dropped += 1
@buffer_spans -= trace.length
@buffer_accepted_lengths -= trace.length
rescue StandardError => e
Datadog.logger.debug("Failed to measure queue drop. Cause: #{e.message} Source: #{e.backtrace.first}")
# Return the current number of stored traces.
def length
@traces.length
end

def measure_pop(traces)
# Accepted
Datadog.health_metrics.queue_accepted(@buffer_accepted)
Datadog.health_metrics.queue_accepted_lengths(@buffer_accepted_lengths)
# Return if the buffer is empty.
def empty?
@traces.empty?
end

# Dropped
Datadog.health_metrics.queue_dropped(@buffer_dropped)
# Return all traces stored and reset buffer.
def pop
traces = @traces.pop(VERY_LARGE_INTEGER)

# Queue gauges
Datadog.health_metrics.queue_max_length(@max_size)
Datadog.health_metrics.queue_spans(@buffer_spans)
Datadog.health_metrics.queue_length(traces.length)
measure_pop(traces)

# Reset aggregated metrics
@buffer_accepted = 0
@buffer_accepted_lengths = 0
@buffer_dropped = 0
@buffer_spans = 0
rescue StandardError => e
Datadog.logger.debug("Failed to measure queue. Cause: #{e.message} Source: #{e.backtrace.first}")
traces
end

# Very large value, to ensure that we drain the whole buffer.
# 1<<62-1 happens to be the largest integer that can be stored inline in CRuby.
VERY_LARGE_INTEGER = 1 << 62 - 1

def close
@closed = true
end
end

# Choose default TraceBuffer implementation for current platform.
BUFFER_IMPLEMENTATION = if Datadog::Ext::Runtime::RUBY_ENGINE == 'ruby'
CRubyTraceBuffer
else
ThreadSafeBuffer
end
private_constant :BUFFER_IMPLEMENTATION

# Trace buffer that stores application traces. The buffer has a maximum size and when
# the buffer is full, a random trace is discarded. This class is thread-safe and is used
# automatically by the ``Tracer`` instance when a ``Span`` is finished.
#
# TODO We should restructure this module, so that classes are not declared at top-level ::Datadog.
# TODO Making such a change is potentially breaking for users manually configuring the tracer.
class TraceBuffer < BUFFER_IMPLEMENTATION
end
end
1 change: 1 addition & 0 deletions lib/ddtrace/ext/runtime.rb
Expand Up @@ -7,6 +7,7 @@ module Runtime
LANG = 'ruby'.freeze
LANG_INTERPRETER = (RUBY_ENGINE + '-' + RUBY_PLATFORM).freeze
LANG_VERSION = RUBY_VERSION
RUBY_ENGINE = ::RUBY_ENGINE # e.g. 'ruby', 'jruby', 'truffleruby'
ericmustin marked this conversation as resolved.
Show resolved Hide resolved
TRACER_VERSION = Datadog::VERSION::STRING

TAG_LANG = 'language'.freeze
Expand Down
32 changes: 32 additions & 0 deletions spec/ddtrace/benchmark/buffer_benchmark_spec.rb
@@ -0,0 +1,32 @@
require 'spec_helper'

require_relative 'support/benchmark_helper'

RSpec.describe 'Microbenchmark Buffer' do
let(:max_size) { Datadog::Workers::AsyncTransport::DEFAULT_BUFFER_MAX_SIZE }
let(:span) { get_test_traces(1).flatten }

let(:steps) { [max_size / 100, max_size / 10, max_size, max_size * 2] } # Number of elements pushed to buffer

def subject(pushes)
i = 0
while i < pushes
buffer.push(span)
i += 1
end

buffer.pop
end

describe 'CRubyTraceBuffer' do
before { skip unless PlatformHelpers.mri? }

include_examples 'benchmark'
let(:buffer) { Datadog::CRubyTraceBuffer.new(max_size) }
end

describe 'ThreadSafeBuffer' do
include_examples 'benchmark'
let(:buffer) { Datadog::ThreadSafeBuffer.new(max_size) }
end
end
77 changes: 77 additions & 0 deletions spec/ddtrace/buffer_spec.rb
Expand Up @@ -3,11 +3,28 @@
require 'ddtrace'
require 'ddtrace/buffer'

require 'concurrent'

RSpec.describe Datadog::TraceBuffer do
subject(:buffer_class) { described_class }

context 'with CRuby' do
before { skip unless PlatformHelpers.mri? }
it { is_expected.to be <= Datadog::CRubyTraceBuffer }
end

context 'with JRuby' do
before { skip unless PlatformHelpers.jruby? }
it { is_expected.to be <= Datadog::ThreadSafeBuffer }
end
end

RSpec.shared_examples 'trace buffer' do
include_context 'health metrics'

subject(:buffer) { described_class.new(max_size) }
let(:max_size) { 0 }
let(:max_size_leniency) { defined?(super) ? super() : 1 } # Multiplier to allowed max_size

def measure_traces_size(traces)
traces.inject(Datadog::Runtime::ObjectSpace.estimate_bytesize(traces)) do |sum, trace|
Expand Down Expand Up @@ -137,6 +154,54 @@ def measure_trace_size(trace)
expect(output).to_not be nil
expect(output.sort).to eq((0..thread_count - 1).map { |i| [i] })
end

context 'with items exceeding maximum size' do
let(:max_size) { 100 }
let(:thread_count) { 1000 }
let(:barrier) { Concurrent::CyclicBarrier.new(thread_count) }
let(:threads) do
buffer
barrier

Array.new(thread_count) do |i|
Thread.new do
barrier.wait
1000.times { buffer.push([i]) }
end
end
end

it 'does not exceed expected maximum size' do
push
expect(output).to have_at_most(max_size * max_size_leniency).items
end

context 'with #pop operations' do
let(:barrier) { Concurrent::CyclicBarrier.new(thread_count + 1) }

before do
allow(Datadog).to receive(:logger).and_return(double)
end

it 'executes without error' do
threads

barrier.wait
1000.times do
buffer.pop

# Yield control to threads to increase contention.
# Otherwise we might run #pop a few times in succession,
# which doesn't help us stress test this case.
sleep 0
end

threads.each(&:kill)

push
end
end
end
end
end

Expand Down Expand Up @@ -201,3 +266,15 @@ def measure_trace_size(trace)
end
end
end

RSpec.describe Datadog::ThreadSafeBuffer do
it_behaves_like 'trace buffer'
end

RSpec.describe Datadog::CRubyTraceBuffer do
before { skip unless PlatformHelpers.mri? }

it_behaves_like 'trace buffer' do
let(:max_size_leniency) { 1.04 } # 4%
end
end