Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use default view if instrument does not match any pipeline view #3237

Merged
merged 10 commits into from Oct 7, 2022
4 changes: 4 additions & 0 deletions CHANGELOG.md
Expand Up @@ -19,6 +19,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Upgrade `golang.org/x/sys/unix` from `v0.0.0-20210423185535-09eb48e85fd7` to `v0.0.0-20220919091848-fb04ddd9f9c8`.
This addresses [GO-2022-0493](https://pkg.go.dev/vuln/GO-2022-0493). (#3235)

### Fixed

- Use default view if instrument does not match any registered view of a reader. (#3224, #3237)

## [0.32.1] Metric SDK (Alpha) - 2022-09-22

### Changed
Expand Down
4 changes: 0 additions & 4 deletions sdk/metric/config.go
Expand Up @@ -126,10 +126,6 @@ func WithReader(r Reader, views ...view.View) Option {
if cfg.readers == nil {
cfg.readers = make(map[Reader][]view.View)
}
if len(views) == 0 {
views = []view.View{{}}
}

cfg.readers[r] = views
return cfg
})
Expand Down
60 changes: 41 additions & 19 deletions sdk/metric/pipeline.go
Expand Up @@ -188,31 +188,23 @@ func newInserter[N int64 | float64](p *pipeline) *inserter[N] {
// Instrument inserts instrument inst with instUnit returning the Aggregators
// that need to be updated with measurments for that instrument.
func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) {
var matched bool
seen := map[instrumentID]struct{}{}
var aggs []internal.Aggregator[N]
errs := &multierror{wrapped: errCreatingAggregators}
for _, v := range i.pipeline.views {
inst, match := v.TransformInstrument(inst)
if !match {
continue
}
matched = true

id := instrumentID{
scope: inst.Scope,
name: inst.Name,
description: inst.Description,
}

if _, ok := seen[id]; ok || !match {
continue
}

if inst.Aggregation == nil {
inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind)
} else if _, ok := inst.Aggregation.(aggregation.Default); ok {
inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind)
}

if err := isAggregatorCompatible(inst.Kind, inst.Aggregation); err != nil {
err = fmt.Errorf("creating aggregator with instrumentKind: %d, aggregation %v: %w", inst.Kind, inst.Aggregation, err)
errs.append(err)
if _, ok := seen[id]; ok {
continue
}

Expand All @@ -224,25 +216,55 @@ func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]in
if agg == nil { // Drop aggregator.
continue
}
err = i.pipeline.addAggregator(inst.Scope, inst.Name, inst.Description, instUnit, agg)
if err != nil {
errs.append(err)
// Do not return the aggregator to be updated if the pipeline will
// never produce from it.
continue
}
// TODO (#3011): If filtering is done at the instrument level add here.
// This is where the aggregator and the view are both in scope.
aggs = append(aggs, agg)
seen[id] = struct{}{}
err = i.pipeline.addAggregator(inst.Scope, inst.Name, inst.Description, instUnit, agg)
}

if !matched { // Apply implicit default view if no explicit matched.
a, err := i.aggregator(inst)
if err != nil {
errs.append(err)
}
if a != nil {
err = i.pipeline.addAggregator(inst.Scope, inst.Name, inst.Description, instUnit, a)
if err != nil {
// Do not return the aggregator to be updated if the pipeline
// will never produce from it.
errs.append(err)
} else {
aggs = append(aggs, a)
}
}
}
// TODO(#3224): handle when no views match. Default should be reader
// aggregation returned.

return aggs, errs.errorOrNil()
}

// aggregator returns the Aggregator for an instrument configuration. If the
// instrument defines an unknown aggregation, an error is returned.
func (i *inserter[N]) aggregator(inst view.Instrument) (internal.Aggregator[N], error) {
// TODO (#3011): If filtering is done by the Aggregator it should be passed
// here.
switch inst.Aggregation.(type) {
case nil, aggregation.Default:
// Undefined, nil, means to use the default from the reader.
inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind)
}

if err := isAggregatorCompatible(inst.Kind, inst.Aggregation); err != nil {
return nil, fmt.Errorf(
"creating aggregator with instrumentKind: %d, aggregation %v: %w",
inst.Kind, inst.Aggregation, err,
)
}

var (
temporality = i.pipeline.reader.temporality(inst.Kind)
monotonic bool
Expand Down
61 changes: 61 additions & 0 deletions sdk/metric/pipeline_test.go
Expand Up @@ -25,7 +25,10 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/unit"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
"go.opentelemetry.io/otel/sdk/metric/view"
"go.opentelemetry.io/otel/sdk/resource"
)

Expand Down Expand Up @@ -211,3 +214,61 @@ func TestPipelineConcurrency(t *testing.T) {
}
wg.Wait()
}

func TestDefaultViewImplicit(t *testing.T) {
t.Run("Int64", testDefaultViewImplicit[int64]())
t.Run("Float64", testDefaultViewImplicit[float64]())
}

func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) {
inst := view.Instrument{
Scope: instrumentation.Scope{Name: "testing/lib"},
Name: "requests",
Description: "count of requests received",
Kind: view.SyncCounter,
Aggregation: aggregation.Sum{},
}
return func(t *testing.T) {
reader := NewManualReader()
v, err := view.New(view.MatchInstrumentName("foo"), view.WithRename("bar"))
require.NoError(t, err)

tests := []struct {
name string
pipe *pipeline
}{
{
name: "NoView",
pipe: newPipeline(nil, reader, nil),
},
{
name: "NoMatchingView",
pipe: newPipeline(nil, reader, []view.View{v}),
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
i := newInserter[N](test.pipe)
got, err := i.Instrument(inst, unit.Dimensionless)
require.NoError(t, err)
assert.Len(t, got, 1, "default view not applied")

out, err := test.pipe.produce(context.Background())
require.NoError(t, err)
require.Len(t, out.ScopeMetrics, 1, "Aggregator not registered with pipeline")
sm := out.ScopeMetrics[0]
require.Len(t, sm.Metrics, 1, "metrics not produced from default view")
metricdatatest.AssertEqual(t, metricdata.Metrics{
Name: inst.Name,
Description: inst.Description,
Unit: unit.Dimensionless,
Data: metricdata.Sum[N]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
}, sm.Metrics[0], metricdatatest.IgnoreTimestamp())
})
}
}
}