Skip to content

Commit

Permalink
[v2] ddtrace/tracer: tidy up Tracer interface (#2633)
Browse files Browse the repository at this point in the history
  • Loading branch information
darccio committed May 8, 2024
1 parent 75c8781 commit f0eee6a
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 77 deletions.
7 changes: 3 additions & 4 deletions ddtrace/mocktracer/mocktracer.go
Expand Up @@ -190,7 +190,6 @@ func (t *mocktracer) TracerConf() tracer.TracerConf {
return tracer.TracerConf{}
}

func (t *mocktracer) SubmitStats(*tracer.Span) {}
func (t *mocktracer) SubmitAbandonedSpan(*tracer.Span, bool) {}
func (t *mocktracer) SubmitChunk(_ any) {}
func (t *mocktracer) Flush() {}
func (t *mocktracer) Submit(*tracer.Span) {}
func (t *mocktracer) SubmitChunk(*tracer.Chunk) {}
func (t *mocktracer) Flush() {}
7 changes: 3 additions & 4 deletions ddtrace/tracer/globaltracer_test.go
Expand Up @@ -30,10 +30,9 @@ func (*raceTestTracer) TracerConf() TracerConf {
return TracerConf{}
}

func (*raceTestTracer) SubmitStats(*Span) {}
func (*raceTestTracer) SubmitAbandonedSpan(*Span, bool) {}
func (*raceTestTracer) SubmitChunk(any) {}
func (*raceTestTracer) Flush() {}
func (*raceTestTracer) Submit(*Span) {}
func (*raceTestTracer) SubmitChunk(*Chunk) {}
func (*raceTestTracer) Flush() {}

func TestGlobalTracer(t *testing.T) {
// at module initialization, the tracer must be seet
Expand Down
9 changes: 3 additions & 6 deletions ddtrace/tracer/noop.go
Expand Up @@ -29,13 +29,10 @@ func (NoopTracer) Inject(_ *SpanContext, _ interface{}) error { return nil }
// Stop implements Tracer.
func (NoopTracer) Stop() {}

// TODO(kjn v2): These should be removed. They are here temporarily to facilitate
// the shift to the v2 API.
func (NoopTracer) TracerConf() TracerConf {
return TracerConf{}
}

func (NoopTracer) SubmitStats(*Span) {}
func (NoopTracer) SubmitAbandonedSpan(*Span, bool) {}
func (NoopTracer) SubmitChunk(any) {}
func (NoopTracer) Flush() {}
func (NoopTracer) Submit(*Span) {}
func (NoopTracer) SubmitChunk(*Chunk) {}
func (NoopTracer) Flush() {}
18 changes: 4 additions & 14 deletions ddtrace/tracer/span.go
Expand Up @@ -625,20 +625,10 @@ func (s *Span) finish(finishTime int64) {
keep := true
if t := GetGlobalTracer(); t != nil {
tc := t.TracerConf()
if !tc.Disabled {
// we have an active tracer
if tc.CanComputeStats && shouldComputeStats(s) {
// the agent supports computed stats
t.SubmitStats(s)
}
if tc.CanDropP0s {
// the agent supports dropping p0's in the client
keep = shouldKeep(s)
}
if tc.DebugAbandonedSpans {
// the tracer supports debugging abandoned spans
t.SubmitAbandonedSpan(s, true)
}
t.Submit(s)
if tc.CanDropP0s {
// the agent supports dropping p0's in the client
keep = shouldKeep(s)
}
}
if keep {
Expand Down
15 changes: 6 additions & 9 deletions ddtrace/tracer/spancontext.go
Expand Up @@ -498,7 +498,7 @@ func (t *trace) finishedOne(s *Span) {
return
}
tc := tr.TracerConf()
setPeerService(s, tr)
setPeerService(s, tc.PeerServiceDefaults, tc.PeerServiceMappings)

// attach the _dd.base_service tag only when the globally configured service name is different from the
// span service name.
Expand Down Expand Up @@ -529,7 +529,7 @@ func (t *trace) finishedOne(s *Span) {

if len(t.spans) == t.finished { // perform a full flush of all spans
t.finishChunk(tr, &Chunk{
Spans: t.spans,
spans: t.spans,
willSend: decisionKeep == samplingDecision(atomic.LoadUint32((*uint32)(&t.samplingDecision))),
})
t.spans = nil
Expand Down Expand Up @@ -560,29 +560,26 @@ func (t *trace) finishedOne(s *Span) {
t.setTraceTags(finishedSpans[0])
}
t.finishChunk(tr, &Chunk{
Spans: finishedSpans,
spans: finishedSpans,
willSend: decisionKeep == samplingDecision(atomic.LoadUint32((*uint32)(&t.samplingDecision))),
})
t.spans = leftoverSpans
}

func (t *trace) finishChunk(tr Tracer, ch *Chunk) {
//atomic.AddUint32(&tr.spansFinished, uint32(len(ch.spans)))
//tr.pushChunk(ch)
tr.SubmitChunk(ch)
t.finished = 0 // important, because a buffer can be used for several flushes
}

// setPeerService sets the peer.service, _dd.peer.service.source, and _dd.peer.service.remapped_from
// tags as applicable for the given span.
func setPeerService(s *Span, t Tracer) {
tc := t.TracerConf()
func setPeerService(s *Span, peerServiceDefaults bool, peerServiceMappings map[string]string) {
if _, ok := s.meta[ext.PeerService]; ok { // peer.service already set on the span
s.setMeta(keyPeerServiceSource, ext.PeerService)
} else { // no peer.service currently set
spanKind := s.meta[ext.SpanKind]
isOutboundRequest := spanKind == ext.SpanKindClient || spanKind == ext.SpanKindProducer
shouldSetDefaultPeerService := isOutboundRequest && tc.PeerServiceDefaults
shouldSetDefaultPeerService := isOutboundRequest && peerServiceDefaults
if !shouldSetDefaultPeerService {
return
}
Expand All @@ -595,7 +592,7 @@ func setPeerService(s *Span, t Tracer) {
}
// Overwrite existing peer.service value if remapped by the user
ps := s.meta[ext.PeerService]
if to, ok := tc.PeerServiceMappings[ps]; ok {
if to, ok := peerServiceMappings[ps]; ok {
s.setMeta(keyPeerServiceRemappedFrom, ps)
s.setMeta(ext.PeerService, to)
}
Expand Down
89 changes: 54 additions & 35 deletions ddtrace/tracer/tracer.go
Expand Up @@ -62,21 +62,23 @@ type Tracer interface {
// Inject injects a span context into the given carrier.
Inject(context *SpanContext, carrier interface{}) error

// Stop stops the tracer. Calls to Stop should be idempotent.
Stop()
// Submit submits a span to the tracer.
Submit(s *Span)

// SubmitChunk submits a trace chunk to the tracer.
SubmitChunk(c *Chunk)

// TODO(kjn v2): These can be removed / consolidated. These are
// here temporarily as we figure out a sensible API.
// TracerConf returns a snapshot of the current configuration of the tracer.
TracerConf() TracerConf

SubmitStats(*Span)
SubmitAbandonedSpan(*Span, bool)
SubmitChunk(any) // This is a horrible signature. This will eventually become SubmitChunk(Chunk)
Flush() // Synchronous flushing
// Flush flushes any buffered traces. Flush is in effect only if a tracer
// is started. Users do not have to call Flush in order to ensure that
// traces reach Datadog. It is a convenience method dedicated to specific
// use cases.
Flush()

// TODO(kjn v2): Not sure if this belongs in the tracer.
// May be better to have a separate stats counting package / type.
// Signal(Event)
// Stop stops the tracer. Calls to Stop should be idempotent.
Stop()
}

var _ Tracer = (*tracer)(nil)
Expand Down Expand Up @@ -400,7 +402,7 @@ func (t *tracer) worker(tick <-chan time.Time) {
select {
case trace := <-t.out:
t.sampleChunk(trace)
t.traceWriter.add(trace.Spans)
t.traceWriter.add(trace.spans)
case <-tick:
t.statsd.Incr("datadog.tracer.flush_triggered", []string{"reason:scheduled"}, 1)
t.traceWriter.flush()
Expand All @@ -423,7 +425,7 @@ func (t *tracer) worker(tick <-chan time.Time) {
select {
case trace := <-t.out:
t.sampleChunk(trace)
t.traceWriter.add(trace.Spans)
t.traceWriter.add(trace.spans)
default:
break loop
}
Expand All @@ -433,55 +435,53 @@ func (t *tracer) worker(tick <-chan time.Time) {
}
}

// Chunk holds information about a trace Chunk to be flushed, including its spans.
// The Chunk may be a fully finished local trace Chunk, or only a portion of the local trace Chunk in the case of
// Chunk holds information about a trace chunk to be flushed, including its spans.
// The chunk may be a fully finished local trace chunk, or only a portion of the local trace chunk in the case of
// partial flushing.
type Chunk struct {
// TODO:(kjn v2) Should probably not be public, or be a different type.
Spans []*Span
spans []*Span
willSend bool // willSend indicates whether the trace will be sent to the agent.
}

func NewChunk(spans []*Span, willSend bool) *Chunk {
return &Chunk{
spans: spans,
willSend: willSend,
}
}

// sampleChunk applies single-span sampling to the provided trace.
func (t *tracer) sampleChunk(c *Chunk) {
if len(c.Spans) > 0 {
if p, ok := c.Spans[0].context.SamplingPriority(); ok && p > 0 {
if len(c.spans) > 0 {
if p, ok := c.spans[0].context.SamplingPriority(); ok && p > 0 {
// The trace is kept, no need to run single span sampling rules.
return
}
}
var kept []*Span
if t.rulesSampling.HasSpanRules() {
// Apply sampling rules to individual spans in the trace.
for _, span := range c.Spans {
for _, span := range c.spans {
if t.rulesSampling.SampleSpan(span) {
kept = append(kept, span)
}
}
if len(kept) > 0 && len(kept) < len(c.Spans) {
if len(kept) > 0 && len(kept) < len(c.spans) {
// Some spans in the trace were kept, so a partial trace will be sent.
tracerstats.Signal(tracerstats.PartialTraces, 1)
}
}
if len(kept) == 0 {
tracerstats.Signal(tracerstats.DroppedP0Traces, 1)
}
tracerstats.Signal(tracerstats.DroppedP0Spans, uint32(len(c.Spans)-len(kept)))
tracerstats.Signal(tracerstats.DroppedP0Spans, uint32(len(c.spans)-len(kept)))
if !c.willSend {
c.Spans = kept
c.spans = kept
}
}

func (t *tracer) SubmitChunk(c any) {
// TODO(kjn v2): This will be unified with pushChunk, and only one function will exist.
// This is here right now because of the lack of appropriate exported types.

ch := c.(*Chunk) // TODO(kjn v2): This can panic. Once the appropriate types are moved, this assertion will be removed.
t.pushChunk(ch)
}

func (t *tracer) pushChunk(trace *Chunk) {
tracerstats.Signal(tracerstats.SpansFinished, uint32(len(trace.Spans)))
tracerstats.Signal(tracerstats.SpansFinished, uint32(len(trace.spans)))
select {
case <-t.stop:
return
Expand All @@ -490,7 +490,7 @@ func (t *tracer) pushChunk(trace *Chunk) {
select {
case t.out <- trace:
default:
log.Error("payload queue full, dropping %d traces", len(trace.Spans))
log.Error("payload queue full, dropping %d traces", len(trace.spans))
}
}

Expand Down Expand Up @@ -776,7 +776,22 @@ func (t *tracer) TracerConf() TracerConf {
}
}

func (t *tracer) SubmitStats(s *Span) {
func (t *tracer) Submit(s *Span) {
tc := t.TracerConf()
if !tc.Disabled {
// we have an active tracer
if tc.CanComputeStats && shouldComputeStats(s) {
// the agent supports computed stats
t.submitStats(s)
}
if tc.DebugAbandonedSpans {
// the tracer supports debugging abandoned spans
t.submitAbandonedSpan(s, true)
}
}
}

func (t *tracer) submitStats(s *Span) {
select {
case t.stats.In <- newAggregableSpan(s, t.obfuscator):
// ok
Expand All @@ -785,7 +800,7 @@ func (t *tracer) SubmitStats(s *Span) {
}
}

func (t *tracer) SubmitAbandonedSpan(s *Span, finished bool) {
func (t *tracer) submitAbandonedSpan(s *Span, finished bool) {
select {
case t.abandonedSpansDebugger.In <- newAbandonedSpanCandidate(s, finished):
// ok
Expand All @@ -794,6 +809,10 @@ func (t *tracer) SubmitAbandonedSpan(s *Span, finished bool) {
}
}

func (t *tracer) SubmitChunk(c *Chunk) {
t.pushChunk(c)
}

// sampleRateMetricKey is the metric key holding the applied sample rate. Has to be the same as the Agent.
const sampleRateMetricKey = "_sample_rate"

Expand Down
10 changes: 5 additions & 5 deletions ddtrace/tracer/tracer_test.go
Expand Up @@ -196,7 +196,7 @@ func TestTracerStart(t *testing.T) {
Start()

// ensure at least one worker started and handles requests
GetGlobalTracer().(*tracer).pushChunk(&Chunk{Spans: []*Span{}})
GetGlobalTracer().(*tracer).pushChunk(&Chunk{spans: []*Span{}})

Stop()
Stop()
Expand All @@ -208,7 +208,7 @@ func TestTracerStart(t *testing.T) {
tr, _, _, stop, err := startTestTracer(t)
assert.Nil(t, err)
defer stop()
tr.pushChunk(&Chunk{Spans: []*Span{}}) // blocks until worker is started
tr.pushChunk(&Chunk{spans: []*Span{}}) // blocks until worker is started
select {
case <-tr.stop:
t.Fatal("stopped channel should be open")
Expand Down Expand Up @@ -1615,16 +1615,16 @@ func TestPushTrace(t *testing.T) {
resource: "/foo",
},
}
tracer.pushChunk(&Chunk{Spans: trace})
tracer.pushChunk(&Chunk{spans: trace})

assert.Len(tracer.out, 1)

t0 := <-tracer.out
assert.Equal(&Chunk{Spans: trace}, t0)
assert.Equal(&Chunk{spans: trace}, t0)

many := payloadQueueSize + 2
for i := 0; i < many; i++ {
tracer.pushChunk(&Chunk{Spans: make([]*Span, i)})
tracer.pushChunk(&Chunk{spans: make([]*Span, i)})
}
assert.Len(tracer.out, payloadQueueSize)
log.Flush()
Expand Down

0 comments on commit f0eee6a

Please sign in to comment.