diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 82d5a5269be..371e1829946 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -16,6 +16,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "context" + "errors" "sync" "go.opentelemetry.io/otel/metric" @@ -25,6 +26,7 @@ import ( "go.opentelemetry.io/otel/metric/instrument/syncfloat64" "go.opentelemetry.io/otel/metric/instrument/syncint64" "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/internal" ) // meterRegistry keeps a record of initialized meters for instrumentation @@ -76,6 +78,54 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter { return m } +type cache map[instrumentID]any + +type cacheResult[N int64 | float64] struct { + aggregators []internal.Aggregator[N] + err error +} + +type querier[N int64 | float64] struct { + sync.Mutex + + c cache +} + +func newQuerier[N int64 | float64](c cache) *querier[N] { + return &querier[N]{c: c} +} + +var ( + errCacheMiss = errors.New("cache miss") + errExists = errors.New("instrument already exists for different number type") +) + +func (q *querier[N]) Get(key instrumentID) (r *cacheResult[N], err error) { + q.Lock() + defer q.Unlock() + + vIface, ok := q.c[key] + if !ok { + err = errCacheMiss + return r, err + } + + switch v := vIface.(type) { + case *cacheResult[N]: + r = v + default: + err = errExists + } + return r, err +} + +func (q *querier[N]) Set(key instrumentID, val *cacheResult[N]) { + q.Lock() + defer q.Unlock() + + q.c[key] = val +} + // meter handles the creation and coordination of all metric instruments. A // meter represents a single instrumentation scope; all metric telemetry // produced by an instrumentation scope will use metric instruments from a @@ -84,6 +134,7 @@ type meter struct { instrumentation.Scope pipes pipelines + cache *cache } // Compile-time check meter implements metric.Meter. @@ -91,12 +142,14 @@ var _ metric.Meter = (*meter)(nil) // AsyncInt64 returns the asynchronous integer instrument provider. func (m *meter) AsyncInt64() asyncint64.InstrumentProvider { - return asyncInt64Provider{scope: m.Scope, resolve: newResolver[int64](m.pipes)} + q := newQuerier[int64](*m.cache) + return asyncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, q)} } // AsyncFloat64 returns the asynchronous floating-point instrument provider. func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider { - return asyncFloat64Provider{scope: m.Scope, resolve: newResolver[float64](m.pipes)} + q := newQuerier[float64](*m.cache) + return asyncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, q)} } // RegisterCallback registers the function f to be called when any of the @@ -108,10 +161,12 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context // SyncInt64 returns the synchronous integer instrument provider. func (m *meter) SyncInt64() syncint64.InstrumentProvider { - return syncInt64Provider{scope: m.Scope, resolve: newResolver[int64](m.pipes)} + q := newQuerier[int64](*m.cache) + return syncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, q)} } // SyncFloat64 returns the synchronous floating-point instrument provider. func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider { - return syncFloat64Provider{scope: m.Scope, resolve: newResolver[float64](m.pipes)} + q := newQuerier[float64](*m.cache) + return syncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, q)} } diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 0bd52d63023..16ad65ba9c7 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -345,22 +345,31 @@ func (p pipelines) registerCallback(fn func(context.Context)) { // measurements with while updating all pipelines that need to pull from those // aggregations. type resolver[N int64 | float64] struct { + cache *querier[N] inserters []*inserter[N] } -func newResolver[N int64 | float64](p pipelines) *resolver[N] { +func newResolver[N int64 | float64](p pipelines, q *querier[N]) *resolver[N] { in := make([]*inserter[N], len(p)) for i := range in { in[i] = newInserter[N](p[i]) } - return &resolver[N]{in} + return &resolver[N]{cache: q, inserters: in} } // Aggregators returns the Aggregators instrument inst needs to update when it // makes a measurement. func (r *resolver[N]) Aggregators(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) { - var aggs []internal.Aggregator[N] + id := instrumentID{scope: inst.Scope, name: inst.Name, description: inst.Description} + resp, err := r.cache.Get(id) + if err == nil { + return resp.aggregators, resp.err + } + if !errors.Is(err, errCacheMiss) { + return nil, err + } + var aggs []internal.Aggregator[N] errs := &multierror{} for _, i := range r.inserters { a, err := i.Instrument(inst, instUnit) @@ -369,7 +378,12 @@ func (r *resolver[N]) Aggregators(inst view.Instrument, instUnit unit.Unit) ([]i } aggs = append(aggs, a...) } - return aggs, errs.errorOrNil() + + err = errs.errorOrNil() + resp.aggregators = aggs + resp.err = err + r.cache.Set(id, resp) + return aggs, err } type multierror struct {