Skip to content

Commit

Permalink
Faster TraceBuffer for CRuby
Browse files Browse the repository at this point in the history
  • Loading branch information
marcotc committed Sep 14, 2020
1 parent 9c56859 commit 94f0d38
Show file tree
Hide file tree
Showing 5 changed files with 250 additions and 40 deletions.
1 change: 1 addition & 0 deletions ddtrace.gemspec
Expand Up @@ -41,6 +41,7 @@ Gem::Specification.new do |spec|
spec.add_development_dependency 'opentracing', '>= 0.4.1'

# Development dependencies
spec.add_development_dependency 'concurrent-ruby' # Leave it open as we also have it as an integration and want Appraisal to control the version under test.
spec.add_development_dependency 'rake', '>= 10.5'
spec.add_development_dependency 'rubocop', '= 0.49.1' if RUBY_VERSION >= '2.1.0'
spec.add_development_dependency 'rspec', '~> 3.0'
Expand Down
181 changes: 141 additions & 40 deletions lib/ddtrace/buffer.rb
Expand Up @@ -2,23 +2,74 @@
require 'ddtrace/diagnostics/health'
require 'ddtrace/runtime/object_space'

# Thread-safe trace buffer
module Datadog
# Aggregate metrics:
# They reflect buffer activity since last #pop.
# These may not be as accurate or as granular, but they
# don't use as much network traffic as live stats.
class MeasuredBuffer
def initialize
@buffer_accepted = 0
@buffer_accepted_lengths = 0
@buffer_dropped = 0
@buffer_spans = 0
end

def measure_accept(trace)
@buffer_spans += trace.length
@buffer_accepted += 1
@buffer_accepted_lengths += trace.length
rescue StandardError => e
Datadog.logger.debug("Failed to measure queue accept. Cause: #{e.message} Source: #{e.backtrace.first}")
end

def measure_drop(trace)
@buffer_dropped += 1
@buffer_spans -= trace.length
@buffer_accepted_lengths -= trace.length
rescue StandardError => e
Datadog.logger.debug("Failed to measure queue drop. Cause: #{e.message} Source: #{e.backtrace.first}")
end

def measure_pop(traces)
# Accepted
Datadog.health_metrics.queue_accepted(@buffer_accepted)
Datadog.health_metrics.queue_accepted_lengths(@buffer_accepted_lengths)

# Dropped
Datadog.health_metrics.queue_dropped(@buffer_dropped)

# Queue gauges
Datadog.health_metrics.queue_max_length(@max_size)
Datadog.health_metrics.queue_spans(@buffer_spans)
Datadog.health_metrics.queue_length(traces.length)

# Reset aggregated metrics
@buffer_accepted = 0
@buffer_accepted_lengths = 0
@buffer_dropped = 0
@buffer_spans = 0
rescue StandardError => e
Datadog.logger.debug("Failed to measure queue. Cause: #{e.message} Source: #{e.backtrace.first}")
end
end

# Trace buffer that stores application traces. The buffer has a maximum size and when
# the buffer is full, a random trace is discarded. This class is thread-safe and is used
# automatically by the ``Tracer`` instance when a ``Span`` is finished.
class TraceBuffer
#
# This is implementation is recommended for non-CRuby environments.
# If using CRuby, {Datadog::CRubyTraceBuffer} is a faster implementation with minimal compromise.
class ThreadSafeBuffer < MeasuredBuffer
def initialize(max_size)
super()

@max_size = max_size

@mutex = Mutex.new()
@traces = []
@closed = false

# Initialize metric values
@buffer_accepted = 0
@buffer_accepted_lengths = 0
@buffer_dropped = 0
@buffer_spans = 0
end

# Add a new ``trace`` in the local queue. This method doesn't block the execution
Expand Down Expand Up @@ -72,48 +123,98 @@ def close
@closed = true
end
end
end

