Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove comparability requirement for Readers #3290

Closed
wants to merge 14 commits into from
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -21,6 +21,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Slice attributes of `attribute` package are now comparable based on their value, not instance. (#3108 #3252)
- Prometheus exporter will now cumulatively sum histogram buckets. (#3281)
- Remove comparable requirement for `Reader`s. (#3290)
- Export the sum of each histogram datapoint uniquely with the `go.opentelemetry.io/otel/exporters/otlpmetric` exporters. (#3284, #3293)

## [1.11.0/0.32.3] 2022-10-12
Expand Down
13 changes: 5 additions & 8 deletions sdk/metric/config.go
Expand Up @@ -26,7 +26,7 @@ import (
// config contains configuration options for a MeterProvider.
type config struct {
res *resource.Resource
readers map[Reader][]view.View
viewers []viewer
}

// readerSignals returns a force-flush and shutdown function for a
Expand All @@ -35,9 +35,9 @@ type config struct {
// single functions.
func (c config) readerSignals() (forceFlush, shutdown func(context.Context) error) {
var fFuncs, sFuncs []func(context.Context) error
for r := range c.readers {
sFuncs = append(sFuncs, r.Shutdown)
fFuncs = append(fFuncs, r.ForceFlush)
for _, v := range c.viewers {
sFuncs = append(sFuncs, v.reader.Shutdown)
fFuncs = append(fFuncs, v.reader.ForceFlush)
}

return unify(fFuncs), unifyShutdown(sFuncs)
Expand Down Expand Up @@ -123,10 +123,7 @@ func WithResource(res *resource.Resource) Option {
// operations; no data will be exported without a Reader.
func WithReader(r Reader, views ...view.View) Option {
return optionFunc(func(cfg config) config {
if cfg.readers == nil {
cfg.readers = make(map[Reader][]view.View)
}
cfg.readers[r] = views
cfg.viewers = append(cfg.viewers, viewer{reader: r, views: views})
Aneurysm9 marked this conversation as resolved.
Show resolved Hide resolved
return cfg
})
}
4 changes: 3 additions & 1 deletion sdk/metric/config_test.go
Expand Up @@ -127,5 +127,7 @@ func TestWithResource(t *testing.T) {
func TestWithReader(t *testing.T) {
r := &reader{}
c := newConfig([]Option{WithReader(r)})
assert.Contains(t, c.readers, r)
require.Len(t, c.viewers, 1)
assert.Same(t, r, c.viewers[0].reader)
assert.Len(t, c.viewers[0].views, 0)
}
30 changes: 15 additions & 15 deletions sdk/metric/pipeline.go
Expand Up @@ -51,14 +51,19 @@ type instrumentSync struct {
aggregator aggregator
}

func newPipeline(res *resource.Resource, reader Reader, views []view.View) *pipeline {
// viewer is a reader with the applicable views it will use when reading.
type viewer struct {
reader Reader
views []view.View
}

func newPipeline(res *resource.Resource, v viewer) *pipeline {
if res == nil {
res = resource.Empty()
}
return &pipeline{
resource: res,
reader: reader,
views: views,
viewer: v,
aggregations: make(map[instrumentation.Scope][]instrumentSync),
}
}
Expand All @@ -69,10 +74,9 @@ func newPipeline(res *resource.Resource, reader Reader, views []view.View) *pipe
// As instruments are created the instrument should be checked if it exists in the
// views of a the Reader, and if so each aggregator should be added to the pipeline.
type pipeline struct {
resource *resource.Resource
viewer

reader Reader
views []view.View
resource *resource.Resource

sync.Mutex
aggregations map[instrumentation.Scope][]instrumentSync
Expand Down Expand Up @@ -401,15 +405,11 @@ func isAggregatorCompatible(kind view.InstrumentKind, agg aggregation.Aggregatio
// measurement.
type pipelines []*pipeline

func newPipelines(res *resource.Resource, readers map[Reader][]view.View) pipelines {
pipes := make([]*pipeline, 0, len(readers))
for r, v := range readers {
p := &pipeline{
resource: res,
reader: r,
views: v,
}
r.register(p)
func newPipelines(res *resource.Resource, viewers []viewer) pipelines {
pipes := make([]*pipeline, 0, len(viewers))
for _, v := range viewers {
p := newPipeline(res, v)
v.reader.register(p)
pipes = append(pipes, p)
}
return pipes
Expand Down