diff --git a/CHANGELOG.md b/CHANGELOG.md index f8f29e5c979..c8303c7e38d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Fixed +- Remove comparable requirement for `Reader`s. (#3290) - The `go.opentelemetry.io/otel/exporters/prometheus` exporter fixes duplicated `_total` suffixes. (#3369) ## [1.11.1/0.33.0] 2022-10-19 diff --git a/sdk/metric/config.go b/sdk/metric/config.go index 5b7537f63ed..954bd87e1cb 100644 --- a/sdk/metric/config.go +++ b/sdk/metric/config.go @@ -26,7 +26,7 @@ import ( // config contains configuration options for a MeterProvider. type config struct { res *resource.Resource - readers map[Reader][]view.View + viewers []viewer } // readerSignals returns a force-flush and shutdown function for a @@ -35,9 +35,9 @@ type config struct { // single functions. func (c config) readerSignals() (forceFlush, shutdown func(context.Context) error) { var fFuncs, sFuncs []func(context.Context) error - for r := range c.readers { - sFuncs = append(sFuncs, r.Shutdown) - fFuncs = append(fFuncs, r.ForceFlush) + for _, v := range c.viewers { + sFuncs = append(sFuncs, v.reader.Shutdown) + fFuncs = append(fFuncs, v.reader.ForceFlush) } return unify(fFuncs), unifyShutdown(sFuncs) @@ -123,10 +123,7 @@ func WithResource(res *resource.Resource) Option { // operations; no data will be exported without a Reader. func WithReader(r Reader, views ...view.View) Option { return optionFunc(func(cfg config) config { - if cfg.readers == nil { - cfg.readers = make(map[Reader][]view.View) - } - cfg.readers[r] = views + cfg.viewers = append(cfg.viewers, viewer{reader: r, views: views}) return cfg }) } diff --git a/sdk/metric/config_test.go b/sdk/metric/config_test.go index ad88d6b9ff5..d83e14d7bfe 100644 --- a/sdk/metric/config_test.go +++ b/sdk/metric/config_test.go @@ -127,5 +127,7 @@ func TestWithResource(t *testing.T) { func TestWithReader(t *testing.T) { r := &reader{} c := newConfig([]Option{WithReader(r)}) - assert.Contains(t, c.readers, r) + require.Len(t, c.viewers, 1) + assert.Same(t, r, c.viewers[0].reader) + assert.Len(t, c.viewers[0].views, 0) } diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index db70003b1d5..5c8786b37e2 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -51,14 +51,19 @@ type instrumentSync struct { aggregator aggregator } -func newPipeline(res *resource.Resource, reader Reader, views []view.View) *pipeline { +// viewer is a reader with the applicable views it will use when reading. +type viewer struct { + reader Reader + views []view.View +} + +func newPipeline(res *resource.Resource, v viewer) *pipeline { if res == nil { res = resource.Empty() } return &pipeline{ resource: res, - reader: reader, - views: views, + viewer: v, aggregations: make(map[instrumentation.Scope][]instrumentSync), } } @@ -69,10 +74,9 @@ func newPipeline(res *resource.Resource, reader Reader, views []view.View) *pipe // As instruments are created the instrument should be checked if it exists in the // views of a the Reader, and if so each aggregator should be added to the pipeline. type pipeline struct { - resource *resource.Resource + viewer - reader Reader - views []view.View + resource *resource.Resource sync.Mutex aggregations map[instrumentation.Scope][]instrumentSync @@ -416,15 +420,11 @@ func isAggregatorCompatible(kind view.InstrumentKind, agg aggregation.Aggregatio // measurement. type pipelines []*pipeline -func newPipelines(res *resource.Resource, readers map[Reader][]view.View) pipelines { - pipes := make([]*pipeline, 0, len(readers)) - for r, v := range readers { - p := &pipeline{ - resource: res, - reader: r, - views: v, - } - r.register(p) +func newPipelines(res *resource.Resource, viewers []viewer) pipelines { + pipes := make([]*pipeline, 0, len(viewers)) + for _, v := range viewers { + p := newPipeline(res, v) + v.reader.register(p) pipes = append(pipes, p) } return pipes diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index eb27da9824e..b3fe0c864c0 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -73,142 +73,175 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { testcases := []struct { name string - reader Reader - views []view.View + viewer viewer inst view.Instrument wantKind internal.Aggregator[N] //Aggregators should match len and types wantLen int wantErr error }{ { - name: "drop should return 0 aggregators", - reader: NewManualReader(WithAggregationSelector(func(ik view.InstrumentKind) aggregation.Aggregation { return aggregation.Drop{} })), - views: []view.View{{}}, - inst: instruments[view.SyncCounter], + name: "drop should return 0 aggregators", + viewer: viewer{ + reader: NewManualReader(WithAggregationSelector(func(ik view.InstrumentKind) aggregation.Aggregation { return aggregation.Drop{} })), + views: []view.View{{}}, + }, + inst: instruments[view.SyncCounter], }, { - name: "default agg should use reader", - reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), - views: []view.View{defaultAggView}, + name: "default agg should use reader", + viewer: viewer{ + reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), + views: []view.View{defaultAggView}, + }, inst: instruments[view.SyncUpDownCounter], wantKind: internal.NewDeltaSum[N](false), wantLen: 1, }, { - name: "default agg should use reader", - reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), - views: []view.View{defaultAggView}, + name: "default agg should use reader", + viewer: viewer{ + reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), + views: []view.View{defaultAggView}, + }, inst: instruments[view.SyncHistogram], wantKind: internal.NewDeltaHistogram[N](aggregation.ExplicitBucketHistogram{}), wantLen: 1, }, { - name: "default agg should use reader", - reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), - views: []view.View{defaultAggView}, + name: "default agg should use reader", + viewer: viewer{ + reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), + views: []view.View{defaultAggView}, + }, inst: instruments[view.AsyncCounter], wantKind: internal.NewPrecomputedDeltaSum[N](true), wantLen: 1, }, { - name: "default agg should use reader", - reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), - views: []view.View{defaultAggView}, + name: "default agg should use reader", + viewer: viewer{ + reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), + views: []view.View{defaultAggView}, + }, inst: instruments[view.AsyncUpDownCounter], wantKind: internal.NewPrecomputedDeltaSum[N](false), wantLen: 1, }, { - name: "default agg should use reader", - reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), - views: []view.View{defaultAggView}, + name: "default agg should use reader", + viewer: viewer{ + reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), + views: []view.View{defaultAggView}, + }, inst: instruments[view.AsyncGauge], wantKind: internal.NewLastValue[N](), wantLen: 1, }, { - name: "default agg should use reader", - reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), - views: []view.View{defaultAggView}, + name: "default agg should use reader", + viewer: viewer{ + reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), + views: []view.View{defaultAggView}, + }, inst: instruments[view.SyncCounter], wantKind: internal.NewDeltaSum[N](true), wantLen: 1, }, { - name: "reader should set default agg", - reader: NewManualReader(), - views: []view.View{{}}, + name: "reader should set default agg", + viewer: viewer{ + reader: NewManualReader(), + views: []view.View{{}}, + }, inst: instruments[view.SyncUpDownCounter], wantKind: internal.NewCumulativeSum[N](false), wantLen: 1, }, { - name: "reader should set default agg", - reader: NewManualReader(), - views: []view.View{{}}, + name: "reader should set default agg", + viewer: viewer{ + reader: NewManualReader(), + views: []view.View{{}}, + }, inst: instruments[view.SyncHistogram], wantKind: internal.NewCumulativeHistogram[N](aggregation.ExplicitBucketHistogram{}), wantLen: 1, }, { - name: "reader should set default agg", - reader: NewManualReader(), - views: []view.View{{}}, + name: "reader should set default agg", + viewer: viewer{ + reader: NewManualReader(), + views: []view.View{{}}, + }, inst: instruments[view.AsyncCounter], wantKind: internal.NewPrecomputedCumulativeSum[N](true), wantLen: 1, }, { - name: "reader should set default agg", - reader: NewManualReader(), - views: []view.View{{}}, + name: "reader should set default agg", + viewer: viewer{ + reader: NewManualReader(), + views: []view.View{{}}, + }, inst: instruments[view.AsyncUpDownCounter], wantKind: internal.NewPrecomputedCumulativeSum[N](false), wantLen: 1, }, { - name: "reader should set default agg", - reader: NewManualReader(), - views: []view.View{{}}, + name: "reader should set default agg", + viewer: viewer{ + reader: NewManualReader(), + views: []view.View{{}}, + }, inst: instruments[view.AsyncGauge], wantKind: internal.NewLastValue[N](), wantLen: 1, }, { - name: "reader should set default agg", - reader: NewManualReader(), - views: []view.View{{}}, + name: "reader should set default agg", + viewer: viewer{ + reader: NewManualReader(), + views: []view.View{{}}, + }, inst: instruments[view.SyncCounter], wantKind: internal.NewCumulativeSum[N](true), wantLen: 1, }, { - name: "view should overwrite reader", - reader: NewManualReader(), - views: []view.View{changeAggView}, + name: "view should overwrite reader", + viewer: viewer{ + reader: NewManualReader(), + views: []view.View{changeAggView}, + }, inst: instruments[view.SyncCounter], wantKind: internal.NewCumulativeHistogram[N](aggregation.ExplicitBucketHistogram{}), wantLen: 1, }, { - name: "multiple views should create multiple aggregators", - reader: NewManualReader(), - views: []view.View{{}, renameView}, + name: "multiple views should create multiple aggregators", + viewer: viewer{ + reader: NewManualReader(), + views: []view.View{{}, renameView}, + }, inst: instruments[view.SyncCounter], wantKind: internal.NewCumulativeSum[N](true), wantLen: 2, }, { - name: "reader with invalid aggregation should error", - reader: NewManualReader(WithAggregationSelector(func(ik view.InstrumentKind) aggregation.Aggregation { return aggregation.Default{} })), - views: []view.View{{}}, + name: "reader with invalid aggregation should error", + viewer: viewer{ + reader: NewManualReader(WithAggregationSelector(func(ik view.InstrumentKind) aggregation.Aggregation { return aggregation.Default{} })), + views: []view.View{{}}, + }, inst: instruments[view.SyncCounter], wantErr: errCreatingAggregators, }, { - name: "view with invalid aggregation should error", - reader: NewManualReader(), - views: []view.View{invalidAggView}, + name: "view with invalid aggregation should error", + viewer: viewer{ + reader: NewManualReader(), + views: []view.View{invalidAggView}, + }, inst: instruments[view.SyncCounter], wantErr: errCreatingAggregators, }, @@ -216,7 +249,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { for _, tt := range testcases { t.Run(tt.name, func(t *testing.T) { c := newInstrumentCache[N](nil, nil) - i := newInserter(newPipeline(nil, tt.reader, tt.views), c) + i := newInserter(newPipeline(nil, tt.viewer), c) got, err := i.Instrument(tt.inst, unit.Dimensionless) assert.ErrorIs(t, err, tt.wantErr) require.Len(t, got, tt.wantLen) @@ -229,7 +262,8 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { func testInvalidInstrumentShouldPanic[N int64 | float64]() { c := newInstrumentCache[N](nil, nil) - i := newInserter(newPipeline(nil, NewManualReader(), []view.View{{}}), c) + v := viewer{reader: NewManualReader()} + i := newInserter(newPipeline(nil, v), c) inst := view.Instrument{ Name: "foo", Kind: view.InstrumentKind(255), @@ -257,59 +291,51 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { testCases := []struct { name string - views map[Reader][]view.View + views []viewer inst view.Instrument wantCount int }{ { - name: "No views have no aggregators", + name: "No Reader implies no aggregators", inst: view.Instrument{Name: "foo"}, }, { - name: "1 reader 1 view gets 1 aggregator", + name: "1 reader default view gets 1 aggregator", inst: view.Instrument{Name: "foo"}, - views: map[Reader][]view.View{ - testRdr: { - {}, - }, - }, + views: []viewer{{ + reader: testRdr, + }}, wantCount: 1, }, { name: "1 reader 2 views gets 2 aggregator", inst: view.Instrument{Name: "foo"}, - views: map[Reader][]view.View{ - testRdr: { - {}, - renameView, - }, - }, + views: []viewer{{ + reader: testRdr, + views: []view.View{{}, renameView}, + }}, wantCount: 2, }, { - name: "2 readers 1 view each gets 2 aggregators", + name: "2 readers default view each gets 2 aggregators", inst: view.Instrument{Name: "foo"}, - views: map[Reader][]view.View{ - testRdr: { - {}, - }, - testRdrHistogram: { - {}, - }, + views: []viewer{ + {reader: testRdr}, + {reader: testRdrHistogram}, }, wantCount: 2, }, { name: "2 reader 2 views each gets 4 aggregators", inst: view.Instrument{Name: "foo"}, - views: map[Reader][]view.View{ - testRdr: { - {}, - renameView, + views: []viewer{ + { + reader: testRdr, + views: []view.View{{}, renameView}, }, - testRdrHistogram: { - {}, - renameView, + { + reader: testRdrHistogram, + views: []view.View{{}, renameView}, }, }, wantCount: 4, @@ -317,12 +343,10 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { { name: "An instrument is duplicated in two views share the same aggregator", inst: view.Instrument{Name: "foo"}, - views: map[Reader][]view.View{ - testRdr: { - {}, - {}, - }, - }, + views: []viewer{{ + reader: testRdr, + views: []view.View{{}, {}}, + }}, wantCount: 1, }, } @@ -362,11 +386,11 @@ func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, want func TestPipelineRegistryResource(t *testing.T) { v, err := view.New(view.MatchInstrumentName("bar"), view.WithRename("foo")) require.NoError(t, err) - views := map[Reader][]view.View{ - NewManualReader(): {{}, v}, - } res := resource.NewSchemaless(attribute.String("key", "val")) - pipes := newPipelines(res, views) + pipes := newPipelines(res, []viewer{{ + reader: NewManualReader(), + views: []view.View{{}, v}, + }}) for _, p := range pipes { assert.True(t, res.Equal(p.resource), "resource not set") } @@ -375,11 +399,7 @@ func TestPipelineRegistryResource(t *testing.T) { func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { testRdrHistogram := NewManualReader(WithAggregationSelector(func(ik view.InstrumentKind) aggregation.Aggregation { return aggregation.ExplicitBucketHistogram{} })) - views := map[Reader][]view.View{ - testRdrHistogram: { - {}, - }, - } + views := []viewer{{reader: testRdrHistogram}} p := newPipelines(resource.Empty(), views) inst := view.Instrument{Name: "foo", Kind: view.AsyncGauge} @@ -421,12 +441,10 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) { view.MatchInstrumentName("bar"), view.WithRename("foo"), ) - views := map[Reader][]view.View{ - NewManualReader(): { - {}, - renameView, - }, - } + views := []viewer{{ + reader: NewManualReader(), + views: []view.View{{}, renameView}, + }} fooInst := view.Instrument{Name: "foo", Kind: view.SyncCounter} barInst := view.Instrument{Name: "bar", Kind: view.SyncCounter} diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index e47bb6b5f64..358e6906b77 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -67,7 +67,7 @@ func TestEmptyPipeline(t *testing.T) { } func TestNewPipeline(t *testing.T) { - pipe := newPipeline(nil, nil, nil) + pipe := newPipeline(nil, viewer{}) output, err := pipe.produce(context.Background()) require.NoError(t, err) @@ -92,7 +92,7 @@ func TestNewPipeline(t *testing.T) { func TestPipelineUsesResource(t *testing.T) { res := resource.NewWithAttributes("noSchema", attribute.String("test", "resource")) - pipe := newPipeline(res, nil, nil) + pipe := newPipeline(res, viewer{}) output, err := pipe.produce(context.Background()) assert.NoError(t, err) @@ -100,7 +100,7 @@ func TestPipelineUsesResource(t *testing.T) { } func TestPipelineConcurrency(t *testing.T) { - pipe := newPipeline(nil, nil, nil) + pipe := newPipeline(nil, viewer{}) ctx := context.Background() var wg sync.WaitGroup @@ -153,11 +153,11 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) { }{ { name: "NoView", - pipe: newPipeline(nil, reader, nil), + pipe: newPipeline(nil, viewer{reader: reader}), }, { name: "NoMatchingView", - pipe: newPipeline(nil, reader, []view.View{v}), + pipe: newPipeline(nil, viewer{reader: reader, views: []view.View{v}}), }, } diff --git a/sdk/metric/provider.go b/sdk/metric/provider.go index ce2e5524398..8d6e3927c28 100644 --- a/sdk/metric/provider.go +++ b/sdk/metric/provider.go @@ -45,7 +45,7 @@ func NewMeterProvider(options ...Option) *MeterProvider { conf := newConfig(options) flush, sdown := conf.readerSignals() return &MeterProvider{ - pipes: newPipelines(conf.res, conf.readers), + pipes: newPipelines(conf.res, conf.viewers), forceFlush: flush, shutdown: sdown, } diff --git a/sdk/metric/reader_test.go b/sdk/metric/reader_test.go index f005ada472f..43f923a1564 100644 --- a/sdk/metric/reader_test.go +++ b/sdk/metric/reader_test.go @@ -236,3 +236,15 @@ func TestDefaultTemporalitySelector(t *testing.T) { assert.Equal(t, metricdata.CumulativeTemporality, DefaultTemporalitySelector(ik)) } } + +type notComparable [0]func() // nolint:unused // non-comparable type itself is used. + +type noCompareReader struct { + notComparable // nolint:unused // non-comparable type itself is used. + Reader +} + +func TestReadersNotRequiredToBeComparable(t *testing.T) { + r := noCompareReader{Reader: NewManualReader()} + assert.NotPanics(t, func() { _ = NewMeterProvider(WithReader(r)) }) +}