Skip to content

Commit

Permalink
PoC of caching on top of open-telemetry#3233
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias committed Sep 23, 2022
1 parent 78cd494 commit 9196b1d
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 8 deletions.
63 changes: 59 additions & 4 deletions sdk/metric/meter.go
Expand Up @@ -16,6 +16,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"context"
"errors"
"sync"

"go.opentelemetry.io/otel/metric"
Expand All @@ -25,6 +26,7 @@ import (
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/internal"
)

// meterRegistry keeps a record of initialized meters for instrumentation
Expand Down Expand Up @@ -91,6 +93,54 @@ func (r *meterRegistry) Range(f func(*meter) bool) {
}
}

type cache map[instrumentID]any

type cacheResult[N int64 | float64] struct {
aggregators []internal.Aggregator[N]
err error
}

type querier[N int64 | float64] struct {
sync.Mutex

c cache
}

func newQuerier[N int64 | float64](c cache) *querier[N] {
return &querier[N]{c: c}
}

var (
errCacheMiss = errors.New("cache miss")
errExists = errors.New("instrument already exists for different number type")
)

func (q *querier[N]) Get(key instrumentID) (r *cacheResult[N], err error) {
q.Lock()
defer q.Unlock()

vIface, ok := q.c[key]
if !ok {
err = errCacheMiss
return r, err
}

switch v := vIface.(type) {
case *cacheResult[N]:
r = v
default:
err = errExists
}
return r, err
}

func (q *querier[N]) Set(key instrumentID, val *cacheResult[N]) {
q.Lock()
defer q.Unlock()

q.c[key] = val
}

// meter handles the creation and coordination of all metric instruments. A
// meter represents a single instrumentation scope; all metric telemetry
// produced by an instrumentation scope will use metric instruments from a
Expand All @@ -99,19 +149,22 @@ type meter struct {
instrumentation.Scope

pipes pipelines
cache *cache
}

// Compile-time check meter implements metric.Meter.
var _ metric.Meter = (*meter)(nil)

// AsyncInt64 returns the asynchronous integer instrument provider.
func (m *meter) AsyncInt64() asyncint64.InstrumentProvider {
return asyncInt64Provider{scope: m.Scope, resolve: newResolver[int64](m.pipes)}
q := newQuerier[int64](*m.cache)
return asyncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, q)}
}

// AsyncFloat64 returns the asynchronous floating-point instrument provider.
func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider {
return asyncFloat64Provider{scope: m.Scope, resolve: newResolver[float64](m.pipes)}
q := newQuerier[float64](*m.cache)
return asyncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, q)}
}

// RegisterCallback registers the function f to be called when any of the
Expand All @@ -123,10 +176,12 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context

// SyncInt64 returns the synchronous integer instrument provider.
func (m *meter) SyncInt64() syncint64.InstrumentProvider {
return syncInt64Provider{scope: m.Scope, resolve: newResolver[int64](m.pipes)}
q := newQuerier[int64](*m.cache)
return syncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, q)}
}

// SyncFloat64 returns the synchronous floating-point instrument provider.
func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider {
return syncFloat64Provider{scope: m.Scope, resolve: newResolver[float64](m.pipes)}
q := newQuerier[float64](*m.cache)
return syncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, q)}
}
22 changes: 18 additions & 4 deletions sdk/metric/pipeline.go
Expand Up @@ -344,22 +344,31 @@ func (p pipelines) registerCallback(fn func(context.Context)) {
// measurements with while updating all pipelines that need to pull from those
// aggregations.
type resolver[N int64 | float64] struct {
cache *querier[N]
inserters []*inserter[N]
}

func newResolver[N int64 | float64](p pipelines) *resolver[N] {
func newResolver[N int64 | float64](p pipelines, q *querier[N]) *resolver[N] {
in := make([]*inserter[N], len(p))
for i := range in {
in[i] = newInserter[N](p[i])
}
return &resolver[N]{in}
return &resolver[N]{cache: q, inserters: in}
}

// Aggregators returns the Aggregators instrument inst needs to update when it
// makes a measurement.
func (r *resolver[N]) Aggregators(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) {
var aggs []internal.Aggregator[N]
id := instrumentID{scope: inst.Scope, name: inst.Name, description: inst.Description}
resp, err := r.cache.Get(id)
if err == nil {
return resp.aggregators, resp.err
}
if !errors.Is(err, errCacheMiss) {
return nil, err
}

var aggs []internal.Aggregator[N]
errs := &multierror{}
for _, i := range r.inserters {
a, err := i.Instrument(inst, instUnit)
Expand All @@ -368,7 +377,12 @@ func (r *resolver[N]) Aggregators(inst view.Instrument, instUnit unit.Unit) ([]i
}
aggs = append(aggs, a...)
}
return aggs, errs.errorOrNil()

err = errs.errorOrNil()
resp.aggregators = aggs
resp.err = err
r.cache.Set(id, resp)
return aggs, err
}

type multierror struct {
Expand Down

0 comments on commit 9196b1d

Please sign in to comment.