Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional context protections #544

Merged
merged 6 commits into from Oct 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
44 changes: 22 additions & 22 deletions sampler_v2_test.go
Expand Up @@ -72,31 +72,31 @@ func TestSpanRemainsWriteable(t *testing.T) {
defer closer.Close()

span := tracer.StartSpan("op1").(*Span)
assert.True(t, span.isWriteable(), "span is writeable when created")
assert.False(t, span.isSamplingFinalized(), "span is not finalized when created")
assert.True(t, span.context.isWriteable(), "span is writeable when created")
assert.False(t, span.context.isSamplingFinalized(), "span is not finalized when created")

span.SetTag("k1", "v1")
assert.True(t, span.isWriteable(), "span is writeable after setting tags")
assert.True(t, span.context.isWriteable(), "span is writeable after setting tags")
assert.Equal(t, opentracing.Tags{"k1": "v1"}, span.Tags())

span.SetOperationName("op2")
assert.False(t, span.isWriteable(), "span is not writeable after sampler returns retryable=true")
assert.True(t, span.isSamplingFinalized(), "span is finalized after sampler returns retryable=true")
assert.False(t, span.context.isWriteable(), "span is not writeable after sampler returns retryable=true")
assert.True(t, span.context.isSamplingFinalized(), "span is finalized after sampler returns retryable=true")
}

func TestSpanSharesSamplingStateWithChildren(t *testing.T) {
tracer, closer := NewTracer("test-service", newRetryableSampler(false), NewNullReporter())
defer closer.Close()

sp1 := tracer.StartSpan("op1").(*Span)
assert.False(t, sp1.isSamplingFinalized(), "span is not finalized when created")
assert.False(t, sp1.context.isSamplingFinalized(), "span is not finalized when created")

sp2 := tracer.StartSpan("op2", opentracing.ChildOf(sp1.Context())).(*Span)
assert.False(t, sp2.isSamplingFinalized(), "child span is not finalized when created")
assert.False(t, sp2.context.isSamplingFinalized(), "child span is not finalized when created")

sp2.SetOperationName("op3")
assert.True(t, sp2.isSamplingFinalized(), "child span is finalized after changing operation name")
assert.True(t, sp1.isSamplingFinalized(), "parent span is also finalized after child is finalized")
assert.True(t, sp2.context.isSamplingFinalized(), "child span is finalized after changing operation name")
assert.True(t, sp1.context.isSamplingFinalized(), "parent span is also finalized after child is finalized")
}

func TestSamplingIsFinalizedOnSamplingPriorityTag(t *testing.T) {
Expand Down Expand Up @@ -125,10 +125,10 @@ func TestSamplingIsFinalizedOnSamplingPriorityTag(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
span := tracer.StartSpan("op1").(*Span)
assert.False(t, span.isSamplingFinalized(), "span is not finalized when created")
assert.False(t, span.context.isSamplingFinalized(), "span is not finalized when created")

ext.SamplingPriority.Set(span, tt.priority)
assert.True(t, span.isSamplingFinalized(), "span is finalized after sampling.priority tag")
assert.True(t, span.context.isSamplingFinalized(), "span is finalized after sampling.priority tag")
assert.Equal(t, tt.expectedSampled, span.context.IsSampled(), "span is sampled after sampling.priority tag")
assert.Equal(t, tt.expectedTags, span.Tags(), "sampling.priority tag in the span")
})
Expand All @@ -149,7 +149,7 @@ func TestSamplingIsFinalizedWithV1Samplers(t *testing.T) {
defer closer.Close()

span := tracer.StartSpan("op1").(*Span)
assert.True(t, span.isSamplingFinalized(), "span is finalized when created with V1 sampler")
assert.True(t, span.context.isSamplingFinalized(), "span is finalized when created with V1 sampler")
})
}
}
Expand All @@ -159,12 +159,12 @@ func TestSamplingIsNotFinalizedWhenContextIsInjected(t *testing.T) {
defer closer.Close()

span := tracer.StartSpan("op1").(*Span)
assert.False(t, span.isSamplingFinalized(), "span is not finalized when created")
assert.False(t, span.context.isSamplingFinalized(), "span is not finalized when created")

err := tracer.Inject(span.Context(), opentracing.TextMap, opentracing.TextMapCarrier{})
require.NoError(t, err)

assert.False(t, span.isSamplingFinalized(), "span is not finalized after context is serialized")
assert.False(t, span.context.isSamplingFinalized(), "span is not finalized after context is serialized")
}

