Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sdktrace.TraceProvider Shutdown/ForceFlush when no processor register #3268

Merged
merged 4 commits into from Oct 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Expand Up @@ -8,6 +8,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

### Changed

- `sdktrace.TraceProvider.Shutdown` and `sdktrace.TraceProvider.ForceFlush` to not return error when no processor register. (#3268)

## [1.11.0/0.32.3] 2022-10-12

### Added
Expand Down
41 changes: 17 additions & 24 deletions sdk/trace/provider.go
Expand Up @@ -116,12 +116,13 @@ func NewTracerProvider(opts ...TracerProviderOption) *TracerProvider {
spanLimits: o.spanLimits,
resource: o.resource,
}

global.Info("TracerProvider created", "config", o)

spss := spanProcessorStates{}
for _, sp := range o.processors {
tp.RegisterSpanProcessor(sp)
spss = append(spss, newSpanProcessorState(sp))
}
tp.spanProcessors.Store(spss)

return tp
}
Expand Down Expand Up @@ -159,44 +160,38 @@ func (p *TracerProvider) Tracer(name string, opts ...trace.TracerOption) trace.T
}

// RegisterSpanProcessor adds the given SpanProcessor to the list of SpanProcessors.
func (p *TracerProvider) RegisterSpanProcessor(s SpanProcessor) {
func (p *TracerProvider) RegisterSpanProcessor(sp SpanProcessor) {
p.mu.Lock()
defer p.mu.Unlock()
newSPS := spanProcessorStates{}
if old, ok := p.spanProcessors.Load().(spanProcessorStates); ok {
newSPS = append(newSPS, old...)
}
newSpanSync := &spanProcessorState{
sp: s,
state: &sync.Once{},
}
newSPS = append(newSPS, newSpanSync)
newSPS = append(newSPS, p.spanProcessors.Load().(spanProcessorStates)...)
newSPS = append(newSPS, newSpanProcessorState(sp))
p.spanProcessors.Store(newSPS)
}

// UnregisterSpanProcessor removes the given SpanProcessor from the list of SpanProcessors.
func (p *TracerProvider) UnregisterSpanProcessor(s SpanProcessor) {
func (p *TracerProvider) UnregisterSpanProcessor(sp SpanProcessor) {
p.mu.Lock()
defer p.mu.Unlock()
spss := spanProcessorStates{}
old, ok := p.spanProcessors.Load().(spanProcessorStates)
if !ok || len(old) == 0 {
old := p.spanProcessors.Load().(spanProcessorStates)
if len(old) == 0 {
return
}
spss := spanProcessorStates{}
spss = append(spss, old...)

// stop the span processor if it is started and remove it from the list
var stopOnce *spanProcessorState
var idx int
for i, sps := range spss {
if sps.sp == s {
if sps.sp == sp {
stopOnce = sps
idx = i
}
}
if stopOnce != nil {
stopOnce.state.Do(func() {
if err := s.Shutdown(context.Background()); err != nil {
if err := sp.Shutdown(context.Background()); err != nil {
otel.Handle(err)
}
})
Expand All @@ -213,10 +208,7 @@ func (p *TracerProvider) UnregisterSpanProcessor(s SpanProcessor) {
// ForceFlush immediately exports all spans that have not yet been exported for
// all the registered span processors.
func (p *TracerProvider) ForceFlush(ctx context.Context) error {
spss, ok := p.spanProcessors.Load().(spanProcessorStates)
if !ok {
return fmt.Errorf("failed to load span processors")
}
spss := p.spanProcessors.Load().(spanProcessorStates)
if len(spss) == 0 {
return nil
}
Expand All @@ -237,10 +229,11 @@ func (p *TracerProvider) ForceFlush(ctx context.Context) error {

// Shutdown shuts down the span processors in the order they were registered.
func (p *TracerProvider) Shutdown(ctx context.Context) error {
spss, ok := p.spanProcessors.Load().(spanProcessorStates)
if !ok {
return fmt.Errorf("failed to load span processors")
spss := p.spanProcessors.Load().(spanProcessorStates)
if len(spss) == 0 {
return nil
}

var retErr error
for _, sps := range spss {
select {
Expand Down
51 changes: 26 additions & 25 deletions sdk/trace/provider_test.go
Expand Up @@ -28,41 +28,45 @@ import (
"go.opentelemetry.io/otel/trace"
)

type basicSpanProcesor struct {
running bool
type basicSpanProcessor struct {
flushed bool
closed bool
injectShutdownError error
}

func (t *basicSpanProcesor) Shutdown(context.Context) error {
t.running = false
func (t *basicSpanProcessor) Shutdown(context.Context) error {
t.closed = true
return t.injectShutdownError
}

func (t *basicSpanProcesor) OnStart(context.Context, ReadWriteSpan) {}
func (t *basicSpanProcesor) OnEnd(ReadOnlySpan) {}
func (t *basicSpanProcesor) ForceFlush(context.Context) error {
func (t *basicSpanProcessor) OnStart(context.Context, ReadWriteSpan) {}
func (t *basicSpanProcessor) OnEnd(ReadOnlySpan) {}
func (t *basicSpanProcessor) ForceFlush(context.Context) error {
t.flushed = true
return nil
}

func TestForceFlushAndShutdownTraceProviderWithoutProcessor(t *testing.T) {
stp := NewTracerProvider()
assert.NoError(t, stp.ForceFlush(context.Background()))
assert.NoError(t, stp.Shutdown(context.Background()))
}

func TestShutdownTraceProvider(t *testing.T) {
stp := NewTracerProvider()
sp := &basicSpanProcesor{}
sp := &basicSpanProcessor{}
stp.RegisterSpanProcessor(sp)

sp.running = true

_ = stp.Shutdown(context.Background())

if sp.running {
t.Errorf("Error shutdown basicSpanProcesor\n")
}
assert.NoError(t, stp.ForceFlush(context.Background()))
assert.True(t, sp.flushed, "error ForceFlush basicSpanProcessor")
assert.NoError(t, stp.Shutdown(context.Background()))
assert.True(t, sp.closed, "error Shutdown basicSpanProcessor")
}

func TestFailedProcessorShutdown(t *testing.T) {
stp := NewTracerProvider()
spErr := errors.New("basic span processor shutdown failure")
sp := &basicSpanProcesor{
running: true,
sp := &basicSpanProcessor{
injectShutdownError: spErr,
}
stp.RegisterSpanProcessor(sp)
Expand All @@ -76,12 +80,10 @@ func TestFailedProcessorsShutdown(t *testing.T) {
stp := NewTracerProvider()
spErr1 := errors.New("basic span processor shutdown failure1")
spErr2 := errors.New("basic span processor shutdown failure2")
sp1 := &basicSpanProcesor{
running: true,
sp1 := &basicSpanProcessor{
injectShutdownError: spErr1,
}
sp2 := &basicSpanProcesor{
running: true,
sp2 := &basicSpanProcessor{
injectShutdownError: spErr2,
}
stp.RegisterSpanProcessor(sp1)
Expand All @@ -90,16 +92,15 @@ func TestFailedProcessorsShutdown(t *testing.T) {
err := stp.Shutdown(context.Background())
assert.Error(t, err)
assert.EqualError(t, err, "basic span processor shutdown failure1; basic span processor shutdown failure2")
assert.False(t, sp1.running)
assert.False(t, sp2.running)
assert.True(t, sp1.closed)
assert.True(t, sp2.closed)
}

func TestFailedProcessorShutdownInUnregister(t *testing.T) {
handler.Reset()
stp := NewTracerProvider()
spErr := errors.New("basic span processor shutdown failure")
sp := &basicSpanProcesor{
running: true,
sp := &basicSpanProcessor{
injectShutdownError: spErr,
}
stp.RegisterSpanProcessor(sp)
Expand Down
15 changes: 7 additions & 8 deletions sdk/trace/span.go
Expand Up @@ -423,14 +423,13 @@ func (s *recordingSpan) End(options ...trace.SpanEndOption) {
}
s.mu.Unlock()

if sps, ok := s.tracer.provider.spanProcessors.Load().(spanProcessorStates); ok {
if len(sps) == 0 {
return
}
snap := s.snapshot()
for _, sp := range sps {
sp.sp.OnEnd(snap)
}
sps := s.tracer.provider.spanProcessors.Load().(spanProcessorStates)
if len(sps) == 0 {
return
}
snap := s.snapshot()
for _, sp := range sps {
sp.sp.OnEnd(snap)
}
}

Expand Down
5 changes: 5 additions & 0 deletions sdk/trace/span_processor.go
Expand Up @@ -64,4 +64,9 @@ type spanProcessorState struct {
sp SpanProcessor
state *sync.Once
}

func newSpanProcessorState(sp SpanProcessor) *spanProcessorState {
return &spanProcessorState{sp: sp, state: &sync.Once{}}
}

type spanProcessorStates []*spanProcessorState
2 changes: 1 addition & 1 deletion sdk/trace/tracer.go
Expand Up @@ -51,7 +51,7 @@ func (tr *tracer) Start(ctx context.Context, name string, options ...trace.SpanS

s := tr.newSpan(ctx, name, &config)
if rw, ok := s.(ReadWriteSpan); ok && s.IsRecording() {
sps, _ := tr.provider.spanProcessors.Load().(spanProcessorStates)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, but this may also have caused a panic (someone else if really interested can investigate).

sps := tr.provider.spanProcessors.Load().(spanProcessorStates)
for _, sp := range sps {
sp.sp.OnStart(ctx, rw)
}
Expand Down