Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Asynchronous Counters Recording #3350

Merged
merged 11 commits into from Oct 19, 2022
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -26,6 +26,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Slice attributes of `attribute` package are now comparable based on their value, not instance. (#3108 #3252)
- Prometheus exporter will now cumulatively sum histogram buckets. (#3281)
- Export the sum of each histogram datapoint uniquely with the `go.opentelemetry.io/otel/exporters/otlpmetric` exporters. (#3284, #3293)
- Recorded values for asynchronous counters (`Counter` and `UpDownCounter`) are interpreted as exact, not incremental, sum values by the metric SDK. (#3350, #3278)

## [1.11.0/0.32.3] 2022-10-12

Expand Down
8 changes: 5 additions & 3 deletions metric/instrument/asyncfloat64/asyncfloat64.go
Expand Up @@ -35,7 +35,8 @@ type InstrumentProvider interface {

// Counter is an instrument that records increasing values.
type Counter interface {
// Observe records the state of the instrument.
// Observe records the state of the instrument to be x. The value of x is
// assumed to be the exact Counter value to record.
//
// It is only valid to call this within a callback. If called outside of the
// registered callback it should have no effect on the instrument, and an
Expand All @@ -47,7 +48,8 @@ type Counter interface {

// UpDownCounter is an instrument that records increasing or decreasing values.
type UpDownCounter interface {
// Observe records the state of the instrument.
// Observe records the state of the instrument to be x. The value of x is
// assumed to be the exact UpDownCounter value to record.
//
// It is only valid to call this within a callback. If called outside of the
// registered callback it should have no effect on the instrument, and an
Expand All @@ -59,7 +61,7 @@ type UpDownCounter interface {

// Gauge is an instrument that records independent readings.
type Gauge interface {
// Observe records the state of the instrument.
// Observe records the state of the instrument to be x.
//
// It is only valid to call this within a callback. If called outside of the
// registered callback it should have no effect on the instrument, and an
Expand Down
8 changes: 5 additions & 3 deletions metric/instrument/asyncint64/asyncint64.go
Expand Up @@ -35,7 +35,8 @@ type InstrumentProvider interface {

// Counter is an instrument that records increasing values.
type Counter interface {
// Observe records the state of the instrument.
// Observe records the state of the instrument to be x. The value of x is
// assumed to be the exact Counter value to record.
//
// It is only valid to call this within a callback. If called outside of the
// registered callback it should have no effect on the instrument, and an
Expand All @@ -47,7 +48,8 @@ type Counter interface {

// UpDownCounter is an instrument that records increasing or decreasing values.
type UpDownCounter interface {
// Observe records the state of the instrument.
// Observe records the state of the instrument to be x. The value of x is
// assumed to be the exact UpDownCounter value to record.
//
// It is only valid to call this within a callback. If called outside of the
// registered callback it should have no effect on the instrument, and an
Expand All @@ -59,7 +61,7 @@ type UpDownCounter interface {

// Gauge is an instrument that records independent readings.
type Gauge interface {
// Observe records the state of the instrument.
// Observe records the state of the instrument to be x.
//
// It is only valid to call this within a callback. If called outside of the
// registered callback it should have no effect on the instrument, and an
Expand Down
52 changes: 52 additions & 0 deletions sdk/metric/internal/sum.go
Expand Up @@ -32,6 +32,12 @@ func newValueMap[N int64 | float64]() *valueMap[N] {
return &valueMap[N]{values: make(map[attribute.Set]N)}
}

func (s *valueMap[N]) set(value N, attr attribute.Set) { // nolint: unused // This is indeed used.
s.Lock()
s.values[attr] = value
s.Unlock()
}

func (s *valueMap[N]) Aggregate(value N, attr attribute.Set) {
s.Lock()
s.values[attr] += value
Expand All @@ -49,6 +55,10 @@ func (s *valueMap[N]) Aggregate(value N, attr attribute.Set) {
// Each aggregation cycle is treated independently. When the returned
// Aggregator's Aggregation method is called it will reset all sums to zero.
func NewDeltaSum[N int64 | float64](monotonic bool) Aggregator[N] {
return newDeltaSum[N](monotonic)
}

func newDeltaSum[N int64 | float64](monotonic bool) *deltaSum[N] {
return &deltaSum[N]{
valueMap: newValueMap[N](),
monotonic: monotonic,
Expand Down Expand Up @@ -106,6 +116,10 @@ func (s *deltaSum[N]) Aggregation() metricdata.Aggregation {
// Each aggregation cycle is treated independently. When the returned
// Aggregator's Aggregation method is called it will reset all sums to zero.
func NewCumulativeSum[N int64 | float64](monotonic bool) Aggregator[N] {
return newCumulativeSum[N](monotonic)
}

func newCumulativeSum[N int64 | float64](monotonic bool) *cumulativeSum[N] {
return &cumulativeSum[N]{
valueMap: newValueMap[N](),
monotonic: monotonic,
Expand Down Expand Up @@ -151,3 +165,41 @@ func (s *cumulativeSum[N]) Aggregation() metricdata.Aggregation {
}
return out
}

// NewPrecomputedSum returns an Aggregator that summarizes a set of
// measurements as their pre-computed arithmetic sum. Each sum is scoped by
// attributes and the aggregation cycle the measurements were made in.
//
// The monotonic value is used to communicate the produced Aggregation is
// monotonic or not. The returned Aggregator does not make any guarantees this
// value is accurate. It is up to the caller to ensure it.
//
// The temporality value is used to communicate the produced Aggregation
// temporality. The returned Aggregator does not make any guarantees this value
// is accurate. It is up to the caller to ensure it.
func NewPrecomputedSum[N int64 | float64](monotonic bool, temporality metricdata.Temporality) Aggregator[N] {
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
var s settableSum[N]
switch temporality {
case metricdata.DeltaTemporality:
s = newDeltaSum[N](monotonic)
default:
s = newCumulativeSum[N](monotonic)
}
return &precomputedSum[N]{settableSum: s}
}

type settableSum[N int64 | float64] interface {
set(value N, attr attribute.Set)
Aggregation() metricdata.Aggregation
}

// precomputedSum summarizes a set of measurements recorded over all
// aggregation cycles directly as an arithmetic sum.
type precomputedSum[N int64 | float64] struct {
settableSum[N]
}

// Aggregate records value directly as a sum for attr.
func (s *precomputedSum[N]) Aggregate(value N, attr attribute.Set) {
s.set(value, attr)
}
33 changes: 31 additions & 2 deletions sdk/metric/internal/sum_test.go
Expand Up @@ -54,14 +54,32 @@ func testSum[N int64 | float64](t *testing.T) {
eFunc = cumuExpecter[N](incr, mono)
t.Run("NonMonotonic", tester.Run(NewCumulativeSum[N](mono), incr, eFunc))
})

t.Run("PreComputed", func(t *testing.T) {
incr, mono, temp := monoIncr, true, metricdata.DeltaTemporality
eFunc := preExpecter[N](incr, mono, temp)
t.Run("Monotonic/Delta", tester.Run(NewPrecomputedSum[N](mono, temp), incr, eFunc))

temp = metricdata.CumulativeTemporality
eFunc = preExpecter[N](incr, mono, temp)
t.Run("Monotonic/Cumulative", tester.Run(NewPrecomputedSum[N](mono, temp), incr, eFunc))

incr, mono, temp = nonMonoIncr, false, metricdata.DeltaTemporality
eFunc = preExpecter[N](incr, mono, temp)
t.Run("NonMonotonic/Delta", tester.Run(NewPrecomputedSum[N](mono, temp), incr, eFunc))

temp = metricdata.CumulativeTemporality
eFunc = preExpecter[N](incr, mono, temp)
t.Run("NonMonotonic/Cumulative", tester.Run(NewPrecomputedSum[N](mono, temp), incr, eFunc))
})
}

func deltaExpecter[N int64 | float64](incr setMap, mono bool) expectFunc {
sum := metricdata.Sum[N]{Temporality: metricdata.DeltaTemporality, IsMonotonic: mono}
return func(m int) metricdata.Aggregation {
sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr))
for a, v := range incr {
sum.DataPoints = append(sum.DataPoints, point[N](a, N(v*m)))
sum.DataPoints = append(sum.DataPoints, point(a, N(v*m)))
}
return sum
}
Expand All @@ -74,7 +92,18 @@ func cumuExpecter[N int64 | float64](incr setMap, mono bool) expectFunc {
cycle++
sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr))
for a, v := range incr {
sum.DataPoints = append(sum.DataPoints, point[N](a, N(v*cycle*m)))
sum.DataPoints = append(sum.DataPoints, point(a, N(v*cycle*m)))
}
return sum
}
}

func preExpecter[N int64 | float64](incr setMap, mono bool, temp metricdata.Temporality) expectFunc {
sum := metricdata.Sum[N]{Temporality: temp, IsMonotonic: mono}
return func(int) metricdata.Aggregation {
sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr))
for a, v := range incr {
sum.DataPoints = append(sum.DataPoints, point(a, N(v)))
}
return sum
}
Expand Down
14 changes: 11 additions & 3 deletions sdk/metric/pipeline.go
Expand Up @@ -266,7 +266,7 @@ func (i *inserter[N]) cachedAggregator(inst view.Instrument, u unit.Unit) (inter
// still be applied and a warning should be logged.
i.logConflict(id)
return i.cache.LookupAggregator(id, func() (internal.Aggregator[N], error) {
agg, err := i.aggregator(inst.Aggregation, id.Temporality, id.Monotonic)
agg, err := i.aggregator(inst.Aggregation, inst.Kind, id.Temporality, id.Monotonic)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -322,16 +322,24 @@ func (i *inserter[N]) instrumentID(vi view.Instrument, u unit.Unit) instrumentID
return id
}

// aggregator returns a new Aggregator matching agg, temporality, and
// aggregator returns a new Aggregator matching agg, kind, temporality, and
// monotonic. If the agg is unknown or temporality is invalid, an error is
// returned.
func (i *inserter[N]) aggregator(agg aggregation.Aggregation, temporality metricdata.Temporality, monotonic bool) (internal.Aggregator[N], error) {
func (i *inserter[N]) aggregator(agg aggregation.Aggregation, kind view.InstrumentKind, temporality metricdata.Temporality, monotonic bool) (internal.Aggregator[N], error) {
switch a := agg.(type) {
case aggregation.Drop:
return nil, nil
case aggregation.LastValue:
return internal.NewLastValue[N](), nil
case aggregation.Sum:
switch kind {
case view.AsyncCounter, view.AsyncUpDownCounter:
// Asynchronous counters and up-down-counters are defined to record
// the absolute value of the count:
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#asynchronous-counter-creation
return internal.NewPrecomputedSum[N](monotonic, temporality), nil
}

switch temporality {
case metricdata.CumulativeTemporality:
return internal.NewCumulativeSum[N](monotonic), nil
Expand Down
9 changes: 5 additions & 4 deletions sdk/metric/pipeline_registry_test.go
Expand Up @@ -28,6 +28,7 @@ import (
"go.opentelemetry.io/otel/metric/unit"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/internal"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
"go.opentelemetry.io/otel/sdk/resource"
)
Expand Down Expand Up @@ -107,15 +108,15 @@ func testCreateAggregators[N int64 | float64](t *testing.T) {
reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)),
views: []view.View{defaultAggView},
inst: instruments[view.AsyncCounter],
wantKind: internal.NewDeltaSum[N](true),
wantKind: internal.NewPrecomputedSum[N](true, metricdata.DeltaTemporality),
wantLen: 1,
},
{
name: "default agg should use reader",
reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)),
views: []view.View{defaultAggView},
inst: instruments[view.AsyncUpDownCounter],
wantKind: internal.NewDeltaSum[N](false),
wantKind: internal.NewPrecomputedSum[N](false, metricdata.DeltaTemporality),
wantLen: 1,
},
{
Expand Down Expand Up @@ -155,15 +156,15 @@ func testCreateAggregators[N int64 | float64](t *testing.T) {
reader: NewManualReader(),
views: []view.View{{}},
inst: instruments[view.AsyncCounter],
wantKind: internal.NewCumulativeSum[N](true),
wantKind: internal.NewPrecomputedSum[N](true, metricdata.CumulativeTemporality),
wantLen: 1,
},
{
name: "reader should set default agg",
reader: NewManualReader(),
views: []view.View{{}},
inst: instruments[view.AsyncUpDownCounter],
wantKind: internal.NewCumulativeSum[N](false),
wantKind: internal.NewPrecomputedSum[N](false, metricdata.CumulativeTemporality),
wantLen: 1,
},
{
Expand Down