Skip to content

Commit

Permalink
Handle duplicate Aggregators and log instrument conflicts (#3251)
Browse files Browse the repository at this point in the history
* Add the cache type

* Add cache unit tests

* Test cache concurrency

* Add the instrumentCache

* Use the instrumentCache to deduplicate creation

* Drop unique check from addAggregator

* Fix aggregatorCache* docs

* Update cachedAggregator and aggregator method docs

* Remove unnecessary type constraint

* Remove unused errAlreadyRegistered

* Rename to not shadow imports

* Add changes to changelog

* Fix changelog English

* Store resolvers in the meter instead of caches

* Test all Aggregator[N] impls are comparable

* Fix lint

* Add documentation that Aggregators need to be comparable

* Update sdk/metric/internal/aggregator.go

Co-authored-by: Anthony Mirabella <a9@aneurysm9.com>

* Update sdk/metric/instrument.go

Co-authored-by: Anthony Mirabella <a9@aneurysm9.com>

* Update sdk/metric/instrument.go

Co-authored-by: Anthony Mirabella <a9@aneurysm9.com>

* Update sdk/metric/internal/aggregator_test.go

Co-authored-by: Anthony Mirabella <a9@aneurysm9.com>

* Fix pipeline_test.go use of newInstrumentCache

Co-authored-by: Anthony Mirabella <a9@aneurysm9.com>
  • Loading branch information
MrAlias and Aneurysm9 committed Oct 11, 2022
1 parent ffa94ca commit b5292b8
Show file tree
Hide file tree
Showing 10 changed files with 502 additions and 239 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Expand Up @@ -24,6 +24,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Fixed

- Use default view if instrument does not match any registered view of a reader. (#3224, #3237)
- Return the same instrument every time a user makes the exact same instrument creation call. (#3229, #3251)
- Return the existing instrument when a view transforms a creation call to match an existing instrument. (#3240, #3251)
- Log a warning when a conflicting instrument (e.g. description, unit, data-type) is created instead of returning an error. (#3251)
- The OpenCensus bridge no longer sends empty batches of metrics. (#3263)

## [0.32.1] Metric SDK (Alpha) - 2022-09-22
Expand Down
110 changes: 110 additions & 0 deletions sdk/metric/cache.go
@@ -0,0 +1,110 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"sync"

"go.opentelemetry.io/otel/sdk/metric/internal"
)

// cache is a locking storage used to quickly return already computed values.
//
// The zero value of a cache is empty and ready to use.
//
// A cache must not be copied after first use.
//
// All methods of a cache are safe to call concurrently.
type cache[K comparable, V any] struct {
sync.Mutex
data map[K]V
}

// Lookup returns the value stored in the cache with the accociated key if it
// exists. Otherwise, f is called and its returned value is set in the cache
// for key and returned.
//
// Lookup is safe to call concurrently. It will hold the cache lock, so f
// should not block excessively.
func (c *cache[K, V]) Lookup(key K, f func() V) V {
c.Lock()
defer c.Unlock()

if c.data == nil {
val := f()
c.data = map[K]V{key: val}
return val
}
if v, ok := c.data[key]; ok {
return v
}
val := f()
c.data[key] = val
return val
}

// instrumentCache is a cache of instruments. It is scoped at the Meter level
// along with a number type. Meaning all instruments it contains need to belong
// to the same instrumentation.Scope (implicitly) and number type (explicitly).
type instrumentCache[N int64 | float64] struct {
// aggregators is used to ensure duplicate creations of the same instrument
// return the same instance of that instrument's aggregator.
aggregators *cache[instrumentID, aggVal[N]]
// views is used to ensure if instruments with the same name are created,
// but do not have the same identifying properties, a warning is logged.
views *cache[string, instrumentID]
}

// newInstrumentCache returns a new instrumentCache that uses ac as the
// underlying cache for aggregators and vc as the cache for views. If ac or vc
// are nil, a new empty cache will be used.
func newInstrumentCache[N int64 | float64](ac *cache[instrumentID, aggVal[N]], vc *cache[string, instrumentID]) instrumentCache[N] {
if ac == nil {
ac = &cache[instrumentID, aggVal[N]]{}
}
if vc == nil {
vc = &cache[string, instrumentID]{}
}
return instrumentCache[N]{aggregators: ac, views: vc}
}

// LookupAggregator returns the Aggregator and error for a cached instrument if
// it exist in the cache. Otherwise, f is called and its returned value is set
// in the cache and returned.
//
// LookupAggregator is safe to call concurrently.
func (c instrumentCache[N]) LookupAggregator(id instrumentID, f func() (internal.Aggregator[N], error)) (agg internal.Aggregator[N], err error) {
v := c.aggregators.Lookup(id, func() aggVal[N] {
a, err := f()
return aggVal[N]{Aggregator: a, Err: err}
})
return v.Aggregator, v.Err
}

// aggVal is the cached value of an instrumentCache's aggregators cache.
type aggVal[N int64 | float64] struct {
Aggregator internal.Aggregator[N]
Err error
}

// Unique returns if id is unique or a duplicate instrument. If an instrument
// with the same name has already been created, that instrumentID will be
// returned along with false. Otherwise, id is returned with true.
//
// Unique is safe to call concurrently.
func (c instrumentCache[N]) Unique(id instrumentID) (instrumentID, bool) {
got := c.views.Lookup(id.Name, func() instrumentID { return id })
return got, id == got
}
76 changes: 76 additions & 0 deletions sdk/metric/cache_test.go
@@ -0,0 +1,76 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestCache(t *testing.T) {
k0, k1 := "one", "two"
v0, v1 := 1, 2

c := cache[string, int]{}

var got int
require.NotPanics(t, func() {
got = c.Lookup(k0, func() int { return v0 })
}, "zero-value cache panics on Lookup")
assert.Equal(t, v0, got, "zero-value cache did not return fallback")

assert.Equal(t, v0, c.Lookup(k0, func() int { return v1 }), "existing key")

assert.Equal(t, v1, c.Lookup(k1, func() int { return v1 }), "non-existing key")
}

func TestCacheConcurrency(t *testing.T) {
const (
key = "k"
goroutines = 10
timeoutSec = 5
)

c := cache[string, int]{}
var wg sync.WaitGroup
for n := 0; n < goroutines; n++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
assert.NotPanics(t, func() {
c.Lookup(key, func() int { return i })
})
}(n)
}

done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()

assert.Eventually(t, func() bool {
select {
case <-done:
return true
default:
return false
}
}, timeoutSec*time.Second, 10*time.Millisecond)
}
23 changes: 23 additions & 0 deletions sdk/metric/instrument.go
Expand Up @@ -23,9 +23,32 @@ import (
"go.opentelemetry.io/otel/metric/instrument/asyncint64"
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
"go.opentelemetry.io/otel/metric/unit"
"go.opentelemetry.io/otel/sdk/metric/internal"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

// instrumentID are the identifying properties of an instrument.
type instrumentID struct {
// Name is the name of the instrument.
Name string
// Description is the description of the instrument.
Description string
// Unit is the unit of the instrument.
Unit unit.Unit
// Aggregation is the aggregation data type of the instrument.
Aggregation string
// Monotonic is the monotonicity of an instruments data type. This field is
// not used for all data types, so a zero value needs to be understood in the
// context of Aggregation.
Monotonic bool
// Temporality is the temporality of an instrument's data type. This field
// is not used by some data types.
Temporality metricdata.Temporality
// Number is the number type of the instrument.
Number string
}

type instrumentImpl[N int64 | float64] struct {
instrument.Asynchronous
instrument.Synchronous
Expand Down
3 changes: 3 additions & 0 deletions sdk/metric/internal/aggregator.go
Expand Up @@ -26,6 +26,9 @@ import (
var now = time.Now

// Aggregator forms an aggregation from a collection of recorded measurements.
//
// Aggregators need to be comparable so they can be de-duplicated by the SDK when
// it creates them for multiple views.
type Aggregator[N int64 | float64] interface {
// Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation.
Expand Down
40 changes: 25 additions & 15 deletions sdk/metric/internal/aggregator_test.go
Expand Up @@ -20,6 +20,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
Expand Down Expand Up @@ -80,23 +82,31 @@ type aggregatorTester[N int64 | float64] struct {
func (at *aggregatorTester[N]) Run(a Aggregator[N], incr setMap, eFunc expectFunc) func(*testing.T) {
m := at.MeasurementN * at.GoroutineN
return func(t *testing.T) {
for i := 0; i < at.CycleN; i++ {
var wg sync.WaitGroup
wg.Add(at.GoroutineN)
for i := 0; i < at.GoroutineN; i++ {
go func() {
defer wg.Done()
for j := 0; j < at.MeasurementN; j++ {
for attrs, n := range incr {
a.Aggregate(N(n), attrs)
t.Run("Comparable", func(t *testing.T) {
assert.NotPanics(t, func() {
_ = map[Aggregator[N]]struct{}{a: {}}
})
})

t.Run("Correctness", func(t *testing.T) {
for i := 0; i < at.CycleN; i++ {
var wg sync.WaitGroup
wg.Add(at.GoroutineN)
for j := 0; j < at.GoroutineN; j++ {
go func() {
defer wg.Done()
for k := 0; k < at.MeasurementN; k++ {
for attrs, n := range incr {
a.Aggregate(N(n), attrs)
}
}
}
}()
}
wg.Wait()
}()
}
wg.Wait()

metricdatatest.AssertAggregationsEqual(t, eFunc(m), a.Aggregation())
}
metricdatatest.AssertAggregationsEqual(t, eFunc(m), a.Aggregation())
}
})
}
}

Expand Down
43 changes: 31 additions & 12 deletions sdk/metric/meter.go
Expand Up @@ -55,10 +55,7 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter {
defer r.Unlock()

if r.meters == nil {
m := &meter{
Scope: s,
pipes: r.pipes,
}
m := newMeter(s, r.pipes)
r.meters = map[instrumentation.Scope]*meter{s: m}
return m
}
Expand All @@ -68,10 +65,7 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter {
return m
}

m = &meter{
Scope: s,
pipes: r.pipes,
}
m = newMeter(s, r.pipes)
r.meters[s] = m
return m
}
Expand All @@ -83,20 +77,45 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter {
type meter struct {
instrumentation.Scope

// *Resolvers are used by the provided instrument providers to resolve new
// instruments aggregators and maintain a cache across instruments this
// meter owns.
int64Resolver resolver[int64]
float64Resolver resolver[float64]

pipes pipelines
}

func newMeter(s instrumentation.Scope, p pipelines) *meter {
// viewCache ensures instrument conflicts, including number conflicts, this
// meter is asked to create are logged to the user.
var viewCache cache[string, instrumentID]

// Passing nil as the ac parameter to newInstrumentCache will have each
// create its own aggregator cache.
ic := newInstrumentCache[int64](nil, &viewCache)
fc := newInstrumentCache[float64](nil, &viewCache)

return &meter{
Scope: s,
pipes: p,

int64Resolver: newResolver(p, ic),
float64Resolver: newResolver(p, fc),
}
}

// Compile-time check meter implements metric.Meter.
var _ metric.Meter = (*meter)(nil)

// AsyncInt64 returns the asynchronous integer instrument provider.
func (m *meter) AsyncInt64() asyncint64.InstrumentProvider {
return asyncInt64Provider{scope: m.Scope, resolve: newResolver[int64](m.pipes)}
return asyncInt64Provider{scope: m.Scope, resolve: &m.int64Resolver}
}

// AsyncFloat64 returns the asynchronous floating-point instrument provider.
func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider {
return asyncFloat64Provider{scope: m.Scope, resolve: newResolver[float64](m.pipes)}
return asyncFloat64Provider{scope: m.Scope, resolve: &m.float64Resolver}
}

// RegisterCallback registers the function f to be called when any of the
Expand All @@ -108,10 +127,10 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context

// SyncInt64 returns the synchronous integer instrument provider.
func (m *meter) SyncInt64() syncint64.InstrumentProvider {
return syncInt64Provider{scope: m.Scope, resolve: newResolver[int64](m.pipes)}
return syncInt64Provider{scope: m.Scope, resolve: &m.int64Resolver}
}

// SyncFloat64 returns the synchronous floating-point instrument provider.
func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider {
return syncFloat64Provider{scope: m.Scope, resolve: newResolver[float64](m.pipes)}
return syncFloat64Provider{scope: m.Scope, resolve: &m.float64Resolver}
}

0 comments on commit b5292b8

Please sign in to comment.