diff --git a/CHANGELOG.md b/CHANGELOG.md index 3b5050e59f9..3a9af7a3094 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Slice attributes of `attribute` package are now comparable based on their value, not instance. (#3108 #3252) - Prometheus exporter will now cumulatively sum histogram buckets. (#3281) - Export the sum of each histogram datapoint uniquely with the `go.opentelemetry.io/otel/exporters/otlpmetric` exporters. (#3284, #3293) +- Recorded values for asynchronous counters (`Counter` and `UpDownCounter`) are interpreted as exact, not incremental, sum values by the metric SDK. (#3350, #3278) - UpDownCounters are now correctly output as prometheus gauges in the `go.opentelemetry.io/otel/exporters/prometheus` exporter. (#3358) ## [1.11.0/0.32.3] 2022-10-12 diff --git a/metric/instrument/asyncfloat64/asyncfloat64.go b/metric/instrument/asyncfloat64/asyncfloat64.go index 370715f694c..5c8260ceb6f 100644 --- a/metric/instrument/asyncfloat64/asyncfloat64.go +++ b/metric/instrument/asyncfloat64/asyncfloat64.go @@ -35,7 +35,8 @@ type InstrumentProvider interface { // Counter is an instrument that records increasing values. type Counter interface { - // Observe records the state of the instrument. + // Observe records the state of the instrument to be x. The value of x is + // assumed to be the exact Counter value to record. // // It is only valid to call this within a callback. If called outside of the // registered callback it should have no effect on the instrument, and an @@ -47,7 +48,8 @@ type Counter interface { // UpDownCounter is an instrument that records increasing or decreasing values. type UpDownCounter interface { - // Observe records the state of the instrument. + // Observe records the state of the instrument to be x. The value of x is + // assumed to be the exact UpDownCounter value to record. // // It is only valid to call this within a callback. If called outside of the // registered callback it should have no effect on the instrument, and an @@ -59,7 +61,7 @@ type UpDownCounter interface { // Gauge is an instrument that records independent readings. type Gauge interface { - // Observe records the state of the instrument. + // Observe records the state of the instrument to be x. // // It is only valid to call this within a callback. If called outside of the // registered callback it should have no effect on the instrument, and an diff --git a/metric/instrument/asyncint64/asyncint64.go b/metric/instrument/asyncint64/asyncint64.go index 41a561bc4a2..b07409c7931 100644 --- a/metric/instrument/asyncint64/asyncint64.go +++ b/metric/instrument/asyncint64/asyncint64.go @@ -35,7 +35,8 @@ type InstrumentProvider interface { // Counter is an instrument that records increasing values. type Counter interface { - // Observe records the state of the instrument. + // Observe records the state of the instrument to be x. The value of x is + // assumed to be the exact Counter value to record. // // It is only valid to call this within a callback. If called outside of the // registered callback it should have no effect on the instrument, and an @@ -47,7 +48,8 @@ type Counter interface { // UpDownCounter is an instrument that records increasing or decreasing values. type UpDownCounter interface { - // Observe records the state of the instrument. + // Observe records the state of the instrument to be x. The value of x is + // assumed to be the exact UpDownCounter value to record. // // It is only valid to call this within a callback. If called outside of the // registered callback it should have no effect on the instrument, and an @@ -59,7 +61,7 @@ type UpDownCounter interface { // Gauge is an instrument that records independent readings. type Gauge interface { - // Observe records the state of the instrument. + // Observe records the state of the instrument to be x. // // It is only valid to call this within a callback. If called outside of the // registered callback it should have no effect on the instrument, and an diff --git a/sdk/metric/internal/sum.go b/sdk/metric/internal/sum.go index da8bf34de82..61b8d20860e 100644 --- a/sdk/metric/internal/sum.go +++ b/sdk/metric/internal/sum.go @@ -32,6 +32,12 @@ func newValueMap[N int64 | float64]() *valueMap[N] { return &valueMap[N]{values: make(map[attribute.Set]N)} } +func (s *valueMap[N]) set(value N, attr attribute.Set) { // nolint: unused // This is indeed used. + s.Lock() + s.values[attr] = value + s.Unlock() +} + func (s *valueMap[N]) Aggregate(value N, attr attribute.Set) { s.Lock() s.values[attr] += value @@ -49,6 +55,10 @@ func (s *valueMap[N]) Aggregate(value N, attr attribute.Set) { // Each aggregation cycle is treated independently. When the returned // Aggregator's Aggregation method is called it will reset all sums to zero. func NewDeltaSum[N int64 | float64](monotonic bool) Aggregator[N] { + return newDeltaSum[N](monotonic) +} + +func newDeltaSum[N int64 | float64](monotonic bool) *deltaSum[N] { return &deltaSum[N]{ valueMap: newValueMap[N](), monotonic: monotonic, @@ -106,6 +116,10 @@ func (s *deltaSum[N]) Aggregation() metricdata.Aggregation { // Each aggregation cycle is treated independently. When the returned // Aggregator's Aggregation method is called it will reset all sums to zero. func NewCumulativeSum[N int64 | float64](monotonic bool) Aggregator[N] { + return newCumulativeSum[N](monotonic) +} + +func newCumulativeSum[N int64 | float64](monotonic bool) *cumulativeSum[N] { return &cumulativeSum[N]{ valueMap: newValueMap[N](), monotonic: monotonic, @@ -151,3 +165,47 @@ func (s *cumulativeSum[N]) Aggregation() metricdata.Aggregation { } return out } + +// NewPrecomputedDeltaSum returns an Aggregator that summarizes a set of +// measurements as their pre-computed arithmetic sum. Each sum is scoped by +// attributes and the aggregation cycle the measurements were made in. +// +// The monotonic value is used to communicate the produced Aggregation is +// monotonic or not. The returned Aggregator does not make any guarantees this +// value is accurate. It is up to the caller to ensure it. +// +// The output Aggregation will report recorded values as delta temporality. It +// is up to the caller to ensure this is accurate. +func NewPrecomputedDeltaSum[N int64 | float64](monotonic bool) Aggregator[N] { + return &precomputedSum[N]{settableSum: newDeltaSum[N](monotonic)} +} + +// NewPrecomputedCumulativeSum returns an Aggregator that summarizes a set of +// measurements as their pre-computed arithmetic sum. Each sum is scoped by +// attributes and the aggregation cycle the measurements were made in. +// +// The monotonic value is used to communicate the produced Aggregation is +// monotonic or not. The returned Aggregator does not make any guarantees this +// value is accurate. It is up to the caller to ensure it. +// +// The output Aggregation will report recorded values as cumulative +// temporality. It is up to the caller to ensure this is accurate. +func NewPrecomputedCumulativeSum[N int64 | float64](monotonic bool) Aggregator[N] { + return &precomputedSum[N]{settableSum: newCumulativeSum[N](monotonic)} +} + +type settableSum[N int64 | float64] interface { + set(value N, attr attribute.Set) + Aggregation() metricdata.Aggregation +} + +// precomputedSum summarizes a set of measurements recorded over all +// aggregation cycles directly as an arithmetic sum. +type precomputedSum[N int64 | float64] struct { + settableSum[N] +} + +// Aggregate records value directly as a sum for attr. +func (s *precomputedSum[N]) Aggregate(value N, attr attribute.Set) { + s.set(value, attr) +} diff --git a/sdk/metric/internal/sum_test.go b/sdk/metric/internal/sum_test.go index 8a838d3f23c..eda320070ff 100644 --- a/sdk/metric/internal/sum_test.go +++ b/sdk/metric/internal/sum_test.go @@ -54,6 +54,26 @@ func testSum[N int64 | float64](t *testing.T) { eFunc = cumuExpecter[N](incr, mono) t.Run("NonMonotonic", tester.Run(NewCumulativeSum[N](mono), incr, eFunc)) }) + + t.Run("PreComputedDelta", func(t *testing.T) { + incr, mono, temp := monoIncr, true, metricdata.DeltaTemporality + eFunc := preExpecter[N](incr, mono, temp) + t.Run("Monotonic", tester.Run(NewPrecomputedDeltaSum[N](mono), incr, eFunc)) + + incr, mono = nonMonoIncr, false + eFunc = preExpecter[N](incr, mono, temp) + t.Run("NonMonotonic", tester.Run(NewPrecomputedDeltaSum[N](mono), incr, eFunc)) + }) + + t.Run("PreComputedCumulative", func(t *testing.T) { + incr, mono, temp := monoIncr, true, metricdata.CumulativeTemporality + eFunc := preExpecter[N](incr, mono, temp) + t.Run("Monotonic", tester.Run(NewPrecomputedCumulativeSum[N](mono), incr, eFunc)) + + incr, mono = nonMonoIncr, false + eFunc = preExpecter[N](incr, mono, temp) + t.Run("NonMonotonic", tester.Run(NewPrecomputedCumulativeSum[N](mono), incr, eFunc)) + }) } func deltaExpecter[N int64 | float64](incr setMap, mono bool) expectFunc { @@ -61,7 +81,7 @@ func deltaExpecter[N int64 | float64](incr setMap, mono bool) expectFunc { return func(m int) metricdata.Aggregation { sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr)) for a, v := range incr { - sum.DataPoints = append(sum.DataPoints, point[N](a, N(v*m))) + sum.DataPoints = append(sum.DataPoints, point(a, N(v*m))) } return sum } @@ -74,7 +94,18 @@ func cumuExpecter[N int64 | float64](incr setMap, mono bool) expectFunc { cycle++ sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr)) for a, v := range incr { - sum.DataPoints = append(sum.DataPoints, point[N](a, N(v*cycle*m))) + sum.DataPoints = append(sum.DataPoints, point(a, N(v*cycle*m))) + } + return sum + } +} + +func preExpecter[N int64 | float64](incr setMap, mono bool, temp metricdata.Temporality) expectFunc { + sum := metricdata.Sum[N]{Temporality: temp, IsMonotonic: mono} + return func(int) metricdata.Aggregation { + sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr)) + for a, v := range incr { + sum.DataPoints = append(sum.DataPoints, point(a, N(v))) } return sum } diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 83de81ccc61..db70003b1d5 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -266,7 +266,7 @@ func (i *inserter[N]) cachedAggregator(inst view.Instrument, u unit.Unit) (inter // still be applied and a warning should be logged. i.logConflict(id) return i.cache.LookupAggregator(id, func() (internal.Aggregator[N], error) { - agg, err := i.aggregator(inst.Aggregation, id.Temporality, id.Monotonic) + agg, err := i.aggregator(inst.Aggregation, inst.Kind, id.Temporality, id.Monotonic) if err != nil { return nil, err } @@ -322,16 +322,31 @@ func (i *inserter[N]) instrumentID(vi view.Instrument, u unit.Unit) instrumentID return id } -// aggregator returns a new Aggregator matching agg, temporality, and +// aggregator returns a new Aggregator matching agg, kind, temporality, and // monotonic. If the agg is unknown or temporality is invalid, an error is // returned. -func (i *inserter[N]) aggregator(agg aggregation.Aggregation, temporality metricdata.Temporality, monotonic bool) (internal.Aggregator[N], error) { +func (i *inserter[N]) aggregator(agg aggregation.Aggregation, kind view.InstrumentKind, temporality metricdata.Temporality, monotonic bool) (internal.Aggregator[N], error) { switch a := agg.(type) { case aggregation.Drop: return nil, nil case aggregation.LastValue: return internal.NewLastValue[N](), nil case aggregation.Sum: + switch kind { + case view.AsyncCounter, view.AsyncUpDownCounter: + // Asynchronous counters and up-down-counters are defined to record + // the absolute value of the count: + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#asynchronous-counter-creation + switch temporality { + case metricdata.CumulativeTemporality: + return internal.NewPrecomputedCumulativeSum[N](monotonic), nil + case metricdata.DeltaTemporality: + return internal.NewPrecomputedDeltaSum[N](monotonic), nil + default: + return nil, fmt.Errorf("%w: %s(%d)", errUnknownTemporality, temporality.String(), temporality) + } + } + switch temporality { case metricdata.CumulativeTemporality: return internal.NewCumulativeSum[N](monotonic), nil diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index 45bc23ccb18..eb27da9824e 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -107,7 +107,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), views: []view.View{defaultAggView}, inst: instruments[view.AsyncCounter], - wantKind: internal.NewDeltaSum[N](true), + wantKind: internal.NewPrecomputedDeltaSum[N](true), wantLen: 1, }, { @@ -115,7 +115,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), views: []view.View{defaultAggView}, inst: instruments[view.AsyncUpDownCounter], - wantKind: internal.NewDeltaSum[N](false), + wantKind: internal.NewPrecomputedDeltaSum[N](false), wantLen: 1, }, { @@ -155,7 +155,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { reader: NewManualReader(), views: []view.View{{}}, inst: instruments[view.AsyncCounter], - wantKind: internal.NewCumulativeSum[N](true), + wantKind: internal.NewPrecomputedCumulativeSum[N](true), wantLen: 1, }, { @@ -163,7 +163,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { reader: NewManualReader(), views: []view.View{{}}, inst: instruments[view.AsyncUpDownCounter], - wantKind: internal.NewCumulativeSum[N](false), + wantKind: internal.NewPrecomputedCumulativeSum[N](false), wantLen: 1, }, {