diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c21c4726cc..c02ec62168f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Exported `Status` codes in the `go.opentelemetry.io/otel/exporters/zipkin` exporter are now exported as all upper case values. (#3340) - `Aggregation`s from `go.opentelemetry.io/otel/sdk/metric` with no data are not exported. (#3394, #3436) - Reenabled Attribute Filters in the Metric SDK. (#3396) +- Asynchronous callbacks are only called if they are registered with at least one instrument that does not use drop aggragation. (#3408) - Do not report empty partial-success responses in the `go.opentelemetry.io/otel/exporters/otlp` exporters. (#3438, #3432) - Handle partial success responses in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric` exporters. (#3162, #3440) diff --git a/sdk/metric/instrument_provider.go b/sdk/metric/instrument_provider.go index 89c09b22990..25aad6cfc20 100644 --- a/sdk/metric/instrument_provider.go +++ b/sdk/metric/instrument_provider.go @@ -15,8 +15,6 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( - "fmt" - "go.opentelemetry.io/otel/metric/instrument" "go.opentelemetry.io/otel/metric/instrument/asyncfloat64" "go.opentelemetry.io/otel/metric/instrument/asyncint64" @@ -70,9 +68,6 @@ func (p *instProvider[N]) lookup(kind view.InstrumentKind, name string, opts []i } aggs, err := p.resolve.Aggregators(key) - if len(aggs) == 0 && err != nil { - err = fmt.Errorf("instrument does not match any view: %w", err) - } return &instrumentImpl[N]{aggregators: aggs}, err } diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 670049037d4..3f9f30106eb 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -73,6 +73,31 @@ func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider { // RegisterCallback registers the function f to be called when any of the // insts Collect method is called. func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) error { + for _, inst := range insts { + // Only register if at least one instrument has a non-drop aggregation. + // Otherwise, calling f during collection will be wasted computation. + switch t := inst.(type) { + case *instrumentImpl[int64]: + if len(t.aggregators) > 0 { + return m.registerCallback(f) + } + case *instrumentImpl[float64]: + if len(t.aggregators) > 0 { + return m.registerCallback(f) + } + default: + // Instrument external to the SDK. For example, an instrument from + // the "go.opentelemetry.io/otel/metric/internal/global" package. + // + // Fail gracefully here, assume a valid instrument. + return m.registerCallback(f) + } + } + // All insts use drop aggregation. + return nil +} + +func (m *meter) registerCallback(f func(context.Context)) error { m.pipes.registerCallback(f) return nil } diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index ff5da4f7e52..6edc58d17ce 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -30,6 +30,7 @@ import ( "go.opentelemetry.io/otel/metric/instrument/syncfloat64" "go.opentelemetry.io/otel/metric/instrument/syncint64" "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" "go.opentelemetry.io/otel/sdk/metric/view" @@ -480,6 +481,49 @@ func TestMetersProvideScope(t *testing.T) { metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp()) } +func TestRegisterCallbackDropAggregations(t *testing.T) { + aggFn := func(view.InstrumentKind) aggregation.Aggregation { + return aggregation.Drop{} + } + r := NewManualReader(WithAggregationSelector(aggFn)) + mp := NewMeterProvider(WithReader(r)) + m := mp.Meter("testRegisterCallbackDropAggregations") + + int64Counter, err := m.AsyncInt64().Counter("int64.counter") + require.NoError(t, err) + + int64UpDownCounter, err := m.AsyncInt64().UpDownCounter("int64.up_down_counter") + require.NoError(t, err) + + int64Gauge, err := m.AsyncInt64().Gauge("int64.gauge") + require.NoError(t, err) + + floag64Counter, err := m.AsyncFloat64().Counter("floag64.counter") + require.NoError(t, err) + + floag64UpDownCounter, err := m.AsyncFloat64().UpDownCounter("floag64.up_down_counter") + require.NoError(t, err) + + floag64Gauge, err := m.AsyncFloat64().Gauge("floag64.gauge") + require.NoError(t, err) + + var called bool + require.NoError(t, m.RegisterCallback([]instrument.Asynchronous{ + int64Counter, + int64UpDownCounter, + int64Gauge, + floag64Counter, + floag64UpDownCounter, + floag64Gauge, + }, func(context.Context) { called = true })) + + data, err := r.Collect(context.Background()) + require.NoError(t, err) + + assert.False(t, called, "callback called for all drop instruments") + assert.Len(t, data.ScopeMetrics, 0, "metrics exported for drop instruments") +} + func TestAttributeFilter(t *testing.T) { one := 1.0 two := 2.0 diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index fb651355498..46ad0806bdf 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -438,7 +438,6 @@ func newPipelines(res *resource.Resource, readers []Reader, views []view.View) p return pipes } -// TODO (#3053) Only register callbacks if any instrument matches in a view. func (p pipelines) registerCallback(fn func(context.Context)) { for _, pipe := range p { pipe.addCallback(fn)