From aca054b07588d85cd974b041b4e6688eff918a38 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 28 Sep 2022 08:47:20 -0700 Subject: [PATCH] Refactor Pipeline (#3233) * Add views field to pipeline Redundant maps tracking readers to views and readers to pipelines exist in the pipelineRegistry. Unify these maps by tracing views in pipelines. * Rename newPipelineRegistries->newPipelineRegistry * Add Reader as field to pipeline * Replace createAggregators with resolver facilitator * Replace create agg funcs with inserter facilitator * Correct documentation * Replace pipelineRegistry with []pipeline type * Rename newPipelineRegistry->newPipelines * Fix pipeline_registry_test * Flatten isMonotonic into only use * Update FIXME into TODO * Rename instrument provider resolver field to resolve * Fix comment English * Fix drop aggregator detection --- sdk/metric/instrument_provider.go | 40 ++-- sdk/metric/meter.go | 22 +-- sdk/metric/pipeline.go | 263 ++++++++++++++++----------- sdk/metric/pipeline_registry_test.go | 54 +++--- sdk/metric/pipeline_test.go | 8 +- sdk/metric/provider.go | 4 +- 6 files changed, 220 insertions(+), 171 deletions(-) diff --git a/sdk/metric/instrument_provider.go b/sdk/metric/instrument_provider.go index ad215a853b2..8640b58e52e 100644 --- a/sdk/metric/instrument_provider.go +++ b/sdk/metric/instrument_provider.go @@ -27,8 +27,8 @@ import ( ) type asyncInt64Provider struct { - scope instrumentation.Scope - registry *pipelineRegistry + scope instrumentation.Scope + resolve *resolver[int64] } var _ asyncint64.InstrumentProvider = asyncInt64Provider{} @@ -37,7 +37,7 @@ var _ asyncint64.InstrumentProvider = asyncInt64Provider{} func (p asyncInt64Provider) Counter(name string, opts ...instrument.Option) (asyncint64.Counter, error) { cfg := instrument.NewConfig(opts...) - aggs, err := createAggregators[int64](p.registry, view.Instrument{ + aggs, err := p.resolve.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -56,7 +56,7 @@ func (p asyncInt64Provider) Counter(name string, opts ...instrument.Option) (asy func (p asyncInt64Provider) UpDownCounter(name string, opts ...instrument.Option) (asyncint64.UpDownCounter, error) { cfg := instrument.NewConfig(opts...) - aggs, err := createAggregators[int64](p.registry, view.Instrument{ + aggs, err := p.resolve.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -74,7 +74,7 @@ func (p asyncInt64Provider) UpDownCounter(name string, opts ...instrument.Option func (p asyncInt64Provider) Gauge(name string, opts ...instrument.Option) (asyncint64.Gauge, error) { cfg := instrument.NewConfig(opts...) - aggs, err := createAggregators[int64](p.registry, view.Instrument{ + aggs, err := p.resolve.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -89,8 +89,8 @@ func (p asyncInt64Provider) Gauge(name string, opts ...instrument.Option) (async } type asyncFloat64Provider struct { - scope instrumentation.Scope - registry *pipelineRegistry + scope instrumentation.Scope + resolve *resolver[float64] } var _ asyncfloat64.InstrumentProvider = asyncFloat64Provider{} @@ -99,7 +99,7 @@ var _ asyncfloat64.InstrumentProvider = asyncFloat64Provider{} func (p asyncFloat64Provider) Counter(name string, opts ...instrument.Option) (asyncfloat64.Counter, error) { cfg := instrument.NewConfig(opts...) - aggs, err := createAggregators[float64](p.registry, view.Instrument{ + aggs, err := p.resolve.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -117,7 +117,7 @@ func (p asyncFloat64Provider) Counter(name string, opts ...instrument.Option) (a func (p asyncFloat64Provider) UpDownCounter(name string, opts ...instrument.Option) (asyncfloat64.UpDownCounter, error) { cfg := instrument.NewConfig(opts...) - aggs, err := createAggregators[float64](p.registry, view.Instrument{ + aggs, err := p.resolve.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -135,7 +135,7 @@ func (p asyncFloat64Provider) UpDownCounter(name string, opts ...instrument.Opti func (p asyncFloat64Provider) Gauge(name string, opts ...instrument.Option) (asyncfloat64.Gauge, error) { cfg := instrument.NewConfig(opts...) - aggs, err := createAggregators[float64](p.registry, view.Instrument{ + aggs, err := p.resolve.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -150,8 +150,8 @@ func (p asyncFloat64Provider) Gauge(name string, opts ...instrument.Option) (asy } type syncInt64Provider struct { - scope instrumentation.Scope - registry *pipelineRegistry + scope instrumentation.Scope + resolve *resolver[int64] } var _ syncint64.InstrumentProvider = syncInt64Provider{} @@ -160,7 +160,7 @@ var _ syncint64.InstrumentProvider = syncInt64Provider{} func (p syncInt64Provider) Counter(name string, opts ...instrument.Option) (syncint64.Counter, error) { cfg := instrument.NewConfig(opts...) - aggs, err := createAggregators[int64](p.registry, view.Instrument{ + aggs, err := p.resolve.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -178,7 +178,7 @@ func (p syncInt64Provider) Counter(name string, opts ...instrument.Option) (sync func (p syncInt64Provider) UpDownCounter(name string, opts ...instrument.Option) (syncint64.UpDownCounter, error) { cfg := instrument.NewConfig(opts...) - aggs, err := createAggregators[int64](p.registry, view.Instrument{ + aggs, err := p.resolve.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -196,7 +196,7 @@ func (p syncInt64Provider) UpDownCounter(name string, opts ...instrument.Option) func (p syncInt64Provider) Histogram(name string, opts ...instrument.Option) (syncint64.Histogram, error) { cfg := instrument.NewConfig(opts...) - aggs, err := createAggregators[int64](p.registry, view.Instrument{ + aggs, err := p.resolve.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -211,8 +211,8 @@ func (p syncInt64Provider) Histogram(name string, opts ...instrument.Option) (sy } type syncFloat64Provider struct { - scope instrumentation.Scope - registry *pipelineRegistry + scope instrumentation.Scope + resolve *resolver[float64] } var _ syncfloat64.InstrumentProvider = syncFloat64Provider{} @@ -221,7 +221,7 @@ var _ syncfloat64.InstrumentProvider = syncFloat64Provider{} func (p syncFloat64Provider) Counter(name string, opts ...instrument.Option) (syncfloat64.Counter, error) { cfg := instrument.NewConfig(opts...) - aggs, err := createAggregators[float64](p.registry, view.Instrument{ + aggs, err := p.resolve.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -239,7 +239,7 @@ func (p syncFloat64Provider) Counter(name string, opts ...instrument.Option) (sy func (p syncFloat64Provider) UpDownCounter(name string, opts ...instrument.Option) (syncfloat64.UpDownCounter, error) { cfg := instrument.NewConfig(opts...) - aggs, err := createAggregators[float64](p.registry, view.Instrument{ + aggs, err := p.resolve.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), @@ -257,7 +257,7 @@ func (p syncFloat64Provider) UpDownCounter(name string, opts ...instrument.Optio func (p syncFloat64Provider) Histogram(name string, opts ...instrument.Option) (syncfloat64.Histogram, error) { cfg := instrument.NewConfig(opts...) - aggs, err := createAggregators[float64](p.registry, view.Instrument{ + aggs, err := p.resolve.Aggregators(view.Instrument{ Scope: p.scope, Name: name, Description: cfg.Description(), diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index ff8222cbc7c..82d5a5269be 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -42,7 +42,7 @@ type meterRegistry struct { meters map[instrumentation.Scope]*meter - registry *pipelineRegistry + pipes pipelines } // Get returns a registered meter matching the instrumentation scope if it @@ -56,8 +56,8 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter { if r.meters == nil { m := &meter{ - Scope: s, - registry: r.registry, + Scope: s, + pipes: r.pipes, } r.meters = map[instrumentation.Scope]*meter{s: m} return m @@ -69,8 +69,8 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter { } m = &meter{ - Scope: s, - registry: r.registry, + Scope: s, + pipes: r.pipes, } r.meters[s] = m return m @@ -83,7 +83,7 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter { type meter struct { instrumentation.Scope - registry *pipelineRegistry + pipes pipelines } // Compile-time check meter implements metric.Meter. @@ -91,27 +91,27 @@ var _ metric.Meter = (*meter)(nil) // AsyncInt64 returns the asynchronous integer instrument provider. func (m *meter) AsyncInt64() asyncint64.InstrumentProvider { - return asyncInt64Provider{scope: m.Scope, registry: m.registry} + return asyncInt64Provider{scope: m.Scope, resolve: newResolver[int64](m.pipes)} } // AsyncFloat64 returns the asynchronous floating-point instrument provider. func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider { - return asyncFloat64Provider{scope: m.Scope, registry: m.registry} + return asyncFloat64Provider{scope: m.Scope, resolve: newResolver[float64](m.pipes)} } // RegisterCallback registers the function f to be called when any of the // insts Collect method is called. func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) error { - m.registry.registerCallback(f) + m.pipes.registerCallback(f) return nil } // SyncInt64 returns the synchronous integer instrument provider. func (m *meter) SyncInt64() syncint64.InstrumentProvider { - return syncInt64Provider{scope: m.Scope, registry: m.registry} + return syncInt64Provider{scope: m.Scope, resolve: newResolver[int64](m.pipes)} } // SyncFloat64 returns the synchronous floating-point instrument provider. func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider { - return syncFloat64Provider{scope: m.Scope, registry: m.registry} + return syncFloat64Provider{scope: m.Scope, resolve: newResolver[float64](m.pipes)} } diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 6783aef5d0f..0bd52d63023 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -30,9 +30,25 @@ import ( "go.opentelemetry.io/otel/sdk/resource" ) +var ( + errCreatingAggregators = errors.New("could not create all aggregators") + errIncompatibleAggregation = errors.New("incompatible aggregation") + errUnknownAggregation = errors.New("unrecognized aggregation") +) + type aggregator interface { Aggregation() metricdata.Aggregation } + +// instrumentID is used to identify multiple instruments being mapped to the +// same aggregator. e.g. using an exact match view with a name=* view. You +// can't use a view.Instrument here because not all Aggregators are comparable. +type instrumentID struct { + scope instrumentation.Scope + name string + description string +} + type instrumentKey struct { name string unit unit.Unit @@ -43,12 +59,14 @@ type instrumentValue struct { aggregator aggregator } -func newPipeline(res *resource.Resource) *pipeline { +func newPipeline(res *resource.Resource, reader Reader, views []view.View) *pipeline { if res == nil { res = resource.Empty() } return &pipeline{ resource: res, + reader: reader, + views: views, aggregations: make(map[instrumentation.Scope]map[instrumentKey]instrumentValue), } } @@ -61,6 +79,9 @@ func newPipeline(res *resource.Resource) *pipeline { type pipeline struct { resource *resource.Resource + reader Reader + views []view.View + sync.Mutex aggregations map[instrumentation.Scope]map[instrumentKey]instrumentValue callbacks []func(context.Context) @@ -155,106 +176,38 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err }, nil } -// pipelineRegistry manages creating pipelines, and aggregators. Meters retrieve -// new Aggregators from a pipelineRegistry. -type pipelineRegistry struct { - views map[Reader][]view.View - pipelines map[Reader]*pipeline -} - -func newPipelineRegistries(res *resource.Resource, views map[Reader][]view.View) *pipelineRegistry { - pipelines := map[Reader]*pipeline{} - for rdr := range views { - pipe := &pipeline{resource: res} - rdr.register(pipe) - pipelines[rdr] = pipe - } - return &pipelineRegistry{ - views: views, - pipelines: pipelines, - } +// inserter facilitates inserting of new instruments into a pipeline. +type inserter[N int64 | float64] struct { + pipeline *pipeline } -// TODO (#3053) Only register callbacks if any instrument matches in a view. -func (reg *pipelineRegistry) registerCallback(fn func(context.Context)) { - for _, pipe := range reg.pipelines { - pipe.addCallback(fn) - } +func newInserter[N int64 | float64](p *pipeline) *inserter[N] { + return &inserter[N]{p} } -// createAggregators will create all backing aggregators for an instrument. -// It will return an error if an instrument is registered more than once. -// Note: There may be returned aggregators with an error. -func createAggregators[N int64 | float64](reg *pipelineRegistry, inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) { +// Instrument inserts instrument inst with instUnit returning the Aggregators +// that need to be updated with measurments for that instrument. +func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) { + seen := map[instrumentID]struct{}{} var aggs []internal.Aggregator[N] - - errs := &multierror{} - for rdr, views := range reg.views { - pipe := reg.pipelines[rdr] - rdrAggs, err := createAggregatorsForReader[N](rdr, views, inst) - if err != nil { - errs.append(err) - } - for inst, agg := range rdrAggs { - err := pipe.addAggregator(inst.scope, inst.name, inst.description, instUnit, agg) - if err != nil { - errs.append(err) - } - aggs = append(aggs, agg) - } - } - return aggs, errs.errorOrNil() -} - -type multierror struct { - wrapped error - errors []string -} - -func (m *multierror) errorOrNil() error { - if len(m.errors) == 0 { - return nil - } - return fmt.Errorf("%w: %s", m.wrapped, strings.Join(m.errors, "; ")) -} - -func (m *multierror) append(err error) { - m.errors = append(m.errors, err.Error()) -} - -// instrumentID is used to identify multiple instruments being mapped to the same aggregator. -// e.g. using an exact match view with a name=* view. -// You can't use a view.Instrument here because not all Aggregators are comparable. -type instrumentID struct { - scope instrumentation.Scope - name string - description string -} - -var errCreatingAggregators = errors.New("could not create all aggregators") - -func createAggregatorsForReader[N int64 | float64](rdr Reader, views []view.View, inst view.Instrument) (map[instrumentID]internal.Aggregator[N], error) { - aggs := map[instrumentID]internal.Aggregator[N]{} - errs := &multierror{ - wrapped: errCreatingAggregators, - } - for _, v := range views { + errs := &multierror{wrapped: errCreatingAggregators} + for _, v := range i.pipeline.views { inst, match := v.TransformInstrument(inst) - ident := instrumentID{ + id := instrumentID{ scope: inst.Scope, name: inst.Name, description: inst.Description, } - if _, ok := aggs[ident]; ok || !match { + if _, ok := seen[id]; ok || !match { continue } if inst.Aggregation == nil { - inst.Aggregation = rdr.aggregation(inst.Kind) + inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind) } else if _, ok := inst.Aggregation.(aggregation.Default); ok { - inst.Aggregation = rdr.aggregation(inst.Kind) + inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind) } if err := isAggregatorCompatible(inst.Kind, inst.Aggregation); err != nil { @@ -263,51 +216,63 @@ func createAggregatorsForReader[N int64 | float64](rdr Reader, views []view.View continue } - agg := createAggregator[N](inst.Aggregation, rdr.temporality(inst.Kind), isMonotonic(inst.Kind)) - if agg != nil { - // TODO (#3011): If filtering is done at the instrument level add here. - // This is where the aggregator and the view are both in scope. - aggs[ident] = agg + agg, err := i.aggregator(inst) + if err != nil { + errs.append(err) + continue + } + if agg == nil { // Drop aggregator. + continue + } + // TODO (#3011): If filtering is done at the instrument level add here. + // This is where the aggregator and the view are both in scope. + aggs = append(aggs, agg) + seen[id] = struct{}{} + err = i.pipeline.addAggregator(inst.Scope, inst.Name, inst.Description, instUnit, agg) + if err != nil { + errs.append(err) } } + // TODO(#3224): handle when no views match. Default should be reader + // aggregation returned. return aggs, errs.errorOrNil() } -func isMonotonic(kind view.InstrumentKind) bool { - switch kind { +// aggregator returns the Aggregator for an instrument configuration. If the +// instrument defines an unknown aggregation, an error is returned. +func (i *inserter[N]) aggregator(inst view.Instrument) (internal.Aggregator[N], error) { + // TODO (#3011): If filtering is done by the Aggregator it should be passed + // here. + var ( + temporality = i.pipeline.reader.temporality(inst.Kind) + monotonic bool + ) + + switch inst.Kind { case view.AsyncCounter, view.SyncCounter, view.SyncHistogram: - return true + monotonic = true } - return false -} -// createAggregator takes the config (Aggregation and Temporality) and produces a memory backed Aggregator. -// TODO (#3011): If filterting is done by the Aggregator it should be passed here. -func createAggregator[N int64 | float64](agg aggregation.Aggregation, temporality metricdata.Temporality, monotonic bool) internal.Aggregator[N] { - switch agg := agg.(type) { + switch agg := inst.Aggregation.(type) { case aggregation.Drop: - return nil + return nil, nil case aggregation.LastValue: - return internal.NewLastValue[N]() + return internal.NewLastValue[N](), nil case aggregation.Sum: if temporality == metricdata.CumulativeTemporality { - return internal.NewCumulativeSum[N](monotonic) + return internal.NewCumulativeSum[N](monotonic), nil } - return internal.NewDeltaSum[N](monotonic) + return internal.NewDeltaSum[N](monotonic), nil case aggregation.ExplicitBucketHistogram: if temporality == metricdata.CumulativeTemporality { - return internal.NewCumulativeHistogram[N](agg) + return internal.NewCumulativeHistogram[N](agg), nil } - return internal.NewDeltaHistogram[N](agg) + return internal.NewDeltaHistogram[N](agg), nil } - return nil + return nil, errUnknownAggregation } -// TODO: review need for aggregation check after https://github.com/open-telemetry/opentelemetry-specification/issues/2710 -var errIncompatibleAggregation = errors.New("incompatible aggregation") -var errUnknownAggregation = errors.New("unrecognized aggregation") - -// is aggregatorCompatible checks if the aggregation can be used by the instrument. +// isAggregatorCompatible checks if the aggregation can be used by the instrument. // Current compatibility: // // | Instrument Kind | Drop | LastValue | Sum | Histogram | Exponential Histogram | @@ -324,18 +289,24 @@ func isAggregatorCompatible(kind view.InstrumentKind, agg aggregation.Aggregatio if kind == view.SyncCounter || kind == view.SyncHistogram { return nil } + // TODO: review need for aggregation check after + // https://github.com/open-telemetry/opentelemetry-specification/issues/2710 return errIncompatibleAggregation case aggregation.Sum: switch kind { case view.AsyncCounter, view.AsyncUpDownCounter, view.SyncCounter, view.SyncHistogram, view.SyncUpDownCounter: return nil default: + // TODO: review need for aggregation check after + // https://github.com/open-telemetry/opentelemetry-specification/issues/2710 return errIncompatibleAggregation } case aggregation.LastValue: if kind == view.AsyncGauge { return nil } + // TODO: review need for aggregation check after + // https://github.com/open-telemetry/opentelemetry-specification/issues/2710 return errIncompatibleAggregation case aggregation.Drop: return nil @@ -344,3 +315,75 @@ func isAggregatorCompatible(kind view.InstrumentKind, agg aggregation.Aggregatio return fmt.Errorf("%w: %v", errUnknownAggregation, agg) } } + +// pipelines is the group of pipelines connecting Readers with instrument +// measurement. +type pipelines []*pipeline + +func newPipelines(res *resource.Resource, readers map[Reader][]view.View) pipelines { + pipes := make([]*pipeline, 0, len(readers)) + for r, v := range readers { + p := &pipeline{ + resource: res, + reader: r, + views: v, + } + r.register(p) + pipes = append(pipes, p) + } + return pipes +} + +// TODO (#3053) Only register callbacks if any instrument matches in a view. +func (p pipelines) registerCallback(fn func(context.Context)) { + for _, pipe := range p { + pipe.addCallback(fn) + } +} + +// resolver facilitates resolving Aggregators an instrument needs to aggregate +// measurements with while updating all pipelines that need to pull from those +// aggregations. +type resolver[N int64 | float64] struct { + inserters []*inserter[N] +} + +func newResolver[N int64 | float64](p pipelines) *resolver[N] { + in := make([]*inserter[N], len(p)) + for i := range in { + in[i] = newInserter[N](p[i]) + } + return &resolver[N]{in} +} + +// Aggregators returns the Aggregators instrument inst needs to update when it +// makes a measurement. +func (r *resolver[N]) Aggregators(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) { + var aggs []internal.Aggregator[N] + + errs := &multierror{} + for _, i := range r.inserters { + a, err := i.Instrument(inst, instUnit) + if err != nil { + errs.append(err) + } + aggs = append(aggs, a...) + } + return aggs, errs.errorOrNil() +} + +type multierror struct { + wrapped error + errors []string +} + +func (m *multierror) errorOrNil() error { + if len(m.errors) == 0 { + return nil + } + return fmt.Errorf("%w: %s", m.wrapped, strings.Join(m.errors, "; ")) +} + +func (m *multierror) append(err error) { + m.errors = append(m.errors, err.Error()) +} diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index 1182eeca06a..f89a09360ba 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -211,7 +211,8 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { } for _, tt := range testcases { t.Run(tt.name, func(t *testing.T) { - got, err := createAggregatorsForReader[N](tt.reader, tt.views, tt.inst) + i := newInserter[N](newPipeline(nil, tt.reader, tt.views)) + got, err := i.Instrument(tt.inst, unit.Dimensionless) assert.ErrorIs(t, err, tt.wantErr) require.Len(t, got, tt.wantLen) for _, agg := range got { @@ -222,13 +223,12 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { } func testInvalidInstrumentShouldPanic[N int64 | float64]() { - reader := NewManualReader() - views := []view.View{{}} + i := newInserter[N](newPipeline(nil, NewManualReader(), []view.View{{}})) inst := view.Instrument{ Name: "foo", Kind: view.InstrumentKind(255), } - _, _ = createAggregatorsForReader[N](reader, views, inst) + _, _ = i.Instrument(inst, unit.Dimensionless) } func TestInvalidInstrumentShouldPanic(t *testing.T) { @@ -323,27 +323,29 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { - reg := newPipelineRegistries(resource.Empty(), tt.views) - testPipelineRegistryCreateIntAggregators(t, reg, tt.wantCount) - reg = newPipelineRegistries(resource.Empty(), tt.views) - testPipelineRegistryCreateFloatAggregators(t, reg, tt.wantCount) + p := newPipelines(resource.Empty(), tt.views) + testPipelineRegistryResolveIntAggregators(t, p, tt.wantCount) + p = newPipelines(resource.Empty(), tt.views) + testPipelineRegistryResolveFloatAggregators(t, p, tt.wantCount) }) } } -func testPipelineRegistryCreateIntAggregators(t *testing.T, reg *pipelineRegistry, wantCount int) { +func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCount int) { inst := view.Instrument{Name: "foo", Kind: view.SyncCounter} - aggs, err := createAggregators[int64](reg, inst, unit.Dimensionless) + r := newResolver[int64](p) + aggs, err := r.Aggregators(inst, unit.Dimensionless) assert.NoError(t, err) require.Len(t, aggs, wantCount) } -func testPipelineRegistryCreateFloatAggregators(t *testing.T, reg *pipelineRegistry, wantCount int) { +func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, wantCount int) { inst := view.Instrument{Name: "foo", Kind: view.SyncCounter} - aggs, err := createAggregators[float64](reg, inst, unit.Dimensionless) + r := newResolver[float64](p) + aggs, err := r.Aggregators(inst, unit.Dimensionless) assert.NoError(t, err) require.Len(t, aggs, wantCount) @@ -356,8 +358,8 @@ func TestPipelineRegistryResource(t *testing.T) { NewManualReader(): {{}, v}, } res := resource.NewSchemaless(attribute.String("key", "val")) - reg := newPipelineRegistries(res, views) - for _, p := range reg.pipelines { + pipes := newPipelines(res, views) + for _, p := range pipes { assert.True(t, res.Equal(p.resource), "resource not set") } } @@ -370,16 +372,18 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { {}, }, } - reg := newPipelineRegistries(resource.Empty(), views) + p := newPipelines(resource.Empty(), views) inst := view.Instrument{Name: "foo", Kind: view.AsyncGauge} - intAggs, err := createAggregators[int64](reg, inst, unit.Dimensionless) + ri := newResolver[int64](p) + intAggs, err := ri.Aggregators(inst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, intAggs, 0) - reg = newPipelineRegistries(resource.Empty(), views) + p = newPipelines(resource.Empty(), views) - floatAggs, err := createAggregators[float64](reg, inst, unit.Dimensionless) + rf := newResolver[float64](p) + floatAggs, err := rf.Aggregators(inst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, floatAggs, 0) } @@ -399,28 +403,30 @@ func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) { fooInst := view.Instrument{Name: "foo", Kind: view.SyncCounter} barInst := view.Instrument{Name: "bar", Kind: view.SyncCounter} - reg := newPipelineRegistries(resource.Empty(), views) + p := newPipelines(resource.Empty(), views) - intAggs, err := createAggregators[int64](reg, fooInst, unit.Dimensionless) + ri := newResolver[int64](p) + intAggs, err := ri.Aggregators(fooInst, unit.Dimensionless) assert.NoError(t, err) assert.Len(t, intAggs, 1) // The Rename view should error, because it creates a foo instrument. - intAggs, err = createAggregators[int64](reg, barInst, unit.Dimensionless) + intAggs, err = ri.Aggregators(barInst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, intAggs, 2) // Creating a float foo instrument should error because there is an int foo instrument. - floatAggs, err := createAggregators[float64](reg, fooInst, unit.Dimensionless) + rf := newResolver[float64](p) + floatAggs, err := rf.Aggregators(fooInst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, floatAggs, 1) fooInst = view.Instrument{Name: "foo-float", Kind: view.SyncCounter} - _, err = createAggregators[float64](reg, fooInst, unit.Dimensionless) + _, err = rf.Aggregators(fooInst, unit.Dimensionless) assert.NoError(t, err) - floatAggs, err = createAggregators[float64](reg, barInst, unit.Dimensionless) + floatAggs, err = rf.Aggregators(barInst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, floatAggs, 2) } diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index 52183360fd3..ca83c9c3a9e 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -61,7 +61,7 @@ func TestEmptyPipeline(t *testing.T) { } func TestNewPipeline(t *testing.T) { - pipe := newPipeline(nil) + pipe := newPipeline(nil, nil, nil) output, err := pipe.produce(context.Background()) require.NoError(t, err) @@ -158,7 +158,7 @@ func TestPipelineDuplicateRegistration(t *testing.T) { } for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { - pipe := newPipeline(nil) + pipe := newPipeline(nil, nil, nil) err := pipe.addAggregator(instrumentation.Scope{}, "name", "desc", unit.Dimensionless, testSumAggregator{}) require.NoError(t, err) @@ -177,7 +177,7 @@ func TestPipelineDuplicateRegistration(t *testing.T) { func TestPipelineUsesResource(t *testing.T) { res := resource.NewWithAttributes("noSchema", attribute.String("test", "resource")) - pipe := newPipeline(res) + pipe := newPipeline(res, nil, nil) output, err := pipe.produce(context.Background()) assert.NoError(t, err) @@ -185,7 +185,7 @@ func TestPipelineUsesResource(t *testing.T) { } func TestPipelineConcurrency(t *testing.T) { - pipe := newPipeline(nil) + pipe := newPipeline(nil, nil, nil) ctx := context.Background() var wg sync.WaitGroup diff --git a/sdk/metric/provider.go b/sdk/metric/provider.go index d3a940bce58..0b13e67d92b 100644 --- a/sdk/metric/provider.go +++ b/sdk/metric/provider.go @@ -48,13 +48,13 @@ func NewMeterProvider(options ...Option) *MeterProvider { flush, sdown := conf.readerSignals() - registry := newPipelineRegistries(conf.res, conf.readers) + registry := newPipelines(conf.res, conf.readers) return &MeterProvider{ res: conf.res, meters: meterRegistry{ - registry: registry, + pipes: registry, }, forceFlush: flush,