Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance investigation #1905

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
9 changes: 5 additions & 4 deletions ddtrace/tracer/context_test.go
Expand Up @@ -9,7 +9,6 @@ import (
"context"
"encoding/binary"
"encoding/hex"
"os"
"testing"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
Expand Down Expand Up @@ -115,8 +114,9 @@ func TestStartSpanFromContextRace(t *testing.T) {
func Test128(t *testing.T) {
_, _, _, stop := startTestTracer(t)
defer stop()
defer func(enabled bool) { TraceID128BitEnabled.Store(enabled) }(TraceID128BitEnabled.Swap(true))
TraceID128BitEnabled.Store(false)

os.Setenv("DD_TRACE_128_BIT_TRACEID_GENERATION_ENABLED", "false")
span, _ := StartSpanFromContext(context.Background(), "http.request")
assert.NotZero(t, span.Context().TraceID())
w3cCtx, ok := span.Context().(ddtrace.SpanContextW3C)
Expand All @@ -129,9 +129,10 @@ func Test128(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, uint64(0), binary.BigEndian.Uint64(idBytes[:8])) // high 64 bits should be 0
assert.Equal(t, span.Context().TraceID(), binary.BigEndian.Uint64(idBytes[8:]))

// Enable 128 bit trace ids
os.Unsetenv("DD_TRACE_128_BIT_TRACEID_GENERATION_ENABLED")
defer func(enabled bool) { TraceID128BitEnabled.Store(enabled) }(TraceID128BitEnabled.Swap(true))

TraceID128BitEnabled.Store(true)
span128, _ := StartSpanFromContext(context.Background(), "http.request")
assert.NotZero(t, span128.Context().TraceID())
w3cCtx, ok = span128.Context().(ddtrace.SpanContextW3C)
Expand Down
7 changes: 3 additions & 4 deletions ddtrace/tracer/span_test.go
Expand Up @@ -684,14 +684,14 @@ func TestSpanSamplingPriority(t *testing.T) {
v, ok := span.Metrics[keySamplingPriority]
assert.True(ok)
assert.EqualValues(priority, v)
assert.EqualValues(*span.context.trace.priority, v)
assert.EqualValues(*span.context.trace.priority.Load(), v)

childSpan := tracer.newChildSpan("my.child", span)
v0, ok0 := span.Metrics[keySamplingPriority]
v1, ok1 := childSpan.Metrics[keySamplingPriority]
assert.Equal(ok0, ok1)
assert.Equal(v0, v1)
assert.EqualValues(*childSpan.context.trace.priority, v0)
assert.EqualValues(*childSpan.context.trace.priority.Load(), v0)
}
}

Expand Down Expand Up @@ -810,7 +810,6 @@ func TestSpanLog(t *testing.T) {
// Generate 128 bit trace ids, but don't log them. So only the lower
// 64 bits should be logged in decimal form.
// DD_TRACE_128_BIT_TRACEID_GENERATION_ENABLED is true by default
// DD_TRACE_128_BIT_TRACEID_LOGGING_ENABLED is false by default
assert := assert.New(t)
tracer, _, _, stop := startTestTracer(t, WithService("tracer.test"), WithEnv("testenv"))
defer stop()
Expand Down Expand Up @@ -841,7 +840,7 @@ func TestSpanLog(t *testing.T) {
t.Run("128-bit-logging-with-generation", func(t *testing.T) {
// Logging 128-bit trace ids is enabled, and a 128-bit trace id, so
// a quoted 32 byte hex string should be printed for the dd.trace_id.
t.Setenv("DD_TRACE_128_BIT_TRACEID_GENERATION_ENABLED", "true")
defer func(enabled bool) { TraceID128BitEnabled.Store(enabled) }(TraceID128BitEnabled.Swap(true))
t.Setenv("DD_TRACE_128_BIT_TRACEID_LOGGING_ENABLED", "true")
assert := assert.New(t)
tracer, _, _, stop := startTestTracer(t, WithService("tracer.test"), WithEnv("testenv"))
Expand Down
68 changes: 37 additions & 31 deletions ddtrace/tracer/spancontext.go
Expand Up @@ -102,6 +102,13 @@
origin string // e.g. "synthetics"
}

// TODO - this is just a temporary hack to avoid accessing mutex locked resource in a hotpath
var TraceID128BitEnabled atomic.Bool = atomic.Bool{}

Check failure on line 106 in ddtrace/tracer/spancontext.go

View workflow job for this annotation

GitHub Actions / PR Unit and Integration Tests / lint

var-declaration: should omit type atomic.Bool from declaration of var TraceID128BitEnabled; it will be inferred from the right-hand side (revive)

Check failure on line 106 in ddtrace/tracer/spancontext.go

View workflow job for this annotation

GitHub Actions / PR Unit and Integration Tests / lint

var-declaration: should omit type atomic.Bool from declaration of var TraceID128BitEnabled; it will be inferred from the right-hand side (revive)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
var-declaration: should omit type atomic.Bool from declaration of var TraceID128BitEnabled; it will be inferred from the right-hand side (revive)


func init() {
TraceID128BitEnabled.Store(sharedinternal.BoolEnv("DD_TRACE_128_BIT_TRACEID_GENERATION_ENABLED", false))
}

// newSpanContext creates a new SpanContext to serve as context for the given
// span. If the provided parent is not nil, the context will inherit the trace,
// baggage and other values from it. This method also pushes the span into the
Expand All @@ -122,7 +129,7 @@
context.setBaggageItem(k, v)
return true
})
} else if sharedinternal.BoolEnv("DD_TRACE_128_BIT_TRACEID_GENERATION_ENABLED", true) {
} else if TraceID128BitEnabled.Load() {
// add 128 bit trace id, if enabled, formatted as big-endian:
// <32-bit unix seconds> <32 bits of zero> <64 random bits>
id128 := time.Duration(span.Start) / time.Second
Expand Down Expand Up @@ -178,12 +185,12 @@
}
}

func (c *spanContext) setSamplingPriority(p int, sampler samplernames.SamplerName) {

Check failure on line 188 in ddtrace/tracer/spancontext.go

View workflow job for this annotation

GitHub Actions / PR Unit and Integration Tests / lint

unused-parameter: parameter 'sampler' seems to be unused, consider removing or renaming it as _ (revive)
if c.trace == nil {
c.trace = newTrace()
}
if c.trace.setSamplingPriority(p, sampler) {
// the trace's sampling priority was updated: mark this as updated
priority := c.trace.priority.Load()
if priority != nil && *priority != float64(p) {
c.updated = true
}
}
Expand Down Expand Up @@ -241,15 +248,15 @@
// priority, the root reference and a buffer of the spans which are part of the
// trace, if these exist.
type trace struct {
mu sync.RWMutex // guards below fields
spans []*span // all the spans that are part of this trace
tags map[string]string // trace level tags
propagatingTags map[string]string // trace level tags that will be propagated across service boundaries
finished int // the number of finished spans
full bool // signifies that the span buffer is full
priority *float64 // sampling priority
locked bool // specifies if the sampling priority can be altered
samplingDecision samplingDecision // samplingDecision indicates whether to send the trace to the agent.
mu sync.RWMutex // guards below fields
spans []*span // all the spans that are part of this trace
tags map[string]string // trace level tags
propagatingTags map[string]string // trace level tags that will be propagated across service boundaries
finished int // the number of finished spans
full bool // signifies that the span buffer is full
priority atomic.Pointer[float64] // sampling priority
locked bool // specifies if the sampling priority can be altered
samplingDecision samplingDecision // samplingDecision indicates whether to send the trace to the agent.

// root specifies the root of the trace, if known; it is nil when a span
// context is extracted from a carrier, at which point there are no spans in
Expand Down Expand Up @@ -277,17 +284,12 @@
return &trace{spans: make([]*span, 0, traceStartSize)}
}

func (t *trace) samplingPriorityLocked() (p int, ok bool) {
if t.priority == nil {
func (t *trace) samplingPriority() (p int, ok bool) {
priority := t.priority.Load()
if priority == nil {
return 0, false
}
return int(*t.priority), true
}

func (t *trace) samplingPriority() (p int, ok bool) {
t.mu.RLock()
defer t.mu.RUnlock()
return t.samplingPriorityLocked()
return int(*priority), true
}

// setSamplingPriority sets the sampling priority and returns true if it was modified.
Expand Down Expand Up @@ -322,13 +324,14 @@
if t.locked {
return false
}

updatedPriority := t.priority == nil || *t.priority != float64(p)

if t.priority == nil {
t.priority = new(float64)
priority := t.priority.Load()
oldPriority := priority
if priority == nil {
priority = new(float64)
oldPriority = priority
}
*t.priority = float64(p)
*priority = float64(p)
t.priority.Store(priority)
_, ok := t.propagatingTags[keyDecisionMaker]
if p > 0 && !ok && sampler != samplernames.Unknown {
// We have a positive priority and the sampling mechanism isn't set.
Expand All @@ -339,7 +342,7 @@
delete(t.propagatingTags, keyDecisionMaker)
}

return updatedPriority
return *priority != *oldPriority
}

// push pushes a new span into the trace. If the buffer is full, it returns
Expand Down Expand Up @@ -409,6 +412,7 @@
return
}
t.finished++

tr, ok := internal.GetGlobalTracer().(*tracer)
if !ok {
return
Expand All @@ -420,11 +424,12 @@
if s.Service != "" && !strings.EqualFold(s.Service, tr.config.serviceName) {
s.Meta[keyBaseService] = tr.config.serviceName
}
if s == t.root && t.priority != nil {
priority := t.priority.Load()
if s == t.root && priority != nil {
// after the root has finished we lock down the priority;
// we won't be able to make changes to a span after finishing
// without causing a race condition.
t.root.setMetric(keySamplingPriority, *t.priority)
t.root.setMetric(keySamplingPriority, *priority)
t.locked = true
}
if len(t.spans) > 0 && s == t.spans[0] {
Expand Down Expand Up @@ -463,7 +468,8 @@
// TODO: (Support MetricKindDist) Re-enable these when we actually support `MetricKindDist`
//telemetry.GlobalClient.Record(telemetry.NamespaceTracers, telemetry.MetricKindDist, "trace_partial_flush.spans_closed", float64(len(finishedSpans)), nil, true)
//telemetry.GlobalClient.Record(telemetry.NamespaceTracers, telemetry.MetricKindDist, "trace_partial_flush.spans_remaining", float64(len(leftoverSpans)), nil, true)
finishedSpans[0].setMetric(keySamplingPriority, *t.priority)

finishedSpans[0].setMetric(keySamplingPriority, *t.priority.Load())
if s != t.spans[0] {
// Make sure the first span in the chunk has the trace-level tags
t.setTraceTags(finishedSpans[0], tr)
Expand Down
7 changes: 4 additions & 3 deletions ddtrace/tracer/spancontext_test.go
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -660,7 +661,7 @@ func TestNewSpanContext(t *testing.T) {
assert.Equal(ctx.spanID, span.SpanID)
assert.Equal(ctx.TraceID(), span.TraceID)
assert.Equal(ctx.SpanID(), span.SpanID)
assert.Equal(*ctx.trace.priority, 1.)
assert.Equal(*ctx.trace.priority.Load(), 1.)
assert.Equal(ctx.trace.root, span)
assert.Contains(ctx.trace.spans, span)
})
Expand All @@ -681,7 +682,7 @@ func TestNewSpanContext(t *testing.T) {
span := StartSpan("some-span", ChildOf(ctx))
assert.EqualValues(uint64(1), sctx.traceID.Lower())
assert.EqualValues(2, sctx.spanID)
assert.EqualValues(3, *sctx.trace.priority)
assert.EqualValues(3, *sctx.trace.priority.Load())
assert.Equal(sctx.trace.root, span)
})
}
Expand All @@ -704,7 +705,7 @@ func TestSpanContextParent(t *testing.T) {
hasBaggage: 1,
trace: &trace{
spans: []*span{newBasicSpan("abc")},
priority: func() *float64 { v := new(float64); *v = 2; return v }(),
priority: func() atomic.Pointer[float64] { v := 2.0; p := atomic.Pointer[float64]{}; p.Store(&v); return p }(),
},
},
"sampling_decision": {
Expand Down
4 changes: 2 additions & 2 deletions ddtrace/tracer/textmap_test.go
Expand Up @@ -789,7 +789,7 @@ func TestEnvVars(t *testing.T) {
// assert.Equal(test.traceID128, id128FromSpan(assert, ctx)) // add when 128-bit trace id support is enabled
if len(tc.out) > 2 {
require.NotNil(t, sctx.trace)
assert.Equal(float64(tc.out[2]), *sctx.trace.priority)
assert.Equal(float64(tc.out[2]), *sctx.trace.priority.Load())
}
})
}
Expand Down Expand Up @@ -1687,7 +1687,7 @@ func TestEnvVars(t *testing.T) {
assert.Equal(tc.out[0], sctx.traceID.Lower())
assert.Equal(tc.out[1], sctx.spanID)
assert.Equal(tc.origin, sctx.origin)
assert.Equal(tc.priority, *sctx.trace.priority)
assert.Equal(tc.priority, *sctx.trace.priority.Load())

headers := TextMapCarrier(map[string]string{})
err = tracer.Inject(ctx, headers)
Expand Down
29 changes: 29 additions & 0 deletions ddtrace/tracer/tracer.go
Expand Up @@ -508,6 +508,35 @@ func (t *tracer) StartSpan(operationName string, options ...ddtrace.StartSpanOpt
span.setMetric(keySamplingPriority, float64(p))
}
if context.span != nil {
// Performance note:
pawelchcki marked this conversation as resolved.
Show resolved Hide resolved
// during span creation - the usage of any mutex or other synchronization primitives
// should be minimized.
//
// This mutex lock will involve mandatory synchronization of state across CPUs
// Spans for the vast majority of applications will inherently be created across many goroutines
// Running on multiple CPU Cores
//
// Even for a read lock like below - which will mostly not run into a writer holding a lock.
// those 3 operation in worst case will require 3 separate loads from memory/LLC
//
// LLC is shared across all Cores in a Socket(*latest AMD consumer CPUs can have two LLC)
// If multiple CPUs attempt to access the same memory segment at the same time
// their execution will essentially be serialized at least at 2 points.
//
// Turning this innocous memory read into Performance bottleneck for a highly tuned code.
// * the other part of performance bottleneck here is that other CPU optimizations are
// * also thrown out of the window e.g. code prefetching, out of order execution
// * in most cases the CPU will sit idle waiting for memory operation to complete
// * this is where systems without SMT (Hyperthreading) like ARM or Benchmarking Platform
// * will make this problem more visible - as with SMT the other sibling core will be able to work
// * at full potential when the first sibling is stalled.
//
// While in the current architecture of span creation, this lock is necessary
// any further access to parent span, should be done in a single lock (iow minimizing locking)
//
// Even Copying most of Span struct (*YMMV for strings and hashmaps which are unbounded in size and fragmented across the heap)
// Will be faster than accessing the lock more than once
//
// local parent, inherit service
context.span.RLock()
span.Service = context.span.Service
Expand Down
14 changes: 6 additions & 8 deletions ddtrace/tracer/tracer_test.go
Expand Up @@ -715,8 +715,7 @@ func TestTracerStartSpanOptions128(t *testing.T) {
defer internal.SetGlobalTracer(&internal.NoopTracer{})
t.Run("64-bit-trace-id", func(t *testing.T) {
assert := assert.New(t)
os.Setenv("DD_TRACE_128_BIT_TRACEID_GENERATION_ENABLED", "false")
defer os.Unsetenv("DD_TRACE_128_BIT_TRACEID_GENERATION_ENABLED")
defer func(enabled bool) { TraceID128BitEnabled.Store(enabled) }(TraceID128BitEnabled.Swap(false))
opts := []StartSpanOption{
WithSpanID(987654),
}
Expand Down Expand Up @@ -865,7 +864,7 @@ func TestPropagationDefaults(t *testing.T) {
assert.Equal(ctx.traceID, pctx.traceID)
assert.Equal(ctx.spanID, pctx.spanID)
assert.Equal(ctx.baggage, pctx.baggage)
assert.Equal(*ctx.trace.priority, -1.)
assert.Equal(*ctx.trace.priority.Load(), -1.)

// ensure a child can be created
child := tracer.StartSpan("db.query", ChildOf(propagated)).(*span)
Expand All @@ -874,7 +873,7 @@ func TestPropagationDefaults(t *testing.T) {
assert.NotEqual(uint64(0), child.SpanID)
assert.Equal(root.SpanID, child.ParentID)
assert.Equal(root.TraceID, child.ParentID)
assert.Equal(*child.context.trace.priority, -1.)
assert.Equal(*child.context.trace.priority.Load(), -1.)
}

func TestTracerSamplingPriorityPropagation(t *testing.T) {
Expand All @@ -886,8 +885,8 @@ func TestTracerSamplingPriorityPropagation(t *testing.T) {
assert.EqualValues(2, root.Metrics[keySamplingPriority])
assert.Equal("-4", root.context.trace.propagatingTags[keyDecisionMaker])
assert.EqualValues(2, child.Metrics[keySamplingPriority])
assert.EqualValues(2., *root.context.trace.priority)
assert.EqualValues(2., *child.context.trace.priority)
assert.EqualValues(2., *root.context.trace.priority.Load())
assert.EqualValues(2., *child.context.trace.priority.Load())
}

func TestTracerSamplingPriorityEmptySpanCtx(t *testing.T) {
Expand Down Expand Up @@ -1046,8 +1045,7 @@ func TestNewSpanChild(t *testing.T) {
func testNewSpanChild(t *testing.T, is128 bool) {
t.Run(fmt.Sprintf("TestNewChildSpan(is128=%t)", is128), func(*testing.T) {
if !is128 {
os.Setenv("DD_TRACE_128_BIT_TRACEID_GENERATION_ENABLED", "false")
defer os.Unsetenv("DD_TRACE_128_BIT_TRACEID_GENERATION_ENABLED")
defer func(enabled bool) { TraceID128BitEnabled.Store(enabled) }(TraceID128BitEnabled.Swap(false))
}
assert := assert.New(t)

Expand Down
19 changes: 10 additions & 9 deletions internal/log/log.go
Expand Up @@ -13,6 +13,7 @@
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/internal/version"
Expand All @@ -39,11 +40,15 @@
}

var (
mu sync.RWMutex // guards below fields
level = LevelWarn
level atomic.Int32 = atomic.Int32{}

Check failure on line 43 in internal/log/log.go

View workflow job for this annotation

GitHub Actions / PR Unit and Integration Tests / lint

var-declaration: should omit type atomic.Int32 from declaration of var level; it will be inferred from the right-hand side (revive)

Check failure on line 43 in internal/log/log.go

View workflow job for this annotation

GitHub Actions / PR Unit and Integration Tests / lint

var-declaration: should omit type atomic.Int32 from declaration of var level; it will be inferred from the right-hand side (revive)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
var-declaration: should omit type atomic.Int32 from declaration of var level; it will be inferred from the right-hand side (revive)

mu sync.RWMutex // guards logger instance
logger Logger = &defaultLogger{l: log.New(os.Stderr, "", log.LstdFlags)}
)

func init() {
level.Store(int32(LevelWarn))
}

// UseLogger sets l as the active logger and returns a function to restore the
// previous logger. The return value is mostly useful when testing.
func UseLogger(l Logger) (undo func()) {
Expand All @@ -59,18 +64,14 @@

// SetLevel sets the given lvl for logging.
func SetLevel(lvl Level) {
mu.Lock()
defer mu.Unlock()
level = lvl
level.Store(int32(lvl))
}

// DebugEnabled returns true if debug log messages are enabled. This can be used in extremely
// hot code paths to avoid allocating the ...interface{} argument.
func DebugEnabled() bool {
mu.RLock()
lvl := level
mu.RUnlock()
return lvl == LevelDebug
lvl := level.Load()
return lvl == int32(LevelDebug)
}

// Debug prints the given message if the level is LevelDebug.
Expand Down