# Trace buffer that stores application traces. The buffer has a maximum size and when
# the buffer is full, a random trace is discarded. This class is thread-safe and is used
# automatically by the ``Tracer`` instance when a ``Span`` is finished.
#
# Because singular +Array+ operations are thread-safe in CRuby, we can implement the trace
# buffer without an explicit lock, while making the compromise of allowing the buffer
# to go over its maximum limit by up to 3% in extreme circumstances.
#
# On the following scenario:
# * 4.5 million spans/second.
# * Pushed into a single CRubyTraceBuffer from 1000 threads.
# The buffer exceeded its maximum size by no more than 4%.
#
# TODO This implementation is XXXX faster
#
# @see https://github.com/ruby-concurrency/concurrent-ruby/blob/c1114a0c6891d9634f019f1f9fe58dcae8658964/lib/concurrent-ruby/concurrent/array.rb#L23-L27
class CRubyTraceBuffer < MeasuredBuffer
def initialize(max_size)
super()

@max_size = max_size

@traces = []
@closed = false
end

# Aggregate metrics:
# They reflect buffer activity since last #pop.
# These may not be as accurate or as granular, but they
# don't use as much network traffic as live stats.
# Add a new ``trace`` in the local queue. This method doesn't block the execution
# even if the buffer is full. In that case, a random trace is discarded.
def push(trace)
return if @closed
len = @traces.length
if len < @max_size || @max_size <= 0
@traces << trace
else
# we should replace a random trace with the new one
replace_index = rand(len)
replaced_trace = @traces.delete_at(replace_index)
@traces << trace

# Check if we deleted the element right when the buffer
# was popped. In that case we didn't actually delete anything,
# we just inserted into a newly cleared buffer instead.
measure_drop(replaced_trace) if replaced_trace
end

def measure_accept(trace)
@buffer_spans += trace.length
@buffer_accepted += 1
@buffer_accepted_lengths += trace.length
rescue StandardError => e
Datadog.logger.debug("Failed to measure queue accept. Cause: #{e.message} Source: #{e.backtrace.first}")
measure_accept(trace)
end

def measure_drop(trace)
@buffer_dropped += 1
@buffer_spans -= trace.length
@buffer_accepted_lengths -= trace.length
rescue StandardError => e
Datadog.logger.debug("Failed to measure queue drop. Cause: #{e.message} Source: #{e.backtrace.first}")
# Return the current number of stored traces.
def length
@traces.length
end

def measure_pop(traces)
# Accepted
Datadog.health_metrics.queue_accepted(@buffer_accepted)
Datadog.health_metrics.queue_accepted_lengths(@buffer_accepted_lengths)
# Return if the buffer is empty.
def empty?
@traces.empty?
end

# Dropped
Datadog.health_metrics.queue_dropped(@buffer_dropped)
# Return all traces stored and reset buffer.
def pop
traces = @traces.pop(VERY_LARGE_INTEGER)

# Queue gauges
Datadog.health_metrics.queue_max_length(@max_size)
Datadog.health_metrics.queue_spans(@buffer_spans)
Datadog.health_metrics.queue_length(traces.length)
measure_pop(traces)

# Reset aggregated metrics
@buffer_accepted = 0
@buffer_accepted_lengths = 0
@buffer_dropped = 0
@buffer_spans = 0
rescue StandardError => e
Datadog.logger.debug("Failed to measure queue. Cause: #{e.message} Source: #{e.backtrace.first}")
traces
end

# Very large value, to ensure that we drain the whole buffer.
# 1<<62-1 happens to be the largest integer that can be stored inline in CRuby.
VERY_LARGE_INTEGER = 1 << 62 - 1

def close
@closed = true
end
end

# Choose default TraceBuffer implementation for current platform.
BUFFER_IMPLEMENTATION = if Datadog::Ext::Runtime::RUBY_ENGINE == 'ruby'
CRubyTraceBuffer
else
ThreadSafeBuffer
end
private_constant :BUFFER_IMPLEMENTATION

# Trace buffer that stores application traces. The buffer has a maximum size and when
# the buffer is full, a random trace is discarded. This class is thread-safe and is used
# automatically by the ``Tracer`` instance when a ``Span`` is finished.
#
# TODO We should restructure this module, so that classes are not declared at top-level ::Datadog.
# TODO Making such a change is potentially breaking for users manually configuring the tracer.
class TraceBuffer < BUFFER_IMPLEMENTATION
end
end
1 change: 1 addition & 0 deletions lib/ddtrace/ext/runtime.rb
Expand Up @@ -7,6 +7,7 @@ module Runtime
LANG = 'ruby'.freeze
LANG_INTERPRETER = (RUBY_ENGINE + '-' + RUBY_PLATFORM).freeze
LANG_VERSION = RUBY_VERSION
RUBY_ENGINE = ::RUBY_ENGINE # e.g. 'ruby', 'jruby', 'truffleruby'
TRACER_VERSION = Datadog::VERSION::STRING

