From c236cdc81c316973a456d757f743cb0c763d4cda Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 28 Oct 2022 12:51:35 -0700 Subject: [PATCH 1/7] Do not return an error for Drop aggs The async instruments currently return an error if and only if there are no aggregators returned from a resolve. Returning no aggregators means the instrument aggregation is drop. Do not include this in the error reporting decision. --- sdk/metric/instrument_provider.go | 87 +++++-------------------------- 1 file changed, 12 insertions(+), 75 deletions(-) diff --git a/sdk/metric/instrument_provider.go b/sdk/metric/instrument_provider.go index 8640b58e52e..79a257a2251 100644 --- a/sdk/metric/instrument_provider.go +++ b/sdk/metric/instrument_provider.go @@ -15,8 +15,6 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( - "fmt" - "go.opentelemetry.io/otel/metric/instrument" "go.opentelemetry.io/otel/metric/instrument/asyncfloat64" "go.opentelemetry.io/otel/metric/instrument/asyncint64" @@ -43,13 +41,7 @@ func (p asyncInt64Provider) Counter(name string, opts ...instrument.Option) (asy Description: cfg.Description(), Kind: view.AsyncCounter, }, cfg.Unit()) - if len(aggs) == 0 && err != nil { - err = fmt.Errorf("instrument does not match any view: %w", err) - } - - return &instrumentImpl[int64]{ - aggregators: aggs, - }, err + return &instrumentImpl[int64]{aggregators: aggs}, err } // UpDownCounter creates an instrument for recording changes of a value. @@ -62,12 +54,7 @@ func (p asyncInt64Provider) UpDownCounter(name string, opts ...instrument.Option Description: cfg.Description(), Kind: view.AsyncUpDownCounter, }, cfg.Unit()) - if len(aggs) == 0 && err != nil { - err = fmt.Errorf("instrument does not match any view: %w", err) - } - return &instrumentImpl[int64]{ - aggregators: aggs, - }, err + return &instrumentImpl[int64]{aggregators: aggs}, err } // Gauge creates an instrument for recording the current value. @@ -80,12 +67,7 @@ func (p asyncInt64Provider) Gauge(name string, opts ...instrument.Option) (async Description: cfg.Description(), Kind: view.AsyncGauge, }, cfg.Unit()) - if len(aggs) == 0 && err != nil { - err = fmt.Errorf("instrument does not match any view: %w", err) - } - return &instrumentImpl[int64]{ - aggregators: aggs, - }, err + return &instrumentImpl[int64]{aggregators: aggs}, err } type asyncFloat64Provider struct { @@ -105,12 +87,7 @@ func (p asyncFloat64Provider) Counter(name string, opts ...instrument.Option) (a Description: cfg.Description(), Kind: view.AsyncCounter, }, cfg.Unit()) - if len(aggs) == 0 && err != nil { - err = fmt.Errorf("instrument does not match any view: %w", err) - } - return &instrumentImpl[float64]{ - aggregators: aggs, - }, err + return &instrumentImpl[float64]{aggregators: aggs}, err } // UpDownCounter creates an instrument for recording changes of a value. @@ -123,12 +100,7 @@ func (p asyncFloat64Provider) UpDownCounter(name string, opts ...instrument.Opti Description: cfg.Description(), Kind: view.AsyncUpDownCounter, }, cfg.Unit()) - if len(aggs) == 0 && err != nil { - err = fmt.Errorf("instrument does not match any view: %w", err) - } - return &instrumentImpl[float64]{ - aggregators: aggs, - }, err + return &instrumentImpl[float64]{aggregators: aggs}, err } // Gauge creates an instrument for recording the current value. @@ -141,12 +113,7 @@ func (p asyncFloat64Provider) Gauge(name string, opts ...instrument.Option) (asy Description: cfg.Description(), Kind: view.AsyncGauge, }, cfg.Unit()) - if len(aggs) == 0 && err != nil { - err = fmt.Errorf("instrument does not match any view: %w", err) - } - return &instrumentImpl[float64]{ - aggregators: aggs, - }, err + return &instrumentImpl[float64]{aggregators: aggs}, err } type syncInt64Provider struct { @@ -166,12 +133,7 @@ func (p syncInt64Provider) Counter(name string, opts ...instrument.Option) (sync Description: cfg.Description(), Kind: view.SyncCounter, }, cfg.Unit()) - if len(aggs) == 0 && err != nil { - err = fmt.Errorf("instrument does not match any view: %w", err) - } - return &instrumentImpl[int64]{ - aggregators: aggs, - }, err + return &instrumentImpl[int64]{aggregators: aggs}, err } // UpDownCounter creates an instrument for recording changes of a value. @@ -184,12 +146,7 @@ func (p syncInt64Provider) UpDownCounter(name string, opts ...instrument.Option) Description: cfg.Description(), Kind: view.SyncUpDownCounter, }, cfg.Unit()) - if len(aggs) == 0 && err != nil { - err = fmt.Errorf("instrument does not match any view: %w", err) - } - return &instrumentImpl[int64]{ - aggregators: aggs, - }, err + return &instrumentImpl[int64]{aggregators: aggs}, err } // Histogram creates an instrument for recording the current value. @@ -202,12 +159,7 @@ func (p syncInt64Provider) Histogram(name string, opts ...instrument.Option) (sy Description: cfg.Description(), Kind: view.SyncHistogram, }, cfg.Unit()) - if len(aggs) == 0 && err != nil { - err = fmt.Errorf("instrument does not match any view: %w", err) - } - return &instrumentImpl[int64]{ - aggregators: aggs, - }, err + return &instrumentImpl[int64]{aggregators: aggs}, err } type syncFloat64Provider struct { @@ -227,12 +179,7 @@ func (p syncFloat64Provider) Counter(name string, opts ...instrument.Option) (sy Description: cfg.Description(), Kind: view.SyncCounter, }, cfg.Unit()) - if len(aggs) == 0 && err != nil { - err = fmt.Errorf("instrument does not match any view: %w", err) - } - return &instrumentImpl[float64]{ - aggregators: aggs, - }, err + return &instrumentImpl[float64]{aggregators: aggs}, err } // UpDownCounter creates an instrument for recording changes of a value. @@ -245,12 +192,7 @@ func (p syncFloat64Provider) UpDownCounter(name string, opts ...instrument.Optio Description: cfg.Description(), Kind: view.SyncUpDownCounter, }, cfg.Unit()) - if len(aggs) == 0 && err != nil { - err = fmt.Errorf("instrument does not match any view: %w", err) - } - return &instrumentImpl[float64]{ - aggregators: aggs, - }, err + return &instrumentImpl[float64]{aggregators: aggs}, err } // Histogram creates an instrument for recording the current value. @@ -263,10 +205,5 @@ func (p syncFloat64Provider) Histogram(name string, opts ...instrument.Option) ( Description: cfg.Description(), Kind: view.SyncHistogram, }, cfg.Unit()) - if len(aggs) == 0 && err != nil { - err = fmt.Errorf("instrument does not match any view: %w", err) - } - return &instrumentImpl[float64]{ - aggregators: aggs, - }, err + return &instrumentImpl[float64]{aggregators: aggs}, err } From e3b28f2c16eb2e3302d2a30be41d9e134f327b13 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 28 Oct 2022 12:53:15 -0700 Subject: [PATCH 2/7] Only registers callbacks if non-drop agg is used The instruments passed to RegisterCallback need to have some aggregation defined otherwise it is implied they have a Drop aggregation. Check that at least one instrument passed has an aggregation other than Drop before registering the callback with the pipelines. Also, return an error if the user passed another API implementation of an asynchronous instrument. --- sdk/metric/meter.go | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 80007b1465e..cc65164ad2c 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -16,6 +16,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "context" + "errors" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/instrument" @@ -77,7 +78,23 @@ func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider { // RegisterCallback registers the function f to be called when any of the // insts Collect method is called. func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) error { - m.pipes.registerCallback(f) + for _, inst := range insts { + switch t := inst.(type) { + case *instrumentImpl[int64]: + if len(t.aggregators) > 0 { + m.pipes.registerCallback(f) + return nil + } + case *instrumentImpl[float64]: + if len(t.aggregators) > 0 { + m.pipes.registerCallback(f) + return nil + } + default: + return errors.New("invalid asynchronous instrument") + } + } + // Only instrument using drop aggregation passed. return nil } From 9fa997adf959856b7c481dcb6e69b05c0711a0cf Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 28 Oct 2022 12:55:41 -0700 Subject: [PATCH 3/7] Remove unneeded TODO from pipeline --- sdk/metric/pipeline.go | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index db70003b1d5..cd9e1ca2cf6 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -430,7 +430,6 @@ func newPipelines(res *resource.Resource, readers map[Reader][]view.View) pipeli return pipes } -// TODO (#3053) Only register callbacks if any instrument matches in a view. func (p pipelines) registerCallback(fn func(context.Context)) { for _, pipe := range p { pipe.addCallback(fn) From 28f6e9995fed48ffe4fb175e817f80e17c6e9680 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 28 Oct 2022 13:20:10 -0700 Subject: [PATCH 4/7] Add changes to changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a75865c5ba5..2292b19ed74 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Cumulative metrics from the OpenCensus bridge (`go.opentelemetry.io/otel/bridge/opencensus`) are defined as monotonic sums, instead of non-monotonic. (#3389) - Asynchronous counters (`Counter` and `UpDownCounter`) from the metric SDK now produce delta sums when configured with delta temporality. (#3398) - Exported `Status` codes in the `go.opentelemetry.io/otel/exporters/zipkin` exporter are now exported as all upper case values. (#3340) +- Asynchronous callbacks are only called if they are registered with at least one instrument that does not use drop aggragation. (#3408) ## [1.11.1/0.33.0] 2022-10-19 From 2a997e0ecc417653404e04b1fcf41ce09a2fae48 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 28 Oct 2022 13:42:06 -0700 Subject: [PATCH 5/7] Test callback not called for all drop instruments --- sdk/metric/meter_test.go | 45 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index 808c946bb40..108c5e78dd9 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -26,8 +26,10 @@ import ( "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/instrument" "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + "go.opentelemetry.io/otel/sdk/metric/view" "go.opentelemetry.io/otel/sdk/resource" ) @@ -474,3 +476,46 @@ func TestMetersProvideScope(t *testing.T) { assert.NoError(t, err) metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp()) } + +func TestRegisterCallbackDropAggregations(t *testing.T) { + aggFn := func(view.InstrumentKind) aggregation.Aggregation { + return aggregation.Drop{} + } + r := NewManualReader(WithAggregationSelector(aggFn)) + mp := NewMeterProvider(WithReader(r)) + m := mp.Meter("testRegisterCallbackDropAggregations") + + int64Counter, err := m.AsyncInt64().Counter("int64.counter") + require.NoError(t, err) + + int64UpDownCounter, err := m.AsyncInt64().UpDownCounter("int64.up_down_counter") + require.NoError(t, err) + + int64Gauge, err := m.AsyncInt64().Gauge("int64.gauge") + require.NoError(t, err) + + floag64Counter, err := m.AsyncFloat64().Counter("floag64.counter") + require.NoError(t, err) + + floag64UpDownCounter, err := m.AsyncFloat64().UpDownCounter("floag64.up_down_counter") + require.NoError(t, err) + + floag64Gauge, err := m.AsyncFloat64().Gauge("floag64.gauge") + require.NoError(t, err) + + var called bool + require.NoError(t, m.RegisterCallback([]instrument.Asynchronous{ + int64Counter, + int64UpDownCounter, + int64Gauge, + floag64Counter, + floag64UpDownCounter, + floag64Gauge, + }, func(context.Context) { called = true })) + + data, err := r.Collect(context.Background()) + require.NoError(t, err) + + assert.False(t, called, "callback called for all drop instruments") + assert.Len(t, data.ScopeMetrics, 0, "metrics exported for drop instruments") +} From ab25a236c704b9571ef730778553fd5a3ef6eda9 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 28 Oct 2022 13:48:20 -0700 Subject: [PATCH 6/7] Test RegisterCallback returns err for non-SDK inst --- sdk/metric/meter_test.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index 108c5e78dd9..75bdb5adce6 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -519,3 +519,17 @@ func TestRegisterCallbackDropAggregations(t *testing.T) { assert.False(t, called, "callback called for all drop instruments") assert.Len(t, data.ScopeMetrics, 0, "metrics exported for drop instruments") } + +func TestRegisterCallbackErrorForNonSDKInstrument(t *testing.T) { + type alien struct{ instrument.Asynchronous } + + r := NewManualReader() + mp := NewMeterProvider(WithReader(r)) + m := mp.Meter("TestRegisterCallbackErrorForNonSDKInstrument") + + err := m.RegisterCallback([]instrument.Asynchronous{&alien{}}, func(context.Context) { + panic("should not be registered") + }) + assert.Error(t, err) + assert.NotPanics(t, func() { _, _ = r.Collect(context.Background()) }) +} From 509a4af5546f2ccb31a3ede9012bb36652f254ea Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Mon, 31 Oct 2022 13:53:42 -0700 Subject: [PATCH 7/7] Fail gracefully for non-SDK instruments --- sdk/metric/meter.go | 22 +++++++++++++++------- sdk/metric/meter_test.go | 14 -------------- 2 files changed, 15 insertions(+), 21 deletions(-) diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index cc65164ad2c..3e882e12fb6 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -16,7 +16,6 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "context" - "errors" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/instrument" @@ -79,22 +78,31 @@ func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider { // insts Collect method is called. func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) error { for _, inst := range insts { + // Only register if at least one instrument has a non-drop aggregation. + // Otherwise, calling f during collection will be wasted computation. switch t := inst.(type) { case *instrumentImpl[int64]: if len(t.aggregators) > 0 { - m.pipes.registerCallback(f) - return nil + return m.registerCallback(f) } case *instrumentImpl[float64]: if len(t.aggregators) > 0 { - m.pipes.registerCallback(f) - return nil + return m.registerCallback(f) } default: - return errors.New("invalid asynchronous instrument") + // Instrument external to the SDK. For example, an instrument from + // the "go.opentelemetry.io/otel/metric/internal/global" package. + // + // Fail gracefully here, assume a valid instrument. + return m.registerCallback(f) } } - // Only instrument using drop aggregation passed. + // All insts use drop aggregation. + return nil +} + +func (m *meter) registerCallback(f func(context.Context)) error { + m.pipes.registerCallback(f) return nil } diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index 75bdb5adce6..108c5e78dd9 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -519,17 +519,3 @@ func TestRegisterCallbackDropAggregations(t *testing.T) { assert.False(t, called, "callback called for all drop instruments") assert.Len(t, data.ScopeMetrics, 0, "metrics exported for drop instruments") } - -func TestRegisterCallbackErrorForNonSDKInstrument(t *testing.T) { - type alien struct{ instrument.Asynchronous } - - r := NewManualReader() - mp := NewMeterProvider(WithReader(r)) - m := mp.Meter("TestRegisterCallbackErrorForNonSDKInstrument") - - err := m.RegisterCallback([]instrument.Asynchronous{&alien{}}, func(context.Context) { - panic("should not be registered") - }) - assert.Error(t, err) - assert.NotPanics(t, func() { _, _ = r.Collect(context.Background()) }) -}