From 33cdc7ce45c7406cdcea6e44a53f457998f101b3 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Mon, 10 Oct 2022 18:04:06 -0700 Subject: [PATCH] Fix sdktrace.TraceProvider Shutdown/ForceFlush when no processor register Signed-off-by: Bogdan Drutu --- CHANGELOG.md | 1 + sdk/trace/provider.go | 30 ++++++++-------------- sdk/trace/provider_test.go | 51 +++++++++++++++++++------------------- sdk/trace/span.go | 15 ++++++----- sdk/trace/tracer.go | 2 +- 5 files changed, 46 insertions(+), 53 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1fd689d606f..8a1a150c72e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Use default view if instrument does not match any registered view of a reader. (#3224, #3237) - The OpenCensus bridge no longer sends empty batches of metrics. (#3263) +- `sdktrace.TraceProvider.Shutdown` and `sdktrace.TraceProvider.ForceFlush` to not return error when no processor register. (#3268) ## [0.32.1] Metric SDK (Alpha) - 2022-09-22 diff --git a/sdk/trace/provider.go b/sdk/trace/provider.go index 292ea5481bc..7452778d29b 100644 --- a/sdk/trace/provider.go +++ b/sdk/trace/provider.go @@ -116,9 +116,9 @@ func NewTracerProvider(opts ...TracerProviderOption) *TracerProvider { spanLimits: o.spanLimits, resource: o.resource, } - global.Info("TracerProvider created", "config", o) + tp.spanProcessors.Store(spanProcessorStates{}) for _, sp := range o.processors { tp.RegisterSpanProcessor(sp) } @@ -163,14 +163,8 @@ func (p *TracerProvider) RegisterSpanProcessor(s SpanProcessor) { p.mu.Lock() defer p.mu.Unlock() newSPS := spanProcessorStates{} - if old, ok := p.spanProcessors.Load().(spanProcessorStates); ok { - newSPS = append(newSPS, old...) - } - newSpanSync := &spanProcessorState{ - sp: s, - state: &sync.Once{}, - } - newSPS = append(newSPS, newSpanSync) + newSPS = append(newSPS, p.spanProcessors.Load().(spanProcessorStates)...) + newSPS = append(newSPS, &spanProcessorState{sp: s, state: &sync.Once{}}) p.spanProcessors.Store(newSPS) } @@ -178,11 +172,11 @@ func (p *TracerProvider) RegisterSpanProcessor(s SpanProcessor) { func (p *TracerProvider) UnregisterSpanProcessor(s SpanProcessor) { p.mu.Lock() defer p.mu.Unlock() - spss := spanProcessorStates{} - old, ok := p.spanProcessors.Load().(spanProcessorStates) - if !ok || len(old) == 0 { + old := p.spanProcessors.Load().(spanProcessorStates) + if len(old) == 0 { return } + spss := spanProcessorStates{} spss = append(spss, old...) // stop the span processor if it is started and remove it from the list @@ -213,10 +207,7 @@ func (p *TracerProvider) UnregisterSpanProcessor(s SpanProcessor) { // ForceFlush immediately exports all spans that have not yet been exported for // all the registered span processors. func (p *TracerProvider) ForceFlush(ctx context.Context) error { - spss, ok := p.spanProcessors.Load().(spanProcessorStates) - if !ok { - return fmt.Errorf("failed to load span processors") - } + spss := p.spanProcessors.Load().(spanProcessorStates) if len(spss) == 0 { return nil } @@ -237,10 +228,11 @@ func (p *TracerProvider) ForceFlush(ctx context.Context) error { // Shutdown shuts down the span processors in the order they were registered. func (p *TracerProvider) Shutdown(ctx context.Context) error { - spss, ok := p.spanProcessors.Load().(spanProcessorStates) - if !ok { - return fmt.Errorf("failed to load span processors") + spss := p.spanProcessors.Load().(spanProcessorStates) + if len(spss) == 0 { + return nil } + var retErr error for _, sps := range spss { select { diff --git a/sdk/trace/provider_test.go b/sdk/trace/provider_test.go index 6ec3df6794d..2cf19aaa029 100644 --- a/sdk/trace/provider_test.go +++ b/sdk/trace/provider_test.go @@ -28,41 +28,45 @@ import ( "go.opentelemetry.io/otel/trace" ) -type basicSpanProcesor struct { - running bool +type basicSpanProcessor struct { + flushed bool + closed bool injectShutdownError error } -func (t *basicSpanProcesor) Shutdown(context.Context) error { - t.running = false +func (t *basicSpanProcessor) Shutdown(context.Context) error { + t.closed = true return t.injectShutdownError } -func (t *basicSpanProcesor) OnStart(context.Context, ReadWriteSpan) {} -func (t *basicSpanProcesor) OnEnd(ReadOnlySpan) {} -func (t *basicSpanProcesor) ForceFlush(context.Context) error { +func (t *basicSpanProcessor) OnStart(context.Context, ReadWriteSpan) {} +func (t *basicSpanProcessor) OnEnd(ReadOnlySpan) {} +func (t *basicSpanProcessor) ForceFlush(context.Context) error { + t.flushed = true return nil } +func TestForceFlushAndShutdownTraceProviderWithoutProcessor(t *testing.T) { + stp := NewTracerProvider() + assert.NoError(t, stp.ForceFlush(context.Background())) + assert.NoError(t, stp.Shutdown(context.Background())) +} + func TestShutdownTraceProvider(t *testing.T) { stp := NewTracerProvider() - sp := &basicSpanProcesor{} + sp := &basicSpanProcessor{} stp.RegisterSpanProcessor(sp) - sp.running = true - - _ = stp.Shutdown(context.Background()) - - if sp.running { - t.Errorf("Error shutdown basicSpanProcesor\n") - } + assert.NoError(t, stp.ForceFlush(context.Background())) + assert.True(t, sp.flushed, "error ForceFlush basicSpanProcessor") + assert.NoError(t, stp.Shutdown(context.Background())) + assert.True(t, sp.closed, "error Shutdown basicSpanProcessor") } func TestFailedProcessorShutdown(t *testing.T) { stp := NewTracerProvider() spErr := errors.New("basic span processor shutdown failure") - sp := &basicSpanProcesor{ - running: true, + sp := &basicSpanProcessor{ injectShutdownError: spErr, } stp.RegisterSpanProcessor(sp) @@ -76,12 +80,10 @@ func TestFailedProcessorsShutdown(t *testing.T) { stp := NewTracerProvider() spErr1 := errors.New("basic span processor shutdown failure1") spErr2 := errors.New("basic span processor shutdown failure2") - sp1 := &basicSpanProcesor{ - running: true, + sp1 := &basicSpanProcessor{ injectShutdownError: spErr1, } - sp2 := &basicSpanProcesor{ - running: true, + sp2 := &basicSpanProcessor{ injectShutdownError: spErr2, } stp.RegisterSpanProcessor(sp1) @@ -90,16 +92,15 @@ func TestFailedProcessorsShutdown(t *testing.T) { err := stp.Shutdown(context.Background()) assert.Error(t, err) assert.EqualError(t, err, "basic span processor shutdown failure1; basic span processor shutdown failure2") - assert.False(t, sp1.running) - assert.False(t, sp2.running) + assert.True(t, sp1.closed) + assert.True(t, sp2.closed) } func TestFailedProcessorShutdownInUnregister(t *testing.T) { handler.Reset() stp := NewTracerProvider() spErr := errors.New("basic span processor shutdown failure") - sp := &basicSpanProcesor{ - running: true, + sp := &basicSpanProcessor{ injectShutdownError: spErr, } stp.RegisterSpanProcessor(sp) diff --git a/sdk/trace/span.go b/sdk/trace/span.go index 9760923f702..c7cf8e94ec9 100644 --- a/sdk/trace/span.go +++ b/sdk/trace/span.go @@ -423,14 +423,13 @@ func (s *recordingSpan) End(options ...trace.SpanEndOption) { } s.mu.Unlock() - if sps, ok := s.tracer.provider.spanProcessors.Load().(spanProcessorStates); ok { - if len(sps) == 0 { - return - } - snap := s.snapshot() - for _, sp := range sps { - sp.sp.OnEnd(snap) - } + sps := s.tracer.provider.spanProcessors.Load().(spanProcessorStates) + if len(sps) == 0 { + return + } + snap := s.snapshot() + for _, sp := range sps { + sp.sp.OnEnd(snap) } } diff --git a/sdk/trace/tracer.go b/sdk/trace/tracer.go index 7b11fc465c6..f17d924b89e 100644 --- a/sdk/trace/tracer.go +++ b/sdk/trace/tracer.go @@ -51,7 +51,7 @@ func (tr *tracer) Start(ctx context.Context, name string, options ...trace.SpanS s := tr.newSpan(ctx, name, &config) if rw, ok := s.(ReadWriteSpan); ok && s.IsRecording() { - sps, _ := tr.provider.spanProcessors.Load().(spanProcessorStates) + sps := tr.provider.spanProcessors.Load().(spanProcessorStates) for _, sp := range sps { sp.sp.OnStart(ctx, rw) }