From b5292b845955d4f99dbd4470728cce809a9384c8 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 11 Oct 2022 12:41:47 -0700 Subject: [PATCH] Handle duplicate Aggregators and log instrument conflicts (#3251) * Add the cache type * Add cache unit tests * Test cache concurrency * Add the instrumentCache * Use the instrumentCache to deduplicate creation * Drop unique check from addAggregator * Fix aggregatorCache* docs * Update cachedAggregator and aggregator method docs * Remove unnecessary type constraint * Remove unused errAlreadyRegistered * Rename to not shadow imports * Add changes to changelog * Fix changelog English * Store resolvers in the meter instead of caches * Test all Aggregator[N] impls are comparable * Fix lint * Add documentation that Aggregators need to be comparable * Update sdk/metric/internal/aggregator.go Co-authored-by: Anthony Mirabella * Update sdk/metric/instrument.go Co-authored-by: Anthony Mirabella * Update sdk/metric/instrument.go Co-authored-by: Anthony Mirabella * Update sdk/metric/internal/aggregator_test.go Co-authored-by: Anthony Mirabella * Fix pipeline_test.go use of newInstrumentCache Co-authored-by: Anthony Mirabella --- CHANGELOG.md | 3 + sdk/metric/cache.go | 110 +++++++++++ sdk/metric/cache_test.go | 76 ++++++++ sdk/metric/instrument.go | 23 +++ sdk/metric/internal/aggregator.go | 3 + sdk/metric/internal/aggregator_test.go | 40 ++-- sdk/metric/meter.go | 43 +++-- sdk/metric/pipeline.go | 257 ++++++++++++++++--------- sdk/metric/pipeline_registry_test.go | 69 +++++-- sdk/metric/pipeline_test.go | 117 ++--------- 10 files changed, 502 insertions(+), 239 deletions(-) create mode 100644 sdk/metric/cache.go create mode 100644 sdk/metric/cache_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 1fd689d606f..80dc9041d85 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Fixed - Use default view if instrument does not match any registered view of a reader. (#3224, #3237) +- Return the same instrument every time a user makes the exact same instrument creation call. (#3229, #3251) +- Return the existing instrument when a view transforms a creation call to match an existing instrument. (#3240, #3251) +- Log a warning when a conflicting instrument (e.g. description, unit, data-type) is created instead of returning an error. (#3251) - The OpenCensus bridge no longer sends empty batches of metrics. (#3263) ## [0.32.1] Metric SDK (Alpha) - 2022-09-22 diff --git a/sdk/metric/cache.go b/sdk/metric/cache.go new file mode 100644 index 00000000000..b75e7ea5402 --- /dev/null +++ b/sdk/metric/cache.go @@ -0,0 +1,110 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric // import "go.opentelemetry.io/otel/sdk/metric" + +import ( + "sync" + + "go.opentelemetry.io/otel/sdk/metric/internal" +) + +// cache is a locking storage used to quickly return already computed values. +// +// The zero value of a cache is empty and ready to use. +// +// A cache must not be copied after first use. +// +// All methods of a cache are safe to call concurrently. +type cache[K comparable, V any] struct { + sync.Mutex + data map[K]V +} + +// Lookup returns the value stored in the cache with the accociated key if it +// exists. Otherwise, f is called and its returned value is set in the cache +// for key and returned. +// +// Lookup is safe to call concurrently. It will hold the cache lock, so f +// should not block excessively. +func (c *cache[K, V]) Lookup(key K, f func() V) V { + c.Lock() + defer c.Unlock() + + if c.data == nil { + val := f() + c.data = map[K]V{key: val} + return val + } + if v, ok := c.data[key]; ok { + return v + } + val := f() + c.data[key] = val + return val +} + +// instrumentCache is a cache of instruments. It is scoped at the Meter level +// along with a number type. Meaning all instruments it contains need to belong +// to the same instrumentation.Scope (implicitly) and number type (explicitly). +type instrumentCache[N int64 | float64] struct { + // aggregators is used to ensure duplicate creations of the same instrument + // return the same instance of that instrument's aggregator. + aggregators *cache[instrumentID, aggVal[N]] + // views is used to ensure if instruments with the same name are created, + // but do not have the same identifying properties, a warning is logged. + views *cache[string, instrumentID] +} + +// newInstrumentCache returns a new instrumentCache that uses ac as the +// underlying cache for aggregators and vc as the cache for views. If ac or vc +// are nil, a new empty cache will be used. +func newInstrumentCache[N int64 | float64](ac *cache[instrumentID, aggVal[N]], vc *cache[string, instrumentID]) instrumentCache[N] { + if ac == nil { + ac = &cache[instrumentID, aggVal[N]]{} + } + if vc == nil { + vc = &cache[string, instrumentID]{} + } + return instrumentCache[N]{aggregators: ac, views: vc} +} + +// LookupAggregator returns the Aggregator and error for a cached instrument if +// it exist in the cache. Otherwise, f is called and its returned value is set +// in the cache and returned. +// +// LookupAggregator is safe to call concurrently. +func (c instrumentCache[N]) LookupAggregator(id instrumentID, f func() (internal.Aggregator[N], error)) (agg internal.Aggregator[N], err error) { + v := c.aggregators.Lookup(id, func() aggVal[N] { + a, err := f() + return aggVal[N]{Aggregator: a, Err: err} + }) + return v.Aggregator, v.Err +} + +// aggVal is the cached value of an instrumentCache's aggregators cache. +type aggVal[N int64 | float64] struct { + Aggregator internal.Aggregator[N] + Err error +} + +// Unique returns if id is unique or a duplicate instrument. If an instrument +// with the same name has already been created, that instrumentID will be +// returned along with false. Otherwise, id is returned with true. +// +// Unique is safe to call concurrently. +func (c instrumentCache[N]) Unique(id instrumentID) (instrumentID, bool) { + got := c.views.Lookup(id.Name, func() instrumentID { return id }) + return got, id == got +} diff --git a/sdk/metric/cache_test.go b/sdk/metric/cache_test.go new file mode 100644 index 00000000000..47332a58cbd --- /dev/null +++ b/sdk/metric/cache_test.go @@ -0,0 +1,76 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric // import "go.opentelemetry.io/otel/sdk/metric" + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCache(t *testing.T) { + k0, k1 := "one", "two" + v0, v1 := 1, 2 + + c := cache[string, int]{} + + var got int + require.NotPanics(t, func() { + got = c.Lookup(k0, func() int { return v0 }) + }, "zero-value cache panics on Lookup") + assert.Equal(t, v0, got, "zero-value cache did not return fallback") + + assert.Equal(t, v0, c.Lookup(k0, func() int { return v1 }), "existing key") + + assert.Equal(t, v1, c.Lookup(k1, func() int { return v1 }), "non-existing key") +} + +func TestCacheConcurrency(t *testing.T) { + const ( + key = "k" + goroutines = 10 + timeoutSec = 5 + ) + + c := cache[string, int]{} + var wg sync.WaitGroup + for n := 0; n < goroutines; n++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + assert.NotPanics(t, func() { + c.Lookup(key, func() int { return i }) + }) + }(n) + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + assert.Eventually(t, func() bool { + select { + case <-done: + return true + default: + return false + } + }, timeoutSec*time.Second, 10*time.Millisecond) +} diff --git a/sdk/metric/instrument.go b/sdk/metric/instrument.go index 5e7b457ab7f..48440280e42 100644 --- a/sdk/metric/instrument.go +++ b/sdk/metric/instrument.go @@ -23,9 +23,32 @@ import ( "go.opentelemetry.io/otel/metric/instrument/asyncint64" "go.opentelemetry.io/otel/metric/instrument/syncfloat64" "go.opentelemetry.io/otel/metric/instrument/syncint64" + "go.opentelemetry.io/otel/metric/unit" "go.opentelemetry.io/otel/sdk/metric/internal" + "go.opentelemetry.io/otel/sdk/metric/metricdata" ) +// instrumentID are the identifying properties of an instrument. +type instrumentID struct { + // Name is the name of the instrument. + Name string + // Description is the description of the instrument. + Description string + // Unit is the unit of the instrument. + Unit unit.Unit + // Aggregation is the aggregation data type of the instrument. + Aggregation string + // Monotonic is the monotonicity of an instruments data type. This field is + // not used for all data types, so a zero value needs to be understood in the + // context of Aggregation. + Monotonic bool + // Temporality is the temporality of an instrument's data type. This field + // is not used by some data types. + Temporality metricdata.Temporality + // Number is the number type of the instrument. + Number string +} + type instrumentImpl[N int64 | float64] struct { instrument.Asynchronous instrument.Synchronous diff --git a/sdk/metric/internal/aggregator.go b/sdk/metric/internal/aggregator.go index 1f70ae7c1ee..952e9a4a8bd 100644 --- a/sdk/metric/internal/aggregator.go +++ b/sdk/metric/internal/aggregator.go @@ -26,6 +26,9 @@ import ( var now = time.Now // Aggregator forms an aggregation from a collection of recorded measurements. +// +// Aggregators need to be comparable so they can be de-duplicated by the SDK when +// it creates them for multiple views. type Aggregator[N int64 | float64] interface { // Aggregate records the measurement, scoped by attr, and aggregates it // into an aggregation. diff --git a/sdk/metric/internal/aggregator_test.go b/sdk/metric/internal/aggregator_test.go index 200d3da6268..a544a18ca21 100644 --- a/sdk/metric/internal/aggregator_test.go +++ b/sdk/metric/internal/aggregator_test.go @@ -20,6 +20,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" @@ -80,23 +82,31 @@ type aggregatorTester[N int64 | float64] struct { func (at *aggregatorTester[N]) Run(a Aggregator[N], incr setMap, eFunc expectFunc) func(*testing.T) { m := at.MeasurementN * at.GoroutineN return func(t *testing.T) { - for i := 0; i < at.CycleN; i++ { - var wg sync.WaitGroup - wg.Add(at.GoroutineN) - for i := 0; i < at.GoroutineN; i++ { - go func() { - defer wg.Done() - for j := 0; j < at.MeasurementN; j++ { - for attrs, n := range incr { - a.Aggregate(N(n), attrs) + t.Run("Comparable", func(t *testing.T) { + assert.NotPanics(t, func() { + _ = map[Aggregator[N]]struct{}{a: {}} + }) + }) + + t.Run("Correctness", func(t *testing.T) { + for i := 0; i < at.CycleN; i++ { + var wg sync.WaitGroup + wg.Add(at.GoroutineN) + for j := 0; j < at.GoroutineN; j++ { + go func() { + defer wg.Done() + for k := 0; k < at.MeasurementN; k++ { + for attrs, n := range incr { + a.Aggregate(N(n), attrs) + } } - } - }() - } - wg.Wait() + }() + } + wg.Wait() - metricdatatest.AssertAggregationsEqual(t, eFunc(m), a.Aggregation()) - } + metricdatatest.AssertAggregationsEqual(t, eFunc(m), a.Aggregation()) + } + }) } } diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 82d5a5269be..c6871a06716 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -55,10 +55,7 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter { defer r.Unlock() if r.meters == nil { - m := &meter{ - Scope: s, - pipes: r.pipes, - } + m := newMeter(s, r.pipes) r.meters = map[instrumentation.Scope]*meter{s: m} return m } @@ -68,10 +65,7 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter { return m } - m = &meter{ - Scope: s, - pipes: r.pipes, - } + m = newMeter(s, r.pipes) r.meters[s] = m return m } @@ -83,20 +77,45 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter { type meter struct { instrumentation.Scope + // *Resolvers are used by the provided instrument providers to resolve new + // instruments aggregators and maintain a cache across instruments this + // meter owns. + int64Resolver resolver[int64] + float64Resolver resolver[float64] + pipes pipelines } +func newMeter(s instrumentation.Scope, p pipelines) *meter { + // viewCache ensures instrument conflicts, including number conflicts, this + // meter is asked to create are logged to the user. + var viewCache cache[string, instrumentID] + + // Passing nil as the ac parameter to newInstrumentCache will have each + // create its own aggregator cache. + ic := newInstrumentCache[int64](nil, &viewCache) + fc := newInstrumentCache[float64](nil, &viewCache) + + return &meter{ + Scope: s, + pipes: p, + + int64Resolver: newResolver(p, ic), + float64Resolver: newResolver(p, fc), + } +} + // Compile-time check meter implements metric.Meter. var _ metric.Meter = (*meter)(nil) // AsyncInt64 returns the asynchronous integer instrument provider. func (m *meter) AsyncInt64() asyncint64.InstrumentProvider { - return asyncInt64Provider{scope: m.Scope, resolve: newResolver[int64](m.pipes)} + return asyncInt64Provider{scope: m.Scope, resolve: &m.int64Resolver} } // AsyncFloat64 returns the asynchronous floating-point instrument provider. func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider { - return asyncFloat64Provider{scope: m.Scope, resolve: newResolver[float64](m.pipes)} + return asyncFloat64Provider{scope: m.Scope, resolve: &m.float64Resolver} } // RegisterCallback registers the function f to be called when any of the @@ -108,10 +127,10 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context // SyncInt64 returns the synchronous integer instrument provider. func (m *meter) SyncInt64() syncint64.InstrumentProvider { - return syncInt64Provider{scope: m.Scope, resolve: newResolver[int64](m.pipes)} + return syncInt64Provider{scope: m.Scope, resolve: &m.int64Resolver} } // SyncFloat64 returns the synchronous floating-point instrument provider. func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider { - return syncFloat64Provider{scope: m.Scope, resolve: newResolver[float64](m.pipes)} + return syncFloat64Provider{scope: m.Scope, resolve: &m.float64Resolver} } diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 9eb86f593ac..83de81ccc61 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -21,6 +21,7 @@ import ( "strings" "sync" + "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/metric/unit" "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric/aggregation" @@ -34,28 +35,19 @@ var ( errCreatingAggregators = errors.New("could not create all aggregators") errIncompatibleAggregation = errors.New("incompatible aggregation") errUnknownAggregation = errors.New("unrecognized aggregation") + errUnknownTemporality = errors.New("unrecognized temporality") ) type aggregator interface { Aggregation() metricdata.Aggregation } -// instrumentID is used to identify multiple instruments being mapped to the -// same aggregator. e.g. using an exact match view with a name=* view. You -// can't use a view.Instrument here because not all Aggregators are comparable. -type instrumentID struct { - scope instrumentation.Scope +// instrumentSync is a synchronization point between a pipeline and an +// instrument's Aggregators. +type instrumentSync struct { name string description string -} - -type instrumentKey struct { - name string - unit unit.Unit -} - -type instrumentValue struct { - description string + unit unit.Unit aggregator aggregator } @@ -67,7 +59,7 @@ func newPipeline(res *resource.Resource, reader Reader, views []view.View) *pipe resource: res, reader: reader, views: views, - aggregations: make(map[instrumentation.Scope]map[instrumentKey]instrumentValue), + aggregations: make(map[instrumentation.Scope][]instrumentSync), } } @@ -83,36 +75,23 @@ type pipeline struct { views []view.View sync.Mutex - aggregations map[instrumentation.Scope]map[instrumentKey]instrumentValue + aggregations map[instrumentation.Scope][]instrumentSync callbacks []func(context.Context) } -var errAlreadyRegistered = errors.New("instrument already registered") - -// addAggregator will stores an aggregator with an instrument description. The aggregator -// is used when `produce()` is called. -func (p *pipeline) addAggregator(scope instrumentation.Scope, name, description string, instUnit unit.Unit, agg aggregator) error { +// addSync adds the instrumentSync to pipeline p with scope. This method is not +// idempotent. Duplicate calls will result in duplicate additions, it is the +// callers responsibility to ensure this is called with unique values. +func (p *pipeline) addSync(scope instrumentation.Scope, iSync instrumentSync) { p.Lock() defer p.Unlock() if p.aggregations == nil { - p.aggregations = map[instrumentation.Scope]map[instrumentKey]instrumentValue{} - } - if p.aggregations[scope] == nil { - p.aggregations[scope] = map[instrumentKey]instrumentValue{} - } - inst := instrumentKey{ - name: name, - unit: instUnit, - } - if _, ok := p.aggregations[scope][inst]; ok { - return fmt.Errorf("%w: name %s, scope: %s", errAlreadyRegistered, name, scope) - } - - p.aggregations[scope][inst] = instrumentValue{ - description: description, - aggregator: agg, + p.aggregations = map[instrumentation.Scope][]instrumentSync{ + scope: {iSync}, + } + return } - return nil + p.aggregations[scope] = append(p.aggregations[scope], iSync) } // addCallback registers a callback to be run when `produce()` is called. @@ -151,12 +130,12 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err sm := make([]metricdata.ScopeMetrics, 0, len(p.aggregations)) for scope, instruments := range p.aggregations { metrics := make([]metricdata.Metrics, 0, len(instruments)) - for inst, instValue := range instruments { - data := instValue.aggregator.Aggregation() + for _, inst := range instruments { + data := inst.aggregator.Aggregation() if data != nil { metrics = append(metrics, metricdata.Metrics{ Name: inst.name, - Description: instValue.description, + Description: inst.description, Unit: inst.unit, Data: data, }) @@ -178,20 +157,45 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err // inserter facilitates inserting of new instruments into a pipeline. type inserter[N int64 | float64] struct { + cache instrumentCache[N] pipeline *pipeline } -func newInserter[N int64 | float64](p *pipeline) *inserter[N] { - return &inserter[N]{p} +func newInserter[N int64 | float64](p *pipeline, c instrumentCache[N]) *inserter[N] { + return &inserter[N]{cache: c, pipeline: p} } -// Instrument inserts instrument inst with instUnit returning the Aggregators -// that need to be updated with measurments for that instrument. +// Instrument inserts the instrument inst with instUnit into a pipeline. All +// views the pipeline contains are matched against, and any matching view that +// creates a unique Aggregator will be inserted into the pipeline and included +// in the returned slice. +// +// The returned Aggregators are ensured to be deduplicated and unique. If +// another view in another pipeline that is cached by this inserter's cache has +// already inserted the same Aggregator for the same instrument, that +// Aggregator instance is returned. +// +// If another instrument has already been inserted by this inserter, or any +// other using the same cache, and it conflicts with the instrument being +// inserted in this call, an Aggregator matching the arguments will still be +// returned but an Info level log message will also be logged to the OTel +// global logger. +// +// If the passed instrument would result in an incompatible Aggregator, an +// error is returned and that Aggregator is not inserted or returned. +// +// If an instrument is determined to use a Drop aggregation, that instrument is +// not inserted nor returned. func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) { - var matched bool - seen := map[instrumentID]struct{}{} - var aggs []internal.Aggregator[N] + var ( + matched bool + aggs []internal.Aggregator[N] + ) + errs := &multierror{wrapped: errCreatingAggregators} + // The cache will return the same Aggregator instance. Use this fact to + // compare pointer addresses to deduplicate Aggregators. + seen := make(map[internal.Aggregator[N]]struct{}) for _, v := range i.pipeline.views { inst, match := v.TransformInstrument(inst) if !match { @@ -199,53 +203,51 @@ func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]in } matched = true - id := instrumentID{ - scope: inst.Scope, - name: inst.Name, - description: inst.Description, - } - if _, ok := seen[id]; ok { - continue - } - - agg, err := i.aggregator(inst) + agg, err := i.cachedAggregator(inst, instUnit) if err != nil { errs.append(err) - continue } if agg == nil { // Drop aggregator. continue } - // TODO (#3011): If filtering is done at the instrument level add here. - // This is where the aggregator and the view are both in scope. - aggs = append(aggs, agg) - seen[id] = struct{}{} - err = i.pipeline.addAggregator(inst.Scope, inst.Name, inst.Description, instUnit, agg) - if err != nil { - errs.append(err) + if _, ok := seen[agg]; ok { + // This aggregator has already been added. + continue } + seen[agg] = struct{}{} + aggs = append(aggs, agg) } - if !matched { // Apply implicit default view if no explicit matched. - a, err := i.aggregator(inst) - if err != nil { - errs.append(err) - } - if a != nil { - aggs = append(aggs, a) - err = i.pipeline.addAggregator(inst.Scope, inst.Name, inst.Description, instUnit, a) - if err != nil { - errs.append(err) - } - } + if matched { + return aggs, errs.errorOrNil() } + // Apply implicit default view if no explicit matched. + agg, err := i.cachedAggregator(inst, instUnit) + if err != nil { + errs.append(err) + } + if agg != nil { + // Ensured to have not seen given matched was false. + aggs = append(aggs, agg) + } return aggs, errs.errorOrNil() } -// aggregator returns the Aggregator for an instrument configuration. If the -// instrument defines an unknown aggregation, an error is returned. -func (i *inserter[N]) aggregator(inst view.Instrument) (internal.Aggregator[N], error) { +// cachedAggregator returns the appropriate Aggregator for an instrument +// configuration. If the exact instrument has been created within the +// inst.Scope, that Aggregator instance will be returned. Otherwise, a new +// computed Aggregator will be cached and returned. +// +// If the instrument configuration conflicts with an instrument that has +// already been created (e.g. description, unit, data type) a warning will be +// logged at the "Info" level with the global OTel logger. A valid new +// Aggregator for the instrument configuration will still be returned without +// an error. +// +// If the instrument defines an unknown or incompatible aggregation, an error +// is returned. +func (i *inserter[N]) cachedAggregator(inst view.Instrument, u unit.Unit) (internal.Aggregator[N], error) { switch inst.Aggregation.(type) { case nil, aggregation.Default: // Undefined, nil, means to use the default from the reader. @@ -259,31 +261,94 @@ func (i *inserter[N]) aggregator(inst view.Instrument) (internal.Aggregator[N], ) } - var ( - temporality = i.pipeline.reader.temporality(inst.Kind) - monotonic bool + id := i.instrumentID(inst, u) + // If there is a conflict, the specification says the view should + // still be applied and a warning should be logged. + i.logConflict(id) + return i.cache.LookupAggregator(id, func() (internal.Aggregator[N], error) { + agg, err := i.aggregator(inst.Aggregation, id.Temporality, id.Monotonic) + if err != nil { + return nil, err + } + if agg == nil { // Drop aggregator. + return nil, nil + } + i.pipeline.addSync(inst.Scope, instrumentSync{ + name: inst.Name, + description: inst.Description, + unit: u, + aggregator: agg, + }) + return agg, err + }) +} + +// logConflict validates if an instrument with the same name as id has already +// been created. If that instrument conflicts with id, a warning is logged. +func (i *inserter[N]) logConflict(id instrumentID) { + existing, unique := i.cache.Unique(id) + if unique { + return + } + + global.Info( + "duplicate metric stream definitions", + "names", fmt.Sprintf("%q, %q", existing.Name, id.Name), + "descriptions", fmt.Sprintf("%q, %q", existing.Description, id.Description), + "units", fmt.Sprintf("%s, %s", existing.Unit, id.Unit), + "numbers", fmt.Sprintf("%s, %s", existing.Number, id.Number), + "aggregations", fmt.Sprintf("%s, %s", existing.Aggregation, id.Aggregation), + "monotonics", fmt.Sprintf("%t, %t", existing.Monotonic, id.Monotonic), + "temporalities", fmt.Sprintf("%s, %s", existing.Temporality.String(), id.Temporality.String()), ) +} + +func (i *inserter[N]) instrumentID(vi view.Instrument, u unit.Unit) instrumentID { + var zero N + id := instrumentID{ + Name: vi.Name, + Description: vi.Description, + Unit: u, + Aggregation: fmt.Sprintf("%T", vi.Aggregation), + Temporality: i.pipeline.reader.temporality(vi.Kind), + Number: fmt.Sprintf("%T", zero), + } - switch inst.Kind { + switch vi.Kind { case view.AsyncCounter, view.SyncCounter, view.SyncHistogram: - monotonic = true + id.Monotonic = true } - switch agg := inst.Aggregation.(type) { + return id +} + +// aggregator returns a new Aggregator matching agg, temporality, and +// monotonic. If the agg is unknown or temporality is invalid, an error is +// returned. +func (i *inserter[N]) aggregator(agg aggregation.Aggregation, temporality metricdata.Temporality, monotonic bool) (internal.Aggregator[N], error) { + switch a := agg.(type) { case aggregation.Drop: return nil, nil case aggregation.LastValue: return internal.NewLastValue[N](), nil case aggregation.Sum: - if temporality == metricdata.CumulativeTemporality { + switch temporality { + case metricdata.CumulativeTemporality: return internal.NewCumulativeSum[N](monotonic), nil + case metricdata.DeltaTemporality: + return internal.NewDeltaSum[N](monotonic), nil + default: + return nil, fmt.Errorf("%w: %s(%d)", errUnknownTemporality, temporality.String(), temporality) } - return internal.NewDeltaSum[N](monotonic), nil case aggregation.ExplicitBucketHistogram: - if temporality == metricdata.CumulativeTemporality { - return internal.NewCumulativeHistogram[N](agg), nil + switch temporality { + case metricdata.CumulativeTemporality: + return internal.NewCumulativeHistogram[N](a), nil + case metricdata.DeltaTemporality: + return internal.NewDeltaHistogram[N](a), nil + default: + return nil, fmt.Errorf("%w: %s(%d)", errUnknownTemporality, temporality.String(), temporality) } - return internal.NewDeltaHistogram[N](agg), nil } return nil, errUnknownAggregation } @@ -364,17 +429,17 @@ type resolver[N int64 | float64] struct { inserters []*inserter[N] } -func newResolver[N int64 | float64](p pipelines) *resolver[N] { +func newResolver[N int64 | float64](p pipelines, c instrumentCache[N]) resolver[N] { in := make([]*inserter[N], len(p)) for i := range in { - in[i] = newInserter[N](p[i]) + in[i] = newInserter(p[i], c) } - return &resolver[N]{in} + return resolver[N]{in} } // Aggregators returns the Aggregators instrument inst needs to update when it // makes a measurement. -func (r *resolver[N]) Aggregators(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) { +func (r resolver[N]) Aggregators(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) { var aggs []internal.Aggregator[N] errs := &multierror{} diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index f89a09360ba..45bc23ccb18 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -15,11 +15,15 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( + "sync/atomic" "testing" + "github.com/go-logr/logr" + "github.com/go-logr/logr/testr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric/unit" "go.opentelemetry.io/otel/sdk/metric/aggregation" @@ -211,7 +215,8 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { } for _, tt := range testcases { t.Run(tt.name, func(t *testing.T) { - i := newInserter[N](newPipeline(nil, tt.reader, tt.views)) + c := newInstrumentCache[N](nil, nil) + i := newInserter(newPipeline(nil, tt.reader, tt.views), c) got, err := i.Instrument(tt.inst, unit.Dimensionless) assert.ErrorIs(t, err, tt.wantErr) require.Len(t, got, tt.wantLen) @@ -223,7 +228,8 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { } func testInvalidInstrumentShouldPanic[N int64 | float64]() { - i := newInserter[N](newPipeline(nil, NewManualReader(), []view.View{{}})) + c := newInstrumentCache[N](nil, nil) + i := newInserter(newPipeline(nil, NewManualReader(), []view.View{{}}), c) inst := view.Instrument{ Name: "foo", Kind: view.InstrumentKind(255), @@ -334,7 +340,8 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCount int) { inst := view.Instrument{Name: "foo", Kind: view.SyncCounter} - r := newResolver[int64](p) + c := newInstrumentCache[int64](nil, nil) + r := newResolver(p, c) aggs, err := r.Aggregators(inst, unit.Dimensionless) assert.NoError(t, err) @@ -344,7 +351,8 @@ func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCo func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, wantCount int) { inst := view.Instrument{Name: "foo", Kind: view.SyncCounter} - r := newResolver[float64](p) + c := newInstrumentCache[float64](nil, nil) + r := newResolver(p, c) aggs, err := r.Aggregators(inst, unit.Dimensionless) assert.NoError(t, err) @@ -375,20 +383,40 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { p := newPipelines(resource.Empty(), views) inst := view.Instrument{Name: "foo", Kind: view.AsyncGauge} - ri := newResolver[int64](p) + vc := cache[string, instrumentID]{} + ri := newResolver(p, newInstrumentCache[int64](nil, &vc)) intAggs, err := ri.Aggregators(inst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, intAggs, 0) p = newPipelines(resource.Empty(), views) - rf := newResolver[float64](p) + rf := newResolver(p, newInstrumentCache[float64](nil, &vc)) floatAggs, err := rf.Aggregators(inst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, floatAggs, 0) } -func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) { +type logCounter struct { + logr.LogSink + + infoN uint32 +} + +func (l *logCounter) Info(level int, msg string, keysAndValues ...interface{}) { + atomic.AddUint32(&l.infoN, 1) + l.LogSink.Info(level, msg, keysAndValues...) +} + +func (l *logCounter) InfoN() int { + return int(atomic.SwapUint32(&l.infoN, 0)) +} + +func TestResolveAggregatorsDuplicateErrors(t *testing.T) { + tLog := testr.NewWithOptions(t, testr.Options{Verbosity: 6}) + l := &logCounter{LogSink: tLog.GetSink()} + otel.SetLogger(logr.New(l)) + renameView, _ := view.New( view.MatchInstrumentName("bar"), view.WithRename("foo"), @@ -405,29 +433,40 @@ func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) { p := newPipelines(resource.Empty(), views) - ri := newResolver[int64](p) + vc := cache[string, instrumentID]{} + ri := newResolver(p, newInstrumentCache[int64](nil, &vc)) intAggs, err := ri.Aggregators(fooInst, unit.Dimensionless) assert.NoError(t, err) + assert.Equal(t, 0, l.InfoN(), "no info logging should happen") assert.Len(t, intAggs, 1) - // The Rename view should error, because it creates a foo instrument. + // The Rename view should produce the same instrument without an error, the + // default view should also cause a new aggregator to be returned. intAggs, err = ri.Aggregators(barInst, unit.Dimensionless) - assert.Error(t, err) + assert.NoError(t, err) + assert.Equal(t, 0, l.InfoN(), "no info logging should happen") assert.Len(t, intAggs, 2) - // Creating a float foo instrument should error because there is an int foo instrument. - rf := newResolver[float64](p) + // Creating a float foo instrument should log a warning because there is an + // int foo instrument. + rf := newResolver(p, newInstrumentCache[float64](nil, &vc)) floatAggs, err := rf.Aggregators(fooInst, unit.Dimensionless) - assert.Error(t, err) + assert.NoError(t, err) + assert.Equal(t, 1, l.InfoN(), "instrument conflict not logged") assert.Len(t, floatAggs, 1) fooInst = view.Instrument{Name: "foo-float", Kind: view.SyncCounter} - _, err = rf.Aggregators(fooInst, unit.Dimensionless) + floatAggs, err = rf.Aggregators(fooInst, unit.Dimensionless) assert.NoError(t, err) + assert.Equal(t, 0, l.InfoN(), "no info logging should happen") + assert.Len(t, floatAggs, 1) floatAggs, err = rf.Aggregators(barInst, unit.Dimensionless) - assert.Error(t, err) + assert.NoError(t, err) + // Both the rename and default view aggregators created above should now + // conflict. Therefore, 2 warning messages should be logged. + assert.Equal(t, 2, l.InfoN(), "instrument conflicts not logged") assert.Len(t, floatAggs, 2) } diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index 7a3321da0c1..e47bb6b5f64 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -16,6 +16,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "context" + "fmt" "sync" "testing" @@ -49,8 +50,10 @@ func TestEmptyPipeline(t *testing.T) { assert.Nil(t, output.Resource) assert.Len(t, output.ScopeMetrics, 0) - err = pipe.addAggregator(instrumentation.Scope{}, "name", "desc", unit.Dimensionless, testSumAggregator{}) - assert.NoError(t, err) + iSync := instrumentSync{"name", "desc", unit.Dimensionless, testSumAggregator{}} + assert.NotPanics(t, func() { + pipe.addSync(instrumentation.Scope{}, iSync) + }) require.NotPanics(t, func() { pipe.addCallback(func(ctx context.Context) {}) @@ -71,8 +74,10 @@ func TestNewPipeline(t *testing.T) { assert.Equal(t, resource.Empty(), output.Resource) assert.Len(t, output.ScopeMetrics, 0) - err = pipe.addAggregator(instrumentation.Scope{}, "name", "desc", unit.Dimensionless, testSumAggregator{}) - assert.NoError(t, err) + iSync := instrumentSync{"name", "desc", unit.Dimensionless, testSumAggregator{}} + assert.NotPanics(t, func() { + pipe.addSync(instrumentation.Scope{}, iSync) + }) require.NotPanics(t, func() { pipe.addCallback(func(ctx context.Context) {}) @@ -85,99 +90,6 @@ func TestNewPipeline(t *testing.T) { require.Len(t, output.ScopeMetrics[0].Metrics, 1) } -func TestPipelineDuplicateRegistration(t *testing.T) { - type instrumentID struct { - scope instrumentation.Scope - name string - description string - unit unit.Unit - } - testCases := []struct { - name string - secondInst instrumentID - want error - wantScopeLen int - wantMetricsLen int - }{ - { - name: "exact should error", - secondInst: instrumentID{ - scope: instrumentation.Scope{}, - name: "name", - description: "desc", - unit: unit.Dimensionless, - }, - want: errAlreadyRegistered, - wantScopeLen: 1, - wantMetricsLen: 1, - }, - { - name: "description should not be identifying", - secondInst: instrumentID{ - scope: instrumentation.Scope{}, - name: "name", - description: "other desc", - unit: unit.Dimensionless, - }, - want: errAlreadyRegistered, - wantScopeLen: 1, - wantMetricsLen: 1, - }, - { - name: "scope should be identifying", - secondInst: instrumentID{ - scope: instrumentation.Scope{ - Name: "newScope", - }, - name: "name", - description: "desc", - unit: unit.Dimensionless, - }, - wantScopeLen: 2, - wantMetricsLen: 1, - }, - { - name: "name should be identifying", - secondInst: instrumentID{ - scope: instrumentation.Scope{}, - name: "newName", - description: "desc", - unit: unit.Dimensionless, - }, - wantScopeLen: 1, - wantMetricsLen: 2, - }, - { - name: "unit should be identifying", - secondInst: instrumentID{ - scope: instrumentation.Scope{}, - name: "name", - description: "desc", - unit: unit.Bytes, - }, - wantScopeLen: 1, - wantMetricsLen: 2, - }, - } - for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - pipe := newPipeline(nil, nil, nil) - err := pipe.addAggregator(instrumentation.Scope{}, "name", "desc", unit.Dimensionless, testSumAggregator{}) - require.NoError(t, err) - - err = pipe.addAggregator(tt.secondInst.scope, tt.secondInst.name, tt.secondInst.description, tt.secondInst.unit, testSumAggregator{}) - assert.ErrorIs(t, err, tt.want) - - if tt.wantScopeLen > 0 { - output, err := pipe.produce(context.Background()) - assert.NoError(t, err) - require.Len(t, output.ScopeMetrics, tt.wantScopeLen) - require.Len(t, output.ScopeMetrics[0].Metrics, tt.wantMetricsLen) - } - }) - } -} - func TestPipelineUsesResource(t *testing.T) { res := resource.NewWithAttributes("noSchema", attribute.String("test", "resource")) pipe := newPipeline(res, nil, nil) @@ -201,10 +113,12 @@ func TestPipelineConcurrency(t *testing.T) { }() wg.Add(1) - go func() { + go func(n int) { defer wg.Done() - _ = pipe.addAggregator(instrumentation.Scope{}, "name", "desc", unit.Dimensionless, testSumAggregator{}) - }() + name := fmt.Sprintf("name %d", n) + sync := instrumentSync{name, "desc", unit.Dimensionless, testSumAggregator{}} + pipe.addSync(instrumentation.Scope{}, sync) + }(i) wg.Add(1) go func() { @@ -249,7 +163,8 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - i := newInserter[N](test.pipe) + c := newInstrumentCache[N](nil, nil) + i := newInserter(test.pipe, c) got, err := i.Instrument(inst, unit.Dimensionless) require.NoError(t, err) assert.Len(t, got, 1, "default view not applied")