TAG_LANG = 'language'.freeze
Expand Down
30 changes: 30 additions & 0 deletions spec/ddtrace/benchmark/buffer_benchmark_spec.rb
@@ -0,0 +1,30 @@
require 'spec_helper'

require_relative 'support/benchmark_helper'

RSpec.describe 'Microbenchmark Buffer' do
let(:max_size) { Datadog::Workers::AsyncTransport::DEFAULT_BUFFER_MAX_SIZE }
let(:span) { get_test_traces(1).flatten }

let(:steps) { [max_size / 10, max_size * 2] } # Number of elements pushed to buffer

def subject(pushes)
i = 0
while i < pushes
buffer.push(span)
i += 1
end

buffer.pop
end

describe 'TraceBuffer' do
include_examples 'benchmark'
let(:buffer) { Datadog::TraceBuffer.new(max_size) }
end

describe 'CRuby' do
include_examples 'benchmark'
let(:buffer) { Datadog::CRubyTraceBuffer.new(max_size) }
end
end
77 changes: 77 additions & 0 deletions spec/ddtrace/buffer_spec.rb
Expand Up @@ -3,11 +3,28 @@
require 'ddtrace'
require 'ddtrace/buffer'

require 'concurrent'

RSpec.describe Datadog::TraceBuffer do
subject(:buffer_class) { described_class }

context 'with CRuby' do
before { skip unless PlatformHelpers.mri? }
it { is_expected.to be <= Datadog::CRubyTraceBuffer }
end

context 'with JRuby' do
before { skip unless PlatformHelpers.jruby? }
it { is_expected.to be <= Datadog::ThreadSafeBuffer }
end
end

RSpec.shared_examples 'trace buffer' do
include_context 'health metrics'

subject(:buffer) { described_class.new(max_size) }
let(:max_size) { 0 }
let(:max_size_leniency) { defined?(super) ? super() : 1 } # Multiplier to allowed max_size

def measure_traces_size(traces)
traces.inject(Datadog::Runtime::ObjectSpace.estimate_bytesize(traces)) do |sum, trace|
Expand Down Expand Up @@ -137,6 +154,54 @@ def measure_trace_size(trace)
expect(output).to_not be nil
expect(output.sort).to eq((0..thread_count - 1).map { |i| [i] })
end

context 'with items exceeding maximum size' do
let(:max_size) { 100 }
let(:thread_count) { 1000 }
let(:barrier) { Concurrent::CyclicBarrier.new(thread_count) }
let(:threads) do
buffer
barrier

Array.new(thread_count) do |i|
Thread.new do
barrier.wait
1000.times { buffer.push([i]) }
end
end
end

it 'does not exceed expected maximum size' do
push
expect(output).to have_at_most(max_size * max_size_leniency).items
end

context 'with #pop operations' do
let(:barrier) { Concurrent::CyclicBarrier.new(thread_count + 1) }

before do
allow(Datadog).to receive(:logger).and_return(double)
end

it 'executes without error' do
threads

barrier.wait
1000.times do
buffer.pop

# Yield control to threads to increase contention.
# Otherwise we might run #pop a few times in succession,
# which doesn't help us stress test this case.
sleep 0
end

threads.each(&:kill)

push
end
end
end
end
end

Expand Down Expand Up @@ -201,3 +266,15 @@ def measure_trace_size(trace)
end
end
end

RSpec.describe Datadog::ThreadSafeBuffer do
it_behaves_like 'trace buffer'
end

RSpec.describe Datadog::CRubyTraceBuffer do
before { skip unless PlatformHelpers.mri? }

it_behaves_like 'trace buffer' do
let(:max_size_leniency) { 1.04 } # 4%
end
end

0 comments on commit 94f0d38

Please sign in to comment.