From 111a1d7bbd37c0e3ce6376cfbe419c260bc7389f Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 27 Sep 2022 08:05:41 -0700 Subject: [PATCH] Flush pending telemetry when ForceFlush or Shutdown are called on a PeriodicReader (#3220) * Flush pending telemetry when ForceFlush called * Test flush of periodic reader * Flush pending telemetry on Shutdown * Fix things * Rename pHolder to p * Add testing for Shutdown * Fix doc for collect method * Fix collectAndExport doc * Fix collectAndExport English * Remove stdoutmetric example expected output * Add changes to changelog * Revert inadvertent change to golangci conf Co-authored-by: Chester Cheung --- CHANGELOG.md | 1 + exporters/stdout/stdoutmetric/example_test.go | 108 ------------------ sdk/metric/periodic_reader.go | 80 ++++++++++--- sdk/metric/periodic_reader_test.go | 53 ++++++++- 4 files changed, 115 insertions(+), 127 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ae74e5575cc..25fc8c12371 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Changed - `span.SetStatus` has been updated such that calls that lower the status are now no-ops. (#3214) +- Flush pending measurements with the `PeriodicReader` in the `go.opentelemetry.io/otel/sdk/metric` when `ForceFlush` or `Shutdown` are called. (#3220) ## [0.32.1] Metric SDK (Alpha) - 2022-09-22 diff --git a/exporters/stdout/stdoutmetric/example_test.go b/exporters/stdout/stdoutmetric/example_test.go index 5bbf8c6faa0..a6103c8c607 100644 --- a/exporters/stdout/stdoutmetric/example_test.go +++ b/exporters/stdout/stdoutmetric/example_test.go @@ -123,112 +123,4 @@ func Example() { // Ensure the periodic reader is cleaned up by shutting down the sdk. _ = sdk.Shutdown(ctx) - - // Output: - // { - // "Resource": [ - // { - // "Key": "service.name", - // "Value": { - // "Type": "STRING", - // "Value": "stdoutmetric-example" - // } - // } - // ], - // "ScopeMetrics": [ - // { - // "Scope": { - // "Name": "example", - // "Version": "v0.0.1", - // "SchemaURL": "" - // }, - // "Metrics": [ - // { - // "Name": "requests", - // "Description": "Number of requests received", - // "Unit": "1", - // "Data": { - // "DataPoints": [ - // { - // "Attributes": [ - // { - // "Key": "server", - // "Value": { - // "Type": "STRING", - // "Value": "central" - // } - // } - // ], - // "StartTime": "2000-01-01T00:00:00Z", - // "Time": "2000-01-01T00:00:01Z", - // "Value": 5 - // } - // ], - // "Temporality": "DeltaTemporality", - // "IsMonotonic": true - // } - // }, - // { - // "Name": "latency", - // "Description": "Time spend processing received requests", - // "Unit": "ms", - // "Data": { - // "DataPoints": [ - // { - // "Attributes": [ - // { - // "Key": "server", - // "Value": { - // "Type": "STRING", - // "Value": "central" - // } - // } - // ], - // "StartTime": "2000-01-01T00:00:00Z", - // "Time": "2000-01-01T00:00:01Z", - // "Count": 10, - // "Bounds": [ - // 1, - // 5, - // 10 - // ], - // "BucketCounts": [ - // 1, - // 3, - // 6, - // 0 - // ], - // "Sum": 57 - // } - // ], - // "Temporality": "DeltaTemporality" - // } - // }, - // { - // "Name": "temperature", - // "Description": "CPU global temperature", - // "Unit": "cel(1 K)", - // "Data": { - // "DataPoints": [ - // { - // "Attributes": [ - // { - // "Key": "server", - // "Value": { - // "Type": "STRING", - // "Value": "central" - // } - // } - // ], - // "StartTime": "0001-01-01T00:00:00Z", - // "Time": "2000-01-01T00:00:01Z", - // "Value": 32.4 - // } - // ] - // } - // } - // ] - // } - // ] - // } } diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index cd729eff32e..3f705086162 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -115,15 +115,16 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade r := &periodicReader{ timeout: conf.timeout, exporter: exporter, + flushCh: make(chan chan error), cancel: cancel, + done: make(chan struct{}), temporalitySelector: conf.temporalitySelector, aggregationSelector: conf.aggregationSelector, } - r.wg.Add(1) go func() { - defer r.wg.Done() + defer func() { close(r.done) }() r.run(ctx, conf.interval) }() @@ -137,11 +138,12 @@ type periodicReader struct { timeout time.Duration exporter Exporter + flushCh chan chan error temporalitySelector TemporalitySelector aggregationSelector AggregationSelector - wg sync.WaitGroup + done chan struct{} cancel context.CancelFunc shutdownOnce sync.Once } @@ -161,15 +163,13 @@ func (r *periodicReader) run(ctx context.Context, interval time.Duration) { for { select { case <-ticker.C: - m, err := r.Collect(ctx) - if err == nil { - c, cancel := context.WithTimeout(ctx, r.timeout) - err = r.exporter.Export(c, m) - cancel() - } + err := r.collectAndExport(ctx) if err != nil { otel.Handle(err) } + case errCh := <-r.flushCh: + errCh <- r.collectAndExport(ctx) + ticker.Reset(interval) case <-ctx.Done(): return } @@ -195,13 +195,27 @@ func (r *periodicReader) aggregation(kind view.InstrumentKind) aggregation.Aggre return r.aggregationSelector(kind) } +// collectAndExport gather all metric data related to the periodicReader r from +// the SDK and exports it with r's exporter. +func (r *periodicReader) collectAndExport(ctx context.Context) error { + m, err := r.Collect(ctx) + if err == nil { + err = r.export(ctx, m) + } + return err +} + // Collect gathers and returns all metric data related to the Reader from // the SDK. The returned metric data is not exported to the configured // exporter, it is left to the caller to handle that if desired. // // An error is returned if this is called after Shutdown. func (r *periodicReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) { - p := r.producer.Load() + return r.collect(ctx, r.producer.Load()) +} + +// collect unwraps p as a produceHolder and returns its produce results. +func (r *periodicReader) collect(ctx context.Context, p interface{}) (metricdata.ResourceMetrics, error) { if p == nil { return metricdata.ResourceMetrics{}, ErrReaderNotRegistered } @@ -218,25 +232,61 @@ func (r *periodicReader) Collect(ctx context.Context) (metricdata.ResourceMetric return ph.produce(ctx) } -// ForceFlush flushes the Exporter. +// export exports metric data m using r's exporter. +func (r *periodicReader) export(ctx context.Context, m metricdata.ResourceMetrics) error { + c, cancel := context.WithTimeout(ctx, r.timeout) + defer cancel() + return r.exporter.Export(c, m) +} + +// ForceFlush flushes pending telemetry. func (r *periodicReader) ForceFlush(ctx context.Context) error { + errCh := make(chan error, 1) + select { + case r.flushCh <- errCh: + select { + case err := <-errCh: + if err != nil { + return err + } + close(errCh) + case <-ctx.Done(): + return ctx.Err() + } + case <-r.done: + return ErrReaderShutdown + case <-ctx.Done(): + return ctx.Err() + } return r.exporter.ForceFlush(ctx) } -// Shutdown stops the export pipeline. +// Shutdown flushes pending telemetry and then stops the export pipeline. func (r *periodicReader) Shutdown(ctx context.Context) error { err := ErrReaderShutdown r.shutdownOnce.Do(func() { // Stop the run loop. r.cancel() - r.wg.Wait() + <-r.done // Any future call to Collect will now return ErrReaderShutdown. - r.producer.Store(produceHolder{ + ph := r.producer.Swap(produceHolder{ produce: shutdownProducer{}.produce, }) - err = r.exporter.Shutdown(ctx) + if ph != nil { // Reader was registered. + // Flush pending telemetry. + var m metricdata.ResourceMetrics + m, err = r.collect(ctx, ph) + if err == nil { + err = r.export(ctx, m) + } + } + + sErr := r.exporter.Shutdown(ctx) + if err == nil || err == ErrReaderShutdown { + err = sErr + } }) return err } diff --git a/sdk/metric/periodic_reader_test.go b/sdk/metric/periodic_reader_test.go index 8ea2f9ffc91..8c3c0599158 100644 --- a/sdk/metric/periodic_reader_test.go +++ b/sdk/metric/periodic_reader_test.go @@ -98,6 +98,7 @@ func (ts *periodicReaderTestSuite) SetupTest() { } ts.ErrReader = NewPeriodicReader(e) + ts.ErrReader.register(testProducer{}) } func (ts *periodicReaderTestSuite) TearDownTest() { @@ -138,11 +139,13 @@ func (eh chErrorHandler) Handle(err error) { eh.Err <- err } -func TestPeriodicReaderRun(t *testing.T) { +func triggerTicker(t *testing.T) chan time.Time { + t.Helper() + // Override the ticker C chan so tests are not flaky and rely on timing. - defer func(orig func(time.Duration) *time.Ticker) { - newTicker = orig - }(newTicker) + orig := newTicker + t.Cleanup(func() { newTicker = orig }) + // Keep this at size zero so when triggered with a send it will hang until // the select case is selected and the collection loop is started. trigger := make(chan time.Time) @@ -151,6 +154,11 @@ func TestPeriodicReaderRun(t *testing.T) { ticker.C = trigger return ticker } + return trigger +} + +func TestPeriodicReaderRun(t *testing.T) { + trigger := triggerTicker(t) // Register an error handler to validate export errors are passed to // otel.Handle. @@ -177,6 +185,43 @@ func TestPeriodicReaderRun(t *testing.T) { _ = r.Shutdown(context.Background()) } +func TestPeriodicReaderFlushesPending(t *testing.T) { + // Override the ticker so tests are not flaky and rely on timing. + trigger := triggerTicker(t) + t.Cleanup(func() { close(trigger) }) + + expFunc := func(t *testing.T) (exp Exporter, called *bool) { + called = new(bool) + return &fnExporter{ + exportFunc: func(_ context.Context, m metricdata.ResourceMetrics) error { + // The testProducer produces testMetrics. + assert.Equal(t, testMetrics, m) + *called = true + return assert.AnError + }, + }, called + } + + t.Run("ForceFlush", func(t *testing.T) { + exp, called := expFunc(t) + r := NewPeriodicReader(exp) + r.register(testProducer{}) + assert.Equal(t, assert.AnError, r.ForceFlush(context.Background()), "export error not returned") + assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed") + + // Ensure Reader is allowed clean up attempt. + _ = r.Shutdown(context.Background()) + }) + + t.Run("Shutdown", func(t *testing.T) { + exp, called := expFunc(t) + r := NewPeriodicReader(exp) + r.register(testProducer{}) + assert.Equal(t, assert.AnError, r.Shutdown(context.Background()), "export error not returned") + assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed") + }) +} + func BenchmarkPeriodicReader(b *testing.B) { b.Run("Collect", benchReaderCollectFunc( NewPeriodicReader(new(fnExporter)),