Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Associate views with MeterProvider instead of Reader #3387

Merged
merged 14 commits into from Oct 31, 2022
12 changes: 12 additions & 0 deletions CHANGELOG.md
Expand Up @@ -8,9 +8,21 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

### Added

- The `WithView` `Option` is added to the `go.opentelemetry.io/otel/sdk/metric` package.
This option is used to configure the view(s) a `MeterProvider` will use for all `Reader`s that are registered with it. (#3387)

### Changed

- The `"go.opentelemetry.io/otel/sdk/metric".WithReader` option no longer accepts views to associate with the `Reader`.
Instead, views are now registered directly with the `MeterProvider` via the new `WithView` option.
The views registered with the `MeterProvider` apply to all `Reader`s. (#3387)

### Fixed

- The `go.opentelemetry.io/otel/exporters/prometheus` exporter fixes duplicated `_total` suffixes. (#3369)
- Remove comparable requirement for `Reader`s. (#3387)

## [1.11.1/0.33.0] 2022-10-19

Expand Down
5 changes: 4 additions & 1 deletion example/view/main.go
Expand Up @@ -66,7 +66,10 @@ func main() {
log.Fatal(err)
}

provider := metric.NewMeterProvider(metric.WithReader(exporter, customBucketsView, defaultView))
provider := metric.NewMeterProvider(
metric.WithReader(exporter),
metric.WithView(customBucketsView, defaultView),
)
meter := provider.Meter(meterName)

// Start the prometheus HTTP server and pass the exporter Collector to it
Expand Down
3 changes: 2 additions & 1 deletion exporters/prometheus/exporter_test.go
Expand Up @@ -260,7 +260,8 @@ func TestPrometheusExporter(t *testing.T) {

provider := metric.NewMeterProvider(
metric.WithResource(res),
metric.WithReader(exporter, customBucketsView, defaultView),
metric.WithReader(exporter),
metric.WithView(customBucketsView, defaultView),
)
meter := provider.Meter("testmeter")

Expand Down
34 changes: 22 additions & 12 deletions sdk/metric/config.go
Expand Up @@ -26,7 +26,8 @@ import (
// config contains configuration options for a MeterProvider.
type config struct {
res *resource.Resource
readers map[Reader][]view.View
readers []Reader
views []view.View
}

// readerSignals returns a force-flush and shutdown function for a
Expand All @@ -35,7 +36,7 @@ type config struct {
// single functions.
func (c config) readerSignals() (forceFlush, shutdown func(context.Context) error) {
var fFuncs, sFuncs []func(context.Context) error
for r := range c.readers {
for _, r := range c.readers {
sFuncs = append(sFuncs, r.Shutdown)
fFuncs = append(fFuncs, r.ForceFlush)
}
Expand Down Expand Up @@ -112,21 +113,30 @@ func WithResource(res *resource.Resource) Option {
})
}

// WithReader associates a Reader with a MeterProvider. Any passed view config
// will be used to associate a view with the Reader. If no views are passed
// the default view will be use for the Reader.
//
// Passing this option multiple times for the same Reader will overwrite. The
// last option passed will be the one used for that Reader.
// WithReader associates Reader r with a MeterProvider.
//
// By default, if this option is not used, the MeterProvider will perform no
// operations; no data will be exported without a Reader.
func WithReader(r Reader, views ...view.View) Option {
func WithReader(r Reader) Option {
return optionFunc(func(cfg config) config {
if cfg.readers == nil {
cfg.readers = make(map[Reader][]view.View)
if r == nil {
return cfg
}
cfg.readers[r] = views
cfg.readers = append(cfg.readers, r)
return cfg
})
}

// WithView associates views a MeterProvider.
//
// Views are appended to existing ones in a MeterProvider if this option is
// used multiple times.
//
// By default, if this option is not used, the MeterProvider will use the
// default view.
func WithView(views ...view.View) Option {
return optionFunc(func(cfg config) config {
cfg.views = append(cfg.views, views...)
return cfg
})
}
18 changes: 17 additions & 1 deletion sdk/metric/config_test.go
Expand Up @@ -127,5 +127,21 @@ func TestWithResource(t *testing.T) {
func TestWithReader(t *testing.T) {
r := &reader{}
c := newConfig([]Option{WithReader(r)})
assert.Contains(t, c.readers, r)
require.Len(t, c.readers, 1)
assert.Same(t, r, c.readers[0])
}

func TestWithView(t *testing.T) {
var views []view.View

v, err := view.New(view.MatchInstrumentKind(view.AsyncCounter), view.WithRename("a"))
require.NoError(t, err)
views = append(views, v)

v, err = view.New(view.MatchInstrumentKind(view.SyncCounter), view.WithRename("b"))
require.NoError(t, err)
views = append(views, v)

c := newConfig([]Option{WithView(views...)})
assert.Equal(t, views, c.views)
}
6 changes: 3 additions & 3 deletions sdk/metric/pipeline.go
Expand Up @@ -416,13 +416,13 @@ func isAggregatorCompatible(kind view.InstrumentKind, agg aggregation.Aggregatio
// measurement.
type pipelines []*pipeline

func newPipelines(res *resource.Resource, readers map[Reader][]view.View) pipelines {
func newPipelines(res *resource.Resource, readers []Reader, views []view.View) pipelines {
pipes := make([]*pipeline, 0, len(readers))
for r, v := range readers {
for _, r := range readers {
p := &pipeline{
resource: res,
reader: r,
views: v,
views: views,
}
r.register(p)
pipes = append(pipes, p)
Expand Down
99 changes: 32 additions & 67 deletions sdk/metric/pipeline_registry_test.go
Expand Up @@ -257,7 +257,8 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) {

testCases := []struct {
name string
views map[Reader][]view.View
readers []Reader
views []view.View
inst view.Instrument
wantCount int
}{
Expand All @@ -266,72 +267,46 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) {
inst: view.Instrument{Name: "foo"},
},
{
name: "1 reader 1 view gets 1 aggregator",
inst: view.Instrument{Name: "foo"},
views: map[Reader][]view.View{
testRdr: {
{},
},
},
name: "1 reader 1 view gets 1 aggregator",
inst: view.Instrument{Name: "foo"},
readers: []Reader{testRdr},
views: []view.View{{}},
wantCount: 1,
},
{
name: "1 reader 2 views gets 2 aggregator",
inst: view.Instrument{Name: "foo"},
views: map[Reader][]view.View{
testRdr: {
{},
renameView,
},
},
name: "1 reader 2 views gets 2 aggregator",
inst: view.Instrument{Name: "foo"},
readers: []Reader{testRdr},
views: []view.View{{}, renameView},
wantCount: 2,
},
{
name: "2 readers 1 view each gets 2 aggregators",
inst: view.Instrument{Name: "foo"},
views: map[Reader][]view.View{
testRdr: {
{},
},
testRdrHistogram: {
{},
},
},
name: "2 readers 1 view each gets 2 aggregators",
inst: view.Instrument{Name: "foo"},
readers: []Reader{testRdr, testRdrHistogram},
views: []view.View{{}},
wantCount: 2,
},
{
name: "2 reader 2 views each gets 4 aggregators",
inst: view.Instrument{Name: "foo"},
views: map[Reader][]view.View{
testRdr: {
{},
renameView,
},
testRdrHistogram: {
{},
renameView,
},
},
name: "2 reader 2 views each gets 4 aggregators",
inst: view.Instrument{Name: "foo"},
readers: []Reader{testRdr, testRdrHistogram},
views: []view.View{{}, renameView},
wantCount: 4,
},
{
name: "An instrument is duplicated in two views share the same aggregator",
inst: view.Instrument{Name: "foo"},
views: map[Reader][]view.View{
testRdr: {
{},
{},
},
},
name: "An instrument is duplicated in two views share the same aggregator",
inst: view.Instrument{Name: "foo"},
readers: []Reader{testRdr},
views: []view.View{{}, {}},
wantCount: 1,
},
}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
p := newPipelines(resource.Empty(), tt.views)
p := newPipelines(resource.Empty(), tt.readers, tt.views)
testPipelineRegistryResolveIntAggregators(t, p, tt.wantCount)
p = newPipelines(resource.Empty(), tt.views)
testPipelineRegistryResolveFloatAggregators(t, p, tt.wantCount)
})
}
Expand Down Expand Up @@ -362,11 +337,10 @@ func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, want
func TestPipelineRegistryResource(t *testing.T) {
v, err := view.New(view.MatchInstrumentName("bar"), view.WithRename("foo"))
require.NoError(t, err)
views := map[Reader][]view.View{
NewManualReader(): {{}, v},
}
readers := []Reader{NewManualReader()}
views := []view.View{{}, v}
res := resource.NewSchemaless(attribute.String("key", "val"))
pipes := newPipelines(res, views)
pipes := newPipelines(res, readers, views)
for _, p := range pipes {
assert.True(t, res.Equal(p.resource), "resource not set")
}
Expand All @@ -375,12 +349,9 @@ func TestPipelineRegistryResource(t *testing.T) {
func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) {
testRdrHistogram := NewManualReader(WithAggregationSelector(func(ik view.InstrumentKind) aggregation.Aggregation { return aggregation.ExplicitBucketHistogram{} }))

views := map[Reader][]view.View{
testRdrHistogram: {
{},
},
}
p := newPipelines(resource.Empty(), views)
readers := []Reader{testRdrHistogram}
views := []view.View{{}}
p := newPipelines(resource.Empty(), readers, views)
inst := view.Instrument{Name: "foo", Kind: view.AsyncGauge}

vc := cache[string, instrumentID]{}
Expand All @@ -389,8 +360,6 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) {
assert.Error(t, err)
assert.Len(t, intAggs, 0)

p = newPipelines(resource.Empty(), views)

rf := newResolver(p, newInstrumentCache[float64](nil, &vc))
floatAggs, err := rf.Aggregators(inst, unit.Dimensionless)
assert.Error(t, err)
Expand Down Expand Up @@ -421,17 +390,13 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) {
view.MatchInstrumentName("bar"),
view.WithRename("foo"),
)
views := map[Reader][]view.View{
NewManualReader(): {
{},
renameView,
},
}
readers := []Reader{NewManualReader()}
views := []view.View{{}, renameView}

fooInst := view.Instrument{Name: "foo", Kind: view.SyncCounter}
barInst := view.Instrument{Name: "bar", Kind: view.SyncCounter}

p := newPipelines(resource.Empty(), views)
p := newPipelines(resource.Empty(), readers, views)

vc := cache[string, instrumentID]{}
ri := newResolver(p, newInstrumentCache[int64](nil, &vc))
Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/provider.go
Expand Up @@ -45,7 +45,7 @@ func NewMeterProvider(options ...Option) *MeterProvider {
conf := newConfig(options)
flush, sdown := conf.readerSignals()
return &MeterProvider{
pipes: newPipelines(conf.res, conf.readers),
pipes: newPipelines(conf.res, conf.readers, conf.views),
forceFlush: flush,
shutdown: sdown,
}
Expand Down
12 changes: 12 additions & 0 deletions sdk/metric/reader_test.go
Expand Up @@ -236,3 +236,15 @@ func TestDefaultTemporalitySelector(t *testing.T) {
assert.Equal(t, metricdata.CumulativeTemporality, DefaultTemporalitySelector(ik))
}
}

type notComparable [0]func() // nolint:unused // non-comparable type itself is used.

type noCompareReader struct {
notComparable // nolint:unused // non-comparable type itself is used.
Reader
}

func TestReadersNotRequiredToBeComparable(t *testing.T) {
r := noCompareReader{Reader: NewManualReader()}
assert.NotPanics(t, func() { _ = NewMeterProvider(WithReader(r)) })
}