Skip to content

Commit

Permalink
Associate views with MeterProvider instead of Reader (#3387)
Browse files Browse the repository at this point in the history
* Split WithView from WithReader

* Accept readers and views params in newPipelines

* Update MeterProvider pipes init

* Fix WithView comment

* Fix view example MeterProvider option

* Fix With{View,Reader} option in prom exporter test

* Test Reader not required to be comparable

* Add changes to changelog

* Fix changelog option name
  • Loading branch information
MrAlias committed Oct 31, 2022
1 parent c8a13d6 commit d1aca7b
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 86 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Expand Up @@ -8,9 +8,21 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

### Added

- The `WithView` `Option` is added to the `go.opentelemetry.io/otel/sdk/metric` package.
This option is used to configure the view(s) a `MeterProvider` will use for all `Reader`s that are registered with it. (#3387)

### Changed

- The `"go.opentelemetry.io/otel/sdk/metric".WithReader` option no longer accepts views to associate with the `Reader`.
Instead, views are now registered directly with the `MeterProvider` via the new `WithView` option.
The views registered with the `MeterProvider` apply to all `Reader`s. (#3387)

### Fixed

- The `go.opentelemetry.io/otel/exporters/prometheus` exporter fixes duplicated `_total` suffixes. (#3369)
- Remove comparable requirement for `Reader`s. (#3387)
- Cumulative metrics from the OpenCensus bridge (`go.opentelemetry.io/otel/bridge/opencensus`) are defined as monotonic sums, instead of non-monotonic. (#3389)
- Asynchronous counters (`Counter` and `UpDownCounter`) from the metric SDK now produce delta sums when configured with delta temporality. (#3398)
- Exported `Status` codes in the `go.opentelemetry.io/otel/exporters/zipkin` exporter are now exported as all upper case values. (#3340)
Expand Down
5 changes: 4 additions & 1 deletion example/view/main.go
Expand Up @@ -66,7 +66,10 @@ func main() {
log.Fatal(err)
}

provider := metric.NewMeterProvider(metric.WithReader(exporter, customBucketsView, defaultView))
provider := metric.NewMeterProvider(
metric.WithReader(exporter),
metric.WithView(customBucketsView, defaultView),
)
meter := provider.Meter(meterName)

// Start the prometheus HTTP server and pass the exporter Collector to it
Expand Down
3 changes: 2 additions & 1 deletion exporters/prometheus/exporter_test.go
Expand Up @@ -260,7 +260,8 @@ func TestPrometheusExporter(t *testing.T) {

provider := metric.NewMeterProvider(
metric.WithResource(res),
metric.WithReader(exporter, customBucketsView, defaultView),
metric.WithReader(exporter),
metric.WithView(customBucketsView, defaultView),
)
meter := provider.Meter("testmeter")

Expand Down
34 changes: 22 additions & 12 deletions sdk/metric/config.go
Expand Up @@ -26,7 +26,8 @@ import (
// config contains configuration options for a MeterProvider.
type config struct {
res *resource.Resource
readers map[Reader][]view.View
readers []Reader
views []view.View
}

// readerSignals returns a force-flush and shutdown function for a
Expand All @@ -35,7 +36,7 @@ type config struct {
// single functions.
func (c config) readerSignals() (forceFlush, shutdown func(context.Context) error) {
var fFuncs, sFuncs []func(context.Context) error
for r := range c.readers {
for _, r := range c.readers {
sFuncs = append(sFuncs, r.Shutdown)
fFuncs = append(fFuncs, r.ForceFlush)
}
Expand Down Expand Up @@ -112,21 +113,30 @@ func WithResource(res *resource.Resource) Option {
})
}

// WithReader associates a Reader with a MeterProvider. Any passed view config
// will be used to associate a view with the Reader. If no views are passed
// the default view will be use for the Reader.
//
// Passing this option multiple times for the same Reader will overwrite. The
// last option passed will be the one used for that Reader.
// WithReader associates Reader r with a MeterProvider.
//
// By default, if this option is not used, the MeterProvider will perform no
// operations; no data will be exported without a Reader.
func WithReader(r Reader, views ...view.View) Option {
func WithReader(r Reader) Option {
return optionFunc(func(cfg config) config {
if cfg.readers == nil {
cfg.readers = make(map[Reader][]view.View)
if r == nil {
return cfg
}
cfg.readers[r] = views
cfg.readers = append(cfg.readers, r)
return cfg
})
}

// WithView associates views a MeterProvider.
//
// Views are appended to existing ones in a MeterProvider if this option is
// used multiple times.
//
// By default, if this option is not used, the MeterProvider will use the
// default view.
func WithView(views ...view.View) Option {
return optionFunc(func(cfg config) config {
cfg.views = append(cfg.views, views...)
return cfg
})
}
18 changes: 17 additions & 1 deletion sdk/metric/config_test.go
Expand Up @@ -127,5 +127,21 @@ func TestWithResource(t *testing.T) {
func TestWithReader(t *testing.T) {
r := &reader{}
c := newConfig([]Option{WithReader(r)})
assert.Contains(t, c.readers, r)
require.Len(t, c.readers, 1)
assert.Same(t, r, c.readers[0])
}

func TestWithView(t *testing.T) {
var views []view.View

v, err := view.New(view.MatchInstrumentKind(view.AsyncCounter), view.WithRename("a"))
require.NoError(t, err)
views = append(views, v)

v, err = view.New(view.MatchInstrumentKind(view.SyncCounter), view.WithRename("b"))
require.NoError(t, err)
views = append(views, v)

c := newConfig([]Option{WithView(views...)})
assert.Equal(t, views, c.views)
}
6 changes: 3 additions & 3 deletions sdk/metric/pipeline.go
Expand Up @@ -416,13 +416,13 @@ func isAggregatorCompatible(kind view.InstrumentKind, agg aggregation.Aggregatio
// measurement.
type pipelines []*pipeline

func newPipelines(res *resource.Resource, readers map[Reader][]view.View) pipelines {
func newPipelines(res *resource.Resource, readers []Reader, views []view.View) pipelines {
pipes := make([]*pipeline, 0, len(readers))
for r, v := range readers {
for _, r := range readers {
p := &pipeline{
resource: res,
reader: r,
views: v,
views: views,
}
r.register(p)
pipes = append(pipes, p)
Expand Down
99 changes: 32 additions & 67 deletions sdk/metric/pipeline_registry_test.go
Expand Up @@ -257,7 +257,8 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) {

testCases := []struct {
name string
views map[Reader][]view.View
readers []Reader
views []view.View
inst view.Instrument
wantCount int
}{
Expand All @@ -266,72 +267,46 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) {
inst: view.Instrument{Name: "foo"},
},
{
name: "1 reader 1 view gets 1 aggregator",
inst: view.Instrument{Name: "foo"},
views: map[Reader][]view.View{
testRdr: {
{},
},
},
name: "1 reader 1 view gets 1 aggregator",
inst: view.Instrument{Name: "foo"},
readers: []Reader{testRdr},
views: []view.View{{}},
wantCount: 1,
},
{
name: "1 reader 2 views gets 2 aggregator",
inst: view.Instrument{Name: "foo"},
views: map[Reader][]view.View{
testRdr: {
{},
renameView,
},
},
name: "1 reader 2 views gets 2 aggregator",
inst: view.Instrument{Name: "foo"},
readers: []Reader{testRdr},
views: []view.View{{}, renameView},
wantCount: 2,
},
{
name: "2 readers 1 view each gets 2 aggregators",
inst: view.Instrument{Name: "foo"},
views: map[Reader][]view.View{
testRdr: {
{},
},
testRdrHistogram: {
{},
},
},
name: "2 readers 1 view each gets 2 aggregators",
inst: view.Instrument{Name: "foo"},
readers: []Reader{testRdr, testRdrHistogram},
views: []view.View{{}},
wantCount: 2,
},
{
name: "2 reader 2 views each gets 4 aggregators",
inst: view.Instrument{Name: "foo"},
views: map[Reader][]view.View{
testRdr: {
{},
renameView,
},
testRdrHistogram: {
{},
renameView,
},
},
name: "2 reader 2 views each gets 4 aggregators",
inst: view.Instrument{Name: "foo"},
readers: []Reader{testRdr, testRdrHistogram},
views: []view.View{{}, renameView},
wantCount: 4,
},
{
name: "An instrument is duplicated in two views share the same aggregator",
inst: view.Instrument{Name: "foo"},
views: map[Reader][]view.View{
testRdr: {
{},
{},
},
},
name: "An instrument is duplicated in two views share the same aggregator",
inst: view.Instrument{Name: "foo"},
readers: []Reader{testRdr},
views: []view.View{{}, {}},
wantCount: 1,
},
}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
p := newPipelines(resource.Empty(), tt.views)
p := newPipelines(resource.Empty(), tt.readers, tt.views)
testPipelineRegistryResolveIntAggregators(t, p, tt.wantCount)
p = newPipelines(resource.Empty(), tt.views)
testPipelineRegistryResolveFloatAggregators(t, p, tt.wantCount)
})
}
Expand Down Expand Up @@ -362,11 +337,10 @@ func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, want
func TestPipelineRegistryResource(t *testing.T) {
v, err := view.New(view.MatchInstrumentName("bar"), view.WithRename("foo"))
require.NoError(t, err)
views := map[Reader][]view.View{
NewManualReader(): {{}, v},
}
readers := []Reader{NewManualReader()}
views := []view.View{{}, v}
res := resource.NewSchemaless(attribute.String("key", "val"))
pipes := newPipelines(res, views)
pipes := newPipelines(res, readers, views)
for _, p := range pipes {
assert.True(t, res.Equal(p.resource), "resource not set")
}
Expand All @@ -375,12 +349,9 @@ func TestPipelineRegistryResource(t *testing.T) {
func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) {
testRdrHistogram := NewManualReader(WithAggregationSelector(func(ik view.InstrumentKind) aggregation.Aggregation { return aggregation.ExplicitBucketHistogram{} }))

views := map[Reader][]view.View{
testRdrHistogram: {
{},
},
}
p := newPipelines(resource.Empty(), views)
readers := []Reader{testRdrHistogram}
views := []view.View{{}}
p := newPipelines(resource.Empty(), readers, views)
inst := view.Instrument{Name: "foo", Kind: view.AsyncGauge}

vc := cache[string, instrumentID]{}
Expand All @@ -389,8 +360,6 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) {
assert.Error(t, err)
assert.Len(t, intAggs, 0)

p = newPipelines(resource.Empty(), views)

rf := newResolver(p, newInstrumentCache[float64](nil, &vc))
floatAggs, err := rf.Aggregators(inst, unit.Dimensionless)
assert.Error(t, err)
Expand Down Expand Up @@ -421,17 +390,13 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) {
view.MatchInstrumentName("bar"),
view.WithRename("foo"),
)
views := map[Reader][]view.View{
NewManualReader(): {
{},
renameView,
},
}
readers := []Reader{NewManualReader()}
views := []view.View{{}, renameView}

fooInst := view.Instrument{Name: "foo", Kind: view.SyncCounter}
barInst := view.Instrument{Name: "bar", Kind: view.SyncCounter}

p := newPipelines(resource.Empty(), views)
p := newPipelines(resource.Empty(), readers, views)

vc := cache[string, instrumentID]{}
ri := newResolver(p, newInstrumentCache[int64](nil, &vc))
Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/provider.go
Expand Up @@ -45,7 +45,7 @@ func NewMeterProvider(options ...Option) *MeterProvider {
conf := newConfig(options)
flush, sdown := conf.readerSignals()
return &MeterProvider{
pipes: newPipelines(conf.res, conf.readers),
pipes: newPipelines(conf.res, conf.readers, conf.views),
forceFlush: flush,
shutdown: sdown,
}
Expand Down
12 changes: 12 additions & 0 deletions sdk/metric/reader_test.go
Expand Up @@ -236,3 +236,15 @@ func TestDefaultTemporalitySelector(t *testing.T) {
assert.Equal(t, metricdata.CumulativeTemporality, DefaultTemporalitySelector(ik))
}
}

type notComparable [0]func() // nolint:unused // non-comparable type itself is used.

type noCompareReader struct {
notComparable // nolint:unused // non-comparable type itself is used.
Reader
}

func TestReadersNotRequiredToBeComparable(t *testing.T) {
r := noCompareReader{Reader: NewManualReader()}
assert.NotPanics(t, func() { _ = NewMeterProvider(WithReader(r)) })
}

0 comments on commit d1aca7b

Please sign in to comment.