Skip to content

Commit

Permalink
Use the PrecomputedSum for async counters
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias committed Oct 17, 2022
1 parent 5c484ed commit a320d5a
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 7 deletions.
14 changes: 11 additions & 3 deletions sdk/metric/pipeline.go
Expand Up @@ -266,7 +266,7 @@ func (i *inserter[N]) cachedAggregator(inst view.Instrument, u unit.Unit) (inter
// still be applied and a warning should be logged.
i.logConflict(id)
return i.cache.LookupAggregator(id, func() (internal.Aggregator[N], error) {
agg, err := i.aggregator(inst.Aggregation, id.Temporality, id.Monotonic)
agg, err := i.aggregator(inst.Aggregation, inst.Kind, id.Temporality, id.Monotonic)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -322,16 +322,24 @@ func (i *inserter[N]) instrumentID(vi view.Instrument, u unit.Unit) instrumentID
return id
}

// aggregator returns a new Aggregator matching agg, temporality, and
// aggregator returns a new Aggregator matching agg, kind, temporality, and
// monotonic. If the agg is unknown or temporality is invalid, an error is
// returned.
func (i *inserter[N]) aggregator(agg aggregation.Aggregation, temporality metricdata.Temporality, monotonic bool) (internal.Aggregator[N], error) {
func (i *inserter[N]) aggregator(agg aggregation.Aggregation, kind view.InstrumentKind, temporality metricdata.Temporality, monotonic bool) (internal.Aggregator[N], error) {
switch a := agg.(type) {
case aggregation.Drop:
return nil, nil
case aggregation.LastValue:
return internal.NewLastValue[N](), nil
case aggregation.Sum:
switch kind {
case view.AsyncCounter, view.AsyncUpDownCounter:
// Asynchronous counters and up-down-counters are defined to record
// the absolute value of the count:
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#asynchronous-counter-creation
return internal.NewPrecomputedSum[N](monotonic, temporality), nil
}

switch temporality {
case metricdata.CumulativeTemporality:
return internal.NewCumulativeSum[N](monotonic), nil
Expand Down
9 changes: 5 additions & 4 deletions sdk/metric/pipeline_registry_test.go
Expand Up @@ -28,6 +28,7 @@ import (
"go.opentelemetry.io/otel/metric/unit"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/internal"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
"go.opentelemetry.io/otel/sdk/resource"
)
Expand Down Expand Up @@ -107,15 +108,15 @@ func testCreateAggregators[N int64 | float64](t *testing.T) {
reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)),
views: []view.View{defaultAggView},
inst: instruments[view.AsyncCounter],
wantKind: internal.NewDeltaSum[N](true),
wantKind: internal.NewPrecomputedSum[N](true, metricdata.DeltaTemporality),
wantLen: 1,
},
{
name: "default agg should use reader",
reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)),
views: []view.View{defaultAggView},
inst: instruments[view.AsyncUpDownCounter],
wantKind: internal.NewDeltaSum[N](false),
wantKind: internal.NewPrecomputedSum[N](false, metricdata.DeltaTemporality),
wantLen: 1,
},
{
Expand Down Expand Up @@ -155,15 +156,15 @@ func testCreateAggregators[N int64 | float64](t *testing.T) {
reader: NewManualReader(),
views: []view.View{{}},
inst: instruments[view.AsyncCounter],
wantKind: internal.NewCumulativeSum[N](true),
wantKind: internal.NewPrecomputedSum[N](true, metricdata.CumulativeTemporality),
wantLen: 1,
},
{
name: "reader should set default agg",
reader: NewManualReader(),
views: []view.View{{}},
inst: instruments[view.AsyncUpDownCounter],
wantKind: internal.NewCumulativeSum[N](false),
wantKind: internal.NewPrecomputedSum[N](false, metricdata.CumulativeTemporality),
wantLen: 1,
},
{
Expand Down

0 comments on commit a320d5a

Please sign in to comment.