Skip to content

Commit

Permalink
Drop unique check from addAggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias committed Sep 30, 2022
1 parent 8889f32 commit 40ae7fa
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 134 deletions.
57 changes: 25 additions & 32 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,12 @@ type aggregator interface {
Aggregation() metricdata.Aggregation
}

type instrumentKey struct {
name string
unit unit.Unit
}

type instrumentValue struct {
// instrumentSync is a synchronization point between a pipeline and an
// instrument's Aggregators.
type instrumentSync struct {
name string
description string
unit unit.Unit
aggregator aggregator
}

Expand All @@ -60,7 +59,7 @@ func newPipeline(res *resource.Resource, reader Reader, views []view.View) *pipe
resource: res,
reader: reader,
views: views,
aggregations: make(map[instrumentation.Scope]map[instrumentKey]instrumentValue),
aggregations: make(map[instrumentation.Scope][]instrumentSync),
}
}

Expand All @@ -76,36 +75,25 @@ type pipeline struct {
views []view.View

sync.Mutex
aggregations map[instrumentation.Scope]map[instrumentKey]instrumentValue
aggregations map[instrumentation.Scope][]instrumentSync
callbacks []func(context.Context)
}

var errAlreadyRegistered = errors.New("instrument already registered")

// addAggregator will stores an aggregator with an instrument description. The aggregator
// is used when `produce()` is called.
func (p *pipeline) addAggregator(scope instrumentation.Scope, name, description string, instUnit unit.Unit, agg aggregator) error {
// addSync adds the instrumentSync to pipeline p with scope. This method is not
// idempotent. Duplicate calls will result in duplicate additions, it is the
// callers responsibility to ensure this is called with unique values.
func (p *pipeline) addSync(scope instrumentation.Scope, sync instrumentSync) {
p.Lock()
defer p.Unlock()
if p.aggregations == nil {
p.aggregations = map[instrumentation.Scope]map[instrumentKey]instrumentValue{}
}
if p.aggregations[scope] == nil {
p.aggregations[scope] = map[instrumentKey]instrumentValue{}
}
inst := instrumentKey{
name: name,
unit: instUnit,
}
if _, ok := p.aggregations[scope][inst]; ok {
return fmt.Errorf("%w: name %s, scope: %s", errAlreadyRegistered, name, scope)
}

p.aggregations[scope][inst] = instrumentValue{
description: description,
aggregator: agg,
p.aggregations = map[instrumentation.Scope][]instrumentSync{
scope: {sync},
}
return
}
return nil
p.aggregations[scope] = append(p.aggregations[scope], sync)
}

// addCallback registers a callback to be run when `produce()` is called.
Expand Down Expand Up @@ -144,12 +132,12 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err
sm := make([]metricdata.ScopeMetrics, 0, len(p.aggregations))
for scope, instruments := range p.aggregations {
metrics := make([]metricdata.Metrics, 0, len(instruments))
for inst, instValue := range instruments {
data := instValue.aggregator.Aggregation()
for _, inst := range instruments {
data := inst.aggregator.Aggregation()
if data != nil {
metrics = append(metrics, metricdata.Metrics{
Name: inst.name,
Description: instValue.description,
Description: inst.description,
Unit: inst.unit,
Data: data,
})
Expand Down Expand Up @@ -259,7 +247,12 @@ func (i *inserter[N]) cachedAggregator(inst view.Instrument, u unit.Unit) (inter
if agg == nil { // Drop aggregator.
return nil, nil
}
err = i.pipeline.addAggregator(inst.Scope, inst.Name, inst.Description, u, agg)
i.pipeline.addSync(inst.Scope, instrumentSync{
name: inst.Name,
description: inst.Description,
unit: u,
aggregator: agg,
})
return agg, err
})
}
Expand Down
2 changes: 0 additions & 2 deletions sdk/metric/pipeline_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"fmt"
"sync/atomic"
"testing"

Expand Down Expand Up @@ -406,7 +405,6 @@ type logCounter struct {

func (l *logCounter) Info(level int, msg string, keysAndValues ...interface{}) {
atomic.AddUint32(&l.infoN, 1)
fmt.Println("here")
l.LogSink.Info(level, msg, keysAndValues...)
}

Expand Down
114 changes: 14 additions & 100 deletions sdk/metric/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"context"
"fmt"
"sync"
"testing"

Expand Down Expand Up @@ -46,8 +47,10 @@ func TestEmptyPipeline(t *testing.T) {
assert.Nil(t, output.Resource)
assert.Len(t, output.ScopeMetrics, 0)

err = pipe.addAggregator(instrumentation.Scope{}, "name", "desc", unit.Dimensionless, testSumAggregator{})
assert.NoError(t, err)
sync := instrumentSync{"name", "desc", unit.Dimensionless, testSumAggregator{}}
assert.NotPanics(t, func() {
pipe.addSync(instrumentation.Scope{}, sync)
})

require.NotPanics(t, func() {
pipe.addCallback(func(ctx context.Context) {})
Expand All @@ -68,8 +71,10 @@ func TestNewPipeline(t *testing.T) {
assert.Equal(t, resource.Empty(), output.Resource)
assert.Len(t, output.ScopeMetrics, 0)

err = pipe.addAggregator(instrumentation.Scope{}, "name", "desc", unit.Dimensionless, testSumAggregator{})
assert.NoError(t, err)
sync := instrumentSync{"name", "desc", unit.Dimensionless, testSumAggregator{}}
assert.NotPanics(t, func() {
pipe.addSync(instrumentation.Scope{}, sync)
})

require.NotPanics(t, func() {
pipe.addCallback(func(ctx context.Context) {})
Expand All @@ -82,99 +87,6 @@ func TestNewPipeline(t *testing.T) {
require.Len(t, output.ScopeMetrics[0].Metrics, 1)
}

func TestPipelineDuplicateRegistration(t *testing.T) {
type instrumentID struct {
scope instrumentation.Scope
name string
description string
unit unit.Unit
}
testCases := []struct {
name string
secondInst instrumentID
want error
wantScopeLen int
wantMetricsLen int
}{
{
name: "exact should error",
secondInst: instrumentID{
scope: instrumentation.Scope{},
name: "name",
description: "desc",
unit: unit.Dimensionless,
},
want: errAlreadyRegistered,
wantScopeLen: 1,
wantMetricsLen: 1,
},
{
name: "description should not be identifying",
secondInst: instrumentID{
scope: instrumentation.Scope{},
name: "name",
description: "other desc",
unit: unit.Dimensionless,
},
want: errAlreadyRegistered,
wantScopeLen: 1,
wantMetricsLen: 1,
},
{
name: "scope should be identifying",
secondInst: instrumentID{
scope: instrumentation.Scope{
Name: "newScope",
},
name: "name",
description: "desc",
unit: unit.Dimensionless,
},
wantScopeLen: 2,
wantMetricsLen: 1,
},
{
name: "name should be identifying",
secondInst: instrumentID{
scope: instrumentation.Scope{},
name: "newName",
description: "desc",
unit: unit.Dimensionless,
},
wantScopeLen: 1,
wantMetricsLen: 2,
},
{
name: "unit should be identifying",
secondInst: instrumentID{
scope: instrumentation.Scope{},
name: "name",
description: "desc",
unit: unit.Bytes,
},
wantScopeLen: 1,
wantMetricsLen: 2,
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
pipe := newPipeline(nil, nil, nil)
err := pipe.addAggregator(instrumentation.Scope{}, "name", "desc", unit.Dimensionless, testSumAggregator{})
require.NoError(t, err)

err = pipe.addAggregator(tt.secondInst.scope, tt.secondInst.name, tt.secondInst.description, tt.secondInst.unit, testSumAggregator{})
assert.ErrorIs(t, err, tt.want)

if tt.wantScopeLen > 0 {
output, err := pipe.produce(context.Background())
assert.NoError(t, err)
require.Len(t, output.ScopeMetrics, tt.wantScopeLen)
require.Len(t, output.ScopeMetrics[0].Metrics, tt.wantMetricsLen)
}
})
}
}

func TestPipelineUsesResource(t *testing.T) {
res := resource.NewWithAttributes("noSchema", attribute.String("test", "resource"))
pipe := newPipeline(res, nil, nil)
Expand All @@ -198,10 +110,12 @@ func TestPipelineConcurrency(t *testing.T) {
}()

wg.Add(1)
go func() {
go func(n int) {
defer wg.Done()
_ = pipe.addAggregator(instrumentation.Scope{}, "name", "desc", unit.Dimensionless, testSumAggregator{})
}()
name := fmt.Sprintf("name %d", n)
sync := instrumentSync{name, "desc", unit.Dimensionless, testSumAggregator{}}
pipe.addSync(instrumentation.Scope{}, sync)
}(i)

wg.Add(1)
go func() {
Expand Down

0 comments on commit 40ae7fa

Please sign in to comment.