Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2] ddtrace/tracer: tidy up Tracer interface #2633

Merged
merged 1 commit into from May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 3 additions & 4 deletions ddtrace/mocktracer/mocktracer.go
Expand Up @@ -190,7 +190,6 @@ func (t *mocktracer) TracerConf() tracer.TracerConf {
return tracer.TracerConf{}
}

func (t *mocktracer) SubmitStats(*tracer.Span) {}
func (t *mocktracer) SubmitAbandonedSpan(*tracer.Span, bool) {}
func (t *mocktracer) SubmitChunk(_ any) {}
func (t *mocktracer) Flush() {}
func (t *mocktracer) Submit(*tracer.Span) {}
func (t *mocktracer) SubmitChunk(*tracer.Chunk) {}
func (t *mocktracer) Flush() {}
7 changes: 3 additions & 4 deletions ddtrace/tracer/globaltracer_test.go
Expand Up @@ -30,10 +30,9 @@ func (*raceTestTracer) TracerConf() TracerConf {
return TracerConf{}
}

func (*raceTestTracer) SubmitStats(*Span) {}
func (*raceTestTracer) SubmitAbandonedSpan(*Span, bool) {}
func (*raceTestTracer) SubmitChunk(any) {}
func (*raceTestTracer) Flush() {}
func (*raceTestTracer) Submit(*Span) {}
func (*raceTestTracer) SubmitChunk(*Chunk) {}
func (*raceTestTracer) Flush() {}

