diff --git a/CHANGELOG.md b/CHANGELOG.md index 65c829728c0..72c4f7cd732 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Cumulative metrics from the OpenCensus bridge (`go.opentelemetry.io/otel/bridge/opencensus`) are defined as monotonic sums, instead of non-monotonic. (#3389) - Asynchronous counters (`Counter` and `UpDownCounter`) from the metric SDK now produce delta sums when configured with delta temporality. (#3398) - Exported `Status` codes in the `go.opentelemetry.io/otel/exporters/zipkin` exporter are now exported as all upper case values. (#3340) +- Reenabled Attribute Filters in the Metric SDK. (#3396) - Do not report empty partial-success responses in the `go.opentelemetry.io/otel/exporters/otlp` exporters. (#3438, #3432) ## [1.11.1/0.33.0] 2022-10-19 diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index 808c946bb40..7cc7e307466 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -28,6 +28,7 @@ import ( "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + "go.opentelemetry.io/otel/sdk/metric/view" "go.opentelemetry.io/otel/sdk/resource" ) @@ -474,3 +475,356 @@ func TestMetersProvideScope(t *testing.T) { assert.NoError(t, err) metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp()) } + +func TestAttributeFilter(t *testing.T) { + one := 1.0 + two := 2.0 + testcases := []struct { + name string + register func(t *testing.T, mtr metric.Meter) error + wantMetric metricdata.Metrics + }{ + { + name: "AsyncFloat64Counter", + register: func(t *testing.T, mtr metric.Meter) error { + ctr, err := mtr.AsyncFloat64().Counter("afcounter") + if err != nil { + return err + } + return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1)) + ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2)) + }) + }, + wantMetric: metricdata.Metrics{ + Name: "afcounter", + Data: metricdata.Sum[float64]{ + DataPoints: []metricdata.DataPoint[float64]{ + { + Attributes: attribute.NewSet(attribute.String("foo", "bar")), + Value: 2.0, // TODO (#3439): This should be 3.0. + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + }, + { + name: "AsyncFloat64UpDownCounter", + register: func(t *testing.T, mtr metric.Meter) error { + ctr, err := mtr.AsyncFloat64().UpDownCounter("afupdowncounter") + if err != nil { + return err + } + return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1)) + ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2)) + }) + }, + wantMetric: metricdata.Metrics{ + Name: "afupdowncounter", + Data: metricdata.Sum[float64]{ + DataPoints: []metricdata.DataPoint[float64]{ + { + Attributes: attribute.NewSet(attribute.String("foo", "bar")), + Value: 2.0, // TODO (#3439): This should be 3.0. + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + }, + }, + }, + { + name: "AsyncFloat64Gauge", + register: func(t *testing.T, mtr metric.Meter) error { + ctr, err := mtr.AsyncFloat64().Gauge("afgauge") + if err != nil { + return err + } + return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1)) + ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2)) + }) + }, + wantMetric: metricdata.Metrics{ + Name: "afgauge", + Data: metricdata.Gauge[float64]{ + DataPoints: []metricdata.DataPoint[float64]{ + { + Attributes: attribute.NewSet(attribute.String("foo", "bar")), + Value: 2.0, + }, + }, + }, + }, + }, + { + name: "AsyncInt64Counter", + register: func(t *testing.T, mtr metric.Meter) error { + ctr, err := mtr.AsyncInt64().Counter("aicounter") + if err != nil { + return err + } + return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 1)) + ctr.Observe(ctx, 20, attribute.String("foo", "bar"), attribute.Int("version", 2)) + }) + }, + wantMetric: metricdata.Metrics{ + Name: "aicounter", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("foo", "bar")), + Value: 20, // TODO (#3439): This should be 30. + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + }, + { + name: "AsyncInt64UpDownCounter", + register: func(t *testing.T, mtr metric.Meter) error { + ctr, err := mtr.AsyncInt64().UpDownCounter("aiupdowncounter") + if err != nil { + return err + } + return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 1)) + ctr.Observe(ctx, 20, attribute.String("foo", "bar"), attribute.Int("version", 2)) + }) + }, + wantMetric: metricdata.Metrics{ + Name: "aiupdowncounter", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("foo", "bar")), + Value: 20, // TODO (#3439): This should be 30. + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + }, + }, + }, + { + name: "AsyncInt64Gauge", + register: func(t *testing.T, mtr metric.Meter) error { + ctr, err := mtr.AsyncInt64().Gauge("aigauge") + if err != nil { + return err + } + return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 1)) + ctr.Observe(ctx, 20, attribute.String("foo", "bar"), attribute.Int("version", 2)) + }) + }, + wantMetric: metricdata.Metrics{ + Name: "aigauge", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("foo", "bar")), + Value: 20, + }, + }, + }, + }, + }, + { + name: "SyncFloat64Counter", + register: func(t *testing.T, mtr metric.Meter) error { + ctr, err := mtr.SyncFloat64().Counter("sfcounter") + if err != nil { + return err + } + + ctr.Add(context.Background(), 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1)) + ctr.Add(context.Background(), 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2)) + return nil + }, + wantMetric: metricdata.Metrics{ + Name: "sfcounter", + Data: metricdata.Sum[float64]{ + DataPoints: []metricdata.DataPoint[float64]{ + { + Attributes: attribute.NewSet(attribute.String("foo", "bar")), + Value: 3.0, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + }, + { + name: "SyncFloat64UpDownCounter", + register: func(t *testing.T, mtr metric.Meter) error { + ctr, err := mtr.SyncFloat64().UpDownCounter("sfupdowncounter") + if err != nil { + return err + } + + ctr.Add(context.Background(), 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1)) + ctr.Add(context.Background(), 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2)) + return nil + }, + wantMetric: metricdata.Metrics{ + Name: "sfupdowncounter", + Data: metricdata.Sum[float64]{ + DataPoints: []metricdata.DataPoint[float64]{ + { + Attributes: attribute.NewSet(attribute.String("foo", "bar")), + Value: 3.0, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + }, + }, + }, + { + name: "SyncFloat64Histogram", + register: func(t *testing.T, mtr metric.Meter) error { + ctr, err := mtr.SyncFloat64().Histogram("sfhistogram") + if err != nil { + return err + } + + ctr.Record(context.Background(), 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1)) + ctr.Record(context.Background(), 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2)) + return nil + }, + wantMetric: metricdata.Metrics{ + Name: "sfhistogram", + Data: metricdata.Histogram{ + DataPoints: []metricdata.HistogramDataPoint{ + { + Attributes: attribute.NewSet(attribute.String("foo", "bar")), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Count: 2, + Min: &one, + Max: &two, + Sum: 3.0, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + }, + { + name: "SyncInt64Counter", + register: func(t *testing.T, mtr metric.Meter) error { + ctr, err := mtr.SyncInt64().Counter("sicounter") + if err != nil { + return err + } + + ctr.Add(context.Background(), 10, attribute.String("foo", "bar"), attribute.Int("version", 1)) + ctr.Add(context.Background(), 20, attribute.String("foo", "bar"), attribute.Int("version", 2)) + return nil + }, + wantMetric: metricdata.Metrics{ + Name: "sicounter", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("foo", "bar")), + Value: 30, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + }, + { + name: "SyncInt64UpDownCounter", + register: func(t *testing.T, mtr metric.Meter) error { + ctr, err := mtr.SyncInt64().UpDownCounter("siupdowncounter") + if err != nil { + return err + } + + ctr.Add(context.Background(), 10, attribute.String("foo", "bar"), attribute.Int("version", 1)) + ctr.Add(context.Background(), 20, attribute.String("foo", "bar"), attribute.Int("version", 2)) + return nil + }, + wantMetric: metricdata.Metrics{ + Name: "siupdowncounter", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("foo", "bar")), + Value: 30, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + }, + }, + }, + { + name: "SyncInt64Histogram", + register: func(t *testing.T, mtr metric.Meter) error { + ctr, err := mtr.SyncInt64().Histogram("sihistogram") + if err != nil { + return err + } + + ctr.Record(context.Background(), 1, attribute.String("foo", "bar"), attribute.Int("version", 1)) + ctr.Record(context.Background(), 2, attribute.String("foo", "bar"), attribute.Int("version", 2)) + return nil + }, + wantMetric: metricdata.Metrics{ + Name: "sihistogram", + Data: metricdata.Histogram{ + DataPoints: []metricdata.HistogramDataPoint{ + { + Attributes: attribute.NewSet(attribute.String("foo", "bar")), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Count: 2, + Min: &one, + Max: &two, + Sum: 3.0, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + }, + } + + for _, tt := range testcases { + t.Run(tt.name, func(t *testing.T) { + v, err := view.New( + view.MatchInstrumentName("*"), + view.WithFilterAttributes(attribute.Key("foo")), + ) + require.NoError(t, err) + rdr := NewManualReader() + mtr := NewMeterProvider( + WithReader(rdr), + WithView(v), + ).Meter("TestAttributeFilter") + + err = tt.register(t, mtr) + require.NoError(t, err) + + m, err := rdr.Collect(context.Background()) + assert.NoError(t, err) + + require.Len(t, m.ScopeMetrics, 1) + require.Len(t, m.ScopeMetrics[0].Metrics, 1) + + metricdatatest.AssertEqual(t, tt.wantMetric, m.ScopeMetrics[0].Metrics[0], metricdatatest.IgnoreTimestamp()) + }) + } +} diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 74a6cb713b1..a3a7d069983 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -21,6 +21,7 @@ import ( "strings" "sync" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/metric/unit" "go.opentelemetry.io/otel/sdk/instrumentation" @@ -203,7 +204,7 @@ func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]in } matched = true - agg, err := i.cachedAggregator(inst, instUnit) + agg, err := i.cachedAggregator(inst, instUnit, v.AttributeFilter()) if err != nil { errs.append(err) } @@ -223,7 +224,7 @@ func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]in } // Apply implicit default view if no explicit matched. - agg, err := i.cachedAggregator(inst, instUnit) + agg, err := i.cachedAggregator(inst, instUnit, nil) if err != nil { errs.append(err) } @@ -247,7 +248,7 @@ func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]in // // If the instrument defines an unknown or incompatible aggregation, an error // is returned. -func (i *inserter[N]) cachedAggregator(inst view.Instrument, u unit.Unit) (internal.Aggregator[N], error) { +func (i *inserter[N]) cachedAggregator(inst view.Instrument, u unit.Unit, filter func(attribute.Set) attribute.Set) (internal.Aggregator[N], error) { switch inst.Aggregation.(type) { case nil, aggregation.Default: // Undefined, nil, means to use the default from the reader. @@ -273,6 +274,10 @@ func (i *inserter[N]) cachedAggregator(inst view.Instrument, u unit.Unit) (inter if agg == nil { // Drop aggregator. return nil, nil } + if filter != nil { + agg = internal.NewFilter(agg, filter) + } + i.pipeline.addSync(inst.Scope, instrumentSync{ name: inst.Name, description: inst.Description,