From 80fae64004b27577ca7aab7c1792afe16871cecd Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 4 Oct 2022 12:23:21 -0700 Subject: [PATCH 01/13] Add Aggregation/Temporality to Exporter iface --- sdk/metric/exporter.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sdk/metric/exporter.go b/sdk/metric/exporter.go index 79257db0fc6..687d3eae9a6 100644 --- a/sdk/metric/exporter.go +++ b/sdk/metric/exporter.go @@ -18,7 +18,9 @@ import ( "context" "fmt" + "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/view" ) // ErrExporterShutdown is returned if Export or Shutdown are called after an @@ -28,6 +30,12 @@ var ErrExporterShutdown = fmt.Errorf("exporter is shutdown") // Exporter handles the delivery of metric data to external receivers. This is // the final component in the metric push pipeline. type Exporter interface { + // Temporality returns the Temporality to use for an instrument kind. + Temporality(view.InstrumentKind) metricdata.Temporality + + // Aggregation returns the Aggregation to use for an instrument kind. + Aggregation(view.InstrumentKind) aggregation.Aggregation + // Export serializes and transmits metric data to a receiver. // // This is called synchronously, there is no concurrency safety From d96beedc966348e74edca09f34fafe3a0f476752 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 4 Oct 2022 12:25:17 -0700 Subject: [PATCH 02/13] Use Exporter selectors in periodic reader --- sdk/metric/periodic_reader.go | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index 3f705086162..b7305c22722 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -36,20 +36,16 @@ const ( // periodicReaderConfig contains configuration options for a PeriodicReader. type periodicReaderConfig struct { - interval time.Duration - timeout time.Duration - temporalitySelector TemporalitySelector - aggregationSelector AggregationSelector + interval time.Duration + timeout time.Duration } // newPeriodicReaderConfig returns a periodicReaderConfig configured with // options. func newPeriodicReaderConfig(options []PeriodicReaderOption) periodicReaderConfig { c := periodicReaderConfig{ - interval: defaultInterval, - timeout: defaultTimeout, - temporalitySelector: DefaultTemporalitySelector, - aggregationSelector: DefaultAggregationSelector, + interval: defaultInterval, + timeout: defaultTimeout, } for _, o := range options { c = o.applyPeriodic(c) @@ -119,8 +115,8 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade cancel: cancel, done: make(chan struct{}), - temporalitySelector: conf.temporalitySelector, - aggregationSelector: conf.aggregationSelector, + temporalitySelector: exporter.Temporality, + aggregationSelector: exporter.Aggregation, } go func() { From 478dc74ba7495df91bd5d05d9d595b587880a4ab Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 4 Oct 2022 12:26:13 -0700 Subject: [PATCH 03/13] Move selector opts to just manual reader --- sdk/metric/manual_reader.go | 50 ++++++++++++++++++++++++++ sdk/metric/reader.go | 70 ------------------------------------- 2 files changed, 50 insertions(+), 70 deletions(-) diff --git a/sdk/metric/manual_reader.go b/sdk/metric/manual_reader.go index ad932be793b..62ccf0f0535 100644 --- a/sdk/metric/manual_reader.go +++ b/sdk/metric/manual_reader.go @@ -129,3 +129,53 @@ func newManualReaderConfig(opts []ManualReaderOption) manualReaderConfig { type ManualReaderOption interface { applyManual(manualReaderConfig) manualReaderConfig } + +// WithTemporalitySelector sets the TemporalitySelector a reader will use to +// determine the Temporality of an instrument based on its kind. If this +// option is not used, the reader will use the DefaultTemporalitySelector. +func WithTemporalitySelector(selector TemporalitySelector) ManualReaderOption { + return temporalitySelectorOption{selector: selector} +} + +type temporalitySelectorOption struct { + selector func(instrument view.InstrumentKind) metricdata.Temporality +} + +// applyManual returns a manualReaderConfig with option applied. +func (t temporalitySelectorOption) applyManual(mrc manualReaderConfig) manualReaderConfig { + mrc.temporalitySelector = t.selector + return mrc +} + +// WithAggregationSelector sets the AggregationSelector a reader will use to +// determine the aggregation to use for an instrument based on its kind. If +// this option is not used, the reader will use the DefaultAggregationSelector +// or the aggregation explicitly passed for a view matching an instrument. +func WithAggregationSelector(selector AggregationSelector) ManualReaderOption { + // Deep copy and validate before using. + wrapped := func(ik view.InstrumentKind) aggregation.Aggregation { + a := selector(ik) + cpA := a.Copy() + if err := cpA.Err(); err != nil { + cpA = DefaultAggregationSelector(ik) + global.Error( + err, "using default aggregation instead", + "aggregation", a, + "replacement", cpA, + ) + } + return cpA + } + + return aggregationSelectorOption{selector: wrapped} +} + +type aggregationSelectorOption struct { + selector AggregationSelector +} + +// applyManual returns a manualReaderConfig with option applied. +func (t aggregationSelectorOption) applyManual(c manualReaderConfig) manualReaderConfig { + c.aggregationSelector = t.selector + return c +} diff --git a/sdk/metric/reader.go b/sdk/metric/reader.go index bf596c27887..e716394078d 100644 --- a/sdk/metric/reader.go +++ b/sdk/metric/reader.go @@ -18,7 +18,6 @@ import ( "context" "fmt" - "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/view" @@ -108,13 +107,6 @@ func (p shutdownProducer) produce(context.Context) (metricdata.ResourceMetrics, return metricdata.ResourceMetrics{}, ErrReaderShutdown } -// ReaderOption applies a configuration option value to either a ManualReader or -// a PeriodicReader. -type ReaderOption interface { - ManualReaderOption - PeriodicReaderOption -} - // TemporalitySelector selects the temporality to use based on the InstrumentKind. type TemporalitySelector func(view.InstrumentKind) metricdata.Temporality @@ -125,29 +117,6 @@ func DefaultTemporalitySelector(view.InstrumentKind) metricdata.Temporality { return metricdata.CumulativeTemporality } -// WithTemporalitySelector sets the TemporalitySelector a reader will use to -// determine the Temporality of an instrument based on its kind. If this -// option is not used, the reader will use the DefaultTemporalitySelector. -func WithTemporalitySelector(selector TemporalitySelector) ReaderOption { - return temporalitySelectorOption{selector: selector} -} - -type temporalitySelectorOption struct { - selector func(instrument view.InstrumentKind) metricdata.Temporality -} - -// applyManual returns a manualReaderConfig with option applied. -func (t temporalitySelectorOption) applyManual(mrc manualReaderConfig) manualReaderConfig { - mrc.temporalitySelector = t.selector - return mrc -} - -// applyPeriodic returns a periodicReaderConfig with option applied. -func (t temporalitySelectorOption) applyPeriodic(prc periodicReaderConfig) periodicReaderConfig { - prc.temporalitySelector = t.selector - return prc -} - // AggregationSelector selects the aggregation and the parameters to use for // that aggregation based on the InstrumentKind. type AggregationSelector func(view.InstrumentKind) aggregation.Aggregation @@ -172,42 +141,3 @@ func DefaultAggregationSelector(ik view.InstrumentKind) aggregation.Aggregation } panic("unknown instrument kind") } - -// WithAggregationSelector sets the AggregationSelector a reader will use to -// determine the aggregation to use for an instrument based on its kind. If -// this option is not used, the reader will use the DefaultAggregationSelector -// or the aggregation explicitly passed for a view matching an instrument. -func WithAggregationSelector(selector AggregationSelector) ReaderOption { - // Deep copy and validate before using. - wrapped := func(ik view.InstrumentKind) aggregation.Aggregation { - a := selector(ik) - cpA := a.Copy() - if err := cpA.Err(); err != nil { - cpA = DefaultAggregationSelector(ik) - global.Error( - err, "using default aggregation instead", - "aggregation", a, - "replacement", cpA, - ) - } - return cpA - } - - return aggregationSelectorOption{selector: wrapped} -} - -type aggregationSelectorOption struct { - selector AggregationSelector -} - -// applyManual returns a manualReaderConfig with option applied. -func (t aggregationSelectorOption) applyManual(c manualReaderConfig) manualReaderConfig { - c.aggregationSelector = t.selector - return c -} - -// applyPeriodic returns a periodicReaderConfig with option applied. -func (t aggregationSelectorOption) applyPeriodic(c periodicReaderConfig) periodicReaderConfig { - c.aggregationSelector = t.selector - return c -} From 3e799aa3a2bbe1e5fe499a136c71a60e2cdc0c5a Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 4 Oct 2022 12:38:50 -0700 Subject: [PATCH 04/13] Simplify periodic reader ref to Exporter selectors --- sdk/metric/periodic_reader.go | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index b7305c22722..3dc7ed2d045 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -114,9 +114,6 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade flushCh: make(chan chan error), cancel: cancel, done: make(chan struct{}), - - temporalitySelector: exporter.Temporality, - aggregationSelector: exporter.Aggregation, } go func() { @@ -136,9 +133,6 @@ type periodicReader struct { exporter Exporter flushCh chan chan error - temporalitySelector TemporalitySelector - aggregationSelector AggregationSelector - done chan struct{} cancel context.CancelFunc shutdownOnce sync.Once @@ -183,12 +177,12 @@ func (r *periodicReader) register(p producer) { // temporality reports the Temporality for the instrument kind provided. func (r *periodicReader) temporality(kind view.InstrumentKind) metricdata.Temporality { - return r.temporalitySelector(kind) + return r.exporter.Temporality(kind) } // aggregation returns what Aggregation to use for kind. func (r *periodicReader) aggregation(kind view.InstrumentKind) aggregation.Aggregation { // nolint:revive // import-shadow for method scoped by type. - return r.aggregationSelector(kind) + return r.exporter.Aggregation(kind) } // collectAndExport gather all metric data related to the periodicReader r from From 1a3c4192dcd0d427c65b137b8afec3a5f71c52b1 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 4 Oct 2022 12:39:11 -0700 Subject: [PATCH 05/13] Fix the periodic reader tests --- sdk/metric/periodic_reader_test.go | 45 +++++++++++++++++++----------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/sdk/metric/periodic_reader_test.go b/sdk/metric/periodic_reader_test.go index 8c3c0599158..c0bf06a3086 100644 --- a/sdk/metric/periodic_reader_test.go +++ b/sdk/metric/periodic_reader_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/suite" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/view" ) @@ -54,13 +55,29 @@ func TestWithInterval(t *testing.T) { } type fnExporter struct { - exportFunc func(context.Context, metricdata.ResourceMetrics) error - flushFunc func(context.Context) error - shutdownFunc func(context.Context) error + temporalityFunc TemporalitySelector + aggregationFunc AggregationSelector + exportFunc func(context.Context, metricdata.ResourceMetrics) error + flushFunc func(context.Context) error + shutdownFunc func(context.Context) error } var _ Exporter = (*fnExporter)(nil) +func (e *fnExporter) Temporality(k view.InstrumentKind) metricdata.Temporality { + if e.temporalityFunc != nil { + return e.temporalityFunc(k) + } + return DefaultTemporalitySelector(k) +} + +func (e *fnExporter) Aggregation(k view.InstrumentKind) aggregation.Aggregation { + if e.aggregationFunc != nil { + return e.aggregationFunc(k) + } + return DefaultAggregationSelector(k) +} + func (e *fnExporter) Export(ctx context.Context, m metricdata.ResourceMetrics) error { if e.exportFunc != nil { return e.exportFunc(ctx, m) @@ -230,29 +247,25 @@ func BenchmarkPeriodicReader(b *testing.B) { func TestPeriodiclReaderTemporality(t *testing.T) { tests := []struct { - name string - options []PeriodicReaderOption + name string + exporter *fnExporter // Currently only testing constant temporality. This should be expanded // if we put more advanced selection in the SDK wantTemporality metricdata.Temporality }{ { name: "default", + exporter: new(fnExporter), wantTemporality: metricdata.CumulativeTemporality, }, { - name: "delta", - options: []PeriodicReaderOption{ - WithTemporalitySelector(deltaTemporalitySelector), - }, + name: "delta", + exporter: &fnExporter{temporalityFunc: deltaTemporalitySelector}, wantTemporality: metricdata.DeltaTemporality, }, { - name: "repeats overwrite", - options: []PeriodicReaderOption{ - WithTemporalitySelector(deltaTemporalitySelector), - WithTemporalitySelector(cumulativeTemporalitySelector), - }, + name: "cumulative", + exporter: &fnExporter{temporalityFunc: cumulativeTemporalitySelector}, wantTemporality: metricdata.CumulativeTemporality, }, } @@ -260,8 +273,8 @@ func TestPeriodiclReaderTemporality(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { var undefinedInstrument view.InstrumentKind - rdr := NewPeriodicReader(new(fnExporter), tt.options...) - assert.Equal(t, tt.wantTemporality, rdr.temporality(undefinedInstrument)) + rdr := NewPeriodicReader(tt.exporter) + assert.Equal(t, tt.wantTemporality.String(), rdr.temporality(undefinedInstrument).String()) }) } } From fc304148abd9310eb784db0e107de0c43d39464b Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 4 Oct 2022 12:56:11 -0700 Subject: [PATCH 06/13] Add Aggregation/Temporality method to stdoutmetric --- exporters/stdout/stdoutmetric/config.go | 68 ++++++++++++++++++++++- exporters/stdout/stdoutmetric/exporter.go | 18 +++++- 2 files changed, 84 insertions(+), 2 deletions(-) diff --git a/exporters/stdout/stdoutmetric/config.go b/exporters/stdout/stdoutmetric/config.go index c2f02da1ecf..2b40806c96d 100644 --- a/exporters/stdout/stdoutmetric/config.go +++ b/exporters/stdout/stdoutmetric/config.go @@ -16,11 +16,18 @@ package stdoutmetric // import "go.opentelemetry.io/otel/exporters/stdout/stdout import ( "encoding/json" "os" + + "go.opentelemetry.io/otel/internal/global" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregation" + "go.opentelemetry.io/otel/sdk/metric/view" ) // config contains options for the exporter. type config struct { - encoder *encoderHolder + encoder *encoderHolder + temporalitySelector metric.TemporalitySelector + aggregationSelector metric.AggregationSelector } // newConfig creates a validated config configured with options. @@ -36,6 +43,14 @@ func newConfig(options ...Option) config { cfg.encoder = &encoderHolder{encoder: enc} } + if cfg.temporalitySelector == nil { + cfg.temporalitySelector = metric.DefaultTemporalitySelector + } + + if cfg.aggregationSelector == nil { + cfg.aggregationSelector = metric.DefaultAggregationSelector + } + return cfg } @@ -60,3 +75,54 @@ func WithEncoder(encoder Encoder) Option { return c }) } + +// WithTemporalitySelector sets the TemporalitySelector the exporter will use +// to determine the Temporality of an instrument based on its kind. If this +// option is not used, the exporter will use the DefaultTemporalitySelector +// from the go.opentelemetry.io/otel/sdk/metric package. +func WithTemporalitySelector(selector metric.TemporalitySelector) Option { + return temporalitySelectorOption{selector: selector} +} + +type temporalitySelectorOption struct { + selector metric.TemporalitySelector +} + +func (t temporalitySelectorOption) apply(c config) config { + c.temporalitySelector = t.selector + return c +} + +// WithAggregationSelector sets the AggregationSelector the exporter will use +// to determine the aggregation to use for an instrument based on its kind. If +// this option is not used, the exporter will use the +// DefaultAggregationSelector from the go.opentelemetry.io/otel/sdk/metric +// package or the aggregation explicitly passed for a view matching an +// instrument. +func WithAggregationSelector(selector metric.AggregationSelector) Option { + // Deep copy and validate before using. + wrapped := func(ik view.InstrumentKind) aggregation.Aggregation { + a := selector(ik) + cpA := a.Copy() + if err := cpA.Err(); err != nil { + cpA = metric.DefaultAggregationSelector(ik) + global.Error( + err, "using default aggregation instead", + "aggregation", a, + "replacement", cpA, + ) + } + return cpA + } + + return aggregationSelectorOption{selector: wrapped} +} + +type aggregationSelectorOption struct { + selector metric.AggregationSelector +} + +func (t aggregationSelectorOption) apply(c config) config { + c.aggregationSelector = t.selector + return c +} diff --git a/exporters/stdout/stdoutmetric/exporter.go b/exporters/stdout/stdoutmetric/exporter.go index fef517fa833..8a9d55a4979 100644 --- a/exporters/stdout/stdoutmetric/exporter.go +++ b/exporters/stdout/stdoutmetric/exporter.go @@ -20,7 +20,9 @@ import ( "sync/atomic" "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/view" ) // exporter is an OpenTelemetry metric exporter. @@ -28,6 +30,9 @@ type exporter struct { encVal atomic.Value // encoderHolder shutdownOnce sync.Once + + temporalitySelector metric.TemporalitySelector + aggregationSelector metric.AggregationSelector } // New returns a configured metric exporter. @@ -36,11 +41,22 @@ type exporter struct { // encoder with tab indentations that output to STDOUT. func New(options ...Option) (metric.Exporter, error) { cfg := newConfig(options...) - exp := &exporter{} + exp := &exporter{ + temporalitySelector: cfg.temporalitySelector, + aggregationSelector: cfg.aggregationSelector, + } exp.encVal.Store(*cfg.encoder) return exp, nil } +func (e *exporter) Temporality(k view.InstrumentKind) metricdata.Temporality { + return e.temporalitySelector(k) +} + +func (e *exporter) Aggregation(k view.InstrumentKind) aggregation.Aggregation { + return e.aggregationSelector(k) +} + func (e *exporter) Export(ctx context.Context, data metricdata.ResourceMetrics) error { select { case <-ctx.Done(): From b9d00223ba1e50d6ee02ad2149e97cc0becc306f Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 4 Oct 2022 13:27:54 -0700 Subject: [PATCH 07/13] Add Temporality/Aggregation to otlpmetric exp --- exporters/otlp/otlpmetric/client.go | 9 ++++++ exporters/otlp/otlpmetric/exporter.go | 34 ++++++++++++++++++++-- exporters/otlp/otlpmetric/exporter_test.go | 11 +++++++ 3 files changed, 52 insertions(+), 2 deletions(-) diff --git a/exporters/otlp/otlpmetric/client.go b/exporters/otlp/otlpmetric/client.go index 9bcbab44f48..0e522fa939a 100644 --- a/exporters/otlp/otlpmetric/client.go +++ b/exporters/otlp/otlpmetric/client.go @@ -17,11 +17,20 @@ package otlpmetric // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric import ( "context" + "go.opentelemetry.io/otel/sdk/metric/aggregation" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/view" mpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) // Client handles the transmission of OTLP data to an OTLP receiving endpoint. type Client interface { + // Temporality returns the Temporality to use for an instrument kind. + Temporality(view.InstrumentKind) metricdata.Temporality + + // Aggregation returns the Aggregation to use for an instrument kind. + Aggregation(view.InstrumentKind) aggregation.Aggregation + // UploadMetrics transmits metric data to an OTLP receiver. // // All retry logic must be handled by UploadMetrics alone, the Exporter diff --git a/exporters/otlp/otlpmetric/exporter.go b/exporters/otlp/otlpmetric/exporter.go index 2967a5f7b24..296f500d411 100644 --- a/exporters/otlp/otlpmetric/exporter.go +++ b/exporters/otlp/otlpmetric/exporter.go @@ -21,7 +21,9 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/transform" "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/view" mpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) @@ -34,6 +36,20 @@ type exporter struct { shutdownOnce sync.Once } +// Temporality returns the Temporality to use for an instrument kind. +func (e *exporter) Temporality(k view.InstrumentKind) metricdata.Temporality { + e.clientMu.Lock() + defer e.clientMu.Unlock() + return e.client.Temporality(k) +} + +// Aggregation returns the Aggregation to use for an instrument kind. +func (e *exporter) Aggregation(k view.InstrumentKind) aggregation.Aggregation { + e.clientMu.Lock() + defer e.clientMu.Unlock() + return e.client.Aggregation(k) +} + // Export transforms and transmits metric data to an OTLP receiver. func (e *exporter) Export(ctx context.Context, rm metricdata.ResourceMetrics) error { otlpRm, err := transform.ResourceMetrics(rm) @@ -68,7 +84,10 @@ func (e *exporter) Shutdown(ctx context.Context) error { e.shutdownOnce.Do(func() { e.clientMu.Lock() client := e.client - e.client = shutdownClient{} + e.client = shutdownClient{ + temporalitySelector: client.Temporality, + aggregationSelector: client.Aggregation, + } e.clientMu.Unlock() err = client.Shutdown(ctx) }) @@ -82,7 +101,10 @@ func New(client Client) metric.Exporter { return &exporter{client: client} } -type shutdownClient struct{} +type shutdownClient struct { + temporalitySelector metric.TemporalitySelector + aggregationSelector metric.AggregationSelector +} func (c shutdownClient) err(ctx context.Context) error { if err := ctx.Err(); err != nil { @@ -91,6 +113,14 @@ func (c shutdownClient) err(ctx context.Context) error { return errShutdown } +func (c shutdownClient) Temporality(k view.InstrumentKind) metricdata.Temporality { + return c.temporalitySelector(k) +} + +func (c shutdownClient) Aggregation(k view.InstrumentKind) aggregation.Aggregation { + return c.aggregationSelector(k) +} + func (c shutdownClient) UploadMetrics(ctx context.Context, _ *mpb.ResourceMetrics) error { return c.err(ctx) } diff --git a/exporters/otlp/otlpmetric/exporter_test.go b/exporters/otlp/otlpmetric/exporter_test.go index fd644140d37..972d0cbf8fe 100644 --- a/exporters/otlp/otlpmetric/exporter_test.go +++ b/exporters/otlp/otlpmetric/exporter_test.go @@ -21,7 +21,10 @@ import ( "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/view" mpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) @@ -31,6 +34,14 @@ type client struct { n int } +func (c *client) Temporality(k view.InstrumentKind) metricdata.Temporality { + return metric.DefaultTemporalitySelector(k) +} + +func (c *client) Aggregation(k view.InstrumentKind) aggregation.Aggregation { + return metric.DefaultAggregationSelector(k) +} + func (c *client) UploadMetrics(context.Context, *mpb.ResourceMetrics) error { c.n++ return nil From a2af7a128e910cd81a45e9343d14cbd1832fe6fc Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 4 Oct 2022 13:56:14 -0700 Subject: [PATCH 08/13] Add Temporality/Aggregation to http/grpc otlp clients --- .../otlp/otlpmetric/internal/oconf/options.go | 42 +++++++++++++++++++ .../otlp/otlpmetric/otlpmetricgrpc/client.go | 19 +++++++++ .../otlp/otlpmetric/otlpmetricgrpc/config.go | 18 ++++++++ .../otlp/otlpmetric/otlpmetrichttp/client.go | 19 +++++++++ .../otlp/otlpmetric/otlpmetrichttp/config.go | 18 ++++++++ 5 files changed, 116 insertions(+) diff --git a/exporters/otlp/otlpmetric/internal/oconf/options.go b/exporters/otlp/otlpmetric/internal/oconf/options.go index f5a82d6db17..1cf9131e203 100644 --- a/exporters/otlp/otlpmetric/internal/oconf/options.go +++ b/exporters/otlp/otlpmetric/internal/oconf/options.go @@ -27,6 +27,10 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/internal" "go.opentelemetry.io/otel/exporters/otlp/internal/retry" + "go.opentelemetry.io/otel/internal/global" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregation" + "go.opentelemetry.io/otel/sdk/metric/view" ) const ( @@ -57,6 +61,9 @@ type ( // gRPC configurations GRPCCredentials credentials.TransportCredentials + + TemporalitySelector metric.TemporalitySelector + AggregationSelector metric.AggregationSelector } Config struct { @@ -82,6 +89,9 @@ func NewHTTPConfig(opts ...HTTPOption) Config { URLPath: DefaultMetricsPath, Compression: NoCompression, Timeout: DefaultTimeout, + + TemporalitySelector: metric.DefaultTemporalitySelector, + AggregationSelector: metric.DefaultAggregationSelector, }, RetryConfig: retry.DefaultConfig, } @@ -102,6 +112,9 @@ func NewGRPCConfig(opts ...GRPCOption) Config { URLPath: DefaultMetricsPath, Compression: NoCompression, Timeout: DefaultTimeout, + + TemporalitySelector: metric.DefaultTemporalitySelector, + AggregationSelector: metric.DefaultAggregationSelector, }, RetryConfig: retry.DefaultConfig, } @@ -312,3 +325,32 @@ func WithTimeout(duration time.Duration) GenericOption { return cfg }) } + +func WithTemporalitySelector(selector metric.TemporalitySelector) GenericOption { + return newGenericOption(func(cfg Config) Config { + cfg.Metrics.TemporalitySelector = selector + return cfg + }) +} + +func WithAggregationSelector(selector metric.AggregationSelector) GenericOption { + // Deep copy and validate before using. + wrapped := func(ik view.InstrumentKind) aggregation.Aggregation { + a := selector(ik) + cpA := a.Copy() + if err := cpA.Err(); err != nil { + cpA = metric.DefaultAggregationSelector(ik) + global.Error( + err, "using default aggregation instead", + "aggregation", a, + "replacement", cpA, + ) + } + return cpA + } + + return newGenericOption(func(cfg Config) Config { + cfg.Metrics.AggregationSelector = wrapped + return cfg + }) +} diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go index 4c5beb8f384..0f1a0040f61 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go @@ -28,6 +28,9 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/otlpmetric" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf" "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregation" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/view" colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) @@ -53,6 +56,9 @@ type client struct { exportTimeout time.Duration requestFunc retry.RequestFunc + temporalitySelector metric.TemporalitySelector + aggregationSelector metric.AggregationSelector + // ourConn keeps track of where conn was created: true if created here in // NewClient, or false if passed with an option. This is important on // Shutdown as the conn should only be closed if we created it. Otherwise, @@ -70,6 +76,9 @@ func newClient(ctx context.Context, options ...Option) (otlpmetric.Client, error exportTimeout: cfg.Metrics.Timeout, requestFunc: cfg.RetryConfig.RequestFunc(retryable), conn: cfg.GRPCConn, + + temporalitySelector: cfg.Metrics.TemporalitySelector, + aggregationSelector: cfg.Metrics.AggregationSelector, } if len(cfg.Metrics.Headers) > 0 { @@ -94,6 +103,16 @@ func newClient(ctx context.Context, options ...Option) (otlpmetric.Client, error return c, nil } +// Temporality returns the Temporality to use for an instrument kind. +func (c *client) Temporality(k view.InstrumentKind) metricdata.Temporality { + return c.temporalitySelector(k) +} + +// Aggregation returns the Aggregation to use for an instrument kind. +func (c *client) Aggregation(k view.InstrumentKind) aggregation.Aggregation { + return c.aggregationSelector(k) +} + // ForceFlush does nothing, the client holds no state. func (c *client) ForceFlush(ctx context.Context) error { return ctx.Err() } diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/config.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/config.go index 5f7a66e559e..3b9539c7187 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/config.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/config.go @@ -24,6 +24,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/internal/retry" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf" + "go.opentelemetry.io/otel/sdk/metric" ) // Option applies a configuration option to the Exporter. @@ -236,3 +237,20 @@ func WithTimeout(duration time.Duration) Option { func WithRetry(settings RetryConfig) Option { return wrappedOption{oconf.WithRetry(retry.Config(settings))} } + +// WithTemporalitySelector sets the TemporalitySelector the client will use to +// determine the Temporality of an instrument based on its kind. If this option +// is not used, the client will use the DefaultTemporalitySelector from the +// go.opentelemetry.io/otel/sdk/metric package. +func WithTemporalitySelector(selector metric.TemporalitySelector) Option { + return wrappedOption{oconf.WithTemporalitySelector(selector)} +} + +// WithAggregationSelector sets the AggregationSelector the client will use to +// determine the aggregation to use for an instrument based on its kind. If +// this option is not used, the reader will use the DefaultAggregationSelector +// from the go.opentelemetry.io/otel/sdk/metric package, or the aggregation +// explicitly passed for a view matching an instrument. +func WithAggregationSelector(selector metric.AggregationSelector) Option { + return wrappedOption{oconf.WithAggregationSelector(selector)} +} diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/client.go b/exporters/otlp/otlpmetric/otlpmetrichttp/client.go index 7a8d7e14707..04302356056 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/client.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/client.go @@ -33,6 +33,9 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/otlpmetric" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf" "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregation" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/view" colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) @@ -54,6 +57,9 @@ type client struct { compression Compression requestFunc retry.RequestFunc httpClient *http.Client + + temporalitySelector metric.TemporalitySelector + aggregationSelector metric.AggregationSelector } // Keep it in sync with golang's DefaultTransport from net/http! We @@ -113,9 +119,22 @@ func newClient(opts ...Option) (otlpmetric.Client, error) { req: req, requestFunc: cfg.RetryConfig.RequestFunc(evaluate), httpClient: httpClient, + + temporalitySelector: cfg.Metrics.TemporalitySelector, + aggregationSelector: cfg.Metrics.AggregationSelector, }, nil } +// Temporality returns the Temporality to use for an instrument kind. +func (c *client) Temporality(k view.InstrumentKind) metricdata.Temporality { + return c.temporalitySelector(k) +} + +// Aggregation returns the Aggregation to use for an instrument kind. +func (c *client) Aggregation(k view.InstrumentKind) aggregation.Aggregation { + return c.aggregationSelector(k) +} + // ForceFlush does nothing, the client holds no state. func (c *client) ForceFlush(ctx context.Context) error { return ctx.Err() } diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/config.go b/exporters/otlp/otlpmetric/otlpmetrichttp/config.go index c320c25d406..e9f7c774642 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/config.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/config.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/internal/retry" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf" + "go.opentelemetry.io/otel/sdk/metric" ) // Compression describes the compression used for payloads sent to the @@ -179,3 +180,20 @@ func WithTimeout(duration time.Duration) Option { func WithRetry(rc RetryConfig) Option { return wrappedOption{oconf.WithRetry(retry.Config(rc))} } + +// WithTemporalitySelector sets the TemporalitySelector the client will use to +// determine the Temporality of an instrument based on its kind. If this option +// is not used, the client will use the DefaultTemporalitySelector from the +// go.opentelemetry.io/otel/sdk/metric package. +func WithTemporalitySelector(selector metric.TemporalitySelector) Option { + return wrappedOption{oconf.WithTemporalitySelector(selector)} +} + +// WithAggregationSelector sets the AggregationSelector the client will use to +// determine the aggregation to use for an instrument based on its kind. If +// this option is not used, the reader will use the DefaultAggregationSelector +// from the go.opentelemetry.io/otel/sdk/metric package, or the aggregation +// explicitly passed for a view matching an instrument. +func WithAggregationSelector(selector metric.AggregationSelector) Option { + return wrappedOption{oconf.WithAggregationSelector(selector)} +} From d218965f3577b423a14b70462c4c83af98fc1b6b Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 5 Oct 2022 08:45:56 -0700 Subject: [PATCH 09/13] Add oconf tests for selector opts --- .../otlpmetric/internal/oconf/options_test.go | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/exporters/otlp/otlpmetric/internal/oconf/options_test.go b/exporters/otlp/otlpmetric/internal/oconf/options_test.go index e436eb5b07e..311daa48954 100644 --- a/exporters/otlp/otlpmetric/internal/oconf/options_test.go +++ b/exporters/otlp/otlpmetric/internal/oconf/options_test.go @@ -23,6 +23,9 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/internal/envconfig" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf" + "go.opentelemetry.io/otel/sdk/metric/aggregation" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/view" ) const ( @@ -383,6 +386,38 @@ func TestConfigs(t *testing.T) { assert.Equal(t, c.Metrics.Timeout, 5*time.Second) }, }, + + // Temporality Selector Tests + { + name: "WithTemporalitySelector", + opts: []oconf.GenericOption{ + oconf.WithTemporalitySelector(deltaSelector), + }, + asserts: func(t *testing.T, c *oconf.Config, grpcOption bool) { + // Function value comparisons are disallowed, test non-default + // behavior of a TemporalitySelector here to ensure our "catch + // all" was set. + var undefinedKind view.InstrumentKind + got := c.Metrics.TemporalitySelector + assert.Equal(t, metricdata.DeltaTemporality, got(undefinedKind)) + }, + }, + + // Aggregation Selector Tests + { + name: "WithAggregationSelector", + opts: []oconf.GenericOption{ + oconf.WithAggregationSelector(dropSelector), + }, + asserts: func(t *testing.T, c *oconf.Config, grpcOption bool) { + // Function value comparisons are disallowed, test non-default + // behavior of a TemporalitySelector here to ensure our "catch + // all" was set. + var undefinedKind view.InstrumentKind + got := c.Metrics.AggregationSelector + assert.Equal(t, aggregation.Drop{}, got(undefinedKind)) + }, + }, } for _, tt := range tests { @@ -406,6 +441,14 @@ func TestConfigs(t *testing.T) { } } +func dropSelector(view.InstrumentKind) aggregation.Aggregation { + return aggregation.Drop{} +} + +func deltaSelector(view.InstrumentKind) metricdata.Temporality { + return metricdata.DeltaTemporality +} + func asHTTPOptions(opts []oconf.GenericOption) []oconf.HTTPOption { converted := make([]oconf.HTTPOption, len(opts)) for i, o := range opts { From 7962e53ead74a00e45a1f05a82ec5c699e8ab20e Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 5 Oct 2022 08:55:45 -0700 Subject: [PATCH 10/13] Add tests to stdoutmetric for opts --- .../stdout/stdoutmetric/exporter_test.go | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/exporters/stdout/stdoutmetric/exporter_test.go b/exporters/stdout/stdoutmetric/exporter_test.go index fa2a05401e8..a7c3abb9bd1 100644 --- a/exporters/stdout/stdoutmetric/exporter_test.go +++ b/exporters/stdout/stdoutmetric/exporter_test.go @@ -25,7 +25,9 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" + "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/view" ) func testEncoderOption() stdoutmetric.Option { @@ -97,3 +99,33 @@ func TestShutdownExporterReturnsShutdownErrorOnExport(t *testing.T) { require.NoError(t, exp.Shutdown(ctx)) assert.EqualError(t, exp.Export(ctx, data), "exporter shutdown") } + +func deltaSelector(view.InstrumentKind) metricdata.Temporality { + return metricdata.DeltaTemporality +} + +func TestTemporalitySelector(t *testing.T) { + exp, err := stdoutmetric.New( + testEncoderOption(), + stdoutmetric.WithTemporalitySelector(deltaSelector), + ) + require.NoError(t, err) + + var unknownKind view.InstrumentKind + assert.Equal(t, metricdata.DeltaTemporality, exp.Temporality(unknownKind)) +} + +func dropSelector(view.InstrumentKind) aggregation.Aggregation { + return aggregation.Drop{} +} + +func TestAggregationSelector(t *testing.T) { + exp, err := stdoutmetric.New( + testEncoderOption(), + stdoutmetric.WithAggregationSelector(dropSelector), + ) + require.NoError(t, err) + + var unknownKind view.InstrumentKind + assert.Equal(t, aggregation.Drop{}, exp.Aggregation(unknownKind)) +} From 3e981b158b9f10756851e9a69c5e79fe52ba340f Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 5 Oct 2022 08:57:33 -0700 Subject: [PATCH 11/13] Correct comment subject --- exporters/otlp/otlpmetric/internal/oconf/options_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporters/otlp/otlpmetric/internal/oconf/options_test.go b/exporters/otlp/otlpmetric/internal/oconf/options_test.go index 311daa48954..51dddd09533 100644 --- a/exporters/otlp/otlpmetric/internal/oconf/options_test.go +++ b/exporters/otlp/otlpmetric/internal/oconf/options_test.go @@ -411,7 +411,7 @@ func TestConfigs(t *testing.T) { }, asserts: func(t *testing.T, c *oconf.Config, grpcOption bool) { // Function value comparisons are disallowed, test non-default - // behavior of a TemporalitySelector here to ensure our "catch + // behavior of a AggregationSelector here to ensure our "catch // all" was set. var undefinedKind view.InstrumentKind got := c.Metrics.AggregationSelector From 4972b9d60bb55ccc8223089d3f06ce2fb9591ad1 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 18 Oct 2022 13:23:29 -0700 Subject: [PATCH 12/13] Add changes to changelog --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e81ccaf4f46..835f2468a7b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Added - Prometheus exporter will register with a prometheus registerer on creation, there are options to control this. (#3239) +- The `WithTemporalitySelector` and `WithAggregationSelector` have been added to the `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc`, `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`, and `go.opentelemetry.io/otel/exporters/stdout/stdoutmetric` packages to configure the temporality and aggregation selectors for their exporters. (#3260) ### Changed @@ -18,6 +19,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The `"go.opentelemetry.io/otel/exporters/prometheus".New` now also returns an error indicating the failure to register the exporter with Prometheus. (#3239) - The prometheus exporter will no longer try to enumerate the metrics it will send to prometheus on startup. This fixes the `reader is not registered` warning currently emitted on startup. (#3291 #3342) +- The `Temporality(view.InstrumentKind) metricdata.Temporality` and `Aggregation(view.InstrumentKind) aggregation.Aggregation` methods are added to the `"go.opentelemetry.io/otel/sdk/metric".Exporter` interface. (#3260) +- The `Temporality(view.InstrumentKind) metricdata.Temporality` and `Aggregation(view.InstrumentKind) aggregation.Aggregation` methods are added to the `"go.opentelemetry.io/otel/exporters/otlp/otlpmetric".Client` interface. (#3260) +- The `WithTemporalitySelector` and `WithAggregationSelector` `ReaderOption`s have been changed to `ManualReaderOption`s in the `go.opentelemetry.io/otel/sdk/metric` package. (#3260) +- The periodic reader in the `go.opentelemetry.io/otel/sdk/metric` package now uses the temporality and aggregation selectors from its configured exporter instead of accepting them as options. (#3260) ### Fixed From 780d18fcc272affba1e5bf6ab92c686430741373 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 18 Oct 2022 13:58:36 -0700 Subject: [PATCH 13/13] Fix otest test client --- .../otlp/otlpmetric/internal/otest/client_test.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/exporters/otlp/otlpmetric/internal/otest/client_test.go b/exporters/otlp/otlpmetric/internal/otest/client_test.go index 09f98ee809b..e701d10b8db 100644 --- a/exporters/otlp/otlpmetric/internal/otest/client_test.go +++ b/exporters/otlp/otlpmetric/internal/otest/client_test.go @@ -19,6 +19,10 @@ import ( "testing" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregation" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/view" cpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" mpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) @@ -27,6 +31,14 @@ type client struct { storage *Storage } +func (c *client) Temporality(k view.InstrumentKind) metricdata.Temporality { + return metric.DefaultTemporalitySelector(k) +} + +func (c *client) Aggregation(k view.InstrumentKind) aggregation.Aggregation { + return metric.DefaultAggregationSelector(k) +} + func (c *client) Collect() *Storage { return c.storage }