Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flush pending telemetry when ForceFlush or Shutdown are called on a PeriodicReader #3220

Merged
merged 17 commits into from Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -15,6 +15,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Changed

- `span.SetStatus` has been updated such that calls that lower the status are now no-ops. (#3214)
- Flush pending measurements with the `PeriodicReader` in the `go.opentelemetry.io/otel/sdk/metric` when `ForceFlush` or `Shutdown` are called. (#3220)

## [0.32.1] Metric SDK (Alpha) - 2022-09-22

Expand Down
108 changes: 0 additions & 108 deletions exporters/stdout/stdoutmetric/example_test.go
Expand Up @@ -123,112 +123,4 @@ func Example() {

// Ensure the periodic reader is cleaned up by shutting down the sdk.
_ = sdk.Shutdown(ctx)

// Output:
// {
// "Resource": [
// {
// "Key": "service.name",
// "Value": {
// "Type": "STRING",
// "Value": "stdoutmetric-example"
// }
// }
// ],
// "ScopeMetrics": [
// {
// "Scope": {
// "Name": "example",
// "Version": "v0.0.1",
// "SchemaURL": ""
// },
// "Metrics": [
// {
// "Name": "requests",
// "Description": "Number of requests received",
// "Unit": "1",
// "Data": {
// "DataPoints": [
// {
// "Attributes": [
// {
// "Key": "server",
// "Value": {
// "Type": "STRING",
// "Value": "central"
// }
// }
// ],
// "StartTime": "2000-01-01T00:00:00Z",
// "Time": "2000-01-01T00:00:01Z",
// "Value": 5
// }
// ],
// "Temporality": "DeltaTemporality",
// "IsMonotonic": true
// }
// },
// {
// "Name": "latency",
// "Description": "Time spend processing received requests",
// "Unit": "ms",
// "Data": {
// "DataPoints": [
// {
// "Attributes": [
// {
// "Key": "server",
// "Value": {
// "Type": "STRING",
// "Value": "central"
// }
// }
// ],
// "StartTime": "2000-01-01T00:00:00Z",
// "Time": "2000-01-01T00:00:01Z",
// "Count": 10,
// "Bounds": [
// 1,
// 5,
// 10
// ],
// "BucketCounts": [
// 1,
// 3,
// 6,
// 0
// ],
// "Sum": 57
// }
// ],
// "Temporality": "DeltaTemporality"
// }
// },
// {
// "Name": "temperature",
// "Description": "CPU global temperature",
// "Unit": "cel(1 K)",
// "Data": {
// "DataPoints": [
// {
// "Attributes": [
// {
// "Key": "server",
// "Value": {
// "Type": "STRING",
// "Value": "central"
// }
// }
// ],
// "StartTime": "0001-01-01T00:00:00Z",
// "Time": "2000-01-01T00:00:01Z",
// "Value": 32.4
// }
// ]
// }
// }
// ]
// }
// ]
// }
}
80 changes: 65 additions & 15 deletions sdk/metric/periodic_reader.go
Expand Up @@ -115,15 +115,16 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade
r := &periodicReader{
timeout: conf.timeout,
exporter: exporter,
flushCh: make(chan chan error),
cancel: cancel,
done: make(chan struct{}),

temporalitySelector: conf.temporalitySelector,
aggregationSelector: conf.aggregationSelector,
}

r.wg.Add(1)
go func() {
defer r.wg.Done()
defer func() { close(r.done) }()
r.run(ctx, conf.interval)
}()

Expand All @@ -137,11 +138,12 @@ type periodicReader struct {

timeout time.Duration
exporter Exporter
flushCh chan chan error

temporalitySelector TemporalitySelector
aggregationSelector AggregationSelector

wg sync.WaitGroup
done chan struct{}
cancel context.CancelFunc
shutdownOnce sync.Once
}
Expand All @@ -161,15 +163,13 @@ func (r *periodicReader) run(ctx context.Context, interval time.Duration) {
for {
select {
case <-ticker.C:
m, err := r.Collect(ctx)
if err == nil {
c, cancel := context.WithTimeout(ctx, r.timeout)
err = r.exporter.Export(c, m)
cancel()
}
err := r.collectAndExport(ctx)
if err != nil {
otel.Handle(err)
}
case errCh := <-r.flushCh:
errCh <- r.collectAndExport(ctx)
ticker.Reset(interval)
case <-ctx.Done():
return
}
Expand All @@ -195,13 +195,27 @@ func (r *periodicReader) aggregation(kind view.InstrumentKind) aggregation.Aggre
return r.aggregationSelector(kind)
}

// collectAndExport gather all metric data related to the periodicReader r from
// the SDK and exports it with r's exporter.
func (r *periodicReader) collectAndExport(ctx context.Context) error {
m, err := r.Collect(ctx)
if err == nil {
err = r.export(ctx, m)
}
return err
}

// Collect gathers and returns all metric data related to the Reader from
// the SDK. The returned metric data is not exported to the configured
// exporter, it is left to the caller to handle that if desired.
//
// An error is returned if this is called after Shutdown.
func (r *periodicReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) {
p := r.producer.Load()
return r.collect(ctx, r.producer.Load())
}

// collect unwraps p as a produceHolder and returns its produce results.
func (r *periodicReader) collect(ctx context.Context, p interface{}) (metricdata.ResourceMetrics, error) {
if p == nil {
return metricdata.ResourceMetrics{}, ErrReaderNotRegistered
}
Expand All @@ -218,25 +232,61 @@ func (r *periodicReader) Collect(ctx context.Context) (metricdata.ResourceMetric
return ph.produce(ctx)
}

// ForceFlush flushes the Exporter.
// export exports metric data m using r's exporter.
func (r *periodicReader) export(ctx context.Context, m metricdata.ResourceMetrics) error {
c, cancel := context.WithTimeout(ctx, r.timeout)
defer cancel()
return r.exporter.Export(c, m)
}

// ForceFlush flushes pending telemetry.
func (r *periodicReader) ForceFlush(ctx context.Context) error {
errCh := make(chan error, 1)
select {
case r.flushCh <- errCh:
select {
case err := <-errCh:
if err != nil {
return err
}
close(errCh)
case <-ctx.Done():
return ctx.Err()
}
case <-r.done:
return ErrReaderShutdown
case <-ctx.Done():
return ctx.Err()
}
return r.exporter.ForceFlush(ctx)
}

// Shutdown stops the export pipeline.
// Shutdown flushes pending telemetry and then stops the export pipeline.
func (r *periodicReader) Shutdown(ctx context.Context) error {
err := ErrReaderShutdown
r.shutdownOnce.Do(func() {
// Stop the run loop.
r.cancel()
r.wg.Wait()
<-r.done

// Any future call to Collect will now return ErrReaderShutdown.
r.producer.Store(produceHolder{
ph := r.producer.Swap(produceHolder{
produce: shutdownProducer{}.produce,
})

err = r.exporter.Shutdown(ctx)
if ph != nil { // Reader was registered.
dashpole marked this conversation as resolved.
Show resolved Hide resolved
// Flush pending telemetry.
var m metricdata.ResourceMetrics
m, err = r.collect(ctx, ph)
if err == nil {
err = r.export(ctx, m)
}
}

sErr := r.exporter.Shutdown(ctx)
if err == nil || err == ErrReaderShutdown {
err = sErr
}
})
return err
}
53 changes: 49 additions & 4 deletions sdk/metric/periodic_reader_test.go
Expand Up @@ -98,6 +98,7 @@ func (ts *periodicReaderTestSuite) SetupTest() {
}

ts.ErrReader = NewPeriodicReader(e)
ts.ErrReader.register(testProducer{})
}

func (ts *periodicReaderTestSuite) TearDownTest() {
Expand Down Expand Up @@ -138,11 +139,13 @@ func (eh chErrorHandler) Handle(err error) {
eh.Err <- err
}

func TestPeriodicReaderRun(t *testing.T) {
func triggerTicker(t *testing.T) chan time.Time {
t.Helper()

// Override the ticker C chan so tests are not flaky and rely on timing.
defer func(orig func(time.Duration) *time.Ticker) {
newTicker = orig
}(newTicker)
orig := newTicker
t.Cleanup(func() { newTicker = orig })

// Keep this at size zero so when triggered with a send it will hang until
// the select case is selected and the collection loop is started.
trigger := make(chan time.Time)
Expand All @@ -151,6 +154,11 @@ func TestPeriodicReaderRun(t *testing.T) {
ticker.C = trigger
return ticker
}
return trigger
}

func TestPeriodicReaderRun(t *testing.T) {
trigger := triggerTicker(t)

// Register an error handler to validate export errors are passed to
// otel.Handle.
Expand All @@ -177,6 +185,43 @@ func TestPeriodicReaderRun(t *testing.T) {
_ = r.Shutdown(context.Background())
}

func TestPeriodicReaderFlushesPending(t *testing.T) {
// Override the ticker so tests are not flaky and rely on timing.
trigger := triggerTicker(t)
t.Cleanup(func() { close(trigger) })

expFunc := func(t *testing.T) (exp Exporter, called *bool) {
called = new(bool)
return &fnExporter{
exportFunc: func(_ context.Context, m metricdata.ResourceMetrics) error {
// The testProducer produces testMetrics.
assert.Equal(t, testMetrics, m)
*called = true
return assert.AnError
},
}, called
}

t.Run("ForceFlush", func(t *testing.T) {
exp, called := expFunc(t)
r := NewPeriodicReader(exp)
r.register(testProducer{})
assert.Equal(t, assert.AnError, r.ForceFlush(context.Background()), "export error not returned")
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")

// Ensure Reader is allowed clean up attempt.
_ = r.Shutdown(context.Background())
})

t.Run("Shutdown", func(t *testing.T) {
exp, called := expFunc(t)
r := NewPeriodicReader(exp)
r.register(testProducer{})
assert.Equal(t, assert.AnError, r.Shutdown(context.Background()), "export error not returned")
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")
})
}

func BenchmarkPeriodicReader(b *testing.B) {
b.Run("Collect", benchReaderCollectFunc(
NewPeriodicReader(new(fnExporter)),
Expand Down