func TestGlobalTracer(t *testing.T) {
// at module initialization, the tracer must be seet
Expand Down
9 changes: 3 additions & 6 deletions ddtrace/tracer/noop.go
Expand Up @@ -29,13 +29,10 @@ func (NoopTracer) Inject(_ *SpanContext, _ interface{}) error { return nil }
// Stop implements Tracer.
func (NoopTracer) Stop() {}

// TODO(kjn v2): These should be removed. They are here temporarily to facilitate
// the shift to the v2 API.
func (NoopTracer) TracerConf() TracerConf {
return TracerConf{}
}

func (NoopTracer) SubmitStats(*Span) {}
func (NoopTracer) SubmitAbandonedSpan(*Span, bool) {}
func (NoopTracer) SubmitChunk(any) {}
func (NoopTracer) Flush() {}
func (NoopTracer) Submit(*Span) {}
func (NoopTracer) SubmitChunk(*Chunk) {}
func (NoopTracer) Flush() {}
18 changes: 4 additions & 14 deletions ddtrace/tracer/span.go
Expand Up @@ -642,20 +642,10 @@ func (s *Span) finish(finishTime int64) {
keep := true
if t := GetGlobalTracer(); t != nil {
tc := t.TracerConf()
if !tc.Disabled {
// we have an active tracer
if tc.CanComputeStats && shouldComputeStats(s) {
// the agent supports computed stats
t.SubmitStats(s)
}
if tc.CanDropP0s {
// the agent supports dropping p0's in the client
keep = shouldKeep(s)
}
if tc.DebugAbandonedSpans {
// the tracer supports debugging abandoned spans
t.SubmitAbandonedSpan(s, true)
}
t.Submit(s)
if tc.CanDropP0s {
// the agent supports dropping p0's in the client
keep = shouldKeep(s)
}
}
if keep {
Expand Down
15 changes: 6 additions & 9 deletions ddtrace/tracer/spancontext.go
Expand Up @@ -482,7 +482,7 @@ func (t *trace) finishedOne(s *Span) {
return
}
tc := tr.TracerConf()
setPeerService(s, tr)
setPeerService(s, tc.PeerServiceDefaults, tc.PeerServiceMappings)

// attach the _dd.base_service tag only when the globally configured service name is different from the
// span service name.
Expand Down Expand Up @@ -513,7 +513,7 @@ func (t *trace) finishedOne(s *Span) {

if len(t.spans) == t.finished { // perform a full flush of all spans
t.finishChunk(tr, &Chunk{
Spans: t.spans,
spans: t.spans,
willSend: decisionKeep == samplingDecision(atomic.LoadUint32((*uint32)(&t.samplingDecision))),
})
t.spans = nil
Expand Down Expand Up @@ -544,29 +544,26 @@ func (t *trace) finishedOne(s *Span) {
t.setTraceTags(finishedSpans[0])
}
t.finishChunk(tr, &Chunk{
Spans: finishedSpans,
spans: finishedSpans,
willSend: decisionKeep == samplingDecision(atomic.LoadUint32((*uint32)(&t.samplingDecision))),
})
t.spans = leftoverSpans
}

func (t *trace) finishChunk(tr Tracer, ch *Chunk) {
//atomic.AddUint32(&tr.spansFinished, uint32(len(ch.spans)))
//tr.pushChunk(ch)
tr.SubmitChunk(ch)
t.finished = 0 // important, because a buffer can be used for several flushes
}

// setPeerService sets the peer.service, _dd.peer.service.source, and _dd.peer.service.remapped_from
// tags as applicable for the given span.
func setPeerService(s *Span, t Tracer) {
tc := t.TracerConf()
func setPeerService(s *Span, peerServiceDefaults bool, peerServiceMappings map[string]string) {
if _, ok := s.meta[ext.PeerService]; ok { // peer.service already set on the span
s.setMeta(keyPeerServiceSource, ext.PeerService)
} else { // no peer.service currently set
spanKind := s.meta[ext.SpanKind]
isOutboundRequest := spanKind == ext.SpanKindClient || spanKind == ext.SpanKindProducer
shouldSetDefaultPeerService := isOutboundRequest && tc.PeerServiceDefaults
shouldSetDefaultPeerService := isOutboundRequest && peerServiceDefaults
if !shouldSetDefaultPeerService {
return
}
Expand All @@ -579,7 +576,7 @@ func setPeerService(s *Span, t Tracer) {
}
// Overwrite existing peer.service value if remapped by the user
ps := s.meta[ext.PeerService]
if to, ok := tc.PeerServiceMappings[ps]; ok {
if to, ok := peerServiceMappings[ps]; ok {
s.setMeta(keyPeerServiceRemappedFrom, ps)
s.setMeta(ext.PeerService, to)
}
Expand Down
89 changes: 54 additions & 35 deletions ddtrace/tracer/tracer.go
Expand Up @@ -62,21 +62,23 @@ type Tracer interface {
// Inject injects a span context into the given carrier.
Inject(context *SpanContext, carrier interface{}) error

// Stop stops the tracer. Calls to Stop should be idempotent.
Stop()
// Submit submits a span to the tracer.
Submit(s *Span)

// SubmitChunk submits a trace chunk to the tracer.
SubmitChunk(c *Chunk)

// TODO(kjn v2): These can be removed / consolidated. These are
// here temporarily as we figure out a sensible API.
// TracerConf returns a snapshot of the current configuration of the tracer.
TracerConf() TracerConf

SubmitStats(*Span)
SubmitAbandonedSpan(*Span, bool)
SubmitChunk(any) // This is a horrible signature. This will eventually become SubmitChunk(Chunk)
Flush() // Synchronous flushing
// Flush flushes any buffered traces. Flush is in effect only if a tracer
// is started. Users do not have to call Flush in order to ensure that
// traces reach Datadog. It is a convenience method dedicated to specific
// use cases.
Flush()

// TODO(kjn v2): Not sure if this belongs in the tracer.
// May be better to have a separate stats counting package / type.
// Signal(Event)
// Stop stops the tracer. Calls to Stop should be idempotent.
Stop()
}

var _ Tracer = (*tracer)(nil)
Expand Down Expand Up @@ -411,7 +413,7 @@ func (t *tracer) worker(tick <-chan time.Time) {
select {
case trace := <-t.out:
t.sampleChunk(trace)
t.traceWriter.add(trace.Spans)
t.traceWriter.add(trace.spans)
case <-tick:
t.statsd.Incr("datadog.tracer.flush_triggered", []string{"reason:scheduled"}, 1)
t.traceWriter.flush()
Expand All @@ -434,7 +436,7 @@ func (t *tracer) worker(tick <-chan time.Time) {
select {
case trace := <-t.out:
t.sampleChunk(trace)
t.traceWriter.add(trace.Spans)
t.traceWriter.add(trace.spans)
default:
break loop
}
Expand All @@ -444,55 +446,53 @@ func (t *tracer) worker(tick <-chan time.Time) {
}
}

// Chunk holds information about a trace Chunk to be flushed, including its spans.
// The Chunk may be a fully finished local trace Chunk, or only a portion of the local trace Chunk in the case of
// Chunk holds information about a trace chunk to be flushed, including its spans.
// The chunk may be a fully finished local trace chunk, or only a portion of the local trace chunk in the case of
// partial flushing.
type Chunk struct {
// TODO:(kjn v2) Should probably not be public, or be a different type.
Spans []*Span
spans []*Span
willSend bool // willSend indicates whether the trace will be sent to the agent.
}

func NewChunk(spans []*Span, willSend bool) *Chunk {
return &Chunk{
spans: spans,
willSend: willSend,
}
}

// sampleChunk applies single-span sampling to the provided trace.
func (t *tracer) sampleChunk(c *Chunk) {
if len(c.Spans) > 0 {
if p, ok := c.Spans[0].context.SamplingPriority(); ok && p > 0 {
if len(c.spans) > 0 {
if p, ok := c.spans[0].context.SamplingPriority(); ok && p > 0 {
// The trace is kept, no need to run single span sampling rules.
return
}
}
var kept []*Span
if t.rulesSampling.HasSpanRules() {
// Apply sampling rules to individual spans in the trace.
for _, span := range c.Spans {
for _, span := range c.spans {
if t.rulesSampling.SampleSpan(span) {
kept = append(kept, span)
}
}
if len(kept) > 0 && len(kept) < len(c.Spans) {
if len(kept) > 0 && len(kept) < len(c.spans) {
// Some spans in the trace were kept, so a partial trace will be sent.
tracerstats.Signal(tracerstats.PartialTraces, 1)
}
}
if len(kept) == 0 {
tracerstats.Signal(tracerstats.DroppedP0Traces, 1)
}
tracerstats.Signal(tracerstats.DroppedP0Spans, uint32(len(c.Spans)-len(kept)))
tracerstats.Signal(tracerstats.DroppedP0Spans, uint32(len(c.spans)-len(kept)))
if !c.willSend {
c.Spans = kept
c.spans = kept
}
}

func (t *tracer) SubmitChunk(c any) {
// TODO(kjn v2): This will be unified with pushChunk, and only one function will exist.
// This is here right now because of the lack of appropriate exported types.

ch := c.(*Chunk) // TODO(kjn v2): This can panic. Once the appropriate types are moved, this assertion will be removed.
t.pushChunk(ch)
}

func (t *tracer) pushChunk(trace *Chunk) {
tracerstats.Signal(tracerstats.SpansFinished, uint32(len(trace.Spans)))
tracerstats.Signal(tracerstats.SpansFinished, uint32(len(trace.spans)))
select {
case <-t.stop:
return
Expand All @@ -501,7 +501,7 @@ func (t *tracer) pushChunk(trace *Chunk) {
select {
case t.out <- trace:
default:
log.Error("payload queue full, dropping %d traces", len(trace.Spans))
log.Error("payload queue full, dropping %d traces", len(trace.spans))
}
}

Expand Down Expand Up @@ -790,7 +790,22 @@ func (t *tracer) TracerConf() TracerConf {
}
}

func (t *tracer) SubmitStats(s *Span) {
func (t *tracer) Submit(s *Span) {
tc := t.TracerConf()
if !tc.Disabled {
// we have an active tracer
if tc.CanComputeStats && shouldComputeStats(s) {
// the agent supports computed stats
t.submitStats(s)
}
if tc.DebugAbandonedSpans {
// the tracer supports debugging abandoned spans
t.submitAbandonedSpan(s, true)
}
}
}

func (t *tracer) submitStats(s *Span) {
select {
case t.stats.In <- newAggregableSpan(s, t.obfuscator):
// ok
Expand All @@ -799,7 +814,7 @@ func (t *tracer) SubmitStats(s *Span) {
}
}

func (t *tracer) SubmitAbandonedSpan(s *Span, finished bool) {
func (t *tracer) submitAbandonedSpan(s *Span, finished bool) {
select {
case t.abandonedSpansDebugger.In <- newAbandonedSpanCandidate(s, finished):
// ok
Expand All @@ -808,6 +823,10 @@ func (t *tracer) SubmitAbandonedSpan(s *Span, finished bool) {
}
}

func (t *tracer) SubmitChunk(c *Chunk) {
t.pushChunk(c)
}

// sampleRateMetricKey is the metric key holding the applied sample rate. Has to be the same as the Agent.
const sampleRateMetricKey = "_sample_rate"

Expand Down
10 changes: 5 additions & 5 deletions ddtrace/tracer/tracer_test.go
Expand Up @@ -196,7 +196,7 @@ func TestTracerStart(t *testing.T) {
Start()

// ensure at least one worker started and handles requests
GetGlobalTracer().(*tracer).pushChunk(&Chunk{Spans: []*Span{}})
GetGlobalTracer().(*tracer).pushChunk(&Chunk{spans: []*Span{}})

Stop()
Stop()
Expand All @@ -208,7 +208,7 @@ func TestTracerStart(t *testing.T) {
tr, _, _, stop, err := startTestTracer(t)
assert.Nil(t, err)
defer stop()
tr.pushChunk(&Chunk{Spans: []*Span{}}) // blocks until worker is started
tr.pushChunk(&Chunk{spans: []*Span{}}) // blocks until worker is started
select {
case <-tr.stop:
t.Fatal("stopped channel should be open")
Expand Down Expand Up @@ -1650,16 +1650,16 @@ func TestPushTrace(t *testing.T) {
resource: "/foo",
},
}
tracer.pushChunk(&Chunk{Spans: trace})
tracer.pushChunk(&Chunk{spans: trace})

assert.Len(tracer.out, 1)

t0 := <-tracer.out
assert.Equal(&Chunk{Spans: trace}, t0)
assert.Equal(&Chunk{spans: trace}, t0)

many := payloadQueueSize + 2
for i := 0; i < many; i++ {
tracer.pushChunk(&Chunk{Spans: make([]*Span, i)})
tracer.pushChunk(&Chunk{spans: make([]*Span, i)})
}
assert.Len(tracer.out, payloadQueueSize)
log.Flush()
Expand Down