diff --git a/ddtrace.gemspec b/ddtrace.gemspec index 8a577e0471a..2e3d82b36fd 100644 --- a/ddtrace.gemspec +++ b/ddtrace.gemspec @@ -41,6 +41,7 @@ Gem::Specification.new do |spec| spec.add_development_dependency 'opentracing', '>= 0.4.1' # Development dependencies + spec.add_development_dependency 'concurrent-ruby' # Leave it open as we also have it as an integration and want Appraisal to control the version under test. spec.add_development_dependency 'rake', '>= 10.5' spec.add_development_dependency 'rubocop', '= 0.49.1' if RUBY_VERSION >= '2.1.0' spec.add_development_dependency 'rspec', '~> 3.0' diff --git a/lib/ddtrace/buffer.rb b/lib/ddtrace/buffer.rb index 69fabc765e1..e6aa9a71cf9 100644 --- a/lib/ddtrace/buffer.rb +++ b/lib/ddtrace/buffer.rb @@ -2,23 +2,74 @@ require 'ddtrace/diagnostics/health' require 'ddtrace/runtime/object_space' +# Thread-safe trace buffer module Datadog + # Aggregate metrics: + # They reflect buffer activity since last #pop. + # These may not be as accurate or as granular, but they + # don't use as much network traffic as live stats. + class MeasuredBuffer + def initialize + @buffer_accepted = 0 + @buffer_accepted_lengths = 0 + @buffer_dropped = 0 + @buffer_spans = 0 + end + + def measure_accept(trace) + @buffer_spans += trace.length + @buffer_accepted += 1 + @buffer_accepted_lengths += trace.length + rescue StandardError => e + Datadog.logger.debug("Failed to measure queue accept. Cause: #{e.message} Source: #{e.backtrace.first}") + end + + def measure_drop(trace) + @buffer_dropped += 1 + @buffer_spans -= trace.length + @buffer_accepted_lengths -= trace.length + rescue StandardError => e + Datadog.logger.debug("Failed to measure queue drop. Cause: #{e.message} Source: #{e.backtrace.first}") + end + + def measure_pop(traces) + # Accepted + Datadog.health_metrics.queue_accepted(@buffer_accepted) + Datadog.health_metrics.queue_accepted_lengths(@buffer_accepted_lengths) + + # Dropped + Datadog.health_metrics.queue_dropped(@buffer_dropped) + + # Queue gauges + Datadog.health_metrics.queue_max_length(@max_size) + Datadog.health_metrics.queue_spans(@buffer_spans) + Datadog.health_metrics.queue_length(traces.length) + + # Reset aggregated metrics + @buffer_accepted = 0 + @buffer_accepted_lengths = 0 + @buffer_dropped = 0 + @buffer_spans = 0 + rescue StandardError => e + Datadog.logger.debug("Failed to measure queue. Cause: #{e.message} Source: #{e.backtrace.first}") + end + end + # Trace buffer that stores application traces. The buffer has a maximum size and when # the buffer is full, a random trace is discarded. This class is thread-safe and is used # automatically by the ``Tracer`` instance when a ``Span`` is finished. - class TraceBuffer + # + # This is implementation is recommended for non-CRuby environments. + # If using CRuby, {Datadog::CRubyTraceBuffer} is a faster implementation with minimal compromise. + class ThreadSafeBuffer < MeasuredBuffer def initialize(max_size) + super() + @max_size = max_size @mutex = Mutex.new() @traces = [] @closed = false - - # Initialize metric values - @buffer_accepted = 0 - @buffer_accepted_lengths = 0 - @buffer_dropped = 0 - @buffer_spans = 0 end # Add a new ``trace`` in the local queue. This method doesn't block the execution @@ -72,48 +123,98 @@ def close @closed = true end end + end + + # Trace buffer that stores application traces. The buffer has a maximum size and when + # the buffer is full, a random trace is discarded. This class is thread-safe and is used + # automatically by the ``Tracer`` instance when a ``Span`` is finished. + # + # Because singular +Array+ operations are thread-safe in CRuby, we can implement the trace + # buffer without an explicit lock, while making the compromise of allowing the buffer + # to go over its maximum limit by up to 3% in extreme circumstances. + # + # On the following scenario: + # * 4.5 million spans/second. + # * Pushed into a single CRubyTraceBuffer from 1000 threads. + # The buffer exceeded its maximum size by no more than 4%. + # + # TODO This implementation is XXXX faster + # + # @see https://github.com/ruby-concurrency/concurrent-ruby/blob/c1114a0c6891d9634f019f1f9fe58dcae8658964/lib/concurrent-ruby/concurrent/array.rb#L23-L27 + class CRubyTraceBuffer < MeasuredBuffer + def initialize(max_size) + super() + + @max_size = max_size + + @traces = [] + @closed = false + end - # Aggregate metrics: - # They reflect buffer activity since last #pop. - # These may not be as accurate or as granular, but they - # don't use as much network traffic as live stats. + # Add a new ``trace`` in the local queue. This method doesn't block the execution + # even if the buffer is full. In that case, a random trace is discarded. + def push(trace) + return if @closed + len = @traces.length + if len < @max_size || @max_size <= 0 + @traces << trace + else + # we should replace a random trace with the new one + replace_index = rand(len) + replaced_trace = @traces.delete_at(replace_index) + @traces << trace + + # Check if we deleted the element right when the buffer + # was popped. In that case we didn't actually delete anything, + # we just inserted into a newly cleared buffer instead. + measure_drop(replaced_trace) if replaced_trace + end - def measure_accept(trace) - @buffer_spans += trace.length - @buffer_accepted += 1 - @buffer_accepted_lengths += trace.length - rescue StandardError => e - Datadog.logger.debug("Failed to measure queue accept. Cause: #{e.message} Source: #{e.backtrace.first}") + measure_accept(trace) end - def measure_drop(trace) - @buffer_dropped += 1 - @buffer_spans -= trace.length - @buffer_accepted_lengths -= trace.length - rescue StandardError => e - Datadog.logger.debug("Failed to measure queue drop. Cause: #{e.message} Source: #{e.backtrace.first}") + # Return the current number of stored traces. + def length + @traces.length end - def measure_pop(traces) - # Accepted - Datadog.health_metrics.queue_accepted(@buffer_accepted) - Datadog.health_metrics.queue_accepted_lengths(@buffer_accepted_lengths) + # Return if the buffer is empty. + def empty? + @traces.empty? + end - # Dropped - Datadog.health_metrics.queue_dropped(@buffer_dropped) + # Return all traces stored and reset buffer. + def pop + traces = @traces.pop(VERY_LARGE_INTEGER) - # Queue gauges - Datadog.health_metrics.queue_max_length(@max_size) - Datadog.health_metrics.queue_spans(@buffer_spans) - Datadog.health_metrics.queue_length(traces.length) + measure_pop(traces) - # Reset aggregated metrics - @buffer_accepted = 0 - @buffer_accepted_lengths = 0 - @buffer_dropped = 0 - @buffer_spans = 0 - rescue StandardError => e - Datadog.logger.debug("Failed to measure queue. Cause: #{e.message} Source: #{e.backtrace.first}") + traces + end + + # Very large value, to ensure that we drain the whole buffer. + # 1<<62-1 happens to be the largest integer that can be stored inline in CRuby. + VERY_LARGE_INTEGER = 1 << 62 - 1 + + def close + @closed = true end end + + # Choose default TraceBuffer implementation for current platform. + BUFFER_IMPLEMENTATION = if Datadog::Ext::Runtime::RUBY_ENGINE == 'ruby' + CRubyTraceBuffer + else + ThreadSafeBuffer + end + private_constant :BUFFER_IMPLEMENTATION + + # Trace buffer that stores application traces. The buffer has a maximum size and when + # the buffer is full, a random trace is discarded. This class is thread-safe and is used + # automatically by the ``Tracer`` instance when a ``Span`` is finished. + # + # TODO We should restructure this module, so that classes are not declared at top-level ::Datadog. + # TODO Making such a change is potentially breaking for users manually configuring the tracer. + class TraceBuffer < BUFFER_IMPLEMENTATION + end end diff --git a/lib/ddtrace/ext/runtime.rb b/lib/ddtrace/ext/runtime.rb index f5f7230aa9d..551d29d98e9 100644 --- a/lib/ddtrace/ext/runtime.rb +++ b/lib/ddtrace/ext/runtime.rb @@ -7,6 +7,7 @@ module Runtime LANG = 'ruby'.freeze LANG_INTERPRETER = (RUBY_ENGINE + '-' + RUBY_PLATFORM).freeze LANG_VERSION = RUBY_VERSION + RUBY_ENGINE = ::RUBY_ENGINE # e.g. 'ruby', 'jruby', 'truffleruby' TRACER_VERSION = Datadog::VERSION::STRING TAG_LANG = 'language'.freeze diff --git a/spec/ddtrace/benchmark/buffer_benchmark_spec.rb b/spec/ddtrace/benchmark/buffer_benchmark_spec.rb new file mode 100644 index 00000000000..2bea2f4f644 --- /dev/null +++ b/spec/ddtrace/benchmark/buffer_benchmark_spec.rb @@ -0,0 +1,30 @@ +require 'spec_helper' + +require_relative 'support/benchmark_helper' + +RSpec.describe 'Microbenchmark Buffer' do + let(:max_size) { Datadog::Workers::AsyncTransport::DEFAULT_BUFFER_MAX_SIZE } + let(:span) { get_test_traces(1).flatten } + + let(:steps) { [max_size / 10, max_size * 2] } # Number of elements pushed to buffer + + def subject(pushes) + i = 0 + while i < pushes + buffer.push(span) + i += 1 + end + + buffer.pop + end + + describe 'TraceBuffer' do + include_examples 'benchmark' + let(:buffer) { Datadog::TraceBuffer.new(max_size) } + end + + describe 'CRuby' do + include_examples 'benchmark' + let(:buffer) { Datadog::CRubyTraceBuffer.new(max_size) } + end +end diff --git a/spec/ddtrace/buffer_spec.rb b/spec/ddtrace/buffer_spec.rb index 306777aa902..1133b4f020b 100644 --- a/spec/ddtrace/buffer_spec.rb +++ b/spec/ddtrace/buffer_spec.rb @@ -3,11 +3,28 @@ require 'ddtrace' require 'ddtrace/buffer' +require 'concurrent' + RSpec.describe Datadog::TraceBuffer do + subject(:buffer_class) { described_class } + + context 'with CRuby' do + before { skip unless PlatformHelpers.mri? } + it { is_expected.to be <= Datadog::CRubyTraceBuffer } + end + + context 'with JRuby' do + before { skip unless PlatformHelpers.jruby? } + it { is_expected.to be <= Datadog::ThreadSafeBuffer } + end +end + +RSpec.shared_examples 'trace buffer' do include_context 'health metrics' subject(:buffer) { described_class.new(max_size) } let(:max_size) { 0 } + let(:max_size_leniency) { defined?(super) ? super() : 1 } # Multiplier to allowed max_size def measure_traces_size(traces) traces.inject(Datadog::Runtime::ObjectSpace.estimate_bytesize(traces)) do |sum, trace| @@ -137,6 +154,54 @@ def measure_trace_size(trace) expect(output).to_not be nil expect(output.sort).to eq((0..thread_count - 1).map { |i| [i] }) end + + context 'with items exceeding maximum size' do + let(:max_size) { 100 } + let(:thread_count) { 1000 } + let(:barrier) { Concurrent::CyclicBarrier.new(thread_count) } + let(:threads) do + buffer + barrier + + Array.new(thread_count) do |i| + Thread.new do + barrier.wait + 1000.times { buffer.push([i]) } + end + end + end + + it 'does not exceed expected maximum size' do + push + expect(output).to have_at_most(max_size * max_size_leniency).items + end + + context 'with #pop operations' do + let(:barrier) { Concurrent::CyclicBarrier.new(thread_count + 1) } + + before do + allow(Datadog).to receive(:logger).and_return(double) + end + + it 'executes without error' do + threads + + barrier.wait + 1000.times do + buffer.pop + + # Yield control to threads to increase contention. + # Otherwise we might run #pop a few times in succession, + # which doesn't help us stress test this case. + sleep 0 + end + + threads.each(&:kill) + + push + end + end + end end end @@ -201,3 +266,15 @@ def measure_trace_size(trace) end end end + +RSpec.describe Datadog::ThreadSafeBuffer do + it_behaves_like 'trace buffer' +end + +RSpec.describe Datadog::CRubyTraceBuffer do + before { skip unless PlatformHelpers.mri? } + + it_behaves_like 'trace buffer' do + let(:max_size_leniency) { 1.04 } # 4% + end +end