Skip to content

Commit

Permalink
Flush pending telemetry when ForceFlush or Shutdown are called on a P…
Browse files Browse the repository at this point in the history
…eriodicReader (#3220)

* Flush pending telemetry when ForceFlush called

* Test flush of periodic reader

* Flush pending telemetry on Shutdown

* Fix things

* Rename pHolder to p

* Add testing for Shutdown

* Fix doc for collect method

* Fix collectAndExport doc

* Fix collectAndExport English

* Remove stdoutmetric example expected output

* Add changes to changelog

* Revert inadvertent change to golangci conf

Co-authored-by: Chester Cheung <cheung.zhy.csu@gmail.com>
  • Loading branch information
MrAlias and hanyuancheung committed Sep 27, 2022
1 parent 8c6e6c4 commit 111a1d7
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 127 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -15,6 +15,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Changed

- `span.SetStatus` has been updated such that calls that lower the status are now no-ops. (#3214)
- Flush pending measurements with the `PeriodicReader` in the `go.opentelemetry.io/otel/sdk/metric` when `ForceFlush` or `Shutdown` are called. (#3220)

## [0.32.1] Metric SDK (Alpha) - 2022-09-22

Expand Down
108 changes: 0 additions & 108 deletions exporters/stdout/stdoutmetric/example_test.go
Expand Up @@ -123,112 +123,4 @@ func Example() {

// Ensure the periodic reader is cleaned up by shutting down the sdk.
_ = sdk.Shutdown(ctx)

// Output:
// {
// "Resource": [
// {
// "Key": "service.name",
// "Value": {
// "Type": "STRING",
// "Value": "stdoutmetric-example"
// }
// }
// ],
// "ScopeMetrics": [
// {
// "Scope": {
// "Name": "example",
// "Version": "v0.0.1",
// "SchemaURL": ""
// },
// "Metrics": [
// {
// "Name": "requests",
// "Description": "Number of requests received",
// "Unit": "1",
// "Data": {
// "DataPoints": [
// {
// "Attributes": [
// {
// "Key": "server",
// "Value": {
// "Type": "STRING",
// "Value": "central"
// }
// }
// ],
// "StartTime": "2000-01-01T00:00:00Z",
// "Time": "2000-01-01T00:00:01Z",
// "Value": 5
// }
// ],
// "Temporality": "DeltaTemporality",
// "IsMonotonic": true
// }
// },
// {
// "Name": "latency",
// "Description": "Time spend processing received requests",
// "Unit": "ms",
// "Data": {
// "DataPoints": [
// {
// "Attributes": [
// {
// "Key": "server",
// "Value": {
// "Type": "STRING",
// "Value": "central"
// }
// }
// ],
// "StartTime": "2000-01-01T00:00:00Z",
// "Time": "2000-01-01T00:00:01Z",
// "Count": 10,
// "Bounds": [
// 1,
// 5,
// 10
// ],
// "BucketCounts": [
// 1,
// 3,
// 6,
// 0
// ],
// "Sum": 57
// }
// ],
// "Temporality": "DeltaTemporality"
// }
// },
// {
// "Name": "temperature",
// "Description": "CPU global temperature",
// "Unit": "cel(1 K)",
// "Data": {
// "DataPoints": [
// {
// "Attributes": [
// {
// "Key": "server",
// "Value": {
// "Type": "STRING",
// "Value": "central"
// }
// }
// ],
// "StartTime": "0001-01-01T00:00:00Z",
// "Time": "2000-01-01T00:00:01Z",
// "Value": 32.4
// }
// ]
// }
// }
// ]
// }
// ]
// }
}
80 changes: 65 additions & 15 deletions sdk/metric/periodic_reader.go
Expand Up @@ -115,15 +115,16 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade
r := &periodicReader{
timeout: conf.timeout,
exporter: exporter,
flushCh: make(chan chan error),
cancel: cancel,
done: make(chan struct{}),

temporalitySelector: conf.temporalitySelector,
aggregationSelector: conf.aggregationSelector,
}

r.wg.Add(1)
go func() {
defer r.wg.Done()
defer func() { close(r.done) }()
r.run(ctx, conf.interval)
}()

Expand All @@ -137,11 +138,12 @@ type periodicReader struct {

timeout time.Duration
exporter Exporter
flushCh chan chan error

temporalitySelector TemporalitySelector
aggregationSelector AggregationSelector

wg sync.WaitGroup
done chan struct{}
cancel context.CancelFunc
shutdownOnce sync.Once
}
Expand All @@ -161,15 +163,13 @@ func (r *periodicReader) run(ctx context.Context, interval time.Duration) {
for {
select {
case <-ticker.C:
m, err := r.Collect(ctx)
if err == nil {
c, cancel := context.WithTimeout(ctx, r.timeout)
err = r.exporter.Export(c, m)
cancel()
}
err := r.collectAndExport(ctx)
if err != nil {
otel.Handle(err)
}
case errCh := <-r.flushCh:
errCh <- r.collectAndExport(ctx)
ticker.Reset(interval)
case <-ctx.Done():
return
}
Expand All @@ -195,13 +195,27 @@ func (r *periodicReader) aggregation(kind view.InstrumentKind) aggregation.Aggre
return r.aggregationSelector(kind)
}

// collectAndExport gather all metric data related to the periodicReader r from
// the SDK and exports it with r's exporter.
func (r *periodicReader) collectAndExport(ctx context.Context) error {
m, err := r.Collect(ctx)
if err == nil {
err = r.export(ctx, m)
}
return err
}

// Collect gathers and returns all metric data related to the Reader from
// the SDK. The returned metric data is not exported to the configured
// exporter, it is left to the caller to handle that if desired.
//
// An error is returned if this is called after Shutdown.
func (r *periodicReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) {
p := r.producer.Load()
return r.collect(ctx, r.producer.Load())
}

// collect unwraps p as a produceHolder and returns its produce results.
func (r *periodicReader) collect(ctx context.Context, p interface{}) (metricdata.ResourceMetrics, error) {
if p == nil {
return metricdata.ResourceMetrics{}, ErrReaderNotRegistered
}
Expand All @@ -218,25 +232,61 @@ func (r *periodicReader) Collect(ctx context.Context) (metricdata.ResourceMetric
return ph.produce(ctx)
}

// ForceFlush flushes the Exporter.
// export exports metric data m using r's exporter.
func (r *periodicReader) export(ctx context.Context, m metricdata.ResourceMetrics) error {
c, cancel := context.WithTimeout(ctx, r.timeout)
defer cancel()
return r.exporter.Export(c, m)
}

// ForceFlush flushes pending telemetry.
func (r *periodicReader) ForceFlush(ctx context.Context) error {
errCh := make(chan error, 1)
select {
case r.flushCh <- errCh:
select {
case err := <-errCh:
if err != nil {
return err
}
close(errCh)
case <-ctx.Done():
return ctx.Err()
}
case <-r.done:
return ErrReaderShutdown
case <-ctx.Done():
return ctx.Err()
}
return r.exporter.ForceFlush(ctx)
}

// Shutdown stops the export pipeline.
// Shutdown flushes pending telemetry and then stops the export pipeline.
func (r *periodicReader) Shutdown(ctx context.Context) error {
err := ErrReaderShutdown
r.shutdownOnce.Do(func() {
// Stop the run loop.
r.cancel()
r.wg.Wait()
<-r.done

// Any future call to Collect will now return ErrReaderShutdown.
r.producer.Store(produceHolder{
ph := r.producer.Swap(produceHolder{
produce: shutdownProducer{}.produce,
})

err = r.exporter.Shutdown(ctx)
if ph != nil { // Reader was registered.
// Flush pending telemetry.
var m metricdata.ResourceMetrics
m, err = r.collect(ctx, ph)
if err == nil {
err = r.export(ctx, m)
}
}

sErr := r.exporter.Shutdown(ctx)
if err == nil || err == ErrReaderShutdown {
err = sErr
}
})
return err
}
53 changes: 49 additions & 4 deletions sdk/metric/periodic_reader_test.go
Expand Up @@ -98,6 +98,7 @@ func (ts *periodicReaderTestSuite) SetupTest() {
}

ts.ErrReader = NewPeriodicReader(e)
ts.ErrReader.register(testProducer{})
}

func (ts *periodicReaderTestSuite) TearDownTest() {
Expand Down Expand Up @@ -138,11 +139,13 @@ func (eh chErrorHandler) Handle(err error) {
eh.Err <- err
}

func TestPeriodicReaderRun(t *testing.T) {
func triggerTicker(t *testing.T) chan time.Time {
t.Helper()

// Override the ticker C chan so tests are not flaky and rely on timing.
defer func(orig func(time.Duration) *time.Ticker) {
newTicker = orig
}(newTicker)
orig := newTicker
t.Cleanup(func() { newTicker = orig })

// Keep this at size zero so when triggered with a send it will hang until
// the select case is selected and the collection loop is started.
trigger := make(chan time.Time)
Expand All @@ -151,6 +154,11 @@ func TestPeriodicReaderRun(t *testing.T) {
ticker.C = trigger
return ticker
}
return trigger
}

func TestPeriodicReaderRun(t *testing.T) {
trigger := triggerTicker(t)

// Register an error handler to validate export errors are passed to
// otel.Handle.
Expand All @@ -177,6 +185,43 @@ func TestPeriodicReaderRun(t *testing.T) {
_ = r.Shutdown(context.Background())
}

func TestPeriodicReaderFlushesPending(t *testing.T) {
// Override the ticker so tests are not flaky and rely on timing.
trigger := triggerTicker(t)
t.Cleanup(func() { close(trigger) })

expFunc := func(t *testing.T) (exp Exporter, called *bool) {
called = new(bool)
return &fnExporter{
exportFunc: func(_ context.Context, m metricdata.ResourceMetrics) error {
// The testProducer produces testMetrics.
assert.Equal(t, testMetrics, m)
*called = true
return assert.AnError
},
}, called
}

t.Run("ForceFlush", func(t *testing.T) {
exp, called := expFunc(t)
r := NewPeriodicReader(exp)
r.register(testProducer{})
assert.Equal(t, assert.AnError, r.ForceFlush(context.Background()), "export error not returned")
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")

// Ensure Reader is allowed clean up attempt.
_ = r.Shutdown(context.Background())
})

t.Run("Shutdown", func(t *testing.T) {
exp, called := expFunc(t)
r := NewPeriodicReader(exp)
r.register(testProducer{})
assert.Equal(t, assert.AnError, r.Shutdown(context.Background()), "export error not returned")
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")
})
}

func BenchmarkPeriodicReader(b *testing.B) {
b.Run("Collect", benchReaderCollectFunc(
NewPeriodicReader(new(fnExporter)),
Expand Down

0 comments on commit 111a1d7

Please sign in to comment.