func TestSamplingIsFinalizedInChildSpanOfRemoteParent(t *testing.T) {
Expand All @@ -174,7 +174,7 @@ func TestSamplingIsFinalizedInChildSpanOfRemoteParent(t *testing.T) {
defer closer.Close()

span := tracer.StartSpan("parent").(*Span)
assert.False(t, span.isSamplingFinalized(), "span is not finalized when created")
assert.False(t, span.context.isSamplingFinalized(), "span is not finalized when created")
assert.Equal(t, sampled, span.context.IsSampled())

carrier := opentracing.TextMapCarrier{}
Expand All @@ -184,7 +184,7 @@ func TestSamplingIsFinalizedInChildSpanOfRemoteParent(t *testing.T) {
parentContext, err := tracer.Extract(opentracing.TextMap, carrier)
require.NoError(t, err)
childSpan := tracer.StartSpan("child", opentracing.ChildOf(parentContext)).(*Span)
assert.True(t, childSpan.isSamplingFinalized(), "child span is finalized")
assert.True(t, childSpan.context.isSamplingFinalized(), "child span is finalized")
assert.Equal(t, sampled, childSpan.context.IsSampled())
})
}
Expand All @@ -195,30 +195,30 @@ func TestSpanIsWriteableIfSampledOrNotFinalized(t *testing.T) {
defer closer.Close()

span := tracer.StartSpan("span").(*Span)
assert.False(t, span.isSamplingFinalized(), "span is not finalized when created")
assert.False(t, span.context.isSamplingFinalized(), "span is not finalized when created")
assert.False(t, span.context.IsSampled(), "span is not sampled")
assert.True(t, span.isWriteable(), "span is writeable")
assert.True(t, span.context.isWriteable(), "span is writeable")

tracer.(*Tracer).sampler = NewConstSampler(true)
span = tracer.StartSpan("span").(*Span)
assert.True(t, span.isSamplingFinalized(), "span is finalized when created")
assert.True(t, span.context.isSamplingFinalized(), "span is finalized when created")
assert.True(t, span.context.IsSampled(), "span is sampled")
assert.True(t, span.isWriteable(), "span is writeable")
assert.True(t, span.context.isWriteable(), "span is writeable")
}

