Skip to content

Commit

Permalink
Set MeterProvider resource for all pipelines (#3218)
Browse files Browse the repository at this point in the history
* Set MeterProvider resource for all pipelines

Resolves #3208

* Add change to changelog
  • Loading branch information
MrAlias committed Sep 22, 2022
1 parent 35019d3 commit 4eea5db
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 8 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Expand Up @@ -13,6 +13,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The metric portion of the OpenCensus bridge (`go.opentelemetry.io/otel/bridge/opencensus`) has been reintroduced. (#3192)
- The OpenCensus bridge example (`go.opentelemetry.io/otel/example/opencensus`) has been reintroduced. (#3206)

### Fixed

- Set the `MeterProvider` resource on all exported metric data. (#3218)

## [0.32.0] Revised Metric SDK (Alpha) - 2022-09-18

### Changed
Expand Down
2 changes: 2 additions & 0 deletions sdk/metric/meter_test.go
Expand Up @@ -28,6 +28,7 @@ import (
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
"go.opentelemetry.io/otel/sdk/resource"
)

func TestMeterRegistry(t *testing.T) {
Expand Down Expand Up @@ -466,6 +467,7 @@ func TestMetersProvideScope(t *testing.T) {
assert.NoError(t, err)

want := metricdata.ResourceMetrics{
Resource: resource.Default(),
ScopeMetrics: []metricdata.ScopeMetrics{
{
Scope: instrumentation.Scope{
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/pipeline.go
Expand Up @@ -162,10 +162,10 @@ type pipelineRegistry struct {
pipelines map[Reader]*pipeline
}

func newPipelineRegistries(views map[Reader][]view.View) *pipelineRegistry {
func newPipelineRegistries(res *resource.Resource, views map[Reader][]view.View) *pipelineRegistry {
pipelines := map[Reader]*pipeline{}
for rdr := range views {
pipe := &pipeline{}
pipe := &pipeline{resource: res}
rdr.register(pipe)
pipelines[rdr] = pipe
}
Expand Down
25 changes: 20 additions & 5 deletions sdk/metric/pipeline_registry_test.go
Expand Up @@ -20,10 +20,12 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/unit"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/internal"
"go.opentelemetry.io/otel/sdk/metric/view"
"go.opentelemetry.io/otel/sdk/resource"
)

type invalidAggregation struct {
Expand Down Expand Up @@ -321,9 +323,9 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) {

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
reg := newPipelineRegistries(tt.views)
reg := newPipelineRegistries(resource.Empty(), tt.views)
testPipelineRegistryCreateIntAggregators(t, reg, tt.wantCount)
reg = newPipelineRegistries(tt.views)
reg = newPipelineRegistries(resource.Empty(), tt.views)
testPipelineRegistryCreateFloatAggregators(t, reg, tt.wantCount)
})
}
Expand All @@ -347,6 +349,19 @@ func testPipelineRegistryCreateFloatAggregators(t *testing.T, reg *pipelineRegis
require.Len(t, aggs, wantCount)
}

func TestPipelineRegistryResource(t *testing.T) {
v, err := view.New(view.MatchInstrumentName("bar"), view.WithRename("foo"))
require.NoError(t, err)
views := map[Reader][]view.View{
NewManualReader(): {{}, v},
}
res := resource.NewSchemaless(attribute.String("key", "val"))
reg := newPipelineRegistries(res, views)
for _, p := range reg.pipelines {
assert.True(t, res.Equal(p.resource), "resource not set")
}
}

func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) {
testRdrHistogram := NewManualReader(WithAggregationSelector(func(ik view.InstrumentKind) aggregation.Aggregation { return aggregation.ExplicitBucketHistogram{} }))

Expand All @@ -355,14 +370,14 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) {
{},
},
}
reg := newPipelineRegistries(views)
reg := newPipelineRegistries(resource.Empty(), views)
inst := view.Instrument{Name: "foo", Kind: view.AsyncGauge}

intAggs, err := createAggregators[int64](reg, inst, unit.Dimensionless)
assert.Error(t, err)
assert.Len(t, intAggs, 0)

reg = newPipelineRegistries(views)
reg = newPipelineRegistries(resource.Empty(), views)

floatAggs, err := createAggregators[float64](reg, inst, unit.Dimensionless)
assert.Error(t, err)
Expand All @@ -384,7 +399,7 @@ func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) {
fooInst := view.Instrument{Name: "foo", Kind: view.SyncCounter}
barInst := view.Instrument{Name: "bar", Kind: view.SyncCounter}

reg := newPipelineRegistries(views)
reg := newPipelineRegistries(resource.Empty(), views)

intAggs, err := createAggregators[int64](reg, fooInst, unit.Dimensionless)
assert.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/provider.go
Expand Up @@ -48,7 +48,7 @@ func NewMeterProvider(options ...Option) *MeterProvider {

flush, sdown := conf.readerSignals()

registry := newPipelineRegistries(conf.readers)
registry := newPipelineRegistries(conf.res, conf.readers)

return &MeterProvider{
res: conf.res,
Expand Down

0 comments on commit 4eea5db

Please sign in to comment.