From bc44d82fa156641e51d8f80fd9540f4949349fac Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Thu, 29 Sep 2022 14:39:02 -0700 Subject: [PATCH 01/22] Add the cache type --- sdk/metric/cache.go | 54 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100644 sdk/metric/cache.go diff --git a/sdk/metric/cache.go b/sdk/metric/cache.go new file mode 100644 index 00000000000..110e4900577 --- /dev/null +++ b/sdk/metric/cache.go @@ -0,0 +1,54 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric // import "go.opentelemetry.io/otel/sdk/metric" + +import ( + "sync" +) + +// cache is a locking storage used to quickly return already computed values. +// +// The zero value of a cache is empty and ready to use. +// +// A cache must not be copied after first use. +// +// All methods of a cache are safe to call concurrently. +type cache[K comparable, V any] struct { + sync.Mutex + data map[K]V +} + +// Lookup returns the value stored in the cache with the accociated key if it +// exists. Otherwise, f is called and its returned value is set in the cache +// for key and returned. +// +// Lookup is safe to call concurrently. It will hold the cache lock, so f +// should not block excessively. +func (c *cache[K, V]) Lookup(key K, f func() V) V { + c.Lock() + defer c.Unlock() + + if c.data == nil { + val := f() + c.data = map[K]V{key: val} + return val + } + if v, ok := c.data[key]; ok { + return v + } + val := f() + c.data[key] = val + return val +} From 29ea02dfa0395beb5ae41be8215fd39bbd12bbbb Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Thu, 29 Sep 2022 14:39:49 -0700 Subject: [PATCH 02/22] Add cache unit tests --- sdk/metric/cache_test.go | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 sdk/metric/cache_test.go diff --git a/sdk/metric/cache_test.go b/sdk/metric/cache_test.go new file mode 100644 index 00000000000..4a27b4256d1 --- /dev/null +++ b/sdk/metric/cache_test.go @@ -0,0 +1,39 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric // import "go.opentelemetry.io/otel/sdk/metric" + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCache(t *testing.T) { + k0, k1 := "one", "two" + v0, v1 := 1, 2 + + c := cache[string, int]{} + + var got int + require.NotPanics(t, func() { + got = c.Lookup(k0, func() int { return v0 }) + }, "zero-value cache panics on Lookup") + assert.Equal(t, v0, got, "zero-value cache did not return fallback") + + assert.Equal(t, v0, c.Lookup(k0, func() int { return v1 }), "existing key") + + assert.Equal(t, v1, c.Lookup(k1, func() int { return v1 }), "non-existing key") +} From 87324d8a4b27b71f100515ce384ac002bc2fc851 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Thu, 29 Sep 2022 14:48:21 -0700 Subject: [PATCH 03/22] Test cache concurrency --- sdk/metric/cache_test.go | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/sdk/metric/cache_test.go b/sdk/metric/cache_test.go index 4a27b4256d1..47332a58cbd 100644 --- a/sdk/metric/cache_test.go +++ b/sdk/metric/cache_test.go @@ -15,7 +15,9 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( + "sync" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -37,3 +39,38 @@ func TestCache(t *testing.T) { assert.Equal(t, v1, c.Lookup(k1, func() int { return v1 }), "non-existing key") } + +func TestCacheConcurrency(t *testing.T) { + const ( + key = "k" + goroutines = 10 + timeoutSec = 5 + ) + + c := cache[string, int]{} + var wg sync.WaitGroup + for n := 0; n < goroutines; n++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + assert.NotPanics(t, func() { + c.Lookup(key, func() int { return i }) + }) + }(n) + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + assert.Eventually(t, func() bool { + select { + case <-done: + return true + default: + return false + } + }, timeoutSec*time.Second, 10*time.Millisecond) +} From 010b999655f6dff81de2983aab59009057781d92 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Thu, 29 Sep 2022 14:50:00 -0700 Subject: [PATCH 04/22] Add the instrumentCache --- sdk/metric/cache.go | 56 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/sdk/metric/cache.go b/sdk/metric/cache.go index 110e4900577..b75e7ea5402 100644 --- a/sdk/metric/cache.go +++ b/sdk/metric/cache.go @@ -16,6 +16,8 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "sync" + + "go.opentelemetry.io/otel/sdk/metric/internal" ) // cache is a locking storage used to quickly return already computed values. @@ -52,3 +54,57 @@ func (c *cache[K, V]) Lookup(key K, f func() V) V { c.data[key] = val return val } + +// instrumentCache is a cache of instruments. It is scoped at the Meter level +// along with a number type. Meaning all instruments it contains need to belong +// to the same instrumentation.Scope (implicitly) and number type (explicitly). +type instrumentCache[N int64 | float64] struct { + // aggregators is used to ensure duplicate creations of the same instrument + // return the same instance of that instrument's aggregator. + aggregators *cache[instrumentID, aggVal[N]] + // views is used to ensure if instruments with the same name are created, + // but do not have the same identifying properties, a warning is logged. + views *cache[string, instrumentID] +} + +// newInstrumentCache returns a new instrumentCache that uses ac as the +// underlying cache for aggregators and vc as the cache for views. If ac or vc +// are nil, a new empty cache will be used. +func newInstrumentCache[N int64 | float64](ac *cache[instrumentID, aggVal[N]], vc *cache[string, instrumentID]) instrumentCache[N] { + if ac == nil { + ac = &cache[instrumentID, aggVal[N]]{} + } + if vc == nil { + vc = &cache[string, instrumentID]{} + } + return instrumentCache[N]{aggregators: ac, views: vc} +} + +// LookupAggregator returns the Aggregator and error for a cached instrument if +// it exist in the cache. Otherwise, f is called and its returned value is set +// in the cache and returned. +// +// LookupAggregator is safe to call concurrently. +func (c instrumentCache[N]) LookupAggregator(id instrumentID, f func() (internal.Aggregator[N], error)) (agg internal.Aggregator[N], err error) { + v := c.aggregators.Lookup(id, func() aggVal[N] { + a, err := f() + return aggVal[N]{Aggregator: a, Err: err} + }) + return v.Aggregator, v.Err +} + +// aggVal is the cached value of an instrumentCache's aggregators cache. +type aggVal[N int64 | float64] struct { + Aggregator internal.Aggregator[N] + Err error +} + +// Unique returns if id is unique or a duplicate instrument. If an instrument +// with the same name has already been created, that instrumentID will be +// returned along with false. Otherwise, id is returned with true. +// +// Unique is safe to call concurrently. +func (c instrumentCache[N]) Unique(id instrumentID) (instrumentID, bool) { + got := c.views.Lookup(id.Name, func() instrumentID { return id }) + return got, id == got +} From 8889f3217ab5b2a2be7e55e5638107ed10552fa3 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 30 Sep 2022 13:10:46 -0700 Subject: [PATCH 05/22] Use the instrumentCache to deduplicate creation --- sdk/metric/instrument.go | 23 ++++ sdk/metric/meter.go | 25 +++- sdk/metric/pipeline.go | 179 ++++++++++++++++++--------- sdk/metric/pipeline_registry_test.go | 71 ++++++++--- 4 files changed, 220 insertions(+), 78 deletions(-) diff --git a/sdk/metric/instrument.go b/sdk/metric/instrument.go index 5e7b457ab7f..7ea7ef87c68 100644 --- a/sdk/metric/instrument.go +++ b/sdk/metric/instrument.go @@ -23,9 +23,32 @@ import ( "go.opentelemetry.io/otel/metric/instrument/asyncint64" "go.opentelemetry.io/otel/metric/instrument/syncfloat64" "go.opentelemetry.io/otel/metric/instrument/syncint64" + "go.opentelemetry.io/otel/metric/unit" "go.opentelemetry.io/otel/sdk/metric/internal" + "go.opentelemetry.io/otel/sdk/metric/metricdata" ) +// instrumentID are the identifying properties of an instrument. +type instrumentID struct { + // Name is the name of the instrument. + Name string + // Description is the description of the instrument. + Description string + // Unit is the unit of the instrument. + Unit unit.Unit + // Aggregation is the aggregation data type of the instrument. + Aggregation string + // Monotonic is the monotonicity of an instruments data type. This field is + // not used for all data types, so a zero value needs to understood in the + // context of Aggregation. + Monotonic bool + // Temporality is the temporality of an instruments data type. This field + // is not used for all data types. + Temporality metricdata.Temporality + // Number is the number type of the instrument. + Number string +} + type instrumentImpl[N int64 | float64] struct { instrument.Asynchronous instrument.Synchronous diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 82d5a5269be..d39ab253d5d 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -83,6 +83,19 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter { type meter struct { instrumentation.Scope + // aggregatorCache* ensures no duplicate Aggregators are created from the + // same instrument within the scope of all instruments this meter owns. + // + // Duplicate creations that for the same number are identified in the + // viewCache. Since the conflict is "resolvable", an aggregator still needs + // to be returned when this occurs. Therefore, instruments of different + // numbers are not tracked with the same cache. + aggregatorCacheInt64 cache[instrumentID, aggVal[int64]] + aggregatorCacheFloat64 cache[instrumentID, aggVal[float64]] + // viewCache ensures instrument conflicts this meter is asked to create are + // logged to the user. + viewCache cache[string, instrumentID] + pipes pipelines } @@ -91,12 +104,14 @@ var _ metric.Meter = (*meter)(nil) // AsyncInt64 returns the asynchronous integer instrument provider. func (m *meter) AsyncInt64() asyncint64.InstrumentProvider { - return asyncInt64Provider{scope: m.Scope, resolve: newResolver[int64](m.pipes)} + c := newInstrumentCache(&m.aggregatorCacheInt64, &m.viewCache) + return asyncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)} } // AsyncFloat64 returns the asynchronous floating-point instrument provider. func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider { - return asyncFloat64Provider{scope: m.Scope, resolve: newResolver[float64](m.pipes)} + c := newInstrumentCache(&m.aggregatorCacheFloat64, &m.viewCache) + return asyncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)} } // RegisterCallback registers the function f to be called when any of the @@ -108,10 +123,12 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context // SyncInt64 returns the synchronous integer instrument provider. func (m *meter) SyncInt64() syncint64.InstrumentProvider { - return syncInt64Provider{scope: m.Scope, resolve: newResolver[int64](m.pipes)} + c := newInstrumentCache(&m.aggregatorCacheInt64, &m.viewCache) + return syncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)} } // SyncFloat64 returns the synchronous floating-point instrument provider. func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider { - return syncFloat64Provider{scope: m.Scope, resolve: newResolver[float64](m.pipes)} + c := newInstrumentCache(&m.aggregatorCacheFloat64, &m.viewCache) + return syncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)} } diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 0bd52d63023..88b399b9f56 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -21,6 +21,7 @@ import ( "strings" "sync" + "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/metric/unit" "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric/aggregation" @@ -34,21 +35,13 @@ var ( errCreatingAggregators = errors.New("could not create all aggregators") errIncompatibleAggregation = errors.New("incompatible aggregation") errUnknownAggregation = errors.New("unrecognized aggregation") + errUnknownTemporality = errors.New("unrecognized temporality") ) type aggregator interface { Aggregation() metricdata.Aggregation } -// instrumentID is used to identify multiple instruments being mapped to the -// same aggregator. e.g. using an exact match view with a name=* view. You -// can't use a view.Instrument here because not all Aggregators are comparable. -type instrumentID struct { - scope instrumentation.Scope - name string - description string -} - type instrumentKey struct { name string unit unit.Unit @@ -178,60 +171,60 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err // inserter facilitates inserting of new instruments into a pipeline. type inserter[N int64 | float64] struct { + cache instrumentCache[N] pipeline *pipeline } -func newInserter[N int64 | float64](p *pipeline) *inserter[N] { - return &inserter[N]{p} +func newInserter[N int64 | float64](p *pipeline, c instrumentCache[N]) *inserter[N] { + return &inserter[N]{cache: c, pipeline: p} } -// Instrument inserts instrument inst with instUnit returning the Aggregators -// that need to be updated with measurments for that instrument. +// Instrument inserts the instrument inst with instUnit into a pipeline. All +// views the pipeline contains are matched against, and any matching view that +// creates a unique Aggregator will be inserted into the pipeline and included +// in the returned slice. +// +// The returned Aggregators are ensured to be deduplicated and unique. If +// another view in another pipeline that is cached by this inserter's cache has +// already inserted the same Aggregator for the same instrument, that +// Aggregator instance is returned. +// +// If another instrument has already been inserted by this inserter, or any +// other using the same cache, and it conflicts with the instrument being +// inserted in this call, an Aggregator matching the arguments will still be +// returned but an Info level log message will also be logged to the OTel +// global logger. +// +// If the passed instrument would result in an incompatible Aggregator, an +// error is returned and that Aggregator is not inserted or returned. +// +// If an instrument is determined to use a Drop aggregation, that instrument is +// not inserted nor returned. func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) { - seen := map[instrumentID]struct{}{} var aggs []internal.Aggregator[N] errs := &multierror{wrapped: errCreatingAggregators} + // The cache will return the same Aggregator instance. Use this fact to + // compare pointer addresses to deduplicate Aggregators. + seen := make(map[internal.Aggregator[N]]struct{}) for _, v := range i.pipeline.views { inst, match := v.TransformInstrument(inst) - - id := instrumentID{ - scope: inst.Scope, - name: inst.Name, - description: inst.Description, - } - - if _, ok := seen[id]; ok || !match { + if !match { continue } - if inst.Aggregation == nil { - inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind) - } else if _, ok := inst.Aggregation.(aggregation.Default); ok { - inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind) - } - - if err := isAggregatorCompatible(inst.Kind, inst.Aggregation); err != nil { - err = fmt.Errorf("creating aggregator with instrumentKind: %d, aggregation %v: %w", inst.Kind, inst.Aggregation, err) - errs.append(err) - continue - } - - agg, err := i.aggregator(inst) + agg, err := i.cachedAggregator(inst, instUnit) if err != nil { errs.append(err) - continue } if agg == nil { // Drop aggregator. continue } - // TODO (#3011): If filtering is done at the instrument level add here. - // This is where the aggregator and the view are both in scope. - aggs = append(aggs, agg) - seen[id] = struct{}{} - err = i.pipeline.addAggregator(inst.Scope, inst.Name, inst.Description, instUnit, agg) - if err != nil { - errs.append(err) + if _, ok := seen[agg]; ok { + // This aggregator has already been added. + continue } + seen[agg] = struct{}{} + aggs = append(aggs, agg) } // TODO(#3224): handle when no views match. Default should be reader // aggregation returned. @@ -240,34 +233,102 @@ func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]in // aggregator returns the Aggregator for an instrument configuration. If the // instrument defines an unknown aggregation, an error is returned. -func (i *inserter[N]) aggregator(inst view.Instrument) (internal.Aggregator[N], error) { - // TODO (#3011): If filtering is done by the Aggregator it should be passed - // here. - var ( - temporality = i.pipeline.reader.temporality(inst.Kind) - monotonic bool +func (i *inserter[N]) cachedAggregator(inst view.Instrument, u unit.Unit) (internal.Aggregator[N], error) { + switch inst.Aggregation.(type) { + case nil, aggregation.Default: + // Undefined, nil, means to use the default from the reader. + inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind) + } + + if err := isAggregatorCompatible(inst.Kind, inst.Aggregation); err != nil { + return nil, fmt.Errorf( + "creating aggregator with instrumentKind: %d, aggregation %v: %w", + inst.Kind, inst.Aggregation, err, + ) + } + + id := i.instrumentID(inst, u) + // If there is a conflict, the specification says the view should + // still be applied and a warning should be logged. + i.logConflict(id) + return i.cache.LookupAggregator(id, func() (internal.Aggregator[N], error) { + agg, err := i.aggregator(inst.Aggregation, id.Temporality, id.Monotonic) + if err != nil { + return nil, err + } + if agg == nil { // Drop aggregator. + return nil, nil + } + err = i.pipeline.addAggregator(inst.Scope, inst.Name, inst.Description, u, agg) + return agg, err + }) +} + +// logConflict validates if an instrument with the same name as id has already +// been created. If that instrument conflicts with id, a warning is logged. +func (i *inserter[N]) logConflict(id instrumentID) { + existing, unique := i.cache.Unique(id) + if unique { + return + } + + global.Info( + "duplicate metric stream definitions", + "names", fmt.Sprintf("%q, %q", existing.Name, id.Name), + "descriptions", fmt.Sprintf("%q, %q", existing.Description, id.Description), + "units", fmt.Sprintf("%s, %s", existing.Unit, id.Unit), + "numbers", fmt.Sprintf("%s, %s", existing.Number, id.Number), + "aggregations", fmt.Sprintf("%s, %s", existing.Aggregation, id.Aggregation), + "monotonics", fmt.Sprintf("%t, %t", existing.Monotonic, id.Monotonic), + "temporalities", fmt.Sprintf("%s, %s", existing.Temporality.String(), id.Temporality.String()), ) +} + +func (i *inserter[N]) instrumentID(vi view.Instrument, u unit.Unit) instrumentID { + var zero N + id := instrumentID{ + Name: vi.Name, + Description: vi.Description, + Unit: u, + Aggregation: fmt.Sprintf("%T", vi.Aggregation), + Temporality: i.pipeline.reader.temporality(vi.Kind), + Number: fmt.Sprintf("%T", zero), + } - switch inst.Kind { + switch vi.Kind { case view.AsyncCounter, view.SyncCounter, view.SyncHistogram: - monotonic = true + id.Monotonic = true } - switch agg := inst.Aggregation.(type) { + return id +} + +// aggregator returns the Aggregator for an aggregation type. If the instrument +// defines an unknown aggregation, an error is returned. +func (i *inserter[N]) aggregator(agg aggregation.Aggregation, temporality metricdata.Temporality, monotonic bool) (internal.Aggregator[N], error) { + switch a := agg.(type) { case aggregation.Drop: return nil, nil case aggregation.LastValue: return internal.NewLastValue[N](), nil case aggregation.Sum: - if temporality == metricdata.CumulativeTemporality { + switch temporality { + case metricdata.CumulativeTemporality: return internal.NewCumulativeSum[N](monotonic), nil + case metricdata.DeltaTemporality: + return internal.NewDeltaSum[N](monotonic), nil + default: + return nil, fmt.Errorf("%w: %s(%d)", errUnknownTemporality, temporality.String(), temporality) } - return internal.NewDeltaSum[N](monotonic), nil case aggregation.ExplicitBucketHistogram: - if temporality == metricdata.CumulativeTemporality { - return internal.NewCumulativeHistogram[N](agg), nil + switch temporality { + case metricdata.CumulativeTemporality: + return internal.NewCumulativeHistogram[N](a), nil + case metricdata.DeltaTemporality: + return internal.NewDeltaHistogram[N](a), nil + default: + return nil, fmt.Errorf("%w: %s(%d)", errUnknownTemporality, temporality.String(), temporality) } - return internal.NewDeltaHistogram[N](agg), nil } return nil, errUnknownAggregation } @@ -348,10 +409,10 @@ type resolver[N int64 | float64] struct { inserters []*inserter[N] } -func newResolver[N int64 | float64](p pipelines) *resolver[N] { +func newResolver[N int64 | float64](p pipelines, c instrumentCache[N]) *resolver[N] { in := make([]*inserter[N], len(p)) for i := range in { - in[i] = newInserter[N](p[i]) + in[i] = newInserter[N](p[i], c) } return &resolver[N]{in} } diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index f89a09360ba..cbfb1c0a8ab 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -15,11 +15,16 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( + "fmt" + "sync/atomic" "testing" + "github.com/go-logr/logr" + "github.com/go-logr/logr/testr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric/unit" "go.opentelemetry.io/otel/sdk/metric/aggregation" @@ -211,7 +216,8 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { } for _, tt := range testcases { t.Run(tt.name, func(t *testing.T) { - i := newInserter[N](newPipeline(nil, tt.reader, tt.views)) + c := newInstrumentCache[N](nil, nil) + i := newInserter(newPipeline(nil, tt.reader, tt.views), c) got, err := i.Instrument(tt.inst, unit.Dimensionless) assert.ErrorIs(t, err, tt.wantErr) require.Len(t, got, tt.wantLen) @@ -223,7 +229,8 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { } func testInvalidInstrumentShouldPanic[N int64 | float64]() { - i := newInserter[N](newPipeline(nil, NewManualReader(), []view.View{{}})) + c := newInstrumentCache[N](nil, nil) + i := newInserter(newPipeline(nil, NewManualReader(), []view.View{{}}), c) inst := view.Instrument{ Name: "foo", Kind: view.InstrumentKind(255), @@ -334,7 +341,8 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCount int) { inst := view.Instrument{Name: "foo", Kind: view.SyncCounter} - r := newResolver[int64](p) + c := newInstrumentCache[int64](nil, nil) + r := newResolver(p, c) aggs, err := r.Aggregators(inst, unit.Dimensionless) assert.NoError(t, err) @@ -344,7 +352,8 @@ func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCo func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, wantCount int) { inst := view.Instrument{Name: "foo", Kind: view.SyncCounter} - r := newResolver[float64](p) + c := newInstrumentCache[float64](nil, nil) + r := newResolver(p, c) aggs, err := r.Aggregators(inst, unit.Dimensionless) assert.NoError(t, err) @@ -375,20 +384,41 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { p := newPipelines(resource.Empty(), views) inst := view.Instrument{Name: "foo", Kind: view.AsyncGauge} - ri := newResolver[int64](p) + vc := cache[string, instrumentID]{} + ri := newResolver(p, newInstrumentCache[int64](nil, &vc)) intAggs, err := ri.Aggregators(inst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, intAggs, 0) p = newPipelines(resource.Empty(), views) - rf := newResolver[float64](p) + rf := newResolver(p, newInstrumentCache[float64](nil, &vc)) floatAggs, err := rf.Aggregators(inst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, floatAggs, 0) } -func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) { +type logCounter struct { + logr.LogSink + + infoN uint32 +} + +func (l *logCounter) Info(level int, msg string, keysAndValues ...interface{}) { + atomic.AddUint32(&l.infoN, 1) + fmt.Println("here") + l.LogSink.Info(level, msg, keysAndValues...) +} + +func (l *logCounter) InfoN() int { + return int(atomic.SwapUint32(&l.infoN, 0)) +} + +func TestResolveAggregatorsDuplicateErrors(t *testing.T) { + tLog := testr.NewWithOptions(t, testr.Options{Verbosity: 6}) + l := &logCounter{LogSink: tLog.GetSink()} + otel.SetLogger(logr.New(l)) + renameView, _ := view.New( view.MatchInstrumentName("bar"), view.WithRename("foo"), @@ -405,29 +435,40 @@ func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) { p := newPipelines(resource.Empty(), views) - ri := newResolver[int64](p) + vc := cache[string, instrumentID]{} + ri := newResolver(p, newInstrumentCache[int64](nil, &vc)) intAggs, err := ri.Aggregators(fooInst, unit.Dimensionless) assert.NoError(t, err) + assert.Equal(t, 0, l.InfoN(), "no info logging should happen") assert.Len(t, intAggs, 1) - // The Rename view should error, because it creates a foo instrument. + // The Rename view should produce the same instrument without an error, the + // default view should also cause a new aggregator to be returned. intAggs, err = ri.Aggregators(barInst, unit.Dimensionless) - assert.Error(t, err) + assert.NoError(t, err) + assert.Equal(t, 0, l.InfoN(), "no info logging should happen") assert.Len(t, intAggs, 2) - // Creating a float foo instrument should error because there is an int foo instrument. - rf := newResolver[float64](p) + // Creating a float foo instrument should log a warning because there is an + // int foo instrument. + rf := newResolver(p, newInstrumentCache[float64](nil, &vc)) floatAggs, err := rf.Aggregators(fooInst, unit.Dimensionless) - assert.Error(t, err) + assert.NoError(t, err) + assert.Equal(t, 1, l.InfoN(), "instrument conflict not logged") assert.Len(t, floatAggs, 1) fooInst = view.Instrument{Name: "foo-float", Kind: view.SyncCounter} - _, err = rf.Aggregators(fooInst, unit.Dimensionless) + floatAggs, err = rf.Aggregators(fooInst, unit.Dimensionless) assert.NoError(t, err) + assert.Equal(t, 0, l.InfoN(), "no info logging should happen") + assert.Len(t, floatAggs, 1) floatAggs, err = rf.Aggregators(barInst, unit.Dimensionless) - assert.Error(t, err) + assert.NoError(t, err) + // Both the rename and default view aggregators created above should now + // conflict. Therefore, 2 warning messages should be logged. + assert.Equal(t, 2, l.InfoN(), "instrument conflicts not logged") assert.Len(t, floatAggs, 2) } From 40ae7fa36f0b15a0d2c69edfc5b840405046bfa3 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 30 Sep 2022 13:12:55 -0700 Subject: [PATCH 06/22] Drop unique check from addAggregator --- sdk/metric/pipeline.go | 57 ++++++-------- sdk/metric/pipeline_registry_test.go | 2 - sdk/metric/pipeline_test.go | 114 ++++----------------------- 3 files changed, 39 insertions(+), 134 deletions(-) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 88b399b9f56..aaf0a8a44c0 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -42,13 +42,12 @@ type aggregator interface { Aggregation() metricdata.Aggregation } -type instrumentKey struct { - name string - unit unit.Unit -} - -type instrumentValue struct { +// instrumentSync is a synchronization point between a pipeline and an +// instrument's Aggregators. +type instrumentSync struct { + name string description string + unit unit.Unit aggregator aggregator } @@ -60,7 +59,7 @@ func newPipeline(res *resource.Resource, reader Reader, views []view.View) *pipe resource: res, reader: reader, views: views, - aggregations: make(map[instrumentation.Scope]map[instrumentKey]instrumentValue), + aggregations: make(map[instrumentation.Scope][]instrumentSync), } } @@ -76,36 +75,25 @@ type pipeline struct { views []view.View sync.Mutex - aggregations map[instrumentation.Scope]map[instrumentKey]instrumentValue + aggregations map[instrumentation.Scope][]instrumentSync callbacks []func(context.Context) } var errAlreadyRegistered = errors.New("instrument already registered") -// addAggregator will stores an aggregator with an instrument description. The aggregator -// is used when `produce()` is called. -func (p *pipeline) addAggregator(scope instrumentation.Scope, name, description string, instUnit unit.Unit, agg aggregator) error { +// addSync adds the instrumentSync to pipeline p with scope. This method is not +// idempotent. Duplicate calls will result in duplicate additions, it is the +// callers responsibility to ensure this is called with unique values. +func (p *pipeline) addSync(scope instrumentation.Scope, sync instrumentSync) { p.Lock() defer p.Unlock() if p.aggregations == nil { - p.aggregations = map[instrumentation.Scope]map[instrumentKey]instrumentValue{} - } - if p.aggregations[scope] == nil { - p.aggregations[scope] = map[instrumentKey]instrumentValue{} - } - inst := instrumentKey{ - name: name, - unit: instUnit, - } - if _, ok := p.aggregations[scope][inst]; ok { - return fmt.Errorf("%w: name %s, scope: %s", errAlreadyRegistered, name, scope) - } - - p.aggregations[scope][inst] = instrumentValue{ - description: description, - aggregator: agg, + p.aggregations = map[instrumentation.Scope][]instrumentSync{ + scope: {sync}, + } + return } - return nil + p.aggregations[scope] = append(p.aggregations[scope], sync) } // addCallback registers a callback to be run when `produce()` is called. @@ -144,12 +132,12 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err sm := make([]metricdata.ScopeMetrics, 0, len(p.aggregations)) for scope, instruments := range p.aggregations { metrics := make([]metricdata.Metrics, 0, len(instruments)) - for inst, instValue := range instruments { - data := instValue.aggregator.Aggregation() + for _, inst := range instruments { + data := inst.aggregator.Aggregation() if data != nil { metrics = append(metrics, metricdata.Metrics{ Name: inst.name, - Description: instValue.description, + Description: inst.description, Unit: inst.unit, Data: data, }) @@ -259,7 +247,12 @@ func (i *inserter[N]) cachedAggregator(inst view.Instrument, u unit.Unit) (inter if agg == nil { // Drop aggregator. return nil, nil } - err = i.pipeline.addAggregator(inst.Scope, inst.Name, inst.Description, u, agg) + i.pipeline.addSync(inst.Scope, instrumentSync{ + name: inst.Name, + description: inst.Description, + unit: u, + aggregator: agg, + }) return agg, err }) } diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index cbfb1c0a8ab..45bc23ccb18 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -15,7 +15,6 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( - "fmt" "sync/atomic" "testing" @@ -406,7 +405,6 @@ type logCounter struct { func (l *logCounter) Info(level int, msg string, keysAndValues ...interface{}) { atomic.AddUint32(&l.infoN, 1) - fmt.Println("here") l.LogSink.Info(level, msg, keysAndValues...) } diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index ca83c9c3a9e..2408778b3fb 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -16,6 +16,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "context" + "fmt" "sync" "testing" @@ -46,8 +47,10 @@ func TestEmptyPipeline(t *testing.T) { assert.Nil(t, output.Resource) assert.Len(t, output.ScopeMetrics, 0) - err = pipe.addAggregator(instrumentation.Scope{}, "name", "desc", unit.Dimensionless, testSumAggregator{}) - assert.NoError(t, err) + sync := instrumentSync{"name", "desc", unit.Dimensionless, testSumAggregator{}} + assert.NotPanics(t, func() { + pipe.addSync(instrumentation.Scope{}, sync) + }) require.NotPanics(t, func() { pipe.addCallback(func(ctx context.Context) {}) @@ -68,8 +71,10 @@ func TestNewPipeline(t *testing.T) { assert.Equal(t, resource.Empty(), output.Resource) assert.Len(t, output.ScopeMetrics, 0) - err = pipe.addAggregator(instrumentation.Scope{}, "name", "desc", unit.Dimensionless, testSumAggregator{}) - assert.NoError(t, err) + sync := instrumentSync{"name", "desc", unit.Dimensionless, testSumAggregator{}} + assert.NotPanics(t, func() { + pipe.addSync(instrumentation.Scope{}, sync) + }) require.NotPanics(t, func() { pipe.addCallback(func(ctx context.Context) {}) @@ -82,99 +87,6 @@ func TestNewPipeline(t *testing.T) { require.Len(t, output.ScopeMetrics[0].Metrics, 1) } -func TestPipelineDuplicateRegistration(t *testing.T) { - type instrumentID struct { - scope instrumentation.Scope - name string - description string - unit unit.Unit - } - testCases := []struct { - name string - secondInst instrumentID - want error - wantScopeLen int - wantMetricsLen int - }{ - { - name: "exact should error", - secondInst: instrumentID{ - scope: instrumentation.Scope{}, - name: "name", - description: "desc", - unit: unit.Dimensionless, - }, - want: errAlreadyRegistered, - wantScopeLen: 1, - wantMetricsLen: 1, - }, - { - name: "description should not be identifying", - secondInst: instrumentID{ - scope: instrumentation.Scope{}, - name: "name", - description: "other desc", - unit: unit.Dimensionless, - }, - want: errAlreadyRegistered, - wantScopeLen: 1, - wantMetricsLen: 1, - }, - { - name: "scope should be identifying", - secondInst: instrumentID{ - scope: instrumentation.Scope{ - Name: "newScope", - }, - name: "name", - description: "desc", - unit: unit.Dimensionless, - }, - wantScopeLen: 2, - wantMetricsLen: 1, - }, - { - name: "name should be identifying", - secondInst: instrumentID{ - scope: instrumentation.Scope{}, - name: "newName", - description: "desc", - unit: unit.Dimensionless, - }, - wantScopeLen: 1, - wantMetricsLen: 2, - }, - { - name: "unit should be identifying", - secondInst: instrumentID{ - scope: instrumentation.Scope{}, - name: "name", - description: "desc", - unit: unit.Bytes, - }, - wantScopeLen: 1, - wantMetricsLen: 2, - }, - } - for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - pipe := newPipeline(nil, nil, nil) - err := pipe.addAggregator(instrumentation.Scope{}, "name", "desc", unit.Dimensionless, testSumAggregator{}) - require.NoError(t, err) - - err = pipe.addAggregator(tt.secondInst.scope, tt.secondInst.name, tt.secondInst.description, tt.secondInst.unit, testSumAggregator{}) - assert.ErrorIs(t, err, tt.want) - - if tt.wantScopeLen > 0 { - output, err := pipe.produce(context.Background()) - assert.NoError(t, err) - require.Len(t, output.ScopeMetrics, tt.wantScopeLen) - require.Len(t, output.ScopeMetrics[0].Metrics, tt.wantMetricsLen) - } - }) - } -} - func TestPipelineUsesResource(t *testing.T) { res := resource.NewWithAttributes("noSchema", attribute.String("test", "resource")) pipe := newPipeline(res, nil, nil) @@ -198,10 +110,12 @@ func TestPipelineConcurrency(t *testing.T) { }() wg.Add(1) - go func() { + go func(n int) { defer wg.Done() - _ = pipe.addAggregator(instrumentation.Scope{}, "name", "desc", unit.Dimensionless, testSumAggregator{}) - }() + name := fmt.Sprintf("name %d", n) + sync := instrumentSync{name, "desc", unit.Dimensionless, testSumAggregator{}} + pipe.addSync(instrumentation.Scope{}, sync) + }(i) wg.Add(1) go func() { From c7d8d2fa575e789db90f9d7a1bcd65ab0593b60e Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 30 Sep 2022 13:15:54 -0700 Subject: [PATCH 07/22] Fix aggregatorCache* docs --- sdk/metric/meter.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index d39ab253d5d..59ea019462e 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -83,13 +83,13 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter { type meter struct { instrumentation.Scope - // aggregatorCache* ensures no duplicate Aggregators are created from the + // aggregatorCache* ensures no duplicate Aggregators are created for the // same instrument within the scope of all instruments this meter owns. // - // Duplicate creations that for the same number are identified in the - // viewCache. Since the conflict is "resolvable", an aggregator still needs - // to be returned when this occurs. Therefore, instruments of different - // numbers are not tracked with the same cache. + // Duplicate instrument creations for different number types are identified + // in the viewCache. Since the conflict is "resolvable", a valid aggregator + // still needs to be returned when this occurs. Therefore, instruments of + // different numbers are not tracked with the same cache. aggregatorCacheInt64 cache[instrumentID, aggVal[int64]] aggregatorCacheFloat64 cache[instrumentID, aggVal[float64]] // viewCache ensures instrument conflicts this meter is asked to create are From 45ac55959931e3bdd86c3650ed5bff65f0685b24 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 30 Sep 2022 13:52:40 -0700 Subject: [PATCH 08/22] Update cachedAggregator and aggregator method docs --- sdk/metric/pipeline.go | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index aaf0a8a44c0..dbd841e3721 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -219,8 +219,19 @@ func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]in return aggs, errs.errorOrNil() } -// aggregator returns the Aggregator for an instrument configuration. If the -// instrument defines an unknown aggregation, an error is returned. +// cachedAggregator returns the appropriate Aggregator for an instrument +// configuration. If the exact instrument has been created within the +// inst.Scope, that Aggregator instance will be returned. Otherwise, a new +// computed Aggregator will be cached and returned. +// +// If the instrument configuration conflicts with an instrument that has +// already been created (e.g. description, unit, data type) a warning will be +// logged at the "Info" level with the global OTel logger. A valid new +// Aggregator for the instrument configuration will still be returned without +// an error. +// +// If the instrument defines an unknown or incompatible aggregation, an error +// is returned. func (i *inserter[N]) cachedAggregator(inst view.Instrument, u unit.Unit) (internal.Aggregator[N], error) { switch inst.Aggregation.(type) { case nil, aggregation.Default: @@ -296,8 +307,9 @@ func (i *inserter[N]) instrumentID(vi view.Instrument, u unit.Unit) instrumentID return id } -// aggregator returns the Aggregator for an aggregation type. If the instrument -// defines an unknown aggregation, an error is returned. +// aggregator returns a new Aggregator matching agg, temporality, and +// monotonic. If the agg is unknown or temporality is invalid, an error is +// returned. func (i *inserter[N]) aggregator(agg aggregation.Aggregation, temporality metricdata.Temporality, monotonic bool) (internal.Aggregator[N], error) { switch a := agg.(type) { case aggregation.Drop: From d0acf9b1088d28a10581911102395cc6178e4db2 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 30 Sep 2022 13:53:03 -0700 Subject: [PATCH 09/22] Remove unnecessary type constraint --- sdk/metric/pipeline.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index dbd841e3721..75c04800d60 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -417,7 +417,7 @@ type resolver[N int64 | float64] struct { func newResolver[N int64 | float64](p pipelines, c instrumentCache[N]) *resolver[N] { in := make([]*inserter[N], len(p)) for i := range in { - in[i] = newInserter[N](p[i], c) + in[i] = newInserter(p[i], c) } return &resolver[N]{in} } From 0cec45a1c7946487c75d39831c46308d3d9f0d50 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 30 Sep 2022 13:59:39 -0700 Subject: [PATCH 10/22] Remove unused errAlreadyRegistered --- sdk/metric/pipeline.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 75c04800d60..e5b09910e42 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -79,8 +79,6 @@ type pipeline struct { callbacks []func(context.Context) } -var errAlreadyRegistered = errors.New("instrument already registered") - // addSync adds the instrumentSync to pipeline p with scope. This method is not // idempotent. Duplicate calls will result in duplicate additions, it is the // callers responsibility to ensure this is called with unique values. From cae41cc48a665433ca23850bc100f7f5dfcfb7b7 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 30 Sep 2022 14:01:05 -0700 Subject: [PATCH 11/22] Rename to not shadow imports --- sdk/metric/pipeline.go | 6 +++--- sdk/metric/pipeline_test.go | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index e5b09910e42..c11834f1019 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -82,16 +82,16 @@ type pipeline struct { // addSync adds the instrumentSync to pipeline p with scope. This method is not // idempotent. Duplicate calls will result in duplicate additions, it is the // callers responsibility to ensure this is called with unique values. -func (p *pipeline) addSync(scope instrumentation.Scope, sync instrumentSync) { +func (p *pipeline) addSync(scope instrumentation.Scope, iSync instrumentSync) { p.Lock() defer p.Unlock() if p.aggregations == nil { p.aggregations = map[instrumentation.Scope][]instrumentSync{ - scope: {sync}, + scope: {iSync}, } return } - p.aggregations[scope] = append(p.aggregations[scope], sync) + p.aggregations[scope] = append(p.aggregations[scope], iSync) } // addCallback registers a callback to be run when `produce()` is called. diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index 2408778b3fb..ca587fae54c 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -47,9 +47,9 @@ func TestEmptyPipeline(t *testing.T) { assert.Nil(t, output.Resource) assert.Len(t, output.ScopeMetrics, 0) - sync := instrumentSync{"name", "desc", unit.Dimensionless, testSumAggregator{}} + iSync := instrumentSync{"name", "desc", unit.Dimensionless, testSumAggregator{}} assert.NotPanics(t, func() { - pipe.addSync(instrumentation.Scope{}, sync) + pipe.addSync(instrumentation.Scope{}, iSync) }) require.NotPanics(t, func() { @@ -71,9 +71,9 @@ func TestNewPipeline(t *testing.T) { assert.Equal(t, resource.Empty(), output.Resource) assert.Len(t, output.ScopeMetrics, 0) - sync := instrumentSync{"name", "desc", unit.Dimensionless, testSumAggregator{}} + iSync := instrumentSync{"name", "desc", unit.Dimensionless, testSumAggregator{}} assert.NotPanics(t, func() { - pipe.addSync(instrumentation.Scope{}, sync) + pipe.addSync(instrumentation.Scope{}, iSync) }) require.NotPanics(t, func() { From 1514d9d56c8fd73065189fffa92fd515954690de Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 30 Sep 2022 14:08:30 -0700 Subject: [PATCH 12/22] Add changes to changelog --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index cf26a4ff104..8cfe0436f76 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,12 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Upgrade `golang.org/x/sys/unix` from `v0.0.0-20210423185535-09eb48e85fd7` to `v0.0.0-20220919091848-fb04ddd9f9c8`. This addresses [GO-2022-0493](https://pkg.go.dev/vuln/GO-2022-0493). (#3235) +### Fixed + +- Return the same instrument every time a user makes the exact same instrument creation call multiple times. (#3229, #3251) +- Return the same instrument when a view transforms a creation call to match an existing instrument. (#3240, #3251) +- Log a warning when a conflicting instrument (e.g. description, unit, data-type) is created instead of returning an error. (#3251) + ## [0.32.1] Metric SDK (Alpha) - 2022-09-22 ### Changed From 52e6508319a8efda7c867540691079568b6a5f8b Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 30 Sep 2022 14:26:47 -0700 Subject: [PATCH 13/22] Fix changelog English --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8cfe0436f76..1784e627587 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,8 +21,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Fixed -- Return the same instrument every time a user makes the exact same instrument creation call multiple times. (#3229, #3251) -- Return the same instrument when a view transforms a creation call to match an existing instrument. (#3240, #3251) +- Return the same instrument every time a user makes the exact same instrument creation call. (#3229, #3251) +- Return the existing instrument when a view transforms a creation call to match an existing instrument. (#3240, #3251) - Log a warning when a conflicting instrument (e.g. description, unit, data-type) is created instead of returning an error. (#3251) ## [0.32.1] Metric SDK (Alpha) - 2022-09-22 From c41328c37d3a4301aecff1723a94bf01278d7acd Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Mon, 3 Oct 2022 16:14:43 -0700 Subject: [PATCH 14/22] Store resolvers in the meter instead of caches --- sdk/metric/meter.go | 58 ++++++++++++++++++++++-------------------- sdk/metric/pipeline.go | 6 ++--- 2 files changed, 33 insertions(+), 31 deletions(-) diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 59ea019462e..c6871a06716 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -55,10 +55,7 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter { defer r.Unlock() if r.meters == nil { - m := &meter{ - Scope: s, - pipes: r.pipes, - } + m := newMeter(s, r.pipes) r.meters = map[instrumentation.Scope]*meter{s: m} return m } @@ -68,10 +65,7 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter { return m } - m = &meter{ - Scope: s, - pipes: r.pipes, - } + m = newMeter(s, r.pipes) r.meters[s] = m return m } @@ -83,35 +77,45 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter { type meter struct { instrumentation.Scope - // aggregatorCache* ensures no duplicate Aggregators are created for the - // same instrument within the scope of all instruments this meter owns. - // - // Duplicate instrument creations for different number types are identified - // in the viewCache. Since the conflict is "resolvable", a valid aggregator - // still needs to be returned when this occurs. Therefore, instruments of - // different numbers are not tracked with the same cache. - aggregatorCacheInt64 cache[instrumentID, aggVal[int64]] - aggregatorCacheFloat64 cache[instrumentID, aggVal[float64]] - // viewCache ensures instrument conflicts this meter is asked to create are - // logged to the user. - viewCache cache[string, instrumentID] + // *Resolvers are used by the provided instrument providers to resolve new + // instruments aggregators and maintain a cache across instruments this + // meter owns. + int64Resolver resolver[int64] + float64Resolver resolver[float64] pipes pipelines } +func newMeter(s instrumentation.Scope, p pipelines) *meter { + // viewCache ensures instrument conflicts, including number conflicts, this + // meter is asked to create are logged to the user. + var viewCache cache[string, instrumentID] + + // Passing nil as the ac parameter to newInstrumentCache will have each + // create its own aggregator cache. + ic := newInstrumentCache[int64](nil, &viewCache) + fc := newInstrumentCache[float64](nil, &viewCache) + + return &meter{ + Scope: s, + pipes: p, + + int64Resolver: newResolver(p, ic), + float64Resolver: newResolver(p, fc), + } +} + // Compile-time check meter implements metric.Meter. var _ metric.Meter = (*meter)(nil) // AsyncInt64 returns the asynchronous integer instrument provider. func (m *meter) AsyncInt64() asyncint64.InstrumentProvider { - c := newInstrumentCache(&m.aggregatorCacheInt64, &m.viewCache) - return asyncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)} + return asyncInt64Provider{scope: m.Scope, resolve: &m.int64Resolver} } // AsyncFloat64 returns the asynchronous floating-point instrument provider. func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider { - c := newInstrumentCache(&m.aggregatorCacheFloat64, &m.viewCache) - return asyncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)} + return asyncFloat64Provider{scope: m.Scope, resolve: &m.float64Resolver} } // RegisterCallback registers the function f to be called when any of the @@ -123,12 +127,10 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context // SyncInt64 returns the synchronous integer instrument provider. func (m *meter) SyncInt64() syncint64.InstrumentProvider { - c := newInstrumentCache(&m.aggregatorCacheInt64, &m.viewCache) - return syncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)} + return syncInt64Provider{scope: m.Scope, resolve: &m.int64Resolver} } // SyncFloat64 returns the synchronous floating-point instrument provider. func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider { - c := newInstrumentCache(&m.aggregatorCacheFloat64, &m.viewCache) - return syncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)} + return syncFloat64Provider{scope: m.Scope, resolve: &m.float64Resolver} } diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index c11834f1019..23dc5daa04a 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -412,17 +412,17 @@ type resolver[N int64 | float64] struct { inserters []*inserter[N] } -func newResolver[N int64 | float64](p pipelines, c instrumentCache[N]) *resolver[N] { +func newResolver[N int64 | float64](p pipelines, c instrumentCache[N]) resolver[N] { in := make([]*inserter[N], len(p)) for i := range in { in[i] = newInserter(p[i], c) } - return &resolver[N]{in} + return resolver[N]{in} } // Aggregators returns the Aggregators instrument inst needs to update when it // makes a measurement. -func (r *resolver[N]) Aggregators(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) { +func (r resolver[N]) Aggregators(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) { var aggs []internal.Aggregator[N] errs := &multierror{} From 1f836588b9d8ba37638e255f270b86085b485aed Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Mon, 3 Oct 2022 16:32:59 -0700 Subject: [PATCH 15/22] Test all Aggregator[N] impls are comparable --- sdk/metric/internal/aggregator_test.go | 39 ++++++++++++++++---------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/sdk/metric/internal/aggregator_test.go b/sdk/metric/internal/aggregator_test.go index 200d3da6268..a7c1f96d425 100644 --- a/sdk/metric/internal/aggregator_test.go +++ b/sdk/metric/internal/aggregator_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" @@ -80,23 +81,31 @@ type aggregatorTester[N int64 | float64] struct { func (at *aggregatorTester[N]) Run(a Aggregator[N], incr setMap, eFunc expectFunc) func(*testing.T) { m := at.MeasurementN * at.GoroutineN return func(t *testing.T) { - for i := 0; i < at.CycleN; i++ { - var wg sync.WaitGroup - wg.Add(at.GoroutineN) - for i := 0; i < at.GoroutineN; i++ { - go func() { - defer wg.Done() - for j := 0; j < at.MeasurementN; j++ { - for attrs, n := range incr { - a.Aggregate(N(n), attrs) + t.Run("Comparable", func(t *testing.T) { + assert.NotPanics(t, func() { + _ = map[Aggregator[N]]struct{}{a: {}} + }) + }) + + t.Run("Correctness", func(t *testing.T) { + for i := 0; i < at.CycleN; i++ { + var wg sync.WaitGroup + wg.Add(at.GoroutineN) + for i := 0; i < at.GoroutineN; i++ { + go func() { + defer wg.Done() + for j := 0; j < at.MeasurementN; j++ { + for attrs, n := range incr { + a.Aggregate(N(n), attrs) + } } - } - }() - } - wg.Wait() + }() + } + wg.Wait() - metricdatatest.AssertAggregationsEqual(t, eFunc(m), a.Aggregation()) - } + metricdatatest.AssertAggregationsEqual(t, eFunc(m), a.Aggregation()) + } + }) } } From 71c19e6e011027b5d4850751aa4472be6fb998a8 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Mon, 3 Oct 2022 16:38:39 -0700 Subject: [PATCH 16/22] Fix lint --- sdk/metric/internal/aggregator_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/metric/internal/aggregator_test.go b/sdk/metric/internal/aggregator_test.go index a7c1f96d425..92a0261c431 100644 --- a/sdk/metric/internal/aggregator_test.go +++ b/sdk/metric/internal/aggregator_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" From 2bb7517c5aebb3726c2ef4ef0282eea4094ca93e Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 4 Oct 2022 14:02:31 -0700 Subject: [PATCH 17/22] Add documentation that Aggregators need to be comparable --- sdk/metric/internal/aggregator.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdk/metric/internal/aggregator.go b/sdk/metric/internal/aggregator.go index 1f70ae7c1ee..04de9560680 100644 --- a/sdk/metric/internal/aggregator.go +++ b/sdk/metric/internal/aggregator.go @@ -26,6 +26,9 @@ import ( var now = time.Now // Aggregator forms an aggregation from a collection of recorded measurements. +// +// Aggregators need to comparable so they can be de-duplicated by the SDK when +// it creates them for multiple views. type Aggregator[N int64 | float64] interface { // Aggregate records the measurement, scoped by attr, and aggregates it // into an aggregation. From cc91e8aed093aca05fa56fae10d14a0b083c61ac Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Mon, 10 Oct 2022 11:23:12 -0700 Subject: [PATCH 18/22] Update sdk/metric/internal/aggregator.go Co-authored-by: Anthony Mirabella --- sdk/metric/internal/aggregator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/metric/internal/aggregator.go b/sdk/metric/internal/aggregator.go index 04de9560680..952e9a4a8bd 100644 --- a/sdk/metric/internal/aggregator.go +++ b/sdk/metric/internal/aggregator.go @@ -27,7 +27,7 @@ var now = time.Now // Aggregator forms an aggregation from a collection of recorded measurements. // -// Aggregators need to comparable so they can be de-duplicated by the SDK when +// Aggregators need to be comparable so they can be de-duplicated by the SDK when // it creates them for multiple views. type Aggregator[N int64 | float64] interface { // Aggregate records the measurement, scoped by attr, and aggregates it From 5832c569a4eca9522502039ba13e111d9d5594fa Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Mon, 10 Oct 2022 11:24:01 -0700 Subject: [PATCH 19/22] Update sdk/metric/instrument.go Co-authored-by: Anthony Mirabella --- sdk/metric/instrument.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/metric/instrument.go b/sdk/metric/instrument.go index 7ea7ef87c68..407a41aaa4f 100644 --- a/sdk/metric/instrument.go +++ b/sdk/metric/instrument.go @@ -42,8 +42,8 @@ type instrumentID struct { // not used for all data types, so a zero value needs to understood in the // context of Aggregation. Monotonic bool - // Temporality is the temporality of an instruments data type. This field - // is not used for all data types. + // Temporality is the temporality of an instrument's data type. This field + // is not used by some data types. Temporality metricdata.Temporality // Number is the number type of the instrument. Number string From 7096a6882c6bd6055d17917a2068f61f514111c9 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Mon, 10 Oct 2022 11:24:17 -0700 Subject: [PATCH 20/22] Update sdk/metric/instrument.go Co-authored-by: Anthony Mirabella --- sdk/metric/instrument.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/metric/instrument.go b/sdk/metric/instrument.go index 407a41aaa4f..48440280e42 100644 --- a/sdk/metric/instrument.go +++ b/sdk/metric/instrument.go @@ -39,7 +39,7 @@ type instrumentID struct { // Aggregation is the aggregation data type of the instrument. Aggregation string // Monotonic is the monotonicity of an instruments data type. This field is - // not used for all data types, so a zero value needs to understood in the + // not used for all data types, so a zero value needs to be understood in the // context of Aggregation. Monotonic bool // Temporality is the temporality of an instrument's data type. This field From 32f47bf3bb288c24de723f5ec13d7e3f8cbbba07 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Mon, 10 Oct 2022 11:24:25 -0700 Subject: [PATCH 21/22] Update sdk/metric/internal/aggregator_test.go Co-authored-by: Anthony Mirabella --- sdk/metric/internal/aggregator_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/metric/internal/aggregator_test.go b/sdk/metric/internal/aggregator_test.go index 92a0261c431..a544a18ca21 100644 --- a/sdk/metric/internal/aggregator_test.go +++ b/sdk/metric/internal/aggregator_test.go @@ -92,10 +92,10 @@ func (at *aggregatorTester[N]) Run(a Aggregator[N], incr setMap, eFunc expectFun for i := 0; i < at.CycleN; i++ { var wg sync.WaitGroup wg.Add(at.GoroutineN) - for i := 0; i < at.GoroutineN; i++ { + for j := 0; j < at.GoroutineN; j++ { go func() { defer wg.Done() - for j := 0; j < at.MeasurementN; j++ { + for k := 0; k < at.MeasurementN; k++ { for attrs, n := range incr { a.Aggregate(N(n), attrs) } From b8763734ebd5841b970d51504c9f7381ba6c6e46 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 11 Oct 2022 08:09:12 -0700 Subject: [PATCH 22/22] Fix pipeline_test.go use of newInstrumentCache --- sdk/metric/pipeline_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index d042113b459..e47bb6b5f64 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -163,7 +163,8 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - i := newInserter[N](test.pipe) + c := newInstrumentCache[N](nil, nil) + i := newInserter(test.pipe, c) got, err := i.Instrument(inst, unit.Dimensionless) require.NoError(t, err) assert.Len(t, got, 1, "default view not applied")