From 308d0362e6c56cbbd23fa0fda14df5dbd70a5f51 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 11 Nov 2022 09:10:59 -0800 Subject: [PATCH] Only registers callbacks if non-drop aggregation is used (#3408) * Do not return an error for Drop aggs The async instruments currently return an error if and only if there are no aggregators returned from a resolve. Returning no aggregators means the instrument aggregation is drop. Do not include this in the error reporting decision. * Only registers callbacks if non-drop agg is used The instruments passed to RegisterCallback need to have some aggregation defined otherwise it is implied they have a Drop aggregation. Check that at least one instrument passed has an aggregation other than Drop before registering the callback with the pipelines. Also, return an error if the user passed another API implementation of an asynchronous instrument. * Remove unneeded TODO from pipeline * Add changes to changelog * Test callback not called for all drop instruments * Test RegisterCallback returns err for non-SDK inst * Fail gracefully for non-SDK instruments Co-authored-by: Chester Cheung --- CHANGELOG.md | 1 + sdk/metric/instrument_provider.go | 5 ---- sdk/metric/meter.go | 25 ++++++++++++++++++ sdk/metric/meter_test.go | 44 +++++++++++++++++++++++++++++++ sdk/metric/pipeline.go | 1 - 5 files changed, 70 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c21c4726cc..c02ec62168f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Exported `Status` codes in the `go.opentelemetry.io/otel/exporters/zipkin` exporter are now exported as all upper case values. (#3340) - `Aggregation`s from `go.opentelemetry.io/otel/sdk/metric` with no data are not exported. (#3394, #3436) - Reenabled Attribute Filters in the Metric SDK. (#3396) +- Asynchronous callbacks are only called if they are registered with at least one instrument that does not use drop aggragation. (#3408) - Do not report empty partial-success responses in the `go.opentelemetry.io/otel/exporters/otlp` exporters. (#3438, #3432) - Handle partial success responses in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric` exporters. (#3162, #3440) diff --git a/sdk/metric/instrument_provider.go b/sdk/metric/instrument_provider.go index 89c09b22990..25aad6cfc20 100644 --- a/sdk/metric/instrument_provider.go +++ b/sdk/metric/instrument_provider.go @@ -15,8 +15,6 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( - "fmt" - "go.opentelemetry.io/otel/metric/instrument" "go.opentelemetry.io/otel/metric/instrument/asyncfloat64" "go.opentelemetry.io/otel/metric/instrument/asyncint64" @@ -70,9 +68,6 @@ func (p *instProvider[N]) lookup(kind view.InstrumentKind, name string, opts []i } aggs, err := p.resolve.Aggregators(key) - if len(aggs) == 0 && err != nil { - err = fmt.Errorf("instrument does not match any view: %w", err) - } return &instrumentImpl[N]{aggregators: aggs}, err } diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 670049037d4..3f9f30106eb 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -73,6 +73,31 @@ func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider { // RegisterCallback registers the function f to be called when any of the // insts Collect method is called. func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) error { + for _, inst := range insts { + // Only register if at least one instrument has a non-drop aggregation. + // Otherwise, calling f during collection will be wasted computation. + switch t := inst.(type) { + case *instrumentImpl[int64]: + if len(t.aggregators) > 0 { + return m.registerCallback(f) + } + case *instrumentImpl[float64]: + if len(t.aggregators) > 0 { + return m.registerCallback(f) + } + default: + // Instrument external to the SDK. For example, an instrument from + // the "go.opentelemetry.io/otel/metric/internal/global" package. + // + // Fail gracefully here, assume a valid instrument. + return m.registerCallback(f) + } + } + // All insts use drop aggregation. + return nil +} + +func (m *meter) registerCallback(f func(context.Context)) error { m.pipes.registerCallback(f) return nil } diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index ff5da4f7e52..6edc58d17ce 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -30,6 +30,7 @@ import ( "go.opentelemetry.io/otel/metric/instrument/syncfloat64" "go.opentelemetry.io/otel/metric/instrument/syncint64" "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" "go.opentelemetry.io/otel/sdk/metric/view" @@ -480,6 +481,49 @@ func TestMetersProvideScope(t *testing.T) { metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp()) } +func TestRegisterCallbackDropAggregations(t *testing.T) { + aggFn := func(view.InstrumentKind) aggregation.Aggregation { + return aggregation.Drop{} + } + r := NewManualReader(WithAggregationSelector(aggFn)) + mp := NewMeterProvider(WithReader(r)) + m := mp.Meter("testRegisterCallbackDropAggregations") + + int64Counter, err := m.AsyncInt64().Counter("int64.counter") + require.NoError(t, err) + + int64UpDownCounter, err := m.AsyncInt64().UpDownCounter("int64.up_down_counter") + require.NoError(t, err) + + int64Gauge, err := m.AsyncInt64().Gauge("int64.gauge") + require.NoError(t, err) + + floag64Counter, err := m.AsyncFloat64().Counter("floag64.counter") + require.NoError(t, err) + + floag64UpDownCounter, err := m.AsyncFloat64().UpDownCounter("floag64.up_down_counter") + require.NoError(t, err) + + floag64Gauge, err := m.AsyncFloat64().Gauge("floag64.gauge") + require.NoError(t, err) + + var called bool + require.NoError(t, m.RegisterCallback([]instrument.Asynchronous{ + int64Counter, + int64UpDownCounter, + int64Gauge, + floag64Counter, + floag64UpDownCounter, + floag64Gauge, + }, func(context.Context) { called = true })) + + data, err := r.Collect(context.Background()) + require.NoError(t, err) + + assert.False(t, called, "callback called for all drop instruments") + assert.Len(t, data.ScopeMetrics, 0, "metrics exported for drop instruments") +} + func TestAttributeFilter(t *testing.T) { one := 1.0 two := 2.0 diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index fb651355498..46ad0806bdf 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -438,7 +438,6 @@ func newPipelines(res *resource.Resource, readers []Reader, views []view.View) p return pipes } -// TODO (#3053) Only register callbacks if any instrument matches in a view. func (p pipelines) registerCallback(fn func(context.Context)) { for _, pipe := range p { pipe.addCallback(fn)