From 468b9d1735e86798a996c08dfdd3e91cfaf6ae81 Mon Sep 17 00:00:00 2001 From: Marco Costa Date: Thu, 3 Sep 2020 14:21:48 -0400 Subject: [PATCH 1/7] Reduce memory usage of the HTTP transport --- lib/ddtrace/span.rb | 55 ++++++ lib/ddtrace/transport/http/adapters/net.rb | 10 +- lib/ddtrace/transport/http/statistics.rb | 15 +- lib/ddtrace/transport/traces.rb | 9 +- spec/ddtrace/benchmark/microbenchmark_spec.rb | 147 +-------------- .../benchmark/support/benchmark_helper.rb | 176 ++++++++++++++++++ .../benchmark/transport_benchmark_spec.rb | 76 ++++++++ .../contrib/http/circuit_breaker_spec.rb | 2 +- spec/ddtrace/span_spec.rb | 52 ++++++ .../transport/http/adapters/net_spec.rb | 15 +- .../ddtrace/transport/http/statistics_spec.rb | 6 + 11 files changed, 405 insertions(+), 158 deletions(-) create mode 100644 spec/ddtrace/benchmark/support/benchmark_helper.rb create mode 100644 spec/ddtrace/benchmark/transport_benchmark_spec.rb diff --git a/lib/ddtrace/span.rb b/lib/ddtrace/span.rb index 2047641e5f8..a8f5ffc357e 100644 --- a/lib/ddtrace/span.rb +++ b/lib/ddtrace/span.rb @@ -254,6 +254,61 @@ def to_hash h end + # MessagePack serializer interface. Making this object + # respond to `#to_msgpack` allows it to be automatically + # serialized by MessagePack. + # + # This is more efficient than doing +MessagePack.pack(span.to_hash)+ + # as we don't have to create an intermediate Hash. + # + # @param packer [MessagePack::Packer] serialization buffer, can be +nil+ with JRuby + def to_msgpack(packer = nil) + # As of 1.3.3, JRuby implementation doesn't pass an existing packer + packer ||= MessagePack::Packer.new + + if !@start_time.nil? && !@end_time.nil? + packer.write_map_header(13) # Set header with how many elements in the map + + packer.write(:start) + packer.write((@start_time.to_f * 1e9).to_i) + + packer.write(:duration) + packer.write(((@end_time - @start_time) * 1e9).to_i) + else + packer.write_map_header(11) # Set header with how many elements in the map + end + + packer.write(:span_id) + packer.write(@span_id) + packer.write(:parent_id) + packer.write(@parent_id) + packer.write(:trace_id) + packer.write(@trace_id) + packer.write(:name) + packer.write(@name) + packer.write(:service) + packer.write(@service) + packer.write(:resource) + packer.write(@resource) + packer.write(:type) + packer.write(@span_type) + packer.write(:meta) + packer.write(@meta) + packer.write(:metrics) + packer.write(@metrics) + packer.write(:allocations) + packer.write(allocations) + packer.write(:error) + packer.write(@status) + packer + end + + # JSON serializer interface. + # Used by older version of the transport. + def to_json(*args) + to_hash.to_json(*args) + end + # Return a human readable version of the span def pretty_print(q) start_time = (@start_time.to_f * 1e9).to_i rescue '-' diff --git a/lib/ddtrace/transport/http/adapters/net.rb b/lib/ddtrace/transport/http/adapters/net.rb index cefd8203cb8..010cd1b76d4 100644 --- a/lib/ddtrace/transport/http/adapters/net.rb +++ b/lib/ddtrace/transport/http/adapters/net.rb @@ -20,8 +20,14 @@ def initialize(hostname, port, options = {}) end def open - # Open connection - ::Net::HTTP.start(hostname, port, open_timeout: timeout, read_timeout: timeout) do |http| + # DEV Initializing +Net::HTTP+ directly help us avoid expensive + # options processing done in +Net::HTTP.start+: + # https://github.com/ruby/ruby/blob/b2d96abb42abbe2e01f010ffc9ac51f0f9a50002/lib/net/http.rb#L614-L618 + req = ::Net::HTTP.new(hostname, port, nil) + + req.open_timeout = req.read_timeout = timeout + + req.start do |http| yield(http) end end diff --git a/lib/ddtrace/transport/http/statistics.rb b/lib/ddtrace/transport/http/statistics.rb index 8e7325c0746..f6b7e3f80ee 100644 --- a/lib/ddtrace/transport/http/statistics.rb +++ b/lib/ddtrace/transport/http/statistics.rb @@ -18,11 +18,24 @@ def metrics_for_response(response) # Add status code tag to api.responses metric if metrics.key?(:api_responses) (metrics[:api_responses].options[:tags] ||= []).tap do |tags| - tags << "status_code:#{response.code}" + tags << metrics_tag_value(response.code) end end end end + + private + + # The most common status code on a healthy tracer + STATUS_CODE_200 = 'status_code:200'.freeze + + def metrics_tag_value(status_code) + if status_code == 200 + STATUS_CODE_200 # DEV Saves string concatenation/creation for common case + else + "status_code:#{status_code}" + end + end end end end diff --git a/lib/ddtrace/transport/traces.rb b/lib/ddtrace/transport/traces.rb index 77da94a84dc..1f4f2147e9c 100644 --- a/lib/ddtrace/transport/traces.rb +++ b/lib/ddtrace/transport/traces.rb @@ -57,7 +57,12 @@ def initialize(encoder, max_size: DEFAULT_MAX_PAYLOAD_SIZE) # @return [Enumerable[Array[Bytes,Integer]]] list of encoded chunks: each containing a byte array and # number of traces def encode_in_chunks(traces) - encoded_traces = traces.map { |t| encode_one(t) }.reject(&:nil?) + encoded_traces = if traces.respond_to?(:filter_map) + # DEV Supported since Ruby 2.7, saves an intermediate object creation + traces.filter_map { |t| encode_one(t) } + else + traces.map { |t| encode_one(t) }.reject(&:nil?) + end Datadog::Chunker.chunk_by_size(encoded_traces, max_size).map do |chunk| [encoder.join(chunk), chunk.size] @@ -86,7 +91,7 @@ module Encoder module_function def encode_trace(encoder, trace) - encoder.encode(trace.map(&:to_hash)) + encoder.encode(trace) end end diff --git a/spec/ddtrace/benchmark/microbenchmark_spec.rb b/spec/ddtrace/benchmark/microbenchmark_spec.rb index 94e2c4de894..378b0b04913 100644 --- a/spec/ddtrace/benchmark/microbenchmark_spec.rb +++ b/spec/ddtrace/benchmark/microbenchmark_spec.rb @@ -1,153 +1,8 @@ require 'spec_helper' -require 'datadog/statsd' -require 'ddtrace' - -require 'benchmark/ips' -if !PlatformHelpers.jruby? && Gem::Version.new(RUBY_VERSION) >= Gem::Version.new('2.1.0') - require 'benchmark/memory' - require 'memory_profiler' -end - -require 'fileutils' -require 'json' +require_relative 'support/benchmark_helper' RSpec.describe 'Microbenchmark' do - shared_context 'benchmark' do - # When applicable, runs the test subject for different input sizes. - # Similar to how N in Big O notation works. - # - # This value is provided to the `subject(i)` method in order for the test - # to appropriately execute its run based on input size. - let(:steps) { defined?(super) ? super() : [1, 10, 100] } - - # How many times we run our program when testing for memory allocation. - # In theory, we should only need to run it once, as memory tests are not - # dependent on competing system resources. - # But occasionally we do see a few blimps of inconsistency, making the benchmarks skewed. - # By running the benchmarked snippet many times, we drown out any one-off anomalies, allowing - # the real memory culprits to surface. - let(:memory_iterations) { defined?(super) ? super() : 100 } - - # Outputs human readable information to STDERR. - # Most of the benchmarks have nicely formatted reports - # that are by default printed to terminal. - before do |e| - @test = e.metadata[:example_group][:full_description] - @type = e.description - - STDERR.puts "Test:#{e.metadata[:example_group][:full_description]} #{e.description}" - - # Warm up - steps.each do |s| - subject(s) - end - end - - # Report JSON result objects to ./tmp/benchmark/ folder - # Theses results can be historically tracked (e.g. plotting) if needed. - def write_result(result) - STDERR.puts(@test, @type, result) - - path = File.join('tmp', 'benchmark', @test, @type) - FileUtils.mkdir_p(File.dirname(path)) - - File.write(path, JSON.pretty_generate(result)) - - STDERR.puts("Result written to #{path}") - end - - # Measure execution time - it 'timing' do - report = Benchmark.ips do |x| - x.config(time: 1, warmup: 0.1) - - steps.each do |s| - x.report(s) do - subject(s) - end - end - - x.compare! - end - - result = report.entries.each_with_object({}) do |entry, hash| - hash[entry.label] = { ips: entry.stats.central_tendency, error: entry.stats.error_percentage / 100 } - end.to_h - - write_result(result) - end - - # Measure memory usage (object creation and memory size) - it 'memory' do - if PlatformHelpers.jruby? || Gem::Version.new(RUBY_VERSION) < Gem::Version.new('2.1.0') - skip("'benchmark/memory' not supported") - end - - report = Benchmark.memory do |x| - steps.each do |s| - x.report(s) do - memory_iterations.times { subject(s) } - end - end - - x.compare! - end - - result = report.entries.map do |entry| - row = entry.measurement.map do |metric| - { type: metric.type, allocated: metric.allocated, retained: metric.retained } - end - - [entry.label, row] - end.to_h - - write_result(result) - end - - # Measure GC cycles triggered during run - it 'gc' do - skip if PlatformHelpers.jruby? - - io = StringIO.new - GC::Profiler.enable - - memory_iterations.times { subject(steps[0]) } - - GC.disable # Prevent data collection from influencing results - - data = GC::Profiler.raw_data - GC::Profiler.report(io) - GC::Profiler.disable - - GC.enable - - puts io.string - - result = { count: data.size, time: data.map { |d| d[:GC_TIME] }.inject(0, &:+) } - write_result(result) - end - - # Reports that generate non-aggregated data. - # Useful for debugging. - context 'detailed report' do - before { skip('Detailed report are too verbose for CI') if ENV.key?('CI') } - - # Memory report with reference to each allocation site - it 'memory report' do - if PlatformHelpers.jruby? || Gem::Version.new(RUBY_VERSION) < Gem::Version.new('2.1.0') - skip("'benchmark/memory' not supported") - end - - report = MemoryProfiler.report do - memory_iterations.times { subject(steps[0]) } - end - - report.pretty_print - end - end - end - # Empty benchmark to assess the overhead benchmarking tools # and stability of underlying hardware. describe 'baseline' do diff --git a/spec/ddtrace/benchmark/support/benchmark_helper.rb b/spec/ddtrace/benchmark/support/benchmark_helper.rb new file mode 100644 index 00000000000..5f9e723f1d0 --- /dev/null +++ b/spec/ddtrace/benchmark/support/benchmark_helper.rb @@ -0,0 +1,176 @@ +require 'spec_helper' + +require 'datadog/statsd' +require 'ddtrace' + +require 'benchmark/ips' +if !PlatformHelpers.jruby? && Gem::Version.new(RUBY_VERSION) >= Gem::Version.new('2.1.0') + require 'benchmark/memory' + require 'memory_profiler' +end + +require 'fileutils' +require 'json' + +RSpec.shared_context 'benchmark' do + # When applicable, runs the test subject for different input sizes. + # Similar to how N in Big O notation works. + # + # This value is provided to the `subject(i)` method in order for the test + # to appropriately execute its run based on input size. + let(:steps) { defined?(super) ? super() : [1, 10, 100] } + + # How many times we run our program when testing for memory allocation. + # In theory, we should only need to run it once, as memory tests are not + # dependent on competing system resources. + # But occasionally we do see a few blimps of inconsistency, making the benchmarks skewed. + # By running the benchmarked snippet many times, we drown out any one-off anomalies, allowing + # the real memory culprits to surface. + let(:memory_iterations) { defined?(super) ? super() : 100 } + + # Outputs human readable information to STDERR. + # Most of the benchmarks have nicely formatted reports + # that are by default printed to terminal. + before do |e| + @test = e.metadata[:example_group][:full_description] + @type = e.description + + STDERR.puts "Test:#{e.metadata[:example_group][:full_description]} #{e.description}" + + # Warm up + steps.each do |s| + subject(s) + end + end + + # Report JSON result objects to ./tmp/benchmark/ folder + # Theses results can be historically tracked (e.g. plotting) if needed. + def write_result(result, subtype = nil) + type = @type + type = "#{type}-#{subtype}" if subtype + + STDERR.puts(@test, type, result) + + path = File.join('tmp', 'benchmark', @test, type) + FileUtils.mkdir_p(File.dirname(path)) + + File.write(path, JSON.pretty_generate(result)) + + STDERR.puts("Result written to #{path}") + end + + # Measure execution time + it 'timing' do + report = Benchmark.ips do |x| + x.config(time: 5, warmup: 0.5) + + steps.each do |s| + x.report(s) do + subject(s) + end + end + + x.compare! + end + + result = report.entries.each_with_object({}) do |entry, hash| + hash[entry.label] = { ips: entry.stats.central_tendency, error: entry.stats.error_percentage / 100 } + end.to_h + + write_result(result) + end + + # Measure memory usage (object creation and memory size) + it 'memory' do + if PlatformHelpers.jruby? || Gem::Version.new(RUBY_VERSION) < Gem::Version.new('2.1.0') + skip("'benchmark/memory' not supported") + end + + report = Benchmark.memory do |x| + steps.each do |s| + x.report(s) do + memory_iterations.times { subject(s) } + end + end + + x.compare! + end + + result = report.entries.map do |entry| + row = entry.measurement.map do |metric| + { type: metric.type, allocated: metric.allocated, retained: metric.retained } + end + + [entry.label, row] + end.to_h + + write_result(result) + end + + # Measure GC cycles triggered during run + it 'gc' do + skip if PlatformHelpers.jruby? + + io = StringIO.new + GC::Profiler.enable + + memory_iterations.times { subject(steps[0]) } + + GC.disable # Prevent data collection from influencing results + + data = GC::Profiler.raw_data + GC::Profiler.report(io) + GC::Profiler.disable + + GC.enable + + puts io.string + + result = { count: data.size, time: data.map { |d| d[:GC_TIME] }.inject(0, &:+) } + write_result(result) + end + + # Reports that generate non-aggregated data. + # Useful for debugging. + context 'detailed report' do + before { skip('Detailed report are too verbose for CI') if ENV.key?('CI') } + + let(:ignore_files) { defined?(super) ? super() : nil } + + # Memory report with reference to each allocation site + it 'memory report' do + if PlatformHelpers.jruby? || Gem::Version.new(RUBY_VERSION) < Gem::Version.new('2.1.0') + skip("'benchmark/memory' not supported") + end + + steps.each do |step| + report = MemoryProfiler.report(ignore_files: ignore_files) do + memory_iterations.times { subject(step) } + end + + report_results(report, step) + end + end + + def report_results(report, step) + puts "Report for step: #{step}" + report.pretty_print + + per_gem_report = lambda do |results| + Hash[results.map { |x| [x[:data], x[:count]] }.sort_by(&:first)] + end + + result = { + total_allocated: report.total_allocated, + total_allocated_memsize: report.total_allocated_memsize, + total_retained: report.total_retained, + total_retained_memsize: report.total_retained_memsize, + allocated_memory_by_gem: per_gem_report[report.allocated_memory_by_gem], + allocated_objects_by_gem: per_gem_report[report.allocated_objects_by_gem], + retained_memory_by_gem: per_gem_report[report.retained_memory_by_gem], + retained_objects_by_gem: per_gem_report[report.retained_objects_by_gem] + } + write_result(result, step) + end + end +end diff --git a/spec/ddtrace/benchmark/transport_benchmark_spec.rb b/spec/ddtrace/benchmark/transport_benchmark_spec.rb new file mode 100644 index 00000000000..139b9471cf0 --- /dev/null +++ b/spec/ddtrace/benchmark/transport_benchmark_spec.rb @@ -0,0 +1,76 @@ +require 'spec_helper' + +require_relative 'support/benchmark_helper' + +require 'socket' + +RSpec.describe 'Microbenchmark Transport' do + context 'with HTTP transport' do + # Create server that responds just like the agent, + # but doesn't consume as many resources, nor introduces external + # noise into the benchmark. + let(:server) { TCPServer.new '127.0.0.1', ENV[Datadog::Ext::Transport::HTTP::ENV_DEFAULT_PORT].to_i } + + # Sample agent response, collected from a real agent exchange. + AGENT_HTTP_RESPONSE = "HTTP/1.1 200\r\n" \ + "Content-Length: 40\r\n" \ + "Content-Type: application/json\r\n" \ + "Date: Thu, 03 Sep 2020 20:05:54 GMT\r\n" \ + "\r\n" \ + "{\"rate_by_service\":{\"service:,env:\":1}}\n".freeze + + before(:each) do + @server_thread = Thread.new do + previous_conn = nil + loop do + conn = server.accept + conn.print AGENT_HTTP_RESPONSE + conn.flush + + # Closing the connection immediately can sometimes + # be too fast, cause to other side to not be able + # to read the response in time. + # We instead delay closing the connection until the next + # connection request comes in. + previous_conn.close if previous_conn + previous_conn = conn + end + end + end + + after(:each) do + @server_thread.kill + end + + around do |example| + # Set the agent port used by the default HTTP transport + ClimateControl.modify(Datadog::Ext::Transport::HTTP::ENV_DEFAULT_PORT => available_port.to_s) do + example.run + end + end + + describe 'send_traces' do + include_examples 'benchmark' + + let(:transport) { Datadog::Transport::HTTP.default } + + # Test with with up to 1000 spans being flushed + # in a single method call. This would translate to + # up to 1000 spans per second in a real application. + let(:steps) { [1, 10, 100, 1000] } + + let(:span1) { get_test_traces(1) } + let(:span10) { get_test_traces(10) } + let(:span100) { get_test_traces(100) } + let(:span1000) { get_test_traces(1000) } + let(:span) { { 1 => span1, 10 => span10, 100 => span100, 1000 => span1000 } } + + # Remove objects created during specs from memory results + let(:ignore_files) { %r{(/spec/)} } + + def subject(i) + transport.send_traces(span[i]) + end + end + end +end diff --git a/spec/ddtrace/contrib/http/circuit_breaker_spec.rb b/spec/ddtrace/contrib/http/circuit_breaker_spec.rb index 33b0aa461f6..1104aea6bbc 100644 --- a/spec/ddtrace/contrib/http/circuit_breaker_spec.rb +++ b/spec/ddtrace/contrib/http/circuit_breaker_spec.rb @@ -89,7 +89,7 @@ end context 'a Datadog Net::HTTP transport' do - before { expect(::Net::HTTP).to receive(:start) } + before { expect(::Net::HTTP).to receive(:new) } let(:transport) { Datadog::Transport::HTTP.default } it { is_expected.to be true } diff --git a/spec/ddtrace/span_spec.rb b/spec/ddtrace/span_spec.rb index bedf9e7cbdd..195a2ef2d11 100644 --- a/spec/ddtrace/span_spec.rb +++ b/spec/ddtrace/span_spec.rb @@ -2,6 +2,9 @@ require 'ddtrace/ext/forced_tracing' require 'ddtrace/span' +require 'json' +require 'msgpack' + RSpec.describe Datadog::Span do subject(:span) { described_class.new(tracer, name, context: context, **span_options) } let(:tracer) { get_test_tracer } @@ -536,4 +539,53 @@ expect(span).to have_error_stack(backtrace.join($RS)) end end + + describe '#to_hash' do + subject(:to_hash) { span.to_hash } + let(:span_options) { { trace_id: 12 } } + before { span.span_id = 34 } + + it do + is_expected.to eq( + trace_id: 12, + span_id: 34, + parent_id: 0, + name: 'my.span', + service: nil, + resource: 'my.span', + type: nil, + meta: {}, + metrics: {}, + allocations: 0, + error: 0 + ) + end + + context 'with a finished span' do + before { span.finish } + + it 'includes timing information' do + is_expected.to include( + start: be >= 0, + duration: be >= 0 + ) + end + end + end + + describe '#to_msgpack' do + subject(:to_msgpack) { MessagePack.unpack(MessagePack.pack(span)) } + + it 'correctly performs a serialization round-trip' do + is_expected.to eq(Hash[span.to_hash.map { |k, v| [k.to_s, v] }]) + end + end + + describe '#to_json' do + subject(:to_json) { JSON(JSON.dump(span)) } + + it 'correctly performs a serialization round-trip' do + is_expected.to eq(Hash[span.to_hash.map { |k, v| [k.to_s, v] }]) + end + end end diff --git a/spec/ddtrace/transport/http/adapters/net_spec.rb b/spec/ddtrace/transport/http/adapters/net_spec.rb index 3c5c8f566e2..e1b6489e5c5 100644 --- a/spec/ddtrace/transport/http/adapters/net_spec.rb +++ b/spec/ddtrace/transport/http/adapters/net_spec.rb @@ -9,20 +9,23 @@ let(:port) { double('port') } let(:timeout) { double('timeout') } let(:options) { { timeout: timeout } } + let(:proxy_addr) { nil } # We currently disable proxy for transport HTTP requests shared_context 'HTTP connection stub' do let(:http_connection) { instance_double(::Net::HTTP) } before do - allow(::Net::HTTP).to receive(:start) + allow(::Net::HTTP).to receive(:new) .with( adapter.hostname, adapter.port, - open_timeout: adapter.timeout, - read_timeout: adapter.timeout - ) do |*_args, &block| - block.call(http_connection) - end + proxy_addr + ).and_return(http_connection) + + allow(http_connection).to receive(:open_timeout=).with(adapter.timeout) + allow(http_connection).to receive(:read_timeout=).with(adapter.timeout) + + allow(http_connection).to receive(:start).and_yield(http_connection) end end diff --git a/spec/ddtrace/transport/http/statistics_spec.rb b/spec/ddtrace/transport/http/statistics_spec.rb index eaeaf50ecb5..4146a42452b 100644 --- a/spec/ddtrace/transport/http/statistics_spec.rb +++ b/spec/ddtrace/transport/http/statistics_spec.rb @@ -16,6 +16,8 @@ context 'when the response' do context 'is OK' do + let(:status_code) { 200 } + before do allow(response).to receive(:ok?).and_return(true) allow(response).to receive(:internal_error?).and_return(false) @@ -35,6 +37,8 @@ end context 'is a client error' do + let(:status_code) { 400 } + before do allow(response).to receive(:ok?).and_return(false) allow(response).to receive(:client_error?).and_return(true) @@ -56,6 +60,8 @@ end context 'is a server error' do + let(:status_code) { 500 } + before do allow(response).to receive(:ok?).and_return(false) allow(response).to receive(:client_error?).and_return(false) From f7a83fbd979e92702aa0faebd4de8c42651d704f Mon Sep 17 00:00:00 2001 From: Marco Costa Date: Fri, 4 Sep 2020 15:04:28 -0400 Subject: [PATCH 2/7] Use strings directly for MessagePack keys --- lib/ddtrace/span.rb | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/lib/ddtrace/span.rb b/lib/ddtrace/span.rb index a8f5ffc357e..b43b4bb1993 100644 --- a/lib/ddtrace/span.rb +++ b/lib/ddtrace/span.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + require 'time' require 'thread' @@ -269,36 +271,36 @@ def to_msgpack(packer = nil) if !@start_time.nil? && !@end_time.nil? packer.write_map_header(13) # Set header with how many elements in the map - packer.write(:start) + packer.write('start') packer.write((@start_time.to_f * 1e9).to_i) - packer.write(:duration) + packer.write('duration') packer.write(((@end_time - @start_time) * 1e9).to_i) else packer.write_map_header(11) # Set header with how many elements in the map end - packer.write(:span_id) + packer.write('span_id') packer.write(@span_id) - packer.write(:parent_id) + packer.write('parent_id') packer.write(@parent_id) - packer.write(:trace_id) + packer.write('trace_id') packer.write(@trace_id) - packer.write(:name) + packer.write('name') packer.write(@name) - packer.write(:service) + packer.write('service') packer.write(@service) - packer.write(:resource) + packer.write('resource') packer.write(@resource) - packer.write(:type) + packer.write('type') packer.write(@span_type) - packer.write(:meta) + packer.write('meta') packer.write(@meta) - packer.write(:metrics) + packer.write('metrics') packer.write(@metrics) - packer.write(:allocations) + packer.write('allocations') packer.write(allocations) - packer.write(:error) + packer.write('error') packer.write(@status) packer end From 4c8d512b71300071f135fad0098d766cc12e4d0c Mon Sep 17 00:00:00 2001 From: Marco Costa Date: Fri, 4 Sep 2020 16:15:53 -0400 Subject: [PATCH 3/7] Add comment regarding msgpack keys --- lib/ddtrace/span.rb | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/ddtrace/span.rb b/lib/ddtrace/span.rb index b43b4bb1993..859823ca832 100644 --- a/lib/ddtrace/span.rb +++ b/lib/ddtrace/span.rb @@ -280,6 +280,9 @@ def to_msgpack(packer = nil) packer.write_map_header(11) # Set header with how many elements in the map end + # DEV: We use strings as keys here, instead of symbols, as + # DEV: MessagePack will ultimately convert them to strings. + # DEV: By providing strings directly, we skip this indirection operation. packer.write('span_id') packer.write(@span_id) packer.write('parent_id') From a9c667735fe30c927037e8121f15d822e166facc Mon Sep 17 00:00:00 2001 From: Marco Costa Date: Fri, 4 Sep 2020 16:30:43 -0400 Subject: [PATCH 4/7] Fork for parallelism support --- .../benchmark/transport_benchmark_spec.rb | 47 ++++++++++++------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/spec/ddtrace/benchmark/transport_benchmark_spec.rb b/spec/ddtrace/benchmark/transport_benchmark_spec.rb index 139b9471cf0..1a31e1af73b 100644 --- a/spec/ddtrace/benchmark/transport_benchmark_spec.rb +++ b/spec/ddtrace/benchmark/transport_benchmark_spec.rb @@ -19,27 +19,40 @@ "\r\n" \ "{\"rate_by_service\":{\"service:,env:\":1}}\n".freeze - before(:each) do - @server_thread = Thread.new do - previous_conn = nil - loop do - conn = server.accept - conn.print AGENT_HTTP_RESPONSE - conn.flush + def server_runner + previous_conn = nil + loop do + conn = server.accept + conn.print AGENT_HTTP_RESPONSE + conn.flush - # Closing the connection immediately can sometimes - # be too fast, cause to other side to not be able - # to read the response in time. - # We instead delay closing the connection until the next - # connection request comes in. - previous_conn.close if previous_conn - previous_conn = conn - end + # Closing the connection immediately can sometimes + # be too fast, cause to other side to not be able + # to read the response in time. + # We instead delay closing the connection until the next + # connection request comes in. + previous_conn.close if previous_conn + previous_conn = conn end end - after(:each) do - @server_thread.kill + before do + # Initializes server in a fork, to allow for true concurrency. + # In JRuby, threads are not supported, but true thread concurrency is. + @server_runner = if PlatformHelpers.supports_fork? + fork { server_runner } + else + Thread.new { server_runner } + end + end + + after do + if PlatformHelpers.supports_fork? + Process.kill('TERM', @server_runner) rescue nil + Process.wait(@server_runner) + else + @server_runner.kill + end end around do |example| From 9c5685935b5f561c30e3b4191aed4cd911f35464 Mon Sep 17 00:00:00 2001 From: Marco Costa Date: Thu, 10 Sep 2020 16:14:26 -0400 Subject: [PATCH 5/7] Improve time reporting --- lib/ddtrace/span.rb | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/lib/ddtrace/span.rb b/lib/ddtrace/span.rb index 859823ca832..640e5af6ff4 100644 --- a/lib/ddtrace/span.rb +++ b/lib/ddtrace/span.rb @@ -249,8 +249,8 @@ def to_hash } if !@start_time.nil? && !@end_time.nil? - h[:start] = (@start_time.to_f * 1e9).to_i - h[:duration] = ((@end_time - @start_time) * 1e9).to_i + h[:start] = start_time_nano + h[:duration] = duration_nano end h @@ -272,10 +272,10 @@ def to_msgpack(packer = nil) packer.write_map_header(13) # Set header with how many elements in the map packer.write('start') - packer.write((@start_time.to_f * 1e9).to_i) + packer.write(start_time_nano) packer.write('duration') - packer.write(((@end_time - @start_time) * 1e9).to_i) + packer.write(duration_nano) else packer.write_map_header(11) # Set header with how many elements in the map end @@ -363,5 +363,17 @@ def now_allocations GC.stat(:total_allocated_objects) end end + + # Used for serialization + # @return [Integer] in nanoseconds since Epoch + def start_time_nano + @start_time.to_i * 1000000000 + @start_time.nsec + end + + # Used for serialization + # @return [Integer] in nanoseconds since Epoch + def duration_nano + ((@end_time - @start_time) * 1e9).to_i + end end end From b1877c16a345d35a83df2ca7116758441ec7c5fc Mon Sep 17 00:00:00 2001 From: Marco Costa Date: Wed, 16 Sep 2020 16:46:17 -0400 Subject: [PATCH 6/7] Extract minimal agent for reuse --- .../benchmark/support/benchmark_helper.rb | 66 +++++++++++++++++++ .../benchmark/transport_benchmark_spec.rb | 58 +--------------- 2 files changed, 67 insertions(+), 57 deletions(-) diff --git a/spec/ddtrace/benchmark/support/benchmark_helper.rb b/spec/ddtrace/benchmark/support/benchmark_helper.rb index 5f9e723f1d0..014c51354fd 100644 --- a/spec/ddtrace/benchmark/support/benchmark_helper.rb +++ b/spec/ddtrace/benchmark/support/benchmark_helper.rb @@ -174,3 +174,69 @@ def report_results(report, step) end end end + +require 'socket' + +# An "agent" that always responds with a proper OK response, while +# keeping minimum overhead. +# +# The goal is to reduce external performance noise from running a real +# agent process in the system. +# +# It finds a locally available port to listen on, and updates the value of +# {Datadog::Ext::Transport::HTTP::ENV_DEFAULT_PORT} accordingly. +RSpec.shared_context 'minimal agent' do + let(:agent_server) { TCPServer.new '127.0.0.1', agent_port } + let(:agent_port) { ENV[Datadog::Ext::Transport::HTTP::ENV_DEFAULT_PORT].to_i } + + # Sample agent response, collected from a real agent exchange. + AGENT_HTTP_RESPONSE = "HTTP/1.1 200\r\n" \ + "Content-Length: 40\r\n" \ + "Content-Type: application/json\r\n" \ + "Date: Thu, 03 Sep 2020 20:05:54 GMT\r\n" \ + "\r\n" \ + "{\"rate_by_service\":{\"service:,env:\":1}}\n".freeze + + def server_runner + previous_conn = nil + loop do + conn = agent_server.accept + conn.print AGENT_HTTP_RESPONSE + conn.flush + + # Closing the connection immediately can sometimes + # be too fast, cause to other side to not be able + # to read the response in time. + # We instead delay closing the connection until the next + # connection request comes in. + previous_conn.close if previous_conn + previous_conn = conn + end + end + + before do + # Initializes server in a fork, to allow for true concurrency. + # In JRuby, threads are not supported, but true thread concurrency is. + @agent_runner = if PlatformHelpers.supports_fork? + fork { server_runner } + else + Thread.new { server_runner } + end + end + + after do + if PlatformHelpers.supports_fork? + Process.kill('TERM', @agent_runner) rescue nil + Process.wait(@agent_runner) + else + @agent_runner.kill + end + end + + around do |example| + # Set the agent port used by the default HTTP transport + ClimateControl.modify(Datadog::Ext::Transport::HTTP::ENV_DEFAULT_PORT => available_port.to_s) do + example.run + end + end +end diff --git a/spec/ddtrace/benchmark/transport_benchmark_spec.rb b/spec/ddtrace/benchmark/transport_benchmark_spec.rb index 1a31e1af73b..72021479f87 100644 --- a/spec/ddtrace/benchmark/transport_benchmark_spec.rb +++ b/spec/ddtrace/benchmark/transport_benchmark_spec.rb @@ -2,65 +2,9 @@ require_relative 'support/benchmark_helper' -require 'socket' - RSpec.describe 'Microbenchmark Transport' do context 'with HTTP transport' do - # Create server that responds just like the agent, - # but doesn't consume as many resources, nor introduces external - # noise into the benchmark. - let(:server) { TCPServer.new '127.0.0.1', ENV[Datadog::Ext::Transport::HTTP::ENV_DEFAULT_PORT].to_i } - - # Sample agent response, collected from a real agent exchange. - AGENT_HTTP_RESPONSE = "HTTP/1.1 200\r\n" \ - "Content-Length: 40\r\n" \ - "Content-Type: application/json\r\n" \ - "Date: Thu, 03 Sep 2020 20:05:54 GMT\r\n" \ - "\r\n" \ - "{\"rate_by_service\":{\"service:,env:\":1}}\n".freeze - - def server_runner - previous_conn = nil - loop do - conn = server.accept - conn.print AGENT_HTTP_RESPONSE - conn.flush - - # Closing the connection immediately can sometimes - # be too fast, cause to other side to not be able - # to read the response in time. - # We instead delay closing the connection until the next - # connection request comes in. - previous_conn.close if previous_conn - previous_conn = conn - end - end - - before do - # Initializes server in a fork, to allow for true concurrency. - # In JRuby, threads are not supported, but true thread concurrency is. - @server_runner = if PlatformHelpers.supports_fork? - fork { server_runner } - else - Thread.new { server_runner } - end - end - - after do - if PlatformHelpers.supports_fork? - Process.kill('TERM', @server_runner) rescue nil - Process.wait(@server_runner) - else - @server_runner.kill - end - end - - around do |example| - # Set the agent port used by the default HTTP transport - ClimateControl.modify(Datadog::Ext::Transport::HTTP::ENV_DEFAULT_PORT => available_port.to_s) do - example.run - end - end + include_context 'minimal agent' describe 'send_traces' do include_examples 'benchmark' From 8933df6c02a54c834ce7c2a3361bbf97c9b793be Mon Sep 17 00:00:00 2001 From: Marco Costa Date: Fri, 18 Sep 2020 17:10:01 -0400 Subject: [PATCH 7/7] Add end-to-end benchmarking (#1178) --- spec/ddtrace/benchmark/microbenchmark_spec.rb | 26 +++++++++++++++++++ .../benchmark/support/benchmark_helper.rb | 5 +++- 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/spec/ddtrace/benchmark/microbenchmark_spec.rb b/spec/ddtrace/benchmark/microbenchmark_spec.rb index 378b0b04913..e0fc6f83390 100644 --- a/spec/ddtrace/benchmark/microbenchmark_spec.rb +++ b/spec/ddtrace/benchmark/microbenchmark_spec.rb @@ -75,4 +75,30 @@ def reset_buffer end end end + + describe 'end-to-end' do + include_context 'minimal agent' + + describe 'nested traces' do + include_examples 'benchmark' + + let(:timing_runtime) { 60 } + let(:memory_iterations) { 1000 } + + let(:steps) { [1, 10, 100] } + + let(:tracer) { Datadog::Tracer.new } + after { tracer.shutdown! } + + let(:name) { 'span'.freeze } + + def trace(i, total) + tracer.trace(name) { trace(i + 1, total) unless i == total } + end + + def subject(i) + trace(1, i) + end + end + end end diff --git a/spec/ddtrace/benchmark/support/benchmark_helper.rb b/spec/ddtrace/benchmark/support/benchmark_helper.rb index 014c51354fd..9dc7c4127b4 100644 --- a/spec/ddtrace/benchmark/support/benchmark_helper.rb +++ b/spec/ddtrace/benchmark/support/benchmark_helper.rb @@ -28,6 +28,9 @@ # the real memory culprits to surface. let(:memory_iterations) { defined?(super) ? super() : 100 } + # How long the program will run when calculating IPS performance, in seconds. + let(:timing_runtime) { defined?(super) ? super() : 5 } + # Outputs human readable information to STDERR. # Most of the benchmarks have nicely formatted reports # that are by default printed to terminal. @@ -62,7 +65,7 @@ def write_result(result, subtype = nil) # Measure execution time it 'timing' do report = Benchmark.ips do |x| - x.config(time: 5, warmup: 0.5) + x.config(time: timing_runtime, warmup: timing_runtime / 10) steps.each do |s| x.report(s) do