From eb4369afea457399c3fd28065a1f3763cb34bc9d Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 23 Sep 2022 08:28:55 -0700 Subject: [PATCH 01/14] Add views field to pipeline Redundant maps tracking readers to views and readers to pipelines exist in the pipelineRegistry. Unify these maps by tracing views in pipelines. --- sdk/metric/pipeline.go | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 6783aef5d0f..8d05ac30386 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -61,6 +61,8 @@ func newPipeline(res *resource.Resource) *pipeline { type pipeline struct { resource *resource.Resource + views []view.View + sync.Mutex aggregations map[instrumentation.Scope]map[instrumentKey]instrumentValue callbacks []func(context.Context) @@ -158,19 +160,17 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err // pipelineRegistry manages creating pipelines, and aggregators. Meters retrieve // new Aggregators from a pipelineRegistry. type pipelineRegistry struct { - views map[Reader][]view.View pipelines map[Reader]*pipeline } -func newPipelineRegistries(res *resource.Resource, views map[Reader][]view.View) *pipelineRegistry { - pipelines := map[Reader]*pipeline{} - for rdr := range views { - pipe := &pipeline{resource: res} - rdr.register(pipe) - pipelines[rdr] = pipe +func newPipelineRegistries(res *resource.Resource, readers map[Reader][]view.View) *pipelineRegistry { + pipelines := make(map[Reader]*pipeline, len(readers)) + for r, v := range readers { + pipe := &pipeline{resource: res, views: v} + r.register(pipe) + pipelines[r] = pipe } return &pipelineRegistry{ - views: views, pipelines: pipelines, } } @@ -189,9 +189,8 @@ func createAggregators[N int64 | float64](reg *pipelineRegistry, inst view.Instr var aggs []internal.Aggregator[N] errs := &multierror{} - for rdr, views := range reg.views { - pipe := reg.pipelines[rdr] - rdrAggs, err := createAggregatorsForReader[N](rdr, views, inst) + for rdr, pipe := range reg.pipelines { + rdrAggs, err := createAggregatorsForReader[N](rdr, pipe.views, inst) if err != nil { errs.append(err) } From 48905fca1a7ec5d13fa749c982b9fe009e2a18c8 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 23 Sep 2022 08:30:24 -0700 Subject: [PATCH 02/14] Rename newPipelineRegistries->newPipelineRegistry --- sdk/metric/pipeline.go | 2 +- sdk/metric/pipeline_registry_test.go | 12 ++++++------ sdk/metric/provider.go | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 8d05ac30386..00b2b0d12ae 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -163,7 +163,7 @@ type pipelineRegistry struct { pipelines map[Reader]*pipeline } -func newPipelineRegistries(res *resource.Resource, readers map[Reader][]view.View) *pipelineRegistry { +func newPipelineRegistry(res *resource.Resource, readers map[Reader][]view.View) *pipelineRegistry { pipelines := make(map[Reader]*pipeline, len(readers)) for r, v := range readers { pipe := &pipeline{resource: res, views: v} diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index 1182eeca06a..401bbf8de95 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -323,9 +323,9 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { - reg := newPipelineRegistries(resource.Empty(), tt.views) + reg := newPipelineRegistry(resource.Empty(), tt.views) testPipelineRegistryCreateIntAggregators(t, reg, tt.wantCount) - reg = newPipelineRegistries(resource.Empty(), tt.views) + reg = newPipelineRegistry(resource.Empty(), tt.views) testPipelineRegistryCreateFloatAggregators(t, reg, tt.wantCount) }) } @@ -356,7 +356,7 @@ func TestPipelineRegistryResource(t *testing.T) { NewManualReader(): {{}, v}, } res := resource.NewSchemaless(attribute.String("key", "val")) - reg := newPipelineRegistries(res, views) + reg := newPipelineRegistry(res, views) for _, p := range reg.pipelines { assert.True(t, res.Equal(p.resource), "resource not set") } @@ -370,14 +370,14 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { {}, }, } - reg := newPipelineRegistries(resource.Empty(), views) + reg := newPipelineRegistry(resource.Empty(), views) inst := view.Instrument{Name: "foo", Kind: view.AsyncGauge} intAggs, err := createAggregators[int64](reg, inst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, intAggs, 0) - reg = newPipelineRegistries(resource.Empty(), views) + reg = newPipelineRegistry(resource.Empty(), views) floatAggs, err := createAggregators[float64](reg, inst, unit.Dimensionless) assert.Error(t, err) @@ -399,7 +399,7 @@ func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) { fooInst := view.Instrument{Name: "foo", Kind: view.SyncCounter} barInst := view.Instrument{Name: "bar", Kind: view.SyncCounter} - reg := newPipelineRegistries(resource.Empty(), views) + reg := newPipelineRegistry(resource.Empty(), views) intAggs, err := createAggregators[int64](reg, fooInst, unit.Dimensionless) assert.NoError(t, err) diff --git a/sdk/metric/provider.go b/sdk/metric/provider.go index d3a940bce58..9a84d7385d1 100644 --- a/sdk/metric/provider.go +++ b/sdk/metric/provider.go @@ -48,7 +48,7 @@ func NewMeterProvider(options ...Option) *MeterProvider { flush, sdown := conf.readerSignals() - registry := newPipelineRegistries(conf.res, conf.readers) + registry := newPipelineRegistry(conf.res, conf.readers) return &MeterProvider{ res: conf.res, From 46a4ef733334c90b9f86de88adfe68af209157c4 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 23 Sep 2022 08:48:12 -0700 Subject: [PATCH 03/14] Add Reader as field to pipeline --- sdk/metric/pipeline.go | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 00b2b0d12ae..78f24a2e950 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -61,7 +61,8 @@ func newPipeline(res *resource.Resource) *pipeline { type pipeline struct { resource *resource.Resource - views []view.View + reader Reader + views []view.View sync.Mutex aggregations map[instrumentation.Scope]map[instrumentKey]instrumentValue @@ -160,19 +161,21 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err // pipelineRegistry manages creating pipelines, and aggregators. Meters retrieve // new Aggregators from a pipelineRegistry. type pipelineRegistry struct { - pipelines map[Reader]*pipeline + pipelines []*pipeline } func newPipelineRegistry(res *resource.Resource, readers map[Reader][]view.View) *pipelineRegistry { - pipelines := make(map[Reader]*pipeline, len(readers)) + pipelines := make([]*pipeline, 0, len(readers)) for r, v := range readers { - pipe := &pipeline{resource: res, views: v} + pipe := &pipeline{ + resource: res, + reader: r, + views: v, + } r.register(pipe) - pipelines[r] = pipe - } - return &pipelineRegistry{ - pipelines: pipelines, + pipelines = append(pipelines, pipe) } + return &pipelineRegistry{pipelines} } // TODO (#3053) Only register callbacks if any instrument matches in a view. @@ -189,8 +192,8 @@ func createAggregators[N int64 | float64](reg *pipelineRegistry, inst view.Instr var aggs []internal.Aggregator[N] errs := &multierror{} - for rdr, pipe := range reg.pipelines { - rdrAggs, err := createAggregatorsForReader[N](rdr, pipe.views, inst) + for _, pipe := range reg.pipelines { + rdrAggs, err := createAggregatorsForReader[N](pipe.reader, pipe.views, inst) if err != nil { errs.append(err) } From 9c08765d03e62d5f702eec99c2b09044b14a7fa3 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 23 Sep 2022 09:14:44 -0700 Subject: [PATCH 04/14] Replace createAggregators with resolver facilitator --- sdk/metric/instrument_provider.go | 32 ++++++++++++++-------------- sdk/metric/meter.go | 8 +++---- sdk/metric/pipeline.go | 19 ++++++++++++----- sdk/metric/pipeline_registry_test.go | 26 +++++++++++++--------- 4 files changed, 50 insertions(+), 35 deletions(-) diff --git a/sdk/metric/instrument_provider.go b/sdk/metric/instrument_provider.go index ad215a853b2..1074baac78f 100644 --- a/sdk/metric/instrument_provider.go +++ b/sdk/metric/instrument_provider.go @@ -28,7 +28,7 @@ import ( type asyncInt64Provider struct { scope instrumentation.Scope - registry *pipelineRegistry + resolver *resolver[int64] } var _ asyncint64.InstrumentProvider = asyncInt64Provider{} @@ -37,7 +37,7 @@ var _ asyncint64.InstrumentProvider = asyncInt64Provider{} func (p asyncInt64Provider) Counter(name string, opts ...instrument.Option) (asyncint64.Counter, error) { cfg := instrument.NewConfig(opts...) - aggs, err := createAggregators[int64](p.registry, view.Instrument{ + aggs, err := p.resolver.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -56,7 +56,7 @@ func (p asyncInt64Provider) Counter(name string, opts ...instrument.Option) (asy func (p asyncInt64Provider) UpDownCounter(name string, opts ...instrument.Option) (asyncint64.UpDownCounter, error) { cfg := instrument.NewConfig(opts...) - aggs, err := createAggregators[int64](p.registry, view.Instrument{ + aggs, err := p.resolver.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -74,7 +74,7 @@ func (p asyncInt64Provider) UpDownCounter(name string, opts ...instrument.Option func (p asyncInt64Provider) Gauge(name string, opts ...instrument.Option) (asyncint64.Gauge, error) { cfg := instrument.NewConfig(opts...) - aggs, err := createAggregators[int64](p.registry, view.Instrument{ + aggs, err := p.resolver.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -90,7 +90,7 @@ func (p asyncInt64Provider) Gauge(name string, opts ...instrument.Option) (async type asyncFloat64Provider struct { scope instrumentation.Scope - registry *pipelineRegistry + resolver *resolver[float64] } var _ asyncfloat64.InstrumentProvider = asyncFloat64Provider{} @@ -99,7 +99,7 @@ var _ asyncfloat64.InstrumentProvider = asyncFloat64Provider{} func (p asyncFloat64Provider) Counter(name string, opts ...instrument.Option) (asyncfloat64.Counter, error) { cfg := instrument.NewConfig(opts...) - aggs, err := createAggregators[float64](p.registry, view.Instrument{ + aggs, err := p.resolver.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -117,7 +117,7 @@ func (p asyncFloat64Provider) Counter(name string, opts ...instrument.Option) (a func (p asyncFloat64Provider) UpDownCounter(name string, opts ...instrument.Option) (asyncfloat64.UpDownCounter, error) { cfg := instrument.NewConfig(opts...) - aggs, err := createAggregators[float64](p.registry, view.Instrument{ + aggs, err := p.resolver.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -135,7 +135,7 @@ func (p asyncFloat64Provider) UpDownCounter(name string, opts ...instrument.Opti func (p asyncFloat64Provider) Gauge(name string, opts ...instrument.Option) (asyncfloat64.Gauge, error) { cfg := instrument.NewConfig(opts...) - aggs, err := createAggregators[float64](p.registry, view.Instrument{ + aggs, err := p.resolver.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -151,7 +151,7 @@ func (p asyncFloat64Provider) Gauge(name string, opts ...instrument.Option) (asy type syncInt64Provider struct { scope instrumentation.Scope - registry *pipelineRegistry + resolver *resolver[int64] } var _ syncint64.InstrumentProvider = syncInt64Provider{} @@ -160,7 +160,7 @@ var _ syncint64.InstrumentProvider = syncInt64Provider{} func (p syncInt64Provider) Counter(name string, opts ...instrument.Option) (syncint64.Counter, error) { cfg := instrument.NewConfig(opts...) - aggs, err := createAggregators[int64](p.registry, view.Instrument{ + aggs, err := p.resolver.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -178,7 +178,7 @@ func (p syncInt64Provider) Counter(name string, opts ...instrument.Option) (sync func (p syncInt64Provider) UpDownCounter(name string, opts ...instrument.Option) (syncint64.UpDownCounter, error) { cfg := instrument.NewConfig(opts...) - aggs, err := createAggregators[int64](p.registry, view.Instrument{ + aggs, err := p.resolver.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -196,7 +196,7 @@ func (p syncInt64Provider) UpDownCounter(name string, opts ...instrument.Option) func (p syncInt64Provider) Histogram(name string, opts ...instrument.Option) (syncint64.Histogram, error) { cfg := instrument.NewConfig(opts...) - aggs, err := createAggregators[int64](p.registry, view.Instrument{ + aggs, err := p.resolver.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -212,7 +212,7 @@ func (p syncInt64Provider) Histogram(name string, opts ...instrument.Option) (sy type syncFloat64Provider struct { scope instrumentation.Scope - registry *pipelineRegistry + resolver *resolver[float64] } var _ syncfloat64.InstrumentProvider = syncFloat64Provider{} @@ -221,7 +221,7 @@ var _ syncfloat64.InstrumentProvider = syncFloat64Provider{} func (p syncFloat64Provider) Counter(name string, opts ...instrument.Option) (syncfloat64.Counter, error) { cfg := instrument.NewConfig(opts...) - aggs, err := createAggregators[float64](p.registry, view.Instrument{ + aggs, err := p.resolver.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -239,7 +239,7 @@ func (p syncFloat64Provider) Counter(name string, opts ...instrument.Option) (sy func (p syncFloat64Provider) UpDownCounter(name string, opts ...instrument.Option) (syncfloat64.UpDownCounter, error) { cfg := instrument.NewConfig(opts...) - aggs, err := createAggregators[float64](p.registry, view.Instrument{ + aggs, err := p.resolver.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -257,7 +257,7 @@ func (p syncFloat64Provider) UpDownCounter(name string, opts ...instrument.Optio func (p syncFloat64Provider) Histogram(name string, opts ...instrument.Option) (syncfloat64.Histogram, error) { cfg := instrument.NewConfig(opts...) - aggs, err := createAggregators[float64](p.registry, view.Instrument{ + aggs, err := p.resolver.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index c38777b69d5..d5d897c1907 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -106,12 +106,12 @@ var _ metric.Meter = (*meter)(nil) // AsyncInt64 returns the asynchronous integer instrument provider. func (m *meter) AsyncInt64() asyncint64.InstrumentProvider { - return asyncInt64Provider{scope: m.Scope, registry: m.registry} + return asyncInt64Provider{scope: m.Scope, resolver: newResolver[int64](m.registry)} } // AsyncFloat64 returns the asynchronous floating-point instrument provider. func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider { - return asyncFloat64Provider{scope: m.Scope, registry: m.registry} + return asyncFloat64Provider{scope: m.Scope, resolver: newResolver[float64](m.registry)} } // RegisterCallback registers the function f to be called when any of the @@ -123,10 +123,10 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context // SyncInt64 returns the synchronous integer instrument provider. func (m *meter) SyncInt64() syncint64.InstrumentProvider { - return syncInt64Provider{scope: m.Scope, registry: m.registry} + return syncInt64Provider{scope: m.Scope, resolver: newResolver[int64](m.registry)} } // SyncFloat64 returns the synchronous floating-point instrument provider. func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider { - return syncFloat64Provider{scope: m.Scope, registry: m.registry} + return syncFloat64Provider{scope: m.Scope, resolver: newResolver[float64](m.registry)} } diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 78f24a2e950..1b640663050 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -185,14 +185,23 @@ func (reg *pipelineRegistry) registerCallback(fn func(context.Context)) { } } -// createAggregators will create all backing aggregators for an instrument. -// It will return an error if an instrument is registered more than once. -// Note: There may be returned aggregators with an error. -func createAggregators[N int64 | float64](reg *pipelineRegistry, inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) { +// resolver resolves Aggregators an instrument needs to aggregate measurments +// with while updating all pipelines that need to pull from those aggregations. +type resolver[N int64 | float64] struct { + reg *pipelineRegistry +} + +func newResolver[N int64 | float64](p *pipelineRegistry) *resolver[N] { + return &resolver[N]{p} +} + +// Aggregators returns the Aggregators instrument inst needs to update when it +// makes a measurment. +func (r *resolver[N]) Aggregators(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) { var aggs []internal.Aggregator[N] errs := &multierror{} - for _, pipe := range reg.pipelines { + for _, pipe := range r.reg.pipelines { rdrAggs, err := createAggregatorsForReader[N](pipe.reader, pipe.views, inst) if err != nil { errs.append(err) diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index 401bbf8de95..fe0351c175c 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -331,10 +331,11 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { } } -func testPipelineRegistryCreateIntAggregators(t *testing.T, reg *pipelineRegistry, wantCount int) { +func testPipelineRegistryResolveIntAggregators(t *testing.T, reg *pipelineRegistry, wantCount int) { inst := view.Instrument{Name: "foo", Kind: view.SyncCounter} - aggs, err := createAggregators[int64](reg, inst, unit.Dimensionless) + r := newResolver[int64](reg) + aggs, err := r.Aggregators(inst, unit.Dimensionless) assert.NoError(t, err) require.Len(t, aggs, wantCount) @@ -343,7 +344,8 @@ func testPipelineRegistryCreateIntAggregators(t *testing.T, reg *pipelineRegistr func testPipelineRegistryCreateFloatAggregators(t *testing.T, reg *pipelineRegistry, wantCount int) { inst := view.Instrument{Name: "foo", Kind: view.SyncCounter} - aggs, err := createAggregators[float64](reg, inst, unit.Dimensionless) + r := newResolver[float64](reg) + aggs, err := r.Aggregators(inst, unit.Dimensionless) assert.NoError(t, err) require.Len(t, aggs, wantCount) @@ -373,13 +375,15 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { reg := newPipelineRegistry(resource.Empty(), views) inst := view.Instrument{Name: "foo", Kind: view.AsyncGauge} - intAggs, err := createAggregators[int64](reg, inst, unit.Dimensionless) + ri := newResolver[int64](reg) + intAggs, err := ri.Aggregators(inst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, intAggs, 0) reg = newPipelineRegistry(resource.Empty(), views) - floatAggs, err := createAggregators[float64](reg, inst, unit.Dimensionless) + rf := newResolver[float64](reg) + floatAggs, err := rf.Aggregators(inst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, floatAggs, 0) } @@ -401,26 +405,28 @@ func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) { reg := newPipelineRegistry(resource.Empty(), views) - intAggs, err := createAggregators[int64](reg, fooInst, unit.Dimensionless) + ri := newResolver[int64](reg) + intAggs, err := ri.Aggregators(fooInst, unit.Dimensionless) assert.NoError(t, err) assert.Len(t, intAggs, 1) // The Rename view should error, because it creates a foo instrument. - intAggs, err = createAggregators[int64](reg, barInst, unit.Dimensionless) + intAggs, err = ri.Aggregators(barInst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, intAggs, 2) // Creating a float foo instrument should error because there is an int foo instrument. - floatAggs, err := createAggregators[float64](reg, fooInst, unit.Dimensionless) + rf := newResolver[float64](reg) + floatAggs, err := rf.Aggregators(fooInst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, floatAggs, 1) fooInst = view.Instrument{Name: "foo-float", Kind: view.SyncCounter} - _, err = createAggregators[float64](reg, fooInst, unit.Dimensionless) + _, err = rf.Aggregators(fooInst, unit.Dimensionless) assert.NoError(t, err) - floatAggs, err = createAggregators[float64](reg, barInst, unit.Dimensionless) + floatAggs, err = rf.Aggregators(barInst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, floatAggs, 2) } From ba1433d1f46fc617513e38ada394e9b94dff5912 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 23 Sep 2022 10:07:08 -0700 Subject: [PATCH 05/14] Replace create agg funcs with inserter facilitator --- sdk/metric/pipeline.go | 265 +++++++++++++++------------ sdk/metric/pipeline_registry_test.go | 14 +- sdk/metric/pipeline_test.go | 8 +- 3 files changed, 159 insertions(+), 128 deletions(-) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 1b640663050..5726cf8a8bd 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -30,9 +30,25 @@ import ( "go.opentelemetry.io/otel/sdk/resource" ) +var ( + errCreatingAggregators = errors.New("could not create all aggregators") + errIncompatibleAggregation = errors.New("incompatible aggregation") + errUnknownAggregation = errors.New("unrecognized aggregation") +) + type aggregator interface { Aggregation() metricdata.Aggregation } + +// instrumentID is used to identify multiple instruments being mapped to the +// same aggregator. e.g. using an exact match view with a name=* view. You +// can't use a view.Instrument here because not all Aggregators are comparable. +type instrumentID struct { + scope instrumentation.Scope + name string + description string +} + type instrumentKey struct { name string unit unit.Unit @@ -43,12 +59,14 @@ type instrumentValue struct { aggregator aggregator } -func newPipeline(res *resource.Resource) *pipeline { +func newPipeline(res *resource.Resource, reader Reader, views []view.View) *pipeline { if res == nil { res = resource.Empty() } return &pipeline{ resource: res, + reader: reader, + views: views, aggregations: make(map[instrumentation.Scope]map[instrumentKey]instrumentValue), } } @@ -158,114 +176,38 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err }, nil } -// pipelineRegistry manages creating pipelines, and aggregators. Meters retrieve -// new Aggregators from a pipelineRegistry. -type pipelineRegistry struct { - pipelines []*pipeline +// inserter inserts new instruments into a pipeline. +type inserter[N int64 | float64] struct { + pipeline *pipeline } -func newPipelineRegistry(res *resource.Resource, readers map[Reader][]view.View) *pipelineRegistry { - pipelines := make([]*pipeline, 0, len(readers)) - for r, v := range readers { - pipe := &pipeline{ - resource: res, - reader: r, - views: v, - } - r.register(pipe) - pipelines = append(pipelines, pipe) - } - return &pipelineRegistry{pipelines} +func newInserter[N int64 | float64](p *pipeline) *inserter[N] { + return &inserter[N]{p} } -// TODO (#3053) Only register callbacks if any instrument matches in a view. -func (reg *pipelineRegistry) registerCallback(fn func(context.Context)) { - for _, pipe := range reg.pipelines { - pipe.addCallback(fn) - } -} - -// resolver resolves Aggregators an instrument needs to aggregate measurments -// with while updating all pipelines that need to pull from those aggregations. -type resolver[N int64 | float64] struct { - reg *pipelineRegistry -} - -func newResolver[N int64 | float64](p *pipelineRegistry) *resolver[N] { - return &resolver[N]{p} -} - -// Aggregators returns the Aggregators instrument inst needs to update when it -// makes a measurment. -func (r *resolver[N]) Aggregators(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) { +// Instrument inserts instrument inst with instUnit returning the Aggregators +// that need to be updated with measurments for that instrument. +func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) { + seen := map[instrumentID]struct{}{} var aggs []internal.Aggregator[N] - - errs := &multierror{} - for _, pipe := range r.reg.pipelines { - rdrAggs, err := createAggregatorsForReader[N](pipe.reader, pipe.views, inst) - if err != nil { - errs.append(err) - } - for inst, agg := range rdrAggs { - err := pipe.addAggregator(inst.scope, inst.name, inst.description, instUnit, agg) - if err != nil { - errs.append(err) - } - aggs = append(aggs, agg) - } - } - return aggs, errs.errorOrNil() -} - -type multierror struct { - wrapped error - errors []string -} - -func (m *multierror) errorOrNil() error { - if len(m.errors) == 0 { - return nil - } - return fmt.Errorf("%w: %s", m.wrapped, strings.Join(m.errors, "; ")) -} - -func (m *multierror) append(err error) { - m.errors = append(m.errors, err.Error()) -} - -// instrumentID is used to identify multiple instruments being mapped to the same aggregator. -// e.g. using an exact match view with a name=* view. -// You can't use a view.Instrument here because not all Aggregators are comparable. -type instrumentID struct { - scope instrumentation.Scope - name string - description string -} - -var errCreatingAggregators = errors.New("could not create all aggregators") - -func createAggregatorsForReader[N int64 | float64](rdr Reader, views []view.View, inst view.Instrument) (map[instrumentID]internal.Aggregator[N], error) { - aggs := map[instrumentID]internal.Aggregator[N]{} - errs := &multierror{ - wrapped: errCreatingAggregators, - } - for _, v := range views { + errs := &multierror{wrapped: errCreatingAggregators} + for _, v := range i.pipeline.views { inst, match := v.TransformInstrument(inst) - ident := instrumentID{ + id := instrumentID{ scope: inst.Scope, name: inst.Name, description: inst.Description, } - if _, ok := aggs[ident]; ok || !match { + if _, ok := seen[id]; ok || !match { continue } if inst.Aggregation == nil { - inst.Aggregation = rdr.aggregation(inst.Kind) + inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind) } else if _, ok := inst.Aggregation.(aggregation.Default); ok { - inst.Aggregation = rdr.aggregation(inst.Kind) + inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind) } if err := isAggregatorCompatible(inst.Kind, inst.Aggregation); err != nil { @@ -274,49 +216,59 @@ func createAggregatorsForReader[N int64 | float64](rdr Reader, views []view.View continue } - agg := createAggregator[N](inst.Aggregation, rdr.temporality(inst.Kind), isMonotonic(inst.Kind)) - if agg != nil { + agg, err := i.aggregator(inst) + if err != nil { + errs.append(err) + continue + } + if agg != nil { // Not a drop aggregation. // TODO (#3011): If filtering is done at the instrument level add here. // This is where the aggregator and the view are both in scope. - aggs[ident] = agg + aggs = append(aggs, agg) + } + seen[id] = struct{}{} + err = i.pipeline.addAggregator(inst.Scope, inst.Name, inst.Description, instUnit, agg) + if err != nil { + errs.append(err) } } + // FIXME: handle when no views match. Default should be reader agg returned. return aggs, errs.errorOrNil() } -func isMonotonic(kind view.InstrumentKind) bool { - switch kind { - case view.AsyncCounter, view.SyncCounter, view.SyncHistogram: - return true - } - return false -} - -// createAggregator takes the config (Aggregation and Temporality) and produces a memory backed Aggregator. -// TODO (#3011): If filterting is done by the Aggregator it should be passed here. -func createAggregator[N int64 | float64](agg aggregation.Aggregation, temporality metricdata.Temporality, monotonic bool) internal.Aggregator[N] { - switch agg := agg.(type) { +// aggregator returns the Aggregator for an instrument configuration. If the +// instrument defines an unknown aggreation, an error is returned. +func (i *inserter[N]) aggregator(inst view.Instrument) (internal.Aggregator[N], error) { + // TODO (#3011): If filterting is done by the Aggregator it should be + // passed here. + temporality := i.pipeline.reader.temporality(inst.Kind) + monotonic := isMonotonic(inst.Kind) + switch agg := inst.Aggregation.(type) { case aggregation.Drop: - return nil + return nil, nil case aggregation.LastValue: - return internal.NewLastValue[N]() + return internal.NewLastValue[N](), nil case aggregation.Sum: if temporality == metricdata.CumulativeTemporality { - return internal.NewCumulativeSum[N](monotonic) + return internal.NewCumulativeSum[N](monotonic), nil } - return internal.NewDeltaSum[N](monotonic) + return internal.NewDeltaSum[N](monotonic), nil case aggregation.ExplicitBucketHistogram: if temporality == metricdata.CumulativeTemporality { - return internal.NewCumulativeHistogram[N](agg) + return internal.NewCumulativeHistogram[N](agg), nil } - return internal.NewDeltaHistogram[N](agg) + return internal.NewDeltaHistogram[N](agg), nil } - return nil + return nil, errUnknownAggregation } -// TODO: review need for aggregation check after https://github.com/open-telemetry/opentelemetry-specification/issues/2710 -var errIncompatibleAggregation = errors.New("incompatible aggregation") -var errUnknownAggregation = errors.New("unrecognized aggregation") +func isMonotonic(kind view.InstrumentKind) bool { + switch kind { + case view.AsyncCounter, view.SyncCounter, view.SyncHistogram: + return true + } + return false +} // is aggregatorCompatible checks if the aggregation can be used by the instrument. // Current compatibility: @@ -335,18 +287,24 @@ func isAggregatorCompatible(kind view.InstrumentKind, agg aggregation.Aggregatio if kind == view.SyncCounter || kind == view.SyncHistogram { return nil } + // TODO: review need for aggregation check after + // https://github.com/open-telemetry/opentelemetry-specification/issues/2710 return errIncompatibleAggregation case aggregation.Sum: switch kind { case view.AsyncCounter, view.AsyncUpDownCounter, view.SyncCounter, view.SyncHistogram, view.SyncUpDownCounter: return nil default: + // TODO: review need for aggregation check after + // https://github.com/open-telemetry/opentelemetry-specification/issues/2710 return errIncompatibleAggregation } case aggregation.LastValue: if kind == view.AsyncGauge { return nil } + // TODO: review need for aggregation check after + // https://github.com/open-telemetry/opentelemetry-specification/issues/2710 return errIncompatibleAggregation case aggregation.Drop: return nil @@ -355,3 +313,76 @@ func isAggregatorCompatible(kind view.InstrumentKind, agg aggregation.Aggregatio return fmt.Errorf("%w: %v", errUnknownAggregation, agg) } } + +// pipelineRegistry manages creating pipelines, and aggregators. Meters retrieve +// new Aggregators from a pipelineRegistry. +type pipelineRegistry struct { + pipelines []*pipeline +} + +func newPipelineRegistry(res *resource.Resource, readers map[Reader][]view.View) *pipelineRegistry { + pipelines := make([]*pipeline, 0, len(readers)) + for r, v := range readers { + pipe := &pipeline{ + resource: res, + reader: r, + views: v, + } + r.register(pipe) + pipelines = append(pipelines, pipe) + } + return &pipelineRegistry{pipelines} +} + +// TODO (#3053) Only register callbacks if any instrument matches in a view. +func (reg *pipelineRegistry) registerCallback(fn func(context.Context)) { + for _, pipe := range reg.pipelines { + pipe.addCallback(fn) + } +} + +// resolver resolves Aggregators an instrument needs to aggregate measurments +// with while updating all pipelines that need to pull from those aggregations. +type resolver[N int64 | float64] struct { + inserters []*inserter[N] +} + +func newResolver[N int64 | float64](p *pipelineRegistry) *resolver[N] { + in := make([]*inserter[N], len(p.pipelines)) + for i := range in { + in[i] = newInserter[N](p.pipelines[i]) + } + return &resolver[N]{in} +} + +// Aggregators returns the Aggregators instrument inst needs to update when it +// makes a measurment. +func (r *resolver[N]) Aggregators(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) { + var aggs []internal.Aggregator[N] + + errs := &multierror{} + for _, i := range r.inserters { + a, err := i.Instrument(inst, instUnit) + if err != nil { + errs.append(err) + } + aggs = append(aggs, a...) + } + return aggs, errs.errorOrNil() +} + +type multierror struct { + wrapped error + errors []string +} + +func (m *multierror) errorOrNil() error { + if len(m.errors) == 0 { + return nil + } + return fmt.Errorf("%w: %s", m.wrapped, strings.Join(m.errors, "; ")) +} + +func (m *multierror) append(err error) { + m.errors = append(m.errors, err.Error()) +} diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index fe0351c175c..4d54a94e6e6 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -211,7 +211,8 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { } for _, tt := range testcases { t.Run(tt.name, func(t *testing.T) { - got, err := createAggregatorsForReader[N](tt.reader, tt.views, tt.inst) + i := newInserter[N](newPipeline(nil, tt.reader, tt.views)) + got, err := i.Instrument(tt.inst, unit.Dimensionless) assert.ErrorIs(t, err, tt.wantErr) require.Len(t, got, tt.wantLen) for _, agg := range got { @@ -222,13 +223,12 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { } func testInvalidInstrumentShouldPanic[N int64 | float64]() { - reader := NewManualReader() - views := []view.View{{}} + i := newInserter[N](newPipeline(nil, NewManualReader(), []view.View{{}})) inst := view.Instrument{ Name: "foo", Kind: view.InstrumentKind(255), } - _, _ = createAggregatorsForReader[N](reader, views, inst) + _, _ = i.Instrument(inst, unit.Dimensionless) } func TestInvalidInstrumentShouldPanic(t *testing.T) { @@ -324,9 +324,9 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { reg := newPipelineRegistry(resource.Empty(), tt.views) - testPipelineRegistryCreateIntAggregators(t, reg, tt.wantCount) + testPipelineRegistryResolveIntAggregators(t, reg, tt.wantCount) reg = newPipelineRegistry(resource.Empty(), tt.views) - testPipelineRegistryCreateFloatAggregators(t, reg, tt.wantCount) + testPipelineRegistryResolveFloatAggregators(t, reg, tt.wantCount) }) } } @@ -341,7 +341,7 @@ func testPipelineRegistryResolveIntAggregators(t *testing.T, reg *pipelineRegist require.Len(t, aggs, wantCount) } -func testPipelineRegistryCreateFloatAggregators(t *testing.T, reg *pipelineRegistry, wantCount int) { +func testPipelineRegistryResolveFloatAggregators(t *testing.T, reg *pipelineRegistry, wantCount int) { inst := view.Instrument{Name: "foo", Kind: view.SyncCounter} r := newResolver[float64](reg) diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index 52183360fd3..ca83c9c3a9e 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -61,7 +61,7 @@ func TestEmptyPipeline(t *testing.T) { } func TestNewPipeline(t *testing.T) { - pipe := newPipeline(nil) + pipe := newPipeline(nil, nil, nil) output, err := pipe.produce(context.Background()) require.NoError(t, err) @@ -158,7 +158,7 @@ func TestPipelineDuplicateRegistration(t *testing.T) { } for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { - pipe := newPipeline(nil) + pipe := newPipeline(nil, nil, nil) err := pipe.addAggregator(instrumentation.Scope{}, "name", "desc", unit.Dimensionless, testSumAggregator{}) require.NoError(t, err) @@ -177,7 +177,7 @@ func TestPipelineDuplicateRegistration(t *testing.T) { func TestPipelineUsesResource(t *testing.T) { res := resource.NewWithAttributes("noSchema", attribute.String("test", "resource")) - pipe := newPipeline(res) + pipe := newPipeline(res, nil, nil) output, err := pipe.produce(context.Background()) assert.NoError(t, err) @@ -185,7 +185,7 @@ func TestPipelineUsesResource(t *testing.T) { } func TestPipelineConcurrency(t *testing.T) { - pipe := newPipeline(nil) + pipe := newPipeline(nil, nil, nil) ctx := context.Background() var wg sync.WaitGroup From 638cea8cef481e358dd3ebd90bd6a19836f1a94f Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 23 Sep 2022 10:19:49 -0700 Subject: [PATCH 06/14] Correct documentation --- sdk/metric/pipeline.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 5726cf8a8bd..066faeb3125 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -176,7 +176,7 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err }, nil } -// inserter inserts new instruments into a pipeline. +// inserter facilitates inserting of new instruments into a pipeline. type inserter[N int64 | float64] struct { pipeline *pipeline } @@ -237,10 +237,10 @@ func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]in } // aggregator returns the Aggregator for an instrument configuration. If the -// instrument defines an unknown aggreation, an error is returned. +// instrument defines an unknown aggregation, an error is returned. func (i *inserter[N]) aggregator(inst view.Instrument) (internal.Aggregator[N], error) { - // TODO (#3011): If filterting is done by the Aggregator it should be - // passed here. + // TODO (#3011): If filtering is done by the Aggregator it should be passed + // here. temporality := i.pipeline.reader.temporality(inst.Kind) monotonic := isMonotonic(inst.Kind) switch agg := inst.Aggregation.(type) { @@ -270,7 +270,7 @@ func isMonotonic(kind view.InstrumentKind) bool { return false } -// is aggregatorCompatible checks if the aggregation can be used by the instrument. +// isAggregatorCompatible checks if the aggregation can be used by the instrument. // Current compatibility: // // | Instrument Kind | Drop | LastValue | Sum | Histogram | Exponential Histogram | @@ -341,8 +341,9 @@ func (reg *pipelineRegistry) registerCallback(fn func(context.Context)) { } } -// resolver resolves Aggregators an instrument needs to aggregate measurments -// with while updating all pipelines that need to pull from those aggregations. +// resolver facilitates resolves Aggregators an instrument needs to aggregate +// measurements with while updating all pipelines that need to pull from those +// aggregations. type resolver[N int64 | float64] struct { inserters []*inserter[N] } @@ -356,7 +357,7 @@ func newResolver[N int64 | float64](p *pipelineRegistry) *resolver[N] { } // Aggregators returns the Aggregators instrument inst needs to update when it -// makes a measurment. +// makes a measurement. func (r *resolver[N]) Aggregators(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) { var aggs []internal.Aggregator[N] From bcc091dd9d50c57e89cc65f4c18186bcbe23f128 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 23 Sep 2022 10:25:24 -0700 Subject: [PATCH 07/14] Replace pipelineRegistry with []pipeline type --- sdk/metric/meter.go | 22 +++++++++++----------- sdk/metric/pipeline.go | 30 ++++++++++++++---------------- sdk/metric/provider.go | 2 +- 3 files changed, 26 insertions(+), 28 deletions(-) diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index d5d897c1907..cc4da98d061 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -42,7 +42,7 @@ type meterRegistry struct { meters map[instrumentation.Scope]*meter - registry *pipelineRegistry + pipes pipelines } // Get returns a registered meter matching the instrumentation scope if it @@ -56,8 +56,8 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter { if r.meters == nil { m := &meter{ - Scope: s, - registry: r.registry, + Scope: s, + pipes: r.pipes, } r.meters = map[instrumentation.Scope]*meter{s: m} return m @@ -69,8 +69,8 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter { } m = &meter{ - Scope: s, - registry: r.registry, + Scope: s, + pipes: r.pipes, } r.meters[s] = m return m @@ -98,7 +98,7 @@ func (r *meterRegistry) Range(f func(*meter) bool) { type meter struct { instrumentation.Scope - registry *pipelineRegistry + pipes pipelines } // Compile-time check meter implements metric.Meter. @@ -106,27 +106,27 @@ var _ metric.Meter = (*meter)(nil) // AsyncInt64 returns the asynchronous integer instrument provider. func (m *meter) AsyncInt64() asyncint64.InstrumentProvider { - return asyncInt64Provider{scope: m.Scope, resolver: newResolver[int64](m.registry)} + return asyncInt64Provider{scope: m.Scope, resolver: newResolver[int64](m.pipes)} } // AsyncFloat64 returns the asynchronous floating-point instrument provider. func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider { - return asyncFloat64Provider{scope: m.Scope, resolver: newResolver[float64](m.registry)} + return asyncFloat64Provider{scope: m.Scope, resolver: newResolver[float64](m.pipes)} } // RegisterCallback registers the function f to be called when any of the // insts Collect method is called. func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) error { - m.registry.registerCallback(f) + m.pipes.registerCallback(f) return nil } // SyncInt64 returns the synchronous integer instrument provider. func (m *meter) SyncInt64() syncint64.InstrumentProvider { - return syncInt64Provider{scope: m.Scope, resolver: newResolver[int64](m.registry)} + return syncInt64Provider{scope: m.Scope, resolver: newResolver[int64](m.pipes)} } // SyncFloat64 returns the synchronous floating-point instrument provider. func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider { - return syncFloat64Provider{scope: m.Scope, resolver: newResolver[float64](m.registry)} + return syncFloat64Provider{scope: m.Scope, resolver: newResolver[float64](m.pipes)} } diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 066faeb3125..64986634a95 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -314,29 +314,27 @@ func isAggregatorCompatible(kind view.InstrumentKind, agg aggregation.Aggregatio } } -// pipelineRegistry manages creating pipelines, and aggregators. Meters retrieve -// new Aggregators from a pipelineRegistry. -type pipelineRegistry struct { - pipelines []*pipeline -} +// pipelines is the group of pipelines connecting Readers with instrument +// measurement. +type pipelines []*pipeline -func newPipelineRegistry(res *resource.Resource, readers map[Reader][]view.View) *pipelineRegistry { - pipelines := make([]*pipeline, 0, len(readers)) +func newPipelineRegistry(res *resource.Resource, readers map[Reader][]view.View) pipelines { + pipes := make([]*pipeline, 0, len(readers)) for r, v := range readers { - pipe := &pipeline{ + p := &pipeline{ resource: res, reader: r, views: v, } - r.register(pipe) - pipelines = append(pipelines, pipe) + r.register(p) + pipes = append(pipes, p) } - return &pipelineRegistry{pipelines} + return pipes } // TODO (#3053) Only register callbacks if any instrument matches in a view. -func (reg *pipelineRegistry) registerCallback(fn func(context.Context)) { - for _, pipe := range reg.pipelines { +func (p pipelines) registerCallback(fn func(context.Context)) { + for _, pipe := range p { pipe.addCallback(fn) } } @@ -348,10 +346,10 @@ type resolver[N int64 | float64] struct { inserters []*inserter[N] } -func newResolver[N int64 | float64](p *pipelineRegistry) *resolver[N] { - in := make([]*inserter[N], len(p.pipelines)) +func newResolver[N int64 | float64](p pipelines) *resolver[N] { + in := make([]*inserter[N], len(p)) for i := range in { - in[i] = newInserter[N](p.pipelines[i]) + in[i] = newInserter[N](p[i]) } return &resolver[N]{in} } diff --git a/sdk/metric/provider.go b/sdk/metric/provider.go index 9a84d7385d1..0d8338140f6 100644 --- a/sdk/metric/provider.go +++ b/sdk/metric/provider.go @@ -54,7 +54,7 @@ func NewMeterProvider(options ...Option) *MeterProvider { res: conf.res, meters: meterRegistry{ - registry: registry, + pipes: registry, }, forceFlush: flush, From 65dd4fec9e8635dbfc30dc7df0dbac7bffb37685 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 23 Sep 2022 10:35:09 -0700 Subject: [PATCH 08/14] Rename newPipelineRegistry->newPipelines --- sdk/metric/pipeline.go | 2 +- sdk/metric/pipeline_registry_test.go | 12 ++++++------ sdk/metric/provider.go | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 64986634a95..0a05c96765c 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -318,7 +318,7 @@ func isAggregatorCompatible(kind view.InstrumentKind, agg aggregation.Aggregatio // measurement. type pipelines []*pipeline -func newPipelineRegistry(res *resource.Resource, readers map[Reader][]view.View) pipelines { +func newPipelines(res *resource.Resource, readers map[Reader][]view.View) pipelines { pipes := make([]*pipeline, 0, len(readers)) for r, v := range readers { p := &pipeline{ diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index 4d54a94e6e6..c55090bd764 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -323,9 +323,9 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { - reg := newPipelineRegistry(resource.Empty(), tt.views) + reg := newPipelines(resource.Empty(), tt.views) testPipelineRegistryResolveIntAggregators(t, reg, tt.wantCount) - reg = newPipelineRegistry(resource.Empty(), tt.views) + reg = newPipelines(resource.Empty(), tt.views) testPipelineRegistryResolveFloatAggregators(t, reg, tt.wantCount) }) } @@ -358,7 +358,7 @@ func TestPipelineRegistryResource(t *testing.T) { NewManualReader(): {{}, v}, } res := resource.NewSchemaless(attribute.String("key", "val")) - reg := newPipelineRegistry(res, views) + reg := newPipelines(res, views) for _, p := range reg.pipelines { assert.True(t, res.Equal(p.resource), "resource not set") } @@ -372,7 +372,7 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { {}, }, } - reg := newPipelineRegistry(resource.Empty(), views) + reg := newPipelines(resource.Empty(), views) inst := view.Instrument{Name: "foo", Kind: view.AsyncGauge} ri := newResolver[int64](reg) @@ -380,7 +380,7 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { assert.Error(t, err) assert.Len(t, intAggs, 0) - reg = newPipelineRegistry(resource.Empty(), views) + reg = newPipelines(resource.Empty(), views) rf := newResolver[float64](reg) floatAggs, err := rf.Aggregators(inst, unit.Dimensionless) @@ -403,7 +403,7 @@ func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) { fooInst := view.Instrument{Name: "foo", Kind: view.SyncCounter} barInst := view.Instrument{Name: "bar", Kind: view.SyncCounter} - reg := newPipelineRegistry(resource.Empty(), views) + reg := newPipelines(resource.Empty(), views) ri := newResolver[int64](reg) intAggs, err := ri.Aggregators(fooInst, unit.Dimensionless) diff --git a/sdk/metric/provider.go b/sdk/metric/provider.go index 0d8338140f6..0b13e67d92b 100644 --- a/sdk/metric/provider.go +++ b/sdk/metric/provider.go @@ -48,7 +48,7 @@ func NewMeterProvider(options ...Option) *MeterProvider { flush, sdown := conf.readerSignals() - registry := newPipelineRegistry(conf.res, conf.readers) + registry := newPipelines(conf.res, conf.readers) return &MeterProvider{ res: conf.res, From 95b570f614a0e1a79e952e26c35cdbbee4f0f970 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 23 Sep 2022 10:37:28 -0700 Subject: [PATCH 09/14] Fix pipeline_registry_test --- sdk/metric/pipeline_registry_test.go | 34 ++++++++++++++-------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index c55090bd764..f89a09360ba 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -323,28 +323,28 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { - reg := newPipelines(resource.Empty(), tt.views) - testPipelineRegistryResolveIntAggregators(t, reg, tt.wantCount) - reg = newPipelines(resource.Empty(), tt.views) - testPipelineRegistryResolveFloatAggregators(t, reg, tt.wantCount) + p := newPipelines(resource.Empty(), tt.views) + testPipelineRegistryResolveIntAggregators(t, p, tt.wantCount) + p = newPipelines(resource.Empty(), tt.views) + testPipelineRegistryResolveFloatAggregators(t, p, tt.wantCount) }) } } -func testPipelineRegistryResolveIntAggregators(t *testing.T, reg *pipelineRegistry, wantCount int) { +func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCount int) { inst := view.Instrument{Name: "foo", Kind: view.SyncCounter} - r := newResolver[int64](reg) + r := newResolver[int64](p) aggs, err := r.Aggregators(inst, unit.Dimensionless) assert.NoError(t, err) require.Len(t, aggs, wantCount) } -func testPipelineRegistryResolveFloatAggregators(t *testing.T, reg *pipelineRegistry, wantCount int) { +func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, wantCount int) { inst := view.Instrument{Name: "foo", Kind: view.SyncCounter} - r := newResolver[float64](reg) + r := newResolver[float64](p) aggs, err := r.Aggregators(inst, unit.Dimensionless) assert.NoError(t, err) @@ -358,8 +358,8 @@ func TestPipelineRegistryResource(t *testing.T) { NewManualReader(): {{}, v}, } res := resource.NewSchemaless(attribute.String("key", "val")) - reg := newPipelines(res, views) - for _, p := range reg.pipelines { + pipes := newPipelines(res, views) + for _, p := range pipes { assert.True(t, res.Equal(p.resource), "resource not set") } } @@ -372,17 +372,17 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { {}, }, } - reg := newPipelines(resource.Empty(), views) + p := newPipelines(resource.Empty(), views) inst := view.Instrument{Name: "foo", Kind: view.AsyncGauge} - ri := newResolver[int64](reg) + ri := newResolver[int64](p) intAggs, err := ri.Aggregators(inst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, intAggs, 0) - reg = newPipelines(resource.Empty(), views) + p = newPipelines(resource.Empty(), views) - rf := newResolver[float64](reg) + rf := newResolver[float64](p) floatAggs, err := rf.Aggregators(inst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, floatAggs, 0) @@ -403,9 +403,9 @@ func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) { fooInst := view.Instrument{Name: "foo", Kind: view.SyncCounter} barInst := view.Instrument{Name: "bar", Kind: view.SyncCounter} - reg := newPipelines(resource.Empty(), views) + p := newPipelines(resource.Empty(), views) - ri := newResolver[int64](reg) + ri := newResolver[int64](p) intAggs, err := ri.Aggregators(fooInst, unit.Dimensionless) assert.NoError(t, err) assert.Len(t, intAggs, 1) @@ -416,7 +416,7 @@ func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) { assert.Len(t, intAggs, 2) // Creating a float foo instrument should error because there is an int foo instrument. - rf := newResolver[float64](reg) + rf := newResolver[float64](p) floatAggs, err := rf.Aggregators(fooInst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, floatAggs, 1) From ce13072fb29a6735a120ea8d6d39e48e64713be2 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 23 Sep 2022 10:41:11 -0700 Subject: [PATCH 10/14] Flatten isMonotonic into only use --- sdk/metric/pipeline.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 0a05c96765c..e637d23dbf5 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -241,8 +241,16 @@ func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]in func (i *inserter[N]) aggregator(inst view.Instrument) (internal.Aggregator[N], error) { // TODO (#3011): If filtering is done by the Aggregator it should be passed // here. - temporality := i.pipeline.reader.temporality(inst.Kind) - monotonic := isMonotonic(inst.Kind) + var ( + temporality = i.pipeline.reader.temporality(inst.Kind) + monotonic bool + ) + + switch inst.Kind { + case view.AsyncCounter, view.SyncCounter, view.SyncHistogram: + monotonic = true + } + switch agg := inst.Aggregation.(type) { case aggregation.Drop: return nil, nil @@ -262,14 +270,6 @@ func (i *inserter[N]) aggregator(inst view.Instrument) (internal.Aggregator[N], return nil, errUnknownAggregation } -func isMonotonic(kind view.InstrumentKind) bool { - switch kind { - case view.AsyncCounter, view.SyncCounter, view.SyncHistogram: - return true - } - return false -} - // isAggregatorCompatible checks if the aggregation can be used by the instrument. // Current compatibility: // From 0b24e9d854fcfb7460eca6c45e48561515f33211 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 23 Sep 2022 10:42:56 -0700 Subject: [PATCH 11/14] Update FIXME into TODO --- sdk/metric/pipeline.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index e637d23dbf5..b208e2eb0c5 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -232,7 +232,8 @@ func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]in errs.append(err) } } - // FIXME: handle when no views match. Default should be reader agg returned. + // TODO(#3224): handle when no views match. Default should be reader + // aggregation returned. return aggs, errs.errorOrNil() } From 6429b15bb3a72360f1da6b6b5ecd6aa3c5c6c7ec Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 23 Sep 2022 10:44:01 -0700 Subject: [PATCH 12/14] Rename instrument provider resolver field to resolve --- sdk/metric/instrument_provider.go | 40 +++++++++++++++---------------- sdk/metric/meter.go | 8 +++---- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/sdk/metric/instrument_provider.go b/sdk/metric/instrument_provider.go index 1074baac78f..8640b58e52e 100644 --- a/sdk/metric/instrument_provider.go +++ b/sdk/metric/instrument_provider.go @@ -27,8 +27,8 @@ import ( ) type asyncInt64Provider struct { - scope instrumentation.Scope - resolver *resolver[int64] + scope instrumentation.Scope + resolve *resolver[int64] } var _ asyncint64.InstrumentProvider = asyncInt64Provider{} @@ -37,7 +37,7 @@ var _ asyncint64.InstrumentProvider = asyncInt64Provider{} func (p asyncInt64Provider) Counter(name string, opts ...instrument.Option) (asyncint64.Counter, error) { cfg := instrument.NewConfig(opts...) - aggs, err := p.resolver.Aggregators(view.Instrument{ + aggs, err := p.resolve.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -56,7 +56,7 @@ func (p asyncInt64Provider) Counter(name string, opts ...instrument.Option) (asy func (p asyncInt64Provider) UpDownCounter(name string, opts ...instrument.Option) (asyncint64.UpDownCounter, error) { cfg := instrument.NewConfig(opts...) - aggs, err := p.resolver.Aggregators(view.Instrument{ + aggs, err := p.resolve.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -74,7 +74,7 @@ func (p asyncInt64Provider) UpDownCounter(name string, opts ...instrument.Option func (p asyncInt64Provider) Gauge(name string, opts ...instrument.Option) (asyncint64.Gauge, error) { cfg := instrument.NewConfig(opts...) - aggs, err := p.resolver.Aggregators(view.Instrument{ + aggs, err := p.resolve.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -89,8 +89,8 @@ func (p asyncInt64Provider) Gauge(name string, opts ...instrument.Option) (async } type asyncFloat64Provider struct { - scope instrumentation.Scope - resolver *resolver[float64] + scope instrumentation.Scope + resolve *resolver[float64] } var _ asyncfloat64.InstrumentProvider = asyncFloat64Provider{} @@ -99,7 +99,7 @@ var _ asyncfloat64.InstrumentProvider = asyncFloat64Provider{} func (p asyncFloat64Provider) Counter(name string, opts ...instrument.Option) (asyncfloat64.Counter, error) { cfg := instrument.NewConfig(opts...) - aggs, err := p.resolver.Aggregators(view.Instrument{ + aggs, err := p.resolve.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -117,7 +117,7 @@ func (p asyncFloat64Provider) Counter(name string, opts ...instrument.Option) (a func (p asyncFloat64Provider) UpDownCounter(name string, opts ...instrument.Option) (asyncfloat64.UpDownCounter, error) { cfg := instrument.NewConfig(opts...) - aggs, err := p.resolver.Aggregators(view.Instrument{ + aggs, err := p.resolve.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -135,7 +135,7 @@ func (p asyncFloat64Provider) UpDownCounter(name string, opts ...instrument.Opti func (p asyncFloat64Provider) Gauge(name string, opts ...instrument.Option) (asyncfloat64.Gauge, error) { cfg := instrument.NewConfig(opts...) - aggs, err := p.resolver.Aggregators(view.Instrument{ + aggs, err := p.resolve.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -150,8 +150,8 @@ func (p asyncFloat64Provider) Gauge(name string, opts ...instrument.Option) (asy } type syncInt64Provider struct { - scope instrumentation.Scope - resolver *resolver[int64] + scope instrumentation.Scope + resolve *resolver[int64] } var _ syncint64.InstrumentProvider = syncInt64Provider{} @@ -160,7 +160,7 @@ var _ syncint64.InstrumentProvider = syncInt64Provider{} func (p syncInt64Provider) Counter(name string, opts ...instrument.Option) (syncint64.Counter, error) { cfg := instrument.NewConfig(opts...) - aggs, err := p.resolver.Aggregators(view.Instrument{ + aggs, err := p.resolve.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -178,7 +178,7 @@ func (p syncInt64Provider) Counter(name string, opts ...instrument.Option) (sync func (p syncInt64Provider) UpDownCounter(name string, opts ...instrument.Option) (syncint64.UpDownCounter, error) { cfg := instrument.NewConfig(opts...) - aggs, err := p.resolver.Aggregators(view.Instrument{ + aggs, err := p.resolve.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -196,7 +196,7 @@ func (p syncInt64Provider) UpDownCounter(name string, opts ...instrument.Option) func (p syncInt64Provider) Histogram(name string, opts ...instrument.Option) (syncint64.Histogram, error) { cfg := instrument.NewConfig(opts...) - aggs, err := p.resolver.Aggregators(view.Instrument{ + aggs, err := p.resolve.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -211,8 +211,8 @@ func (p syncInt64Provider) Histogram(name string, opts ...instrument.Option) (sy } type syncFloat64Provider struct { - scope instrumentation.Scope - resolver *resolver[float64] + scope instrumentation.Scope + resolve *resolver[float64] } var _ syncfloat64.InstrumentProvider = syncFloat64Provider{} @@ -221,7 +221,7 @@ var _ syncfloat64.InstrumentProvider = syncFloat64Provider{} func (p syncFloat64Provider) Counter(name string, opts ...instrument.Option) (syncfloat64.Counter, error) { cfg := instrument.NewConfig(opts...) - aggs, err := p.resolver.Aggregators(view.Instrument{ + aggs, err := p.resolve.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -239,7 +239,7 @@ func (p syncFloat64Provider) Counter(name string, opts ...instrument.Option) (sy func (p syncFloat64Provider) UpDownCounter(name string, opts ...instrument.Option) (syncfloat64.UpDownCounter, error) { cfg := instrument.NewConfig(opts...) - aggs, err := p.resolver.Aggregators(view.Instrument{ + aggs, err := p.resolve.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -257,7 +257,7 @@ func (p syncFloat64Provider) UpDownCounter(name string, opts ...instrument.Optio func (p syncFloat64Provider) Histogram(name string, opts ...instrument.Option) (syncfloat64.Histogram, error) { cfg := instrument.NewConfig(opts...) - aggs, err := p.resolver.Aggregators(view.Instrument{ + aggs, err := p.resolve.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index cc4da98d061..249fb41103d 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -106,12 +106,12 @@ var _ metric.Meter = (*meter)(nil) // AsyncInt64 returns the asynchronous integer instrument provider. func (m *meter) AsyncInt64() asyncint64.InstrumentProvider { - return asyncInt64Provider{scope: m.Scope, resolver: newResolver[int64](m.pipes)} + return asyncInt64Provider{scope: m.Scope, resolve: newResolver[int64](m.pipes)} } // AsyncFloat64 returns the asynchronous floating-point instrument provider. func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider { - return asyncFloat64Provider{scope: m.Scope, resolver: newResolver[float64](m.pipes)} + return asyncFloat64Provider{scope: m.Scope, resolve: newResolver[float64](m.pipes)} } // RegisterCallback registers the function f to be called when any of the @@ -123,10 +123,10 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context // SyncInt64 returns the synchronous integer instrument provider. func (m *meter) SyncInt64() syncint64.InstrumentProvider { - return syncInt64Provider{scope: m.Scope, resolver: newResolver[int64](m.pipes)} + return syncInt64Provider{scope: m.Scope, resolve: newResolver[int64](m.pipes)} } // SyncFloat64 returns the synchronous floating-point instrument provider. func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider { - return syncFloat64Provider{scope: m.Scope, resolver: newResolver[float64](m.pipes)} + return syncFloat64Provider{scope: m.Scope, resolve: newResolver[float64](m.pipes)} } From 78cd4940f640fbda876224e4a0b12c067a799b7e Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 23 Sep 2022 11:13:35 -0700 Subject: [PATCH 13/14] Fix comment English --- sdk/metric/pipeline.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index b208e2eb0c5..7b3f87529a4 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -340,7 +340,7 @@ func (p pipelines) registerCallback(fn func(context.Context)) { } } -// resolver facilitates resolves Aggregators an instrument needs to aggregate +// resolver facilitates resolving Aggregators an instrument needs to aggregate // measurements with while updating all pipelines that need to pull from those // aggregations. type resolver[N int64 | float64] struct { From 26b7a33831b46795415c7ca3b4c06c8c96fa5fa6 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 27 Sep 2022 10:58:00 -0700 Subject: [PATCH 14/14] Fix drop aggregator detection --- sdk/metric/pipeline.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 7b3f87529a4..0bd52d63023 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -221,11 +221,12 @@ func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]in errs.append(err) continue } - if agg != nil { // Not a drop aggregation. - // TODO (#3011): If filtering is done at the instrument level add here. - // This is where the aggregator and the view are both in scope. - aggs = append(aggs, agg) + if agg == nil { // Drop aggregator. + continue } + // TODO (#3011): If filtering is done at the instrument level add here. + // This is where the aggregator and the view are both in scope. + aggs = append(aggs, agg) seen[id] = struct{}{} err = i.pipeline.addAggregator(inst.Scope, inst.Name, inst.Description, instUnit, agg) if err != nil {