Skip to content

Commit

Permalink
Set MeterProvider resource for all pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias committed Sep 21, 2022
1 parent 038248b commit 2469880
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 8 deletions.
2 changes: 2 additions & 0 deletions sdk/metric/meter_test.go
Expand Up @@ -31,6 +31,7 @@ import (
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
"go.opentelemetry.io/otel/sdk/resource"
)

func TestMeterRegistry(t *testing.T) {
Expand Down Expand Up @@ -469,6 +470,7 @@ func TestMetersProvideScope(t *testing.T) {
assert.NoError(t, err)

want := metricdata.ResourceMetrics{
Resource: resource.Default(),
ScopeMetrics: []metricdata.ScopeMetrics{
{
Scope: instrumentation.Scope{
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/pipeline.go
Expand Up @@ -165,10 +165,10 @@ type pipelineRegistry struct {
pipelines map[Reader]*pipeline
}

func newPipelineRegistries(views map[Reader][]view.View) *pipelineRegistry {
func newPipelineRegistries(res *resource.Resource, views map[Reader][]view.View) *pipelineRegistry {
pipelines := map[Reader]*pipeline{}
for rdr := range views {
pipe := &pipeline{}
pipe := &pipeline{resource: res}
rdr.register(pipe)
pipelines[rdr] = pipe
}
Expand Down
25 changes: 20 additions & 5 deletions sdk/metric/pipeline_registry_test.go
Expand Up @@ -23,10 +23,12 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/unit"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/internal"
"go.opentelemetry.io/otel/sdk/metric/view"
"go.opentelemetry.io/otel/sdk/resource"
)

type invalidAggregation struct {
Expand Down Expand Up @@ -324,9 +326,9 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) {

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
reg := newPipelineRegistries(tt.views)
reg := newPipelineRegistries(resource.Empty(), tt.views)
testPipelineRegistryCreateIntAggregators(t, reg, tt.wantCount)
reg = newPipelineRegistries(tt.views)
reg = newPipelineRegistries(resource.Empty(), tt.views)
testPipelineRegistryCreateFloatAggregators(t, reg, tt.wantCount)
})
}
Expand All @@ -350,6 +352,19 @@ func testPipelineRegistryCreateFloatAggregators(t *testing.T, reg *pipelineRegis
require.Len(t, aggs, wantCount)
}

func TestPipelineRegistryResource(t *testing.T) {
v, err := view.New(view.MatchInstrumentName("bar"), view.WithRename("foo"))
require.NoError(t, err)
views := map[Reader][]view.View{
NewManualReader(): {{}, v},
}
res := resource.NewSchemaless(attribute.String("key", "val"))
reg := newPipelineRegistries(res, views)
for _, p := range reg.pipelines {
assert.True(t, res.Equal(p.resource), "resource not set")
}
}

func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) {
testRdrHistogram := NewManualReader(WithAggregationSelector(func(ik view.InstrumentKind) aggregation.Aggregation { return aggregation.ExplicitBucketHistogram{} }))

Expand All @@ -358,14 +373,14 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) {
{},
},
}
reg := newPipelineRegistries(views)
reg := newPipelineRegistries(resource.Empty(), views)
inst := view.Instrument{Name: "foo", Kind: view.AsyncGauge}

intAggs, err := createAggregators[int64](reg, inst, unit.Dimensionless)
assert.Error(t, err)
assert.Len(t, intAggs, 0)

reg = newPipelineRegistries(views)
reg = newPipelineRegistries(resource.Empty(), views)

floatAggs, err := createAggregators[float64](reg, inst, unit.Dimensionless)
assert.Error(t, err)
Expand All @@ -387,7 +402,7 @@ func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) {
fooInst := view.Instrument{Name: "foo", Kind: view.SyncCounter}
barInst := view.Instrument{Name: "bar", Kind: view.SyncCounter}

reg := newPipelineRegistries(views)
reg := newPipelineRegistries(resource.Empty(), views)

intAggs, err := createAggregators[int64](reg, fooInst, unit.Dimensionless)
assert.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/provider.go
Expand Up @@ -51,7 +51,7 @@ func NewMeterProvider(options ...Option) *MeterProvider {

flush, sdown := conf.readerSignals()

registry := newPipelineRegistries(conf.readers)
registry := newPipelineRegistries(conf.res, conf.readers)

return &MeterProvider{
res: conf.res,
Expand Down

0 comments on commit 2469880

Please sign in to comment.