From d1aca7b16719792a517f590063aa0ca44aced447 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Mon, 31 Oct 2022 07:55:54 -0700 Subject: [PATCH] Associate views with MeterProvider instead of Reader (#3387) * Split WithView from WithReader * Accept readers and views params in newPipelines * Update MeterProvider pipes init * Fix WithView comment * Fix view example MeterProvider option * Fix With{View,Reader} option in prom exporter test * Test Reader not required to be comparable * Add changes to changelog * Fix changelog option name --- CHANGELOG.md | 12 ++++ example/view/main.go | 5 +- exporters/prometheus/exporter_test.go | 3 +- sdk/metric/config.go | 34 +++++---- sdk/metric/config_test.go | 18 ++++- sdk/metric/pipeline.go | 6 +- sdk/metric/pipeline_registry_test.go | 99 +++++++++------------------ sdk/metric/provider.go | 2 +- sdk/metric/reader_test.go | 12 ++++ 9 files changed, 105 insertions(+), 86 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a75865c5ba5..43c083fba4d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,9 +8,21 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## [Unreleased] +### Added + +- The `WithView` `Option` is added to the `go.opentelemetry.io/otel/sdk/metric` package. + This option is used to configure the view(s) a `MeterProvider` will use for all `Reader`s that are registered with it. (#3387) + +### Changed + +- The `"go.opentelemetry.io/otel/sdk/metric".WithReader` option no longer accepts views to associate with the `Reader`. + Instead, views are now registered directly with the `MeterProvider` via the new `WithView` option. + The views registered with the `MeterProvider` apply to all `Reader`s. (#3387) + ### Fixed - The `go.opentelemetry.io/otel/exporters/prometheus` exporter fixes duplicated `_total` suffixes. (#3369) +- Remove comparable requirement for `Reader`s. (#3387) - Cumulative metrics from the OpenCensus bridge (`go.opentelemetry.io/otel/bridge/opencensus`) are defined as monotonic sums, instead of non-monotonic. (#3389) - Asynchronous counters (`Counter` and `UpDownCounter`) from the metric SDK now produce delta sums when configured with delta temporality. (#3398) - Exported `Status` codes in the `go.opentelemetry.io/otel/exporters/zipkin` exporter are now exported as all upper case values. (#3340) diff --git a/example/view/main.go b/example/view/main.go index c8f1b246590..bafbe3c8920 100644 --- a/example/view/main.go +++ b/example/view/main.go @@ -66,7 +66,10 @@ func main() { log.Fatal(err) } - provider := metric.NewMeterProvider(metric.WithReader(exporter, customBucketsView, defaultView)) + provider := metric.NewMeterProvider( + metric.WithReader(exporter), + metric.WithView(customBucketsView, defaultView), + ) meter := provider.Meter(meterName) // Start the prometheus HTTP server and pass the exporter Collector to it diff --git a/exporters/prometheus/exporter_test.go b/exporters/prometheus/exporter_test.go index 27441e74ea1..a21458158c9 100644 --- a/exporters/prometheus/exporter_test.go +++ b/exporters/prometheus/exporter_test.go @@ -260,7 +260,8 @@ func TestPrometheusExporter(t *testing.T) { provider := metric.NewMeterProvider( metric.WithResource(res), - metric.WithReader(exporter, customBucketsView, defaultView), + metric.WithReader(exporter), + metric.WithView(customBucketsView, defaultView), ) meter := provider.Meter("testmeter") diff --git a/sdk/metric/config.go b/sdk/metric/config.go index 5b7537f63ed..ec1e06ab43b 100644 --- a/sdk/metric/config.go +++ b/sdk/metric/config.go @@ -26,7 +26,8 @@ import ( // config contains configuration options for a MeterProvider. type config struct { res *resource.Resource - readers map[Reader][]view.View + readers []Reader + views []view.View } // readerSignals returns a force-flush and shutdown function for a @@ -35,7 +36,7 @@ type config struct { // single functions. func (c config) readerSignals() (forceFlush, shutdown func(context.Context) error) { var fFuncs, sFuncs []func(context.Context) error - for r := range c.readers { + for _, r := range c.readers { sFuncs = append(sFuncs, r.Shutdown) fFuncs = append(fFuncs, r.ForceFlush) } @@ -112,21 +113,30 @@ func WithResource(res *resource.Resource) Option { }) } -// WithReader associates a Reader with a MeterProvider. Any passed view config -// will be used to associate a view with the Reader. If no views are passed -// the default view will be use for the Reader. -// -// Passing this option multiple times for the same Reader will overwrite. The -// last option passed will be the one used for that Reader. +// WithReader associates Reader r with a MeterProvider. // // By default, if this option is not used, the MeterProvider will perform no // operations; no data will be exported without a Reader. -func WithReader(r Reader, views ...view.View) Option { +func WithReader(r Reader) Option { return optionFunc(func(cfg config) config { - if cfg.readers == nil { - cfg.readers = make(map[Reader][]view.View) + if r == nil { + return cfg } - cfg.readers[r] = views + cfg.readers = append(cfg.readers, r) + return cfg + }) +} + +// WithView associates views a MeterProvider. +// +// Views are appended to existing ones in a MeterProvider if this option is +// used multiple times. +// +// By default, if this option is not used, the MeterProvider will use the +// default view. +func WithView(views ...view.View) Option { + return optionFunc(func(cfg config) config { + cfg.views = append(cfg.views, views...) return cfg }) } diff --git a/sdk/metric/config_test.go b/sdk/metric/config_test.go index ad88d6b9ff5..3c3659a46e7 100644 --- a/sdk/metric/config_test.go +++ b/sdk/metric/config_test.go @@ -127,5 +127,21 @@ func TestWithResource(t *testing.T) { func TestWithReader(t *testing.T) { r := &reader{} c := newConfig([]Option{WithReader(r)}) - assert.Contains(t, c.readers, r) + require.Len(t, c.readers, 1) + assert.Same(t, r, c.readers[0]) +} + +func TestWithView(t *testing.T) { + var views []view.View + + v, err := view.New(view.MatchInstrumentKind(view.AsyncCounter), view.WithRename("a")) + require.NoError(t, err) + views = append(views, v) + + v, err = view.New(view.MatchInstrumentKind(view.SyncCounter), view.WithRename("b")) + require.NoError(t, err) + views = append(views, v) + + c := newConfig([]Option{WithView(views...)}) + assert.Equal(t, views, c.views) } diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index db70003b1d5..74a6cb713b1 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -416,13 +416,13 @@ func isAggregatorCompatible(kind view.InstrumentKind, agg aggregation.Aggregatio // measurement. type pipelines []*pipeline -func newPipelines(res *resource.Resource, readers map[Reader][]view.View) pipelines { +func newPipelines(res *resource.Resource, readers []Reader, views []view.View) pipelines { pipes := make([]*pipeline, 0, len(readers)) - for r, v := range readers { + for _, r := range readers { p := &pipeline{ resource: res, reader: r, - views: v, + views: views, } r.register(p) pipes = append(pipes, p) diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index eb27da9824e..a6ad6f81d23 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -257,7 +257,8 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { testCases := []struct { name string - views map[Reader][]view.View + readers []Reader + views []view.View inst view.Instrument wantCount int }{ @@ -266,72 +267,46 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { inst: view.Instrument{Name: "foo"}, }, { - name: "1 reader 1 view gets 1 aggregator", - inst: view.Instrument{Name: "foo"}, - views: map[Reader][]view.View{ - testRdr: { - {}, - }, - }, + name: "1 reader 1 view gets 1 aggregator", + inst: view.Instrument{Name: "foo"}, + readers: []Reader{testRdr}, + views: []view.View{{}}, wantCount: 1, }, { - name: "1 reader 2 views gets 2 aggregator", - inst: view.Instrument{Name: "foo"}, - views: map[Reader][]view.View{ - testRdr: { - {}, - renameView, - }, - }, + name: "1 reader 2 views gets 2 aggregator", + inst: view.Instrument{Name: "foo"}, + readers: []Reader{testRdr}, + views: []view.View{{}, renameView}, wantCount: 2, }, { - name: "2 readers 1 view each gets 2 aggregators", - inst: view.Instrument{Name: "foo"}, - views: map[Reader][]view.View{ - testRdr: { - {}, - }, - testRdrHistogram: { - {}, - }, - }, + name: "2 readers 1 view each gets 2 aggregators", + inst: view.Instrument{Name: "foo"}, + readers: []Reader{testRdr, testRdrHistogram}, + views: []view.View{{}}, wantCount: 2, }, { - name: "2 reader 2 views each gets 4 aggregators", - inst: view.Instrument{Name: "foo"}, - views: map[Reader][]view.View{ - testRdr: { - {}, - renameView, - }, - testRdrHistogram: { - {}, - renameView, - }, - }, + name: "2 reader 2 views each gets 4 aggregators", + inst: view.Instrument{Name: "foo"}, + readers: []Reader{testRdr, testRdrHistogram}, + views: []view.View{{}, renameView}, wantCount: 4, }, { - name: "An instrument is duplicated in two views share the same aggregator", - inst: view.Instrument{Name: "foo"}, - views: map[Reader][]view.View{ - testRdr: { - {}, - {}, - }, - }, + name: "An instrument is duplicated in two views share the same aggregator", + inst: view.Instrument{Name: "foo"}, + readers: []Reader{testRdr}, + views: []view.View{{}, {}}, wantCount: 1, }, } for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { - p := newPipelines(resource.Empty(), tt.views) + p := newPipelines(resource.Empty(), tt.readers, tt.views) testPipelineRegistryResolveIntAggregators(t, p, tt.wantCount) - p = newPipelines(resource.Empty(), tt.views) testPipelineRegistryResolveFloatAggregators(t, p, tt.wantCount) }) } @@ -362,11 +337,10 @@ func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, want func TestPipelineRegistryResource(t *testing.T) { v, err := view.New(view.MatchInstrumentName("bar"), view.WithRename("foo")) require.NoError(t, err) - views := map[Reader][]view.View{ - NewManualReader(): {{}, v}, - } + readers := []Reader{NewManualReader()} + views := []view.View{{}, v} res := resource.NewSchemaless(attribute.String("key", "val")) - pipes := newPipelines(res, views) + pipes := newPipelines(res, readers, views) for _, p := range pipes { assert.True(t, res.Equal(p.resource), "resource not set") } @@ -375,12 +349,9 @@ func TestPipelineRegistryResource(t *testing.T) { func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { testRdrHistogram := NewManualReader(WithAggregationSelector(func(ik view.InstrumentKind) aggregation.Aggregation { return aggregation.ExplicitBucketHistogram{} })) - views := map[Reader][]view.View{ - testRdrHistogram: { - {}, - }, - } - p := newPipelines(resource.Empty(), views) + readers := []Reader{testRdrHistogram} + views := []view.View{{}} + p := newPipelines(resource.Empty(), readers, views) inst := view.Instrument{Name: "foo", Kind: view.AsyncGauge} vc := cache[string, instrumentID]{} @@ -389,8 +360,6 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { assert.Error(t, err) assert.Len(t, intAggs, 0) - p = newPipelines(resource.Empty(), views) - rf := newResolver(p, newInstrumentCache[float64](nil, &vc)) floatAggs, err := rf.Aggregators(inst, unit.Dimensionless) assert.Error(t, err) @@ -421,17 +390,13 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) { view.MatchInstrumentName("bar"), view.WithRename("foo"), ) - views := map[Reader][]view.View{ - NewManualReader(): { - {}, - renameView, - }, - } + readers := []Reader{NewManualReader()} + views := []view.View{{}, renameView} fooInst := view.Instrument{Name: "foo", Kind: view.SyncCounter} barInst := view.Instrument{Name: "bar", Kind: view.SyncCounter} - p := newPipelines(resource.Empty(), views) + p := newPipelines(resource.Empty(), readers, views) vc := cache[string, instrumentID]{} ri := newResolver(p, newInstrumentCache[int64](nil, &vc)) diff --git a/sdk/metric/provider.go b/sdk/metric/provider.go index ce2e5524398..b90ae4ff445 100644 --- a/sdk/metric/provider.go +++ b/sdk/metric/provider.go @@ -45,7 +45,7 @@ func NewMeterProvider(options ...Option) *MeterProvider { conf := newConfig(options) flush, sdown := conf.readerSignals() return &MeterProvider{ - pipes: newPipelines(conf.res, conf.readers), + pipes: newPipelines(conf.res, conf.readers, conf.views), forceFlush: flush, shutdown: sdown, } diff --git a/sdk/metric/reader_test.go b/sdk/metric/reader_test.go index f005ada472f..43f923a1564 100644 --- a/sdk/metric/reader_test.go +++ b/sdk/metric/reader_test.go @@ -236,3 +236,15 @@ func TestDefaultTemporalitySelector(t *testing.T) { assert.Equal(t, metricdata.CumulativeTemporality, DefaultTemporalitySelector(ik)) } } + +type notComparable [0]func() // nolint:unused // non-comparable type itself is used. + +type noCompareReader struct { + notComparable // nolint:unused // non-comparable type itself is used. + Reader +} + +func TestReadersNotRequiredToBeComparable(t *testing.T) { + r := noCompareReader{Reader: NewManualReader()} + assert.NotPanics(t, func() { _ = NewMeterProvider(WithReader(r)) }) +}