From 2e8124da15fe68296a6e57b44a66ef8f85290d14 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 14 Oct 2022 12:02:52 -0700 Subject: [PATCH 1/5] Add the viewer type --- sdk/metric/pipeline.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 83de81ccc61..f185bf121b4 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -51,6 +51,12 @@ type instrumentSync struct { aggregator aggregator } +// viewer is a reader with the applicable views it will use when reading. +type viewer struct { + reader Reader + views []view.View +} + func newPipeline(res *resource.Resource, reader Reader, views []view.View) *pipeline { if res == nil { res = resource.Empty() From 3aad56363596a4fc5682851bc3006a2d8408a2ff Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 14 Oct 2022 12:36:12 -0700 Subject: [PATCH 2/5] Use the viewer in pipeline --- sdk/metric/config.go | 13 +- sdk/metric/config_test.go | 4 +- sdk/metric/pipeline.go | 24 ++- sdk/metric/pipeline_registry_test.go | 230 +++++++++++++++------------ sdk/metric/pipeline_test.go | 10 +- sdk/metric/provider.go | 2 +- 6 files changed, 147 insertions(+), 136 deletions(-) diff --git a/sdk/metric/config.go b/sdk/metric/config.go index 5b7537f63ed..954bd87e1cb 100644 --- a/sdk/metric/config.go +++ b/sdk/metric/config.go @@ -26,7 +26,7 @@ import ( // config contains configuration options for a MeterProvider. type config struct { res *resource.Resource - readers map[Reader][]view.View + viewers []viewer } // readerSignals returns a force-flush and shutdown function for a @@ -35,9 +35,9 @@ type config struct { // single functions. func (c config) readerSignals() (forceFlush, shutdown func(context.Context) error) { var fFuncs, sFuncs []func(context.Context) error - for r := range c.readers { - sFuncs = append(sFuncs, r.Shutdown) - fFuncs = append(fFuncs, r.ForceFlush) + for _, v := range c.viewers { + sFuncs = append(sFuncs, v.reader.Shutdown) + fFuncs = append(fFuncs, v.reader.ForceFlush) } return unify(fFuncs), unifyShutdown(sFuncs) @@ -123,10 +123,7 @@ func WithResource(res *resource.Resource) Option { // operations; no data will be exported without a Reader. func WithReader(r Reader, views ...view.View) Option { return optionFunc(func(cfg config) config { - if cfg.readers == nil { - cfg.readers = make(map[Reader][]view.View) - } - cfg.readers[r] = views + cfg.viewers = append(cfg.viewers, viewer{reader: r, views: views}) return cfg }) } diff --git a/sdk/metric/config_test.go b/sdk/metric/config_test.go index ad88d6b9ff5..d83e14d7bfe 100644 --- a/sdk/metric/config_test.go +++ b/sdk/metric/config_test.go @@ -127,5 +127,7 @@ func TestWithResource(t *testing.T) { func TestWithReader(t *testing.T) { r := &reader{} c := newConfig([]Option{WithReader(r)}) - assert.Contains(t, c.readers, r) + require.Len(t, c.viewers, 1) + assert.Same(t, r, c.viewers[0].reader) + assert.Len(t, c.viewers[0].views, 0) } diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index f185bf121b4..925e544ca09 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -57,14 +57,13 @@ type viewer struct { views []view.View } -func newPipeline(res *resource.Resource, reader Reader, views []view.View) *pipeline { +func newPipeline(res *resource.Resource, v viewer) *pipeline { if res == nil { res = resource.Empty() } return &pipeline{ resource: res, - reader: reader, - views: views, + viewer: v, aggregations: make(map[instrumentation.Scope][]instrumentSync), } } @@ -75,10 +74,9 @@ func newPipeline(res *resource.Resource, reader Reader, views []view.View) *pipe // As instruments are created the instrument should be checked if it exists in the // views of a the Reader, and if so each aggregator should be added to the pipeline. type pipeline struct { - resource *resource.Resource + viewer - reader Reader - views []view.View + resource *resource.Resource sync.Mutex aggregations map[instrumentation.Scope][]instrumentSync @@ -407,15 +405,11 @@ func isAggregatorCompatible(kind view.InstrumentKind, agg aggregation.Aggregatio // measurement. type pipelines []*pipeline -func newPipelines(res *resource.Resource, readers map[Reader][]view.View) pipelines { - pipes := make([]*pipeline, 0, len(readers)) - for r, v := range readers { - p := &pipeline{ - resource: res, - reader: r, - views: v, - } - r.register(p) +func newPipelines(res *resource.Resource, viewers []viewer) pipelines { + pipes := make([]*pipeline, 0, len(viewers)) + for _, v := range viewers { + p := newPipeline(res, v) + v.reader.register(p) pipes = append(pipes, p) } return pipes diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index 45bc23ccb18..363387408c1 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -73,142 +73,175 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { testcases := []struct { name string - reader Reader - views []view.View + viewer viewer inst view.Instrument wantKind internal.Aggregator[N] //Aggregators should match len and types wantLen int wantErr error }{ { - name: "drop should return 0 aggregators", - reader: NewManualReader(WithAggregationSelector(func(ik view.InstrumentKind) aggregation.Aggregation { return aggregation.Drop{} })), - views: []view.View{{}}, - inst: instruments[view.SyncCounter], + name: "drop should return 0 aggregators", + viewer: viewer{ + reader: NewManualReader(WithAggregationSelector(func(ik view.InstrumentKind) aggregation.Aggregation { return aggregation.Drop{} })), + views: []view.View{{}}, + }, + inst: instruments[view.SyncCounter], }, { - name: "default agg should use reader", - reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), - views: []view.View{defaultAggView}, + name: "default agg should use reader", + viewer: viewer{ + reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), + views: []view.View{defaultAggView}, + }, inst: instruments[view.SyncUpDownCounter], wantKind: internal.NewDeltaSum[N](false), wantLen: 1, }, { - name: "default agg should use reader", - reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), - views: []view.View{defaultAggView}, + name: "default agg should use reader", + viewer: viewer{ + reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), + views: []view.View{defaultAggView}, + }, inst: instruments[view.SyncHistogram], wantKind: internal.NewDeltaHistogram[N](aggregation.ExplicitBucketHistogram{}), wantLen: 1, }, { - name: "default agg should use reader", - reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), - views: []view.View{defaultAggView}, + name: "default agg should use reader", + viewer: viewer{ + reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), + views: []view.View{defaultAggView}, + }, inst: instruments[view.AsyncCounter], wantKind: internal.NewDeltaSum[N](true), wantLen: 1, }, { - name: "default agg should use reader", - reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), - views: []view.View{defaultAggView}, + name: "default agg should use reader", + viewer: viewer{ + reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), + views: []view.View{defaultAggView}, + }, inst: instruments[view.AsyncUpDownCounter], wantKind: internal.NewDeltaSum[N](false), wantLen: 1, }, { - name: "default agg should use reader", - reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), - views: []view.View{defaultAggView}, + name: "default agg should use reader", + viewer: viewer{ + reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), + views: []view.View{defaultAggView}, + }, inst: instruments[view.AsyncGauge], wantKind: internal.NewLastValue[N](), wantLen: 1, }, { - name: "default agg should use reader", - reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), - views: []view.View{defaultAggView}, + name: "default agg should use reader", + viewer: viewer{ + reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), + views: []view.View{defaultAggView}, + }, inst: instruments[view.SyncCounter], wantKind: internal.NewDeltaSum[N](true), wantLen: 1, }, { - name: "reader should set default agg", - reader: NewManualReader(), - views: []view.View{{}}, + name: "reader should set default agg", + viewer: viewer{ + reader: NewManualReader(), + views: []view.View{{}}, + }, inst: instruments[view.SyncUpDownCounter], wantKind: internal.NewCumulativeSum[N](false), wantLen: 1, }, { - name: "reader should set default agg", - reader: NewManualReader(), - views: []view.View{{}}, + name: "reader should set default agg", + viewer: viewer{ + reader: NewManualReader(), + views: []view.View{{}}, + }, inst: instruments[view.SyncHistogram], wantKind: internal.NewCumulativeHistogram[N](aggregation.ExplicitBucketHistogram{}), wantLen: 1, }, { - name: "reader should set default agg", - reader: NewManualReader(), - views: []view.View{{}}, + name: "reader should set default agg", + viewer: viewer{ + reader: NewManualReader(), + views: []view.View{{}}, + }, inst: instruments[view.AsyncCounter], wantKind: internal.NewCumulativeSum[N](true), wantLen: 1, }, { - name: "reader should set default agg", - reader: NewManualReader(), - views: []view.View{{}}, + name: "reader should set default agg", + viewer: viewer{ + reader: NewManualReader(), + views: []view.View{{}}, + }, inst: instruments[view.AsyncUpDownCounter], wantKind: internal.NewCumulativeSum[N](false), wantLen: 1, }, { - name: "reader should set default agg", - reader: NewManualReader(), - views: []view.View{{}}, + name: "reader should set default agg", + viewer: viewer{ + reader: NewManualReader(), + views: []view.View{{}}, + }, inst: instruments[view.AsyncGauge], wantKind: internal.NewLastValue[N](), wantLen: 1, }, { - name: "reader should set default agg", - reader: NewManualReader(), - views: []view.View{{}}, + name: "reader should set default agg", + viewer: viewer{ + reader: NewManualReader(), + views: []view.View{{}}, + }, inst: instruments[view.SyncCounter], wantKind: internal.NewCumulativeSum[N](true), wantLen: 1, }, { - name: "view should overwrite reader", - reader: NewManualReader(), - views: []view.View{changeAggView}, + name: "view should overwrite reader", + viewer: viewer{ + reader: NewManualReader(), + views: []view.View{changeAggView}, + }, inst: instruments[view.SyncCounter], wantKind: internal.NewCumulativeHistogram[N](aggregation.ExplicitBucketHistogram{}), wantLen: 1, }, { - name: "multiple views should create multiple aggregators", - reader: NewManualReader(), - views: []view.View{{}, renameView}, + name: "multiple views should create multiple aggregators", + viewer: viewer{ + reader: NewManualReader(), + views: []view.View{{}, renameView}, + }, inst: instruments[view.SyncCounter], wantKind: internal.NewCumulativeSum[N](true), wantLen: 2, }, { - name: "reader with invalid aggregation should error", - reader: NewManualReader(WithAggregationSelector(func(ik view.InstrumentKind) aggregation.Aggregation { return aggregation.Default{} })), - views: []view.View{{}}, + name: "reader with invalid aggregation should error", + viewer: viewer{ + reader: NewManualReader(WithAggregationSelector(func(ik view.InstrumentKind) aggregation.Aggregation { return aggregation.Default{} })), + views: []view.View{{}}, + }, inst: instruments[view.SyncCounter], wantErr: errCreatingAggregators, }, { - name: "view with invalid aggregation should error", - reader: NewManualReader(), - views: []view.View{invalidAggView}, + name: "view with invalid aggregation should error", + viewer: viewer{ + reader: NewManualReader(), + views: []view.View{invalidAggView}, + }, inst: instruments[view.SyncCounter], wantErr: errCreatingAggregators, }, @@ -216,7 +249,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { for _, tt := range testcases { t.Run(tt.name, func(t *testing.T) { c := newInstrumentCache[N](nil, nil) - i := newInserter(newPipeline(nil, tt.reader, tt.views), c) + i := newInserter(newPipeline(nil, tt.viewer), c) got, err := i.Instrument(tt.inst, unit.Dimensionless) assert.ErrorIs(t, err, tt.wantErr) require.Len(t, got, tt.wantLen) @@ -229,7 +262,8 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { func testInvalidInstrumentShouldPanic[N int64 | float64]() { c := newInstrumentCache[N](nil, nil) - i := newInserter(newPipeline(nil, NewManualReader(), []view.View{{}}), c) + v := viewer{reader: NewManualReader()} + i := newInserter(newPipeline(nil, v), c) inst := view.Instrument{ Name: "foo", Kind: view.InstrumentKind(255), @@ -257,59 +291,51 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { testCases := []struct { name string - views map[Reader][]view.View + views []viewer inst view.Instrument wantCount int }{ { - name: "No views have no aggregators", + name: "No Reader implies no aggregators", inst: view.Instrument{Name: "foo"}, }, { - name: "1 reader 1 view gets 1 aggregator", + name: "1 reader default view gets 1 aggregator", inst: view.Instrument{Name: "foo"}, - views: map[Reader][]view.View{ - testRdr: { - {}, - }, - }, + views: []viewer{{ + reader: testRdr, + }}, wantCount: 1, }, { name: "1 reader 2 views gets 2 aggregator", inst: view.Instrument{Name: "foo"}, - views: map[Reader][]view.View{ - testRdr: { - {}, - renameView, - }, - }, + views: []viewer{{ + reader: testRdr, + views: []view.View{{}, renameView}, + }}, wantCount: 2, }, { - name: "2 readers 1 view each gets 2 aggregators", + name: "2 readers default view each gets 2 aggregators", inst: view.Instrument{Name: "foo"}, - views: map[Reader][]view.View{ - testRdr: { - {}, - }, - testRdrHistogram: { - {}, - }, + views: []viewer{ + {reader: testRdr}, + {reader: testRdrHistogram}, }, wantCount: 2, }, { name: "2 reader 2 views each gets 4 aggregators", inst: view.Instrument{Name: "foo"}, - views: map[Reader][]view.View{ - testRdr: { - {}, - renameView, + views: []viewer{ + { + reader: testRdr, + views: []view.View{{}, renameView}, }, - testRdrHistogram: { - {}, - renameView, + { + reader: testRdrHistogram, + views: []view.View{{}, renameView}, }, }, wantCount: 4, @@ -317,12 +343,10 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { { name: "An instrument is duplicated in two views share the same aggregator", inst: view.Instrument{Name: "foo"}, - views: map[Reader][]view.View{ - testRdr: { - {}, - {}, - }, - }, + views: []viewer{{ + reader: testRdr, + views: []view.View{{}, {}}, + }}, wantCount: 1, }, } @@ -362,11 +386,11 @@ func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, want func TestPipelineRegistryResource(t *testing.T) { v, err := view.New(view.MatchInstrumentName("bar"), view.WithRename("foo")) require.NoError(t, err) - views := map[Reader][]view.View{ - NewManualReader(): {{}, v}, - } res := resource.NewSchemaless(attribute.String("key", "val")) - pipes := newPipelines(res, views) + pipes := newPipelines(res, []viewer{{ + reader: NewManualReader(), + views: []view.View{{}, v}, + }}) for _, p := range pipes { assert.True(t, res.Equal(p.resource), "resource not set") } @@ -375,11 +399,7 @@ func TestPipelineRegistryResource(t *testing.T) { func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { testRdrHistogram := NewManualReader(WithAggregationSelector(func(ik view.InstrumentKind) aggregation.Aggregation { return aggregation.ExplicitBucketHistogram{} })) - views := map[Reader][]view.View{ - testRdrHistogram: { - {}, - }, - } + views := []viewer{{reader: testRdrHistogram}} p := newPipelines(resource.Empty(), views) inst := view.Instrument{Name: "foo", Kind: view.AsyncGauge} @@ -421,12 +441,10 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) { view.MatchInstrumentName("bar"), view.WithRename("foo"), ) - views := map[Reader][]view.View{ - NewManualReader(): { - {}, - renameView, - }, - } + views := []viewer{{ + reader: NewManualReader(), + views: []view.View{{}, renameView}, + }} fooInst := view.Instrument{Name: "foo", Kind: view.SyncCounter} barInst := view.Instrument{Name: "bar", Kind: view.SyncCounter} diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index e47bb6b5f64..358e6906b77 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -67,7 +67,7 @@ func TestEmptyPipeline(t *testing.T) { } func TestNewPipeline(t *testing.T) { - pipe := newPipeline(nil, nil, nil) + pipe := newPipeline(nil, viewer{}) output, err := pipe.produce(context.Background()) require.NoError(t, err) @@ -92,7 +92,7 @@ func TestNewPipeline(t *testing.T) { func TestPipelineUsesResource(t *testing.T) { res := resource.NewWithAttributes("noSchema", attribute.String("test", "resource")) - pipe := newPipeline(res, nil, nil) + pipe := newPipeline(res, viewer{}) output, err := pipe.produce(context.Background()) assert.NoError(t, err) @@ -100,7 +100,7 @@ func TestPipelineUsesResource(t *testing.T) { } func TestPipelineConcurrency(t *testing.T) { - pipe := newPipeline(nil, nil, nil) + pipe := newPipeline(nil, viewer{}) ctx := context.Background() var wg sync.WaitGroup @@ -153,11 +153,11 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) { }{ { name: "NoView", - pipe: newPipeline(nil, reader, nil), + pipe: newPipeline(nil, viewer{reader: reader}), }, { name: "NoMatchingView", - pipe: newPipeline(nil, reader, []view.View{v}), + pipe: newPipeline(nil, viewer{reader: reader, views: []view.View{v}}), }, } diff --git a/sdk/metric/provider.go b/sdk/metric/provider.go index ce2e5524398..8d6e3927c28 100644 --- a/sdk/metric/provider.go +++ b/sdk/metric/provider.go @@ -45,7 +45,7 @@ func NewMeterProvider(options ...Option) *MeterProvider { conf := newConfig(options) flush, sdown := conf.readerSignals() return &MeterProvider{ - pipes: newPipelines(conf.res, conf.readers), + pipes: newPipelines(conf.res, conf.viewers), forceFlush: flush, shutdown: sdown, } From 8f3f2ee1d8f1c0f1d665c4422ab52476cd50d5cf Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 14 Oct 2022 12:43:33 -0700 Subject: [PATCH 3/5] Test Reader not required to be comparable --- sdk/metric/reader_test.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sdk/metric/reader_test.go b/sdk/metric/reader_test.go index f005ada472f..91a4b76df39 100644 --- a/sdk/metric/reader_test.go +++ b/sdk/metric/reader_test.go @@ -236,3 +236,15 @@ func TestDefaultTemporalitySelector(t *testing.T) { assert.Equal(t, metricdata.CumulativeTemporality, DefaultTemporalitySelector(ik)) } } + +type notComparable [0]func() + +type noCompareReader struct { + notComparable + Reader +} + +func TestReadersNotRequiredToBeComparable(t *testing.T) { + r := noCompareReader{Reader: NewManualReader()} + assert.NotPanics(t, func() { _ = NewMeterProvider(WithReader(r)) }) +} From 02dc27e5c4523674732e90b36cb71e1edb9481ee Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 14 Oct 2022 12:46:37 -0700 Subject: [PATCH 4/5] Add changes to changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 87d13296190..a164766c3b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Slice attributes of `attribute` package are now comparable based on their value, not instance. (#3108 #3252) - Prometheus exporter will now cumulatively sum histogram buckets. (#3281) +- Remove comparable requirement for `Reader`s. (#3290) ## [1.11.0/0.32.3] 2022-10-12 From 7fda30f0a2d0e989ef7ff24b905b32656fbad66e Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 14 Oct 2022 12:54:30 -0700 Subject: [PATCH 5/5] Ignore lint of unused type --- sdk/metric/reader_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/metric/reader_test.go b/sdk/metric/reader_test.go index 91a4b76df39..43f923a1564 100644 --- a/sdk/metric/reader_test.go +++ b/sdk/metric/reader_test.go @@ -237,10 +237,10 @@ func TestDefaultTemporalitySelector(t *testing.T) { } } -type notComparable [0]func() +type notComparable [0]func() // nolint:unused // non-comparable type itself is used. type noCompareReader struct { - notComparable + notComparable // nolint:unused // non-comparable type itself is used. Reader }