func TestSetOperationOverridesOperationOnSampledSpan(t *testing.T) {
tracer, closer := NewTracer("test-service", NewConstSampler(true), NewNullReporter())
defer closer.Close()

span := tracer.StartSpan("op1").(*Span)
assert.True(t, span.isSamplingFinalized(), "span is finalized when created")
assert.True(t, span.context.isSamplingFinalized(), "span is finalized when created")
assert.True(t, span.context.IsSampled(), "span is sampled")
assert.Equal(t, "op1", span.OperationName())
assert.Equal(t, makeSamplerTags("const", true), span.tags)

span.tags = []Tag{} // easier to check that tags are not re-added
span.SetOperationName("op2")
assert.True(t, span.isSamplingFinalized(), "span is finalized when created")
assert.True(t, span.context.isSamplingFinalized(), "span is finalized when created")
assert.True(t, span.context.IsSampled(), "span is sampled")
assert.Equal(t, "op2", span.OperationName())
assert.Equal(t, []Tag{}, span.tags, "sampling tags are not re-added")
Expand Down
71 changes: 40 additions & 31 deletions span.go
Expand Up @@ -85,8 +85,9 @@ func NewTag(key string, value interface{}) Tag {
func (s *Span) SetOperationName(operationName string) opentracing.Span {
s.Lock()
s.operationName = operationName
ctx := s.context
s.Unlock()
if !s.isSamplingFinalized() {
if !ctx.isSamplingFinalized() {
decision := s.tracer.sampler.OnSetOperationName(s, operationName)
s.applySamplingDecision(decision, true)
}
Expand All @@ -100,15 +101,25 @@ func (s *Span) SetTag(key string, value interface{}) opentracing.Span {
}

func (s *Span) setTagInternal(key string, value interface{}, lock bool) opentracing.Span {
var ctx SpanContext
var operationName string
if lock {
ctx = s.SpanContext()
operationName = s.OperationName()
} else {
ctx = s.context
operationName = s.operationName
}

s.observer.OnSetTag(key, value)
if key == string(ext.SamplingPriority) && !setSamplingPriority(s, value) {
if key == string(ext.SamplingPriority) && !setSamplingPriority(ctx.samplingState, operationName, s.tracer, value) {
return s
}
if !s.isSamplingFinalized() {
if !ctx.isSamplingFinalized() {
decision := s.tracer.sampler.OnSetTag(s, key, value)
s.applySamplingDecision(decision, lock)
}
if s.isWriteable() {
if ctx.isWriteable() {
if lock {
s.Lock()
defer s.Unlock()
Expand Down Expand Up @@ -340,12 +351,13 @@ func (s *Span) FinishWithOptions(options opentracing.FinishOptions) {
s.observer.OnFinish(options)
s.Lock()
s.duration = options.FinishTime.Sub(s.startTime)
ctx := s.context
s.Unlock()
if !s.isSamplingFinalized() {
if !ctx.isSamplingFinalized() {
decision := s.tracer.sampler.OnFinishSpan(s)
s.applySamplingDecision(decision, true)
}
if s.context.IsSampled() {
if ctx.IsSampled() {
s.Lock()
s.fixLogsIfDropped()
if len(options.LogRecords) > 0 || len(options.BulkLogData) > 0 {
Expand Down Expand Up @@ -426,11 +438,18 @@ func (s *Span) serviceName() string {
}

func (s *Span) applySamplingDecision(decision SamplingDecision, lock bool) {
var ctx SpanContext
if lock {
ctx = s.SpanContext()
} else {
ctx = s.context
}

if !decision.Retryable {
s.context.samplingState.setFinal()
ctx.samplingState.setFinal()
}
if decision.Sample {
s.context.samplingState.setSampled()
ctx.samplingState.setSampled()
if len(decision.Tags) > 0 {
if lock {
s.Lock()
Expand All @@ -443,44 +462,34 @@ func (s *Span) applySamplingDecision(decision SamplingDecision, lock bool) {
}
}

// Span can be written to if it is sampled or the sampling decision has not been finalized.
func (s *Span) isWriteable() bool {
state := s.context.samplingState
return !state.isFinal() || state.isSampled()
}

func (s *Span) isSamplingFinalized() bool {
return s.context.samplingState.isFinal()
}

// setSamplingPriority returns true if the flag was updated successfully, false otherwise.
// The behavior of setSamplingPriority is surprising
// If noDebugFlagOnForcedSampling is set
// setSamplingPriority(span, 1) always sets only flagSampled
// setSamplingPriority(..., 1) always sets only flagSampled
// If noDebugFlagOnForcedSampling is unset, and isDebugAllowed passes
// setSamplingPriority(span, 1) sets both flagSampled and flagDebug
// setSamplingPriority(..., 1) sets both flagSampled and flagDebug
// However,
// setSamplingPriority(span, 0) always only resets flagSampled
// setSamplingPriority(..., 0) always only resets flagSampled
//
// This means that doing a setSamplingPriority(span, 1) followed by setSamplingPriority(span, 0) can
// This means that doing a setSamplingPriority(..., 1) followed by setSamplingPriority(..., 0) can
// leave flagDebug set
func setSamplingPriority(s *Span, value interface{}) bool {
func setSamplingPriority(state *samplingState, operationName string, tracer *Tracer, value interface{}) bool {
val, ok := value.(uint16)
if !ok {
return false
}
if val == 0 {
s.context.samplingState.unsetSampled()
s.context.samplingState.setFinal()
state.unsetSampled()
state.setFinal()
return true
}
if s.tracer.options.noDebugFlagOnForcedSampling {
s.context.samplingState.setSampled()
s.context.samplingState.setFinal()
if tracer.options.noDebugFlagOnForcedSampling {
state.setSampled()
state.setFinal()
return true
} else if s.tracer.isDebugAllowed(s.operationName) {
s.context.samplingState.setDebugAndSampled()
s.context.samplingState.setFinal()
} else if tracer.isDebugAllowed(operationName) {
state.setDebugAndSampled()
state.setFinal()
return true
}
return false
Expand Down
10 changes: 10 additions & 0 deletions span_context.go
Expand Up @@ -271,6 +271,16 @@ func (c SpanContext) Flags() byte {
return c.samplingState.flags()
}

// Span can be written to if it is sampled or the sampling decision has not been finalized.
func (c SpanContext) isWriteable() bool {
state := c.samplingState
return !state.isFinal() || state.isSampled()
}

func (c SpanContext) isSamplingFinalized() bool {
return c.samplingState.isFinal()
}

// NewSpanContext creates a new instance of SpanContext
func NewSpanContext(traceID TraceID, spanID, parentID SpanID, sampled bool, baggage map[string]string) SpanContext {
samplingState := &samplingState{}
Expand Down
16 changes: 15 additions & 1 deletion span_test.go
Expand Up @@ -363,7 +363,6 @@ func TestSpan_References(t *testing.T) {
}

func TestSpanContextRaces(t *testing.T) {
t.Skip("Skipped: test will panic with -race, see https://github.com/jaegertracing/jaeger-client-go/issues/526")
tracer, closer := NewTracer("test", NewConstSampler(true), NewNullReporter())
defer closer.Close()

Expand Down Expand Up @@ -395,6 +394,21 @@ func TestSpanContextRaces(t *testing.T) {
go accessor(func() {
span.BaggageItem("k")
})
go accessor(func() {
ext.SamplingPriority.Set(span, 0)
})
go accessor(func() {
EnableFirehose(span)
})
go accessor(func() {
span.SpanContext().samplingState.setFlag(flagDebug)
})
go accessor(func() {
span.SetOperationName("test")
})
go accessor(func() {
span.FinishWithOptions(opentracing.FinishOptions{})
})
time.Sleep(100 * time.Millisecond)
close(end)
}
12 changes: 7 additions & 5 deletions tracer.go
Expand Up @@ -320,7 +320,7 @@ func (t *Tracer) startSpanWithOptions(
sp.references = references
sp.firstInProcess = rpcServer || sp.context.parentID == 0

if !sp.isSamplingFinalized() {
if !sp.context.isSamplingFinalized() {
decision := t.sampler.OnCreateSpan(sp)
sp.applySamplingDecision(decision, false)
}
Expand Down Expand Up @@ -413,7 +413,7 @@ func (t *Tracer) newSpan() *Span {
// calling client-side span, but obviously the server side span is
// no longer a root span of the trace.
func (t *Tracer) emitNewSpanMetrics(sp *Span, newTrace bool) {
if !sp.isSamplingFinalized() {
if !sp.context.isSamplingFinalized() {
t.metrics.SpansStartedDelayedSampling.Inc(1)
if newTrace {
t.metrics.TracesStartedDelayedSampling.Inc(1)
Expand All @@ -437,9 +437,11 @@ func (t *Tracer) emitNewSpanMetrics(sp *Span, newTrace bool) {
}

func (t *Tracer) reportSpan(sp *Span) {
if !sp.isSamplingFinalized() {
ctx := sp.SpanContext()

if !ctx.isSamplingFinalized() {
t.metrics.SpansFinishedDelayedSampling.Inc(1)
} else if sp.context.IsSampled() {
} else if ctx.IsSampled() {
t.metrics.SpansFinishedSampled.Inc(1)
} else {
t.metrics.SpansFinishedNotSampled.Inc(1)
Expand All @@ -448,7 +450,7 @@ func (t *Tracer) reportSpan(sp *Span) {
// Note: if the reporter is processing Span asynchronously then it needs to Retain() the span,
// and then Release() it when no longer needed.
// Otherwise, the span may be reused for another trace and its data may be overwritten.
if sp.context.IsSampled() {
if ctx.IsSampled() {
t.reporter.Report(sp)
}

Expand Down