Skip to content

Commit

Permalink
Lock RemotelyControlledSampler.sampler on callbacks (#543)
Browse files Browse the repository at this point in the history
* Make access to RemotelyControlledSampler.sampler serialized

Signed-off-by: Dima Kozlov <hummerd@mail.ru>

* Add test for race detection in RemotelyControlledSampler

Signed-off-by: Dima Kozlov <hummerd@mail.ru>
  • Loading branch information
Dima committed Oct 9, 2020
1 parent f3d00f4 commit 9a2c34e
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 5 deletions.
17 changes: 12 additions & 5 deletions sampler_remote.go
Expand Up @@ -95,22 +95,30 @@ func (s *RemotelyControlledSampler) IsSampled(id TraceID, operation string) (boo

// OnCreateSpan implements OnCreateSpan of SamplerV2.
func (s *RemotelyControlledSampler) OnCreateSpan(span *Span) SamplingDecision {
return s.Sampler().OnCreateSpan(span)
s.RLock()
defer s.RUnlock()
return s.sampler.OnCreateSpan(span)
}

// OnSetOperationName implements OnSetOperationName of SamplerV2.
func (s *RemotelyControlledSampler) OnSetOperationName(span *Span, operationName string) SamplingDecision {
return s.Sampler().OnSetOperationName(span, operationName)
s.RLock()
defer s.RUnlock()
return s.sampler.OnSetOperationName(span, operationName)
}

// OnSetTag implements OnSetTag of SamplerV2.
func (s *RemotelyControlledSampler) OnSetTag(span *Span, key string, value interface{}) SamplingDecision {
return s.Sampler().OnSetTag(span, key, value)
s.RLock()
defer s.RUnlock()
return s.sampler.OnSetTag(span, key, value)
}

// OnFinishSpan implements OnFinishSpan of SamplerV2.
func (s *RemotelyControlledSampler) OnFinishSpan(span *Span) SamplingDecision {
return s.Sampler().OnFinishSpan(span)
s.RLock()
defer s.RUnlock()
return s.sampler.OnFinishSpan(span)
}

// Close implements Close() of Sampler.
Expand Down Expand Up @@ -157,7 +165,6 @@ func (s *RemotelyControlledSampler) Sampler() SamplerV2 {
defer s.RUnlock()
return s.sampler
}

func (s *RemotelyControlledSampler) setSampler(sampler SamplerV2) {
s.Lock()
defer s.Unlock()
Expand Down
94 changes: 94 additions & 0 deletions sampler_remote_test.go
Expand Up @@ -22,13 +22,107 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics"
mTestutils "github.com/uber/jaeger-lib/metrics/metricstest"

"github.com/uber/jaeger-client-go/log"
"github.com/uber/jaeger-client-go/testutils"
"github.com/uber/jaeger-client-go/thrift-gen/sampling"
)

func TestRemotelyControlledSampler_updateRace(t *testing.T) {
m := &Metrics{
SamplerRetrieved: metrics.NullCounter,
SamplerUpdated: metrics.NullCounter,
}
initSampler, _ := NewProbabilisticSampler(0.123)
logger := log.NullLogger
fetcher := &testSamplingStrategyFetcher{response: []byte("probabilistic")}
parser := new(testSamplingStrategyParser)
updaters := []SamplerUpdater{new(ProbabilisticSamplerUpdater)}
sampler := NewRemotelyControlledSampler(
"test",
SamplerOptions.Metrics(m),
SamplerOptions.MaxOperations(42),
SamplerOptions.OperationNameLateBinding(true),
SamplerOptions.InitialSampler(initSampler),
SamplerOptions.Logger(logger),
SamplerOptions.SamplingServerURL("my url"),
SamplerOptions.SamplingRefreshInterval(time.Millisecond),
SamplerOptions.SamplingStrategyFetcher(fetcher),
SamplerOptions.SamplingStrategyParser(parser),
SamplerOptions.Updaters(updaters...),
)

s := makeSpan(1, "test")
end := make(chan struct{})

accessor := func(f func()) {
for {
select {
case <-end:
return
default:
f()
}
}
}

go accessor(func() {
sampler.UpdateSampler()
})

go accessor(func() {
sampler.IsSampled(TraceID{Low: 1}, "test")
})

go accessor(func() {
sampler.OnCreateSpan(s)
})

go accessor(func() {
sampler.OnSetTag(s, "test", 1)
})

go accessor(func() {
sampler.OnFinishSpan(s)
})

go accessor(func() {
sampler.OnSetOperationName(s, "test")
})

time.Sleep(100 * time.Millisecond)
close(end)
sampler.Close()
}

type testSamplingStrategyFetcher struct {
response []byte
}

func (c *testSamplingStrategyFetcher) Fetch(serviceName string) ([]byte, error) {
return []byte(c.response), nil
}

type testSamplingStrategyParser struct {
}

func (p *testSamplingStrategyParser) Parse(response []byte) (interface{}, error) {
strategy := new(sampling.SamplingStrategyResponse)

switch string(response) {
case "probabilistic":
strategy.StrategyType = sampling.SamplingStrategyType_PROBABILISTIC
strategy.ProbabilisticSampling = &sampling.ProbabilisticSamplingStrategy{
SamplingRate: 0.85,
}
return strategy, nil
}

return nil, errors.New("unknown strategy test request")
}

func TestRemoteSamplerOptions(t *testing.T) {
m := new(Metrics)
initSampler, _ := NewProbabilisticSampler(0.123)
Expand Down

0 comments on commit 9a2c34e

Please sign in to comment.