diff --git a/example/view/main.go b/example/view/main.go index bafbe3c8920..cdced744a30 100644 --- a/example/view/main.go +++ b/example/view/main.go @@ -30,7 +30,6 @@ import ( "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/aggregation" - "go.opentelemetry.io/otel/sdk/metric/view" ) const meterName = "github.com/open-telemetry/opentelemetry-go/example/view" @@ -44,31 +43,21 @@ func main() { log.Fatal(err) } - // View to customize histogram buckets and rename a single histogram instrument. - customBucketsView, err := view.New( - // Match* to match instruments - view.MatchInstrumentName("custom_histogram"), - view.MatchInstrumentationScope(instrumentation.Scope{Name: meterName}), - - // With* to modify instruments - view.WithSetAggregation(aggregation.ExplicitBucketHistogram{ - Boundaries: []float64{64, 128, 256, 512, 1024, 2048, 4096}, - }), - view.WithRename("bar"), - ) - if err != nil { - log.Fatal(err) - } - - // Default view to keep all instruments - defaultView, err := view.New(view.MatchInstrumentName("*")) - if err != nil { - log.Fatal(err) - } - provider := metric.NewMeterProvider( metric.WithReader(exporter), - metric.WithView(customBucketsView, defaultView), + // View to customize histogram buckets and rename a single histogram instrument. + metric.WithView(metric.NewView( + metric.Instrument{ + Name: "custom_histogram", + Scope: instrumentation.Scope{Name: meterName}, + }, + metric.Stream{ + Name: "bar", + Aggregation: aggregation.ExplicitBucketHistogram{ + Boundaries: []float64{64, 128, 256, 512, 1024, 2048, 4096}, + }, + }, + )), ) meter := provider.Meter(meterName) diff --git a/exporters/otlp/otlpmetric/client.go b/exporters/otlp/otlpmetric/client.go index 0e522fa939a..622b2bdd2a7 100644 --- a/exporters/otlp/otlpmetric/client.go +++ b/exporters/otlp/otlpmetric/client.go @@ -17,19 +17,19 @@ package otlpmetric // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric import ( "context" + "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/metric/view" mpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) // Client handles the transmission of OTLP data to an OTLP receiving endpoint. type Client interface { // Temporality returns the Temporality to use for an instrument kind. - Temporality(view.InstrumentKind) metricdata.Temporality + Temporality(metric.InstrumentKind) metricdata.Temporality // Aggregation returns the Aggregation to use for an instrument kind. - Aggregation(view.InstrumentKind) aggregation.Aggregation + Aggregation(metric.InstrumentKind) aggregation.Aggregation // UploadMetrics transmits metric data to an OTLP receiver. // diff --git a/exporters/otlp/otlpmetric/exporter.go b/exporters/otlp/otlpmetric/exporter.go index 296f500d411..b7e06e51785 100644 --- a/exporters/otlp/otlpmetric/exporter.go +++ b/exporters/otlp/otlpmetric/exporter.go @@ -23,7 +23,6 @@ import ( "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/metric/view" mpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) @@ -37,14 +36,14 @@ type exporter struct { } // Temporality returns the Temporality to use for an instrument kind. -func (e *exporter) Temporality(k view.InstrumentKind) metricdata.Temporality { +func (e *exporter) Temporality(k metric.InstrumentKind) metricdata.Temporality { e.clientMu.Lock() defer e.clientMu.Unlock() return e.client.Temporality(k) } // Aggregation returns the Aggregation to use for an instrument kind. -func (e *exporter) Aggregation(k view.InstrumentKind) aggregation.Aggregation { +func (e *exporter) Aggregation(k metric.InstrumentKind) aggregation.Aggregation { e.clientMu.Lock() defer e.clientMu.Unlock() return e.client.Aggregation(k) @@ -113,11 +112,11 @@ func (c shutdownClient) err(ctx context.Context) error { return errShutdown } -func (c shutdownClient) Temporality(k view.InstrumentKind) metricdata.Temporality { +func (c shutdownClient) Temporality(k metric.InstrumentKind) metricdata.Temporality { return c.temporalitySelector(k) } -func (c shutdownClient) Aggregation(k view.InstrumentKind) aggregation.Aggregation { +func (c shutdownClient) Aggregation(k metric.InstrumentKind) aggregation.Aggregation { return c.aggregationSelector(k) } diff --git a/exporters/otlp/otlpmetric/exporter_test.go b/exporters/otlp/otlpmetric/exporter_test.go index 972d0cbf8fe..dc0ccb9903d 100644 --- a/exporters/otlp/otlpmetric/exporter_test.go +++ b/exporters/otlp/otlpmetric/exporter_test.go @@ -24,7 +24,6 @@ import ( "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/metric/view" mpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) @@ -34,11 +33,11 @@ type client struct { n int } -func (c *client) Temporality(k view.InstrumentKind) metricdata.Temporality { +func (c *client) Temporality(k metric.InstrumentKind) metricdata.Temporality { return metric.DefaultTemporalitySelector(k) } -func (c *client) Aggregation(k view.InstrumentKind) aggregation.Aggregation { +func (c *client) Aggregation(k metric.InstrumentKind) aggregation.Aggregation { return metric.DefaultAggregationSelector(k) } diff --git a/exporters/otlp/otlpmetric/internal/oconf/options.go b/exporters/otlp/otlpmetric/internal/oconf/options.go index cf5da7e40f9..b5ab4e6f315 100644 --- a/exporters/otlp/otlpmetric/internal/oconf/options.go +++ b/exporters/otlp/otlpmetric/internal/oconf/options.go @@ -30,7 +30,6 @@ import ( "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/aggregation" - "go.opentelemetry.io/otel/sdk/metric/view" ) const ( @@ -336,7 +335,7 @@ func WithTemporalitySelector(selector metric.TemporalitySelector) GenericOption func WithAggregationSelector(selector metric.AggregationSelector) GenericOption { // Deep copy and validate before using. - wrapped := func(ik view.InstrumentKind) aggregation.Aggregation { + wrapped := func(ik metric.InstrumentKind) aggregation.Aggregation { a := selector(ik) cpA := a.Copy() if err := cpA.Err(); err != nil { diff --git a/exporters/otlp/otlpmetric/internal/oconf/options_test.go b/exporters/otlp/otlpmetric/internal/oconf/options_test.go index 51dddd09533..d2426af84c3 100644 --- a/exporters/otlp/otlpmetric/internal/oconf/options_test.go +++ b/exporters/otlp/otlpmetric/internal/oconf/options_test.go @@ -23,9 +23,9 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/internal/envconfig" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf" + "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/metric/view" ) const ( @@ -397,7 +397,7 @@ func TestConfigs(t *testing.T) { // Function value comparisons are disallowed, test non-default // behavior of a TemporalitySelector here to ensure our "catch // all" was set. - var undefinedKind view.InstrumentKind + var undefinedKind metric.InstrumentKind got := c.Metrics.TemporalitySelector assert.Equal(t, metricdata.DeltaTemporality, got(undefinedKind)) }, @@ -413,7 +413,7 @@ func TestConfigs(t *testing.T) { // Function value comparisons are disallowed, test non-default // behavior of a AggregationSelector here to ensure our "catch // all" was set. - var undefinedKind view.InstrumentKind + var undefinedKind metric.InstrumentKind got := c.Metrics.AggregationSelector assert.Equal(t, aggregation.Drop{}, got(undefinedKind)) }, @@ -441,11 +441,11 @@ func TestConfigs(t *testing.T) { } } -func dropSelector(view.InstrumentKind) aggregation.Aggregation { +func dropSelector(metric.InstrumentKind) aggregation.Aggregation { return aggregation.Drop{} } -func deltaSelector(view.InstrumentKind) metricdata.Temporality { +func deltaSelector(metric.InstrumentKind) metricdata.Temporality { return metricdata.DeltaTemporality } diff --git a/exporters/otlp/otlpmetric/internal/otest/client_test.go b/exporters/otlp/otlpmetric/internal/otest/client_test.go index 427b68c4e8c..1db3cc78ea6 100644 --- a/exporters/otlp/otlpmetric/internal/otest/client_test.go +++ b/exporters/otlp/otlpmetric/internal/otest/client_test.go @@ -24,7 +24,6 @@ import ( "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/metric/view" cpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" mpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) @@ -34,11 +33,11 @@ type client struct { storage *Storage } -func (c *client) Temporality(k view.InstrumentKind) metricdata.Temporality { +func (c *client) Temporality(k metric.InstrumentKind) metricdata.Temporality { return metric.DefaultTemporalitySelector(k) } -func (c *client) Aggregation(k view.InstrumentKind) aggregation.Aggregation { +func (c *client) Aggregation(k metric.InstrumentKind) aggregation.Aggregation { return metric.DefaultAggregationSelector(k) } diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go index c001836c611..d2f64432d72 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go @@ -32,7 +32,6 @@ import ( "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/metric/view" colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) @@ -106,12 +105,12 @@ func newClient(ctx context.Context, options ...Option) (otlpmetric.Client, error } // Temporality returns the Temporality to use for an instrument kind. -func (c *client) Temporality(k view.InstrumentKind) metricdata.Temporality { +func (c *client) Temporality(k metric.InstrumentKind) metricdata.Temporality { return c.temporalitySelector(k) } // Aggregation returns the Aggregation to use for an instrument kind. -func (c *client) Aggregation(k view.InstrumentKind) aggregation.Aggregation { +func (c *client) Aggregation(k metric.InstrumentKind) aggregation.Aggregation { return c.aggregationSelector(k) } diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/client.go b/exporters/otlp/otlpmetric/otlpmetrichttp/client.go index 5232ac236a4..ecc170f198a 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/client.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/client.go @@ -37,7 +37,6 @@ import ( "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/metric/view" colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) @@ -130,12 +129,12 @@ func newClient(opts ...Option) (otlpmetric.Client, error) { } // Temporality returns the Temporality to use for an instrument kind. -func (c *client) Temporality(k view.InstrumentKind) metricdata.Temporality { +func (c *client) Temporality(k metric.InstrumentKind) metricdata.Temporality { return c.temporalitySelector(k) } // Aggregation returns the Aggregation to use for an instrument kind. -func (c *client) Aggregation(k view.InstrumentKind) aggregation.Aggregation { +func (c *client) Aggregation(k metric.InstrumentKind) aggregation.Aggregation { return c.aggregationSelector(k) } diff --git a/exporters/prometheus/config_test.go b/exporters/prometheus/config_test.go index 0f3278c61df..2dd92e7abbc 100644 --- a/exporters/prometheus/config_test.go +++ b/exporters/prometheus/config_test.go @@ -20,14 +20,14 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/aggregation" - "go.opentelemetry.io/otel/sdk/metric/view" ) func TestNewConfig(t *testing.T) { registry := prometheus.NewRegistry() - aggregationSelector := func(view.InstrumentKind) aggregation.Aggregation { return nil } + aggregationSelector := func(metric.InstrumentKind) aggregation.Aggregation { return nil } testCases := []struct { name string @@ -112,7 +112,7 @@ func TestNewConfig(t *testing.T) { } func TestConfigManualReaderOptions(t *testing.T) { - aggregationSelector := func(view.InstrumentKind) aggregation.Aggregation { return nil } + aggregationSelector := func(metric.InstrumentKind) aggregation.Aggregation { return nil } testCases := []struct { name string diff --git a/exporters/prometheus/exporter_test.go b/exporters/prometheus/exporter_test.go index f4769ce4ffd..4edf11ba48a 100644 --- a/exporters/prometheus/exporter_test.go +++ b/exporters/prometheus/exporter_test.go @@ -30,7 +30,6 @@ import ( "go.opentelemetry.io/otel/metric/unit" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/aggregation" - "go.opentelemetry.io/otel/sdk/metric/view" "go.opentelemetry.io/otel/sdk/resource" semconv "go.opentelemetry.io/otel/semconv/v1.12.0" ) @@ -269,18 +268,7 @@ func TestPrometheusExporter(t *testing.T) { exporter, err := New(append(tc.options, WithRegisterer(registry))...) require.NoError(t, err) - customBucketsView, err := view.New( - view.MatchInstrumentName("histogram_*"), - view.WithSetAggregation(aggregation.ExplicitBucketHistogram{ - Boundaries: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 1000}, - }), - ) - require.NoError(t, err) - defaultView, err := view.New(view.MatchInstrumentName("*")) - require.NoError(t, err) - var res *resource.Resource - if tc.emptyResource { res = resource.Empty() } else { @@ -300,7 +288,12 @@ func TestPrometheusExporter(t *testing.T) { provider := metric.NewMeterProvider( metric.WithResource(res), metric.WithReader(exporter), - metric.WithView(customBucketsView, defaultView), + metric.WithView(metric.NewView( + metric.Instrument{Name: "histogram_*"}, + metric.Stream{Aggregation: aggregation.ExplicitBucketHistogram{ + Boundaries: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 1000}, + }}, + )), ) meter := provider.Meter( "testmeter", diff --git a/exporters/stdout/stdoutmetric/config.go b/exporters/stdout/stdoutmetric/config.go index 2b40806c96d..60a76a0bbd5 100644 --- a/exporters/stdout/stdoutmetric/config.go +++ b/exporters/stdout/stdoutmetric/config.go @@ -20,7 +20,6 @@ import ( "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/aggregation" - "go.opentelemetry.io/otel/sdk/metric/view" ) // config contains options for the exporter. @@ -101,7 +100,7 @@ func (t temporalitySelectorOption) apply(c config) config { // instrument. func WithAggregationSelector(selector metric.AggregationSelector) Option { // Deep copy and validate before using. - wrapped := func(ik view.InstrumentKind) aggregation.Aggregation { + wrapped := func(ik metric.InstrumentKind) aggregation.Aggregation { a := selector(ik) cpA := a.Copy() if err := cpA.Err(); err != nil { diff --git a/exporters/stdout/stdoutmetric/exporter.go b/exporters/stdout/stdoutmetric/exporter.go index 8a9d55a4979..77cad9e934e 100644 --- a/exporters/stdout/stdoutmetric/exporter.go +++ b/exporters/stdout/stdoutmetric/exporter.go @@ -22,7 +22,6 @@ import ( "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/metric/view" ) // exporter is an OpenTelemetry metric exporter. @@ -49,11 +48,11 @@ func New(options ...Option) (metric.Exporter, error) { return exp, nil } -func (e *exporter) Temporality(k view.InstrumentKind) metricdata.Temporality { +func (e *exporter) Temporality(k metric.InstrumentKind) metricdata.Temporality { return e.temporalitySelector(k) } -func (e *exporter) Aggregation(k view.InstrumentKind) aggregation.Aggregation { +func (e *exporter) Aggregation(k metric.InstrumentKind) aggregation.Aggregation { return e.aggregationSelector(k) } diff --git a/exporters/stdout/stdoutmetric/exporter_test.go b/exporters/stdout/stdoutmetric/exporter_test.go index a7c3abb9bd1..a88180502b1 100644 --- a/exporters/stdout/stdoutmetric/exporter_test.go +++ b/exporters/stdout/stdoutmetric/exporter_test.go @@ -25,9 +25,9 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" + "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/metric/view" ) func testEncoderOption() stdoutmetric.Option { @@ -100,7 +100,7 @@ func TestShutdownExporterReturnsShutdownErrorOnExport(t *testing.T) { assert.EqualError(t, exp.Export(ctx, data), "exporter shutdown") } -func deltaSelector(view.InstrumentKind) metricdata.Temporality { +func deltaSelector(metric.InstrumentKind) metricdata.Temporality { return metricdata.DeltaTemporality } @@ -111,11 +111,11 @@ func TestTemporalitySelector(t *testing.T) { ) require.NoError(t, err) - var unknownKind view.InstrumentKind + var unknownKind metric.InstrumentKind assert.Equal(t, metricdata.DeltaTemporality, exp.Temporality(unknownKind)) } -func dropSelector(view.InstrumentKind) aggregation.Aggregation { +func dropSelector(metric.InstrumentKind) aggregation.Aggregation { return aggregation.Drop{} } @@ -126,6 +126,6 @@ func TestAggregationSelector(t *testing.T) { ) require.NoError(t, err) - var unknownKind view.InstrumentKind + var unknownKind metric.InstrumentKind assert.Equal(t, aggregation.Drop{}, exp.Aggregation(unknownKind)) } diff --git a/sdk/metric/benchmark_test.go b/sdk/metric/benchmark_test.go index d26f598f676..9743c90cf87 100644 --- a/sdk/metric/benchmark_test.go +++ b/sdk/metric/benchmark_test.go @@ -20,10 +20,9 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric/instrument/syncint64" - "go.opentelemetry.io/otel/sdk/metric/view" ) -func benchCounter(b *testing.B, views ...view.View) (context.Context, Reader, syncint64.Counter) { +func benchCounter(b *testing.B, views ...View) (context.Context, Reader, syncint64.Counter) { ctx := context.Background() rdr := NewManualReader() provider := NewMeterProvider(WithReader(rdr), WithView(views...)) @@ -74,9 +73,12 @@ func BenchmarkCounterAddSingleUseInvalidAttrs(b *testing.B) { } func BenchmarkCounterAddSingleUseFilteredAttrs(b *testing.B) { - vw, _ := view.New(view.WithFilterAttributes(attribute.Key("K"))) - - ctx, _, cntr := benchCounter(b, vw) + ctx, _, cntr := benchCounter(b, NewView( + Instrument{Name: "*"}, + Stream{AttributeFilter: func(kv attribute.KeyValue) bool { + return kv.Key == attribute.Key("K") + }}, + )) for i := 0; i < b.N; i++ { cntr.Add(ctx, 1, attribute.Int("L", i), attribute.Int("K", i)) diff --git a/sdk/metric/config.go b/sdk/metric/config.go index ec1e06ab43b..c78b0416415 100644 --- a/sdk/metric/config.go +++ b/sdk/metric/config.go @@ -19,7 +19,6 @@ import ( "fmt" "sync" - "go.opentelemetry.io/otel/sdk/metric/view" "go.opentelemetry.io/otel/sdk/resource" ) @@ -27,7 +26,7 @@ import ( type config struct { res *resource.Resource readers []Reader - views []view.View + views []View } // readerSignals returns a force-flush and shutdown function for a @@ -134,7 +133,7 @@ func WithReader(r Reader) Option { // // By default, if this option is not used, the MeterProvider will use the // default view. -func WithView(views ...view.View) Option { +func WithView(views ...View) Option { return optionFunc(func(cfg config) config { cfg.views = append(cfg.views, views...) return cfg diff --git a/sdk/metric/config_test.go b/sdk/metric/config_test.go index 3c3659a46e7..a924d879d00 100644 --- a/sdk/metric/config_test.go +++ b/sdk/metric/config_test.go @@ -24,7 +24,6 @@ import ( "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/metric/view" "go.opentelemetry.io/otel/sdk/resource" ) @@ -39,12 +38,12 @@ type reader struct { var _ Reader = (*reader)(nil) -func (r *reader) aggregation(kind view.InstrumentKind) aggregation.Aggregation { // nolint:revive // import-shadow for method scoped by type. +func (r *reader) aggregation(kind InstrumentKind) aggregation.Aggregation { // nolint:revive // import-shadow for method scoped by type. return r.aggregationFunc(kind) } func (r *reader) register(p producer) { r.producer = p } -func (r *reader) temporality(kind view.InstrumentKind) metricdata.Temporality { +func (r *reader) temporality(kind InstrumentKind) metricdata.Temporality { return r.temporalityFunc(kind) } func (r *reader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) { @@ -132,16 +131,15 @@ func TestWithReader(t *testing.T) { } func TestWithView(t *testing.T) { - var views []view.View - - v, err := view.New(view.MatchInstrumentKind(view.AsyncCounter), view.WithRename("a")) - require.NoError(t, err) - views = append(views, v) - - v, err = view.New(view.MatchInstrumentKind(view.SyncCounter), view.WithRename("b")) - require.NoError(t, err) - views = append(views, v) - - c := newConfig([]Option{WithView(views...)}) - assert.Equal(t, views, c.views) + c := newConfig([]Option{WithView( + NewView( + Instrument{Kind: InstrumentKindAsyncCounter}, + Stream{Name: "a"}, + ), + NewView( + Instrument{Kind: InstrumentKindSyncCounter}, + Stream{Name: "b"}, + ), + )}) + assert.Len(t, c.views, 2) } diff --git a/sdk/metric/doc.go b/sdk/metric/doc.go index 6cbc7fdfc0d..92878ce8bc2 100644 --- a/sdk/metric/doc.go +++ b/sdk/metric/doc.go @@ -33,9 +33,7 @@ // // Each Reader, when registered with the MeterProvider, can be augmented with a // View. Views allow users that run OpenTelemetry instrumented code to modify -// the generated data of that instrumentation. See the -// go.opentelemetry.io/otel/sdk/metric/view package for more information about -// Views. +// the generated data of that instrumentation. // // The data generated by a MeterProvider needs to include information about its // origin. A MeterProvider needs to be configured with a Resource, using the diff --git a/sdk/metric/exporter.go b/sdk/metric/exporter.go index 687d3eae9a6..d899b925aa9 100644 --- a/sdk/metric/exporter.go +++ b/sdk/metric/exporter.go @@ -20,7 +20,6 @@ import ( "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/metric/view" ) // ErrExporterShutdown is returned if Export or Shutdown are called after an @@ -31,10 +30,10 @@ var ErrExporterShutdown = fmt.Errorf("exporter is shutdown") // the final component in the metric push pipeline. type Exporter interface { // Temporality returns the Temporality to use for an instrument kind. - Temporality(view.InstrumentKind) metricdata.Temporality + Temporality(InstrumentKind) metricdata.Temporality // Aggregation returns the Aggregation to use for an instrument kind. - Aggregation(view.InstrumentKind) aggregation.Aggregation + Aggregation(InstrumentKind) aggregation.Aggregation // Export serializes and transmits metric data to a receiver. // diff --git a/sdk/metric/instrument_provider.go b/sdk/metric/instrument_provider.go index 25aad6cfc20..ade1f641521 100644 --- a/sdk/metric/instrument_provider.go +++ b/sdk/metric/instrument_provider.go @@ -20,54 +20,30 @@ import ( "go.opentelemetry.io/otel/metric/instrument/asyncint64" "go.opentelemetry.io/otel/metric/instrument/syncfloat64" "go.opentelemetry.io/otel/metric/instrument/syncint64" - "go.opentelemetry.io/otel/metric/unit" "go.opentelemetry.io/otel/sdk/instrumentation" - "go.opentelemetry.io/otel/sdk/metric/view" ) -// instProviderKey uniquely describes an instrument creation request received -// by an instrument provider. -type instProviderKey struct { - // Name is the name of the instrument. - Name string - // Description is the description of the instrument. - Description string - // Unit is the unit of the instrument. - Unit unit.Unit - // Kind is the instrument Kind provided. - Kind view.InstrumentKind -} - -// viewInst returns the instProviderKey as a view Instrument using scope s. -func (k instProviderKey) viewInst(s instrumentation.Scope) view.Instrument { - return view.Instrument{ - Scope: s, - Name: k.Name, - Description: k.Description, - Kind: k.Kind, - } -} - // instProvider provides all OpenTelemetry instruments. type instProvider[N int64 | float64] struct { + scope instrumentation.Scope resolve resolver[N] } -func newInstProvider[N int64 | float64](r resolver[N]) *instProvider[N] { - return &instProvider[N]{resolve: r} +func newInstProvider[N int64 | float64](s instrumentation.Scope, p pipelines, c instrumentCache[N]) *instProvider[N] { + return &instProvider[N]{scope: s, resolve: newResolver(p, c)} } // lookup returns the resolved instrumentImpl. -func (p *instProvider[N]) lookup(kind view.InstrumentKind, name string, opts []instrument.Option) (*instrumentImpl[N], error) { +func (p *instProvider[N]) lookup(kind InstrumentKind, name string, opts []instrument.Option) (*instrumentImpl[N], error) { cfg := instrument.NewConfig(opts...) - key := instProviderKey{ + i := Instrument{ Name: name, Description: cfg.Description(), Unit: cfg.Unit(), Kind: kind, + Scope: p.scope, } - - aggs, err := p.resolve.Aggregators(key) + aggs, err := p.resolve.Aggregators(i) return &instrumentImpl[N]{aggregators: aggs}, err } @@ -79,17 +55,17 @@ var _ asyncint64.InstrumentProvider = asyncInt64Provider{} // Counter creates an instrument for recording increasing values. func (p asyncInt64Provider) Counter(name string, opts ...instrument.Option) (asyncint64.Counter, error) { - return p.lookup(view.AsyncCounter, name, opts) + return p.lookup(InstrumentKindAsyncCounter, name, opts) } // UpDownCounter creates an instrument for recording changes of a value. func (p asyncInt64Provider) UpDownCounter(name string, opts ...instrument.Option) (asyncint64.UpDownCounter, error) { - return p.lookup(view.AsyncUpDownCounter, name, opts) + return p.lookup(InstrumentKindAsyncUpDownCounter, name, opts) } // Gauge creates an instrument for recording the current value. func (p asyncInt64Provider) Gauge(name string, opts ...instrument.Option) (asyncint64.Gauge, error) { - return p.lookup(view.AsyncGauge, name, opts) + return p.lookup(InstrumentKindAsyncGauge, name, opts) } type asyncFloat64Provider struct { @@ -100,17 +76,17 @@ var _ asyncfloat64.InstrumentProvider = asyncFloat64Provider{} // Counter creates an instrument for recording increasing values. func (p asyncFloat64Provider) Counter(name string, opts ...instrument.Option) (asyncfloat64.Counter, error) { - return p.lookup(view.AsyncCounter, name, opts) + return p.lookup(InstrumentKindAsyncCounter, name, opts) } // UpDownCounter creates an instrument for recording changes of a value. func (p asyncFloat64Provider) UpDownCounter(name string, opts ...instrument.Option) (asyncfloat64.UpDownCounter, error) { - return p.lookup(view.AsyncUpDownCounter, name, opts) + return p.lookup(InstrumentKindAsyncUpDownCounter, name, opts) } // Gauge creates an instrument for recording the current value. func (p asyncFloat64Provider) Gauge(name string, opts ...instrument.Option) (asyncfloat64.Gauge, error) { - return p.lookup(view.AsyncGauge, name, opts) + return p.lookup(InstrumentKindAsyncGauge, name, opts) } type syncInt64Provider struct { @@ -121,17 +97,17 @@ var _ syncint64.InstrumentProvider = syncInt64Provider{} // Counter creates an instrument for recording increasing values. func (p syncInt64Provider) Counter(name string, opts ...instrument.Option) (syncint64.Counter, error) { - return p.lookup(view.SyncCounter, name, opts) + return p.lookup(InstrumentKindSyncCounter, name, opts) } // UpDownCounter creates an instrument for recording changes of a value. func (p syncInt64Provider) UpDownCounter(name string, opts ...instrument.Option) (syncint64.UpDownCounter, error) { - return p.lookup(view.SyncUpDownCounter, name, opts) + return p.lookup(InstrumentKindSyncUpDownCounter, name, opts) } // Histogram creates an instrument for recording the current value. func (p syncInt64Provider) Histogram(name string, opts ...instrument.Option) (syncint64.Histogram, error) { - return p.lookup(view.SyncHistogram, name, opts) + return p.lookup(InstrumentKindSyncHistogram, name, opts) } type syncFloat64Provider struct { @@ -142,15 +118,15 @@ var _ syncfloat64.InstrumentProvider = syncFloat64Provider{} // Counter creates an instrument for recording increasing values. func (p syncFloat64Provider) Counter(name string, opts ...instrument.Option) (syncfloat64.Counter, error) { - return p.lookup(view.SyncCounter, name, opts) + return p.lookup(InstrumentKindSyncCounter, name, opts) } // UpDownCounter creates an instrument for recording changes of a value. func (p syncFloat64Provider) UpDownCounter(name string, opts ...instrument.Option) (syncfloat64.UpDownCounter, error) { - return p.lookup(view.SyncUpDownCounter, name, opts) + return p.lookup(InstrumentKindSyncUpDownCounter, name, opts) } // Histogram creates an instrument for recording the current value. func (p syncFloat64Provider) Histogram(name string, opts ...instrument.Option) (syncfloat64.Histogram, error) { - return p.lookup(view.SyncHistogram, name, opts) + return p.lookup(InstrumentKindSyncHistogram, name, opts) } diff --git a/sdk/metric/internal/filter.go b/sdk/metric/internal/filter.go index 8acbdc79a02..86e73c866dc 100644 --- a/sdk/metric/internal/filter.go +++ b/sdk/metric/internal/filter.go @@ -24,7 +24,7 @@ import ( // filter is an aggregator that applies attribute filter when Aggregating. filters // do not have any backing memory, and must be constructed with a backing Aggregator. type filter[N int64 | float64] struct { - filter func(attribute.Set) attribute.Set + filter attribute.Filter aggregator Aggregator[N] sync.Mutex @@ -32,7 +32,7 @@ type filter[N int64 | float64] struct { } // NewFilter wraps an Aggregator with an attribute filtering function. -func NewFilter[N int64 | float64](agg Aggregator[N], fn func(attribute.Set) attribute.Set) Aggregator[N] { +func NewFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) Aggregator[N] { if fn == nil { return agg } @@ -51,7 +51,7 @@ func (f *filter[N]) Aggregate(measurement N, attr attribute.Set) { defer f.Unlock() fAttr, ok := f.seen[attr] if !ok { - fAttr = f.filter(attr) + fAttr, _ = attr.Filter(f.filter) f.seen[attr] = fAttr } f.aggregator.Aggregate(measurement, fAttr) diff --git a/sdk/metric/internal/filter_test.go b/sdk/metric/internal/filter_test.go index 14f572a5f8e..e5632156b30 100644 --- a/sdk/metric/internal/filter_test.go +++ b/sdk/metric/internal/filter_test.go @@ -64,11 +64,8 @@ func testNewFilter[N int64 | float64](t *testing.T, agg Aggregator[N]) { assert.Equal(t, agg, filt.aggregator) } -func testAttributeFilter(input attribute.Set) attribute.Set { - out, _ := input.Filter(func(kv attribute.KeyValue) bool { - return kv.Key == "power-level" - }) - return out +var testAttributeFilter = func(kv attribute.KeyValue) bool { + return kv.Key == "power-level" } func TestNewFilter(t *testing.T) { diff --git a/sdk/metric/manual_reader.go b/sdk/metric/manual_reader.go index 62ccf0f0535..7860a3be2b9 100644 --- a/sdk/metric/manual_reader.go +++ b/sdk/metric/manual_reader.go @@ -23,7 +23,6 @@ import ( "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/metric/view" ) // manualReader is a a simple Reader that allows an application to @@ -59,12 +58,12 @@ func (mr *manualReader) register(p producer) { } // temporality reports the Temporality for the instrument kind provided. -func (mr *manualReader) temporality(kind view.InstrumentKind) metricdata.Temporality { +func (mr *manualReader) temporality(kind InstrumentKind) metricdata.Temporality { return mr.temporalitySelector(kind) } // aggregation returns what Aggregation to use for kind. -func (mr *manualReader) aggregation(kind view.InstrumentKind) aggregation.Aggregation { // nolint:revive // import-shadow for method scoped by type. +func (mr *manualReader) aggregation(kind InstrumentKind) aggregation.Aggregation { // nolint:revive // import-shadow for method scoped by type. return mr.aggregationSelector(kind) } @@ -138,7 +137,7 @@ func WithTemporalitySelector(selector TemporalitySelector) ManualReaderOption { } type temporalitySelectorOption struct { - selector func(instrument view.InstrumentKind) metricdata.Temporality + selector func(instrument InstrumentKind) metricdata.Temporality } // applyManual returns a manualReaderConfig with option applied. @@ -153,7 +152,7 @@ func (t temporalitySelectorOption) applyManual(mrc manualReaderConfig) manualRea // or the aggregation explicitly passed for a view matching an instrument. func WithAggregationSelector(selector AggregationSelector) ManualReaderOption { // Deep copy and validate before using. - wrapped := func(ik view.InstrumentKind) aggregation.Aggregation { + wrapped := func(ik InstrumentKind) aggregation.Aggregation { a := selector(ik) cpA := a.Copy() if err := cpA.Err(); err != nil { diff --git a/sdk/metric/manual_reader_test.go b/sdk/metric/manual_reader_test.go index 7d54be113c1..f20b0bc683c 100644 --- a/sdk/metric/manual_reader_test.go +++ b/sdk/metric/manual_reader_test.go @@ -21,7 +21,6 @@ import ( "github.com/stretchr/testify/suite" "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/metric/view" ) func TestManualReader(t *testing.T) { @@ -32,8 +31,8 @@ func BenchmarkManualReader(b *testing.B) { b.Run("Collect", benchReaderCollectFunc(NewManualReader())) } -var deltaTemporalitySelector = func(view.InstrumentKind) metricdata.Temporality { return metricdata.DeltaTemporality } -var cumulativeTemporalitySelector = func(view.InstrumentKind) metricdata.Temporality { return metricdata.CumulativeTemporality } +var deltaTemporalitySelector = func(InstrumentKind) metricdata.Temporality { return metricdata.DeltaTemporality } +var cumulativeTemporalitySelector = func(InstrumentKind) metricdata.Temporality { return metricdata.CumulativeTemporality } func TestManualReaderTemporality(t *testing.T) { tests := []struct { @@ -66,7 +65,7 @@ func TestManualReaderTemporality(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - var undefinedInstrument view.InstrumentKind + var undefinedInstrument InstrumentKind rdr := NewManualReader(tt.options...) assert.Equal(t, tt.wantTemporality, rdr.temporality(undefinedInstrument)) }) diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 3f9f30106eb..418827e9672 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -47,13 +47,10 @@ func newMeter(s instrumentation.Scope, p pipelines) *meter { ic := newInstrumentCache[int64](nil, &viewCache) fc := newInstrumentCache[float64](nil, &viewCache) - ir := newResolver(s, p, ic) - fr := newResolver(s, p, fc) - return &meter{ pipes: p, - instProviderInt64: newInstProvider(ir), - instProviderFloat64: newInstProvider(fr), + instProviderInt64: newInstProvider(s, p, ic), + instProviderFloat64: newInstProvider(s, p, fc), } } diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index 6edc58d17ce..013a52e5924 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -33,7 +33,6 @@ import ( "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" - "go.opentelemetry.io/otel/sdk/metric/view" "go.opentelemetry.io/otel/sdk/resource" ) @@ -482,7 +481,7 @@ func TestMetersProvideScope(t *testing.T) { } func TestRegisterCallbackDropAggregations(t *testing.T) { - aggFn := func(view.InstrumentKind) aggregation.Aggregation { + aggFn := func(InstrumentKind) aggregation.Aggregation { return aggregation.Drop{} } r := NewManualReader(WithAggregationSelector(aggFn)) @@ -852,19 +851,17 @@ func TestAttributeFilter(t *testing.T) { for _, tt := range testcases { t.Run(tt.name, func(t *testing.T) { - v, err := view.New( - view.MatchInstrumentName("*"), - view.WithFilterAttributes(attribute.Key("foo")), - ) - require.NoError(t, err) rdr := NewManualReader() mtr := NewMeterProvider( WithReader(rdr), - WithView(v), + WithView(NewView( + Instrument{Name: "*"}, + Stream{AttributeFilter: func(kv attribute.KeyValue) bool { + return kv.Key == attribute.Key("foo") + }}, + )), ).Meter("TestAttributeFilter") - - err = tt.register(t, mtr) - require.NoError(t, err) + require.NoError(t, tt.register(t, mtr)) m, err := rdr.Collect(context.Background()) assert.NoError(t, err) diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index 3dc7ed2d045..00ba1305595 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -25,7 +25,6 @@ import ( "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/metric/view" ) // Default periodic reader timing. @@ -176,12 +175,12 @@ func (r *periodicReader) register(p producer) { } // temporality reports the Temporality for the instrument kind provided. -func (r *periodicReader) temporality(kind view.InstrumentKind) metricdata.Temporality { +func (r *periodicReader) temporality(kind InstrumentKind) metricdata.Temporality { return r.exporter.Temporality(kind) } // aggregation returns what Aggregation to use for kind. -func (r *periodicReader) aggregation(kind view.InstrumentKind) aggregation.Aggregation { // nolint:revive // import-shadow for method scoped by type. +func (r *periodicReader) aggregation(kind InstrumentKind) aggregation.Aggregation { // nolint:revive // import-shadow for method scoped by type. return r.exporter.Aggregation(kind) } diff --git a/sdk/metric/periodic_reader_test.go b/sdk/metric/periodic_reader_test.go index c0bf06a3086..d48c1a7de8e 100644 --- a/sdk/metric/periodic_reader_test.go +++ b/sdk/metric/periodic_reader_test.go @@ -25,7 +25,6 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/metric/view" ) const testDur = time.Second * 2 @@ -64,14 +63,14 @@ type fnExporter struct { var _ Exporter = (*fnExporter)(nil) -func (e *fnExporter) Temporality(k view.InstrumentKind) metricdata.Temporality { +func (e *fnExporter) Temporality(k InstrumentKind) metricdata.Temporality { if e.temporalityFunc != nil { return e.temporalityFunc(k) } return DefaultTemporalitySelector(k) } -func (e *fnExporter) Aggregation(k view.InstrumentKind) aggregation.Aggregation { +func (e *fnExporter) Aggregation(k InstrumentKind) aggregation.Aggregation { if e.aggregationFunc != nil { return e.aggregationFunc(k) } @@ -272,7 +271,7 @@ func TestPeriodiclReaderTemporality(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - var undefinedInstrument view.InstrumentKind + var undefinedInstrument InstrumentKind rdr := NewPeriodicReader(tt.exporter) assert.Equal(t, tt.wantTemporality.String(), rdr.temporality(undefinedInstrument).String()) }) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 46ad0806bdf..bc6901e5775 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -21,14 +21,12 @@ import ( "strings" "sync" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/metric/unit" "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/internal" "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/metric/view" "go.opentelemetry.io/otel/sdk/resource" ) @@ -52,7 +50,7 @@ type instrumentSync struct { aggregator aggregator } -func newPipeline(res *resource.Resource, reader Reader, views []view.View) *pipeline { +func newPipeline(res *resource.Resource, reader Reader, views []View) *pipeline { if res == nil { res = resource.Empty() } @@ -73,7 +71,7 @@ type pipeline struct { resource *resource.Resource reader Reader - views []view.View + views []View sync.Mutex aggregations map[instrumentation.Scope][]instrumentSync @@ -159,13 +157,12 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err // inserter facilitates inserting of new instruments from a single scope into a // pipeline. type inserter[N int64 | float64] struct { - scope instrumentation.Scope cache instrumentCache[N] pipeline *pipeline } -func newInserter[N int64 | float64](s instrumentation.Scope, p *pipeline, c instrumentCache[N]) *inserter[N] { - return &inserter[N]{scope: s, cache: c, pipeline: p} +func newInserter[N int64 | float64](p *pipeline, c instrumentCache[N]) *inserter[N] { + return &inserter[N]{cache: c, pipeline: p} } // Instrument inserts the instrument inst with instUnit into a pipeline. All @@ -189,7 +186,7 @@ func newInserter[N int64 | float64](s instrumentation.Scope, p *pipeline, c inst // // If an instrument is determined to use a Drop aggregation, that instrument is // not inserted nor returned. -func (i *inserter[N]) Instrument(key instProviderKey) ([]internal.Aggregator[N], error) { +func (i *inserter[N]) Instrument(inst Instrument) ([]internal.Aggregator[N], error) { var ( matched bool aggs []internal.Aggregator[N] @@ -199,15 +196,14 @@ func (i *inserter[N]) Instrument(key instProviderKey) ([]internal.Aggregator[N], // The cache will return the same Aggregator instance. Use this fact to // compare pointer addresses to deduplicate Aggregators. seen := make(map[internal.Aggregator[N]]struct{}) - inst := key.viewInst(i.scope) for _, v := range i.pipeline.views { - inst, match := v.TransformInstrument(inst) + stream, match := v(inst) if !match { continue } matched = true - agg, err := i.cachedAggregator(inst, key.Unit, v.AttributeFilter()) + agg, err := i.cachedAggregator(inst.Scope, inst.Kind, stream) if err != nil { errs.append(err) } @@ -227,7 +223,12 @@ func (i *inserter[N]) Instrument(key instProviderKey) ([]internal.Aggregator[N], } // Apply implicit default view if no explicit matched. - agg, err := i.cachedAggregator(inst, key.Unit, nil) + stream := Stream{ + Name: inst.Name, + Description: inst.Description, + Unit: inst.Unit, + } + agg, err := i.cachedAggregator(inst.Scope, inst.Kind, stream) if err != nil { errs.append(err) } @@ -251,40 +252,40 @@ func (i *inserter[N]) Instrument(key instProviderKey) ([]internal.Aggregator[N], // // If the instrument defines an unknown or incompatible aggregation, an error // is returned. -func (i *inserter[N]) cachedAggregator(inst view.Instrument, u unit.Unit, filter func(attribute.Set) attribute.Set) (internal.Aggregator[N], error) { - switch inst.Aggregation.(type) { +func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind InstrumentKind, stream Stream) (internal.Aggregator[N], error) { + switch stream.Aggregation.(type) { case nil, aggregation.Default: // Undefined, nil, means to use the default from the reader. - inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind) + stream.Aggregation = i.pipeline.reader.aggregation(kind) } - if err := isAggregatorCompatible(inst.Kind, inst.Aggregation); err != nil { + if err := isAggregatorCompatible(kind, stream.Aggregation); err != nil { return nil, fmt.Errorf( "creating aggregator with instrumentKind: %d, aggregation %v: %w", - inst.Kind, inst.Aggregation, err, + kind, stream.Aggregation, err, ) } - id := i.instrumentID(inst, u) + id := i.instrumentID(kind, stream) // If there is a conflict, the specification says the view should // still be applied and a warning should be logged. i.logConflict(id) return i.cache.LookupAggregator(id, func() (internal.Aggregator[N], error) { - agg, err := i.aggregator(inst.Aggregation, inst.Kind, id.Temporality, id.Monotonic) + agg, err := i.aggregator(stream.Aggregation, kind, id.Temporality, id.Monotonic) if err != nil { return nil, err } if agg == nil { // Drop aggregator. return nil, nil } - if filter != nil { - agg = internal.NewFilter(agg, filter) + if stream.AttributeFilter != nil { + agg = internal.NewFilter(agg, stream.AttributeFilter) } - i.pipeline.addSync(inst.Scope, instrumentSync{ - name: inst.Name, - description: inst.Description, - unit: u, + i.pipeline.addSync(scope, instrumentSync{ + name: stream.Name, + description: stream.Description, + unit: stream.Unit, aggregator: agg, }) return agg, err @@ -311,19 +312,19 @@ func (i *inserter[N]) logConflict(id instrumentID) { ) } -func (i *inserter[N]) instrumentID(vi view.Instrument, u unit.Unit) instrumentID { +func (i *inserter[N]) instrumentID(kind InstrumentKind, stream Stream) instrumentID { var zero N id := instrumentID{ - Name: vi.Name, - Description: vi.Description, - Unit: u, - Aggregation: fmt.Sprintf("%T", vi.Aggregation), - Temporality: i.pipeline.reader.temporality(vi.Kind), + Name: stream.Name, + Description: stream.Description, + Unit: stream.Unit, + Aggregation: fmt.Sprintf("%T", stream.Aggregation), + Temporality: i.pipeline.reader.temporality(kind), Number: fmt.Sprintf("%T", zero), } - switch vi.Kind { - case view.AsyncCounter, view.SyncCounter, view.SyncHistogram: + switch kind { + case InstrumentKindAsyncCounter, InstrumentKindSyncCounter, InstrumentKindSyncHistogram: id.Monotonic = true } @@ -333,7 +334,7 @@ func (i *inserter[N]) instrumentID(vi view.Instrument, u unit.Unit) instrumentID // aggregator returns a new Aggregator matching agg, kind, temporality, and // monotonic. If the agg is unknown or temporality is invalid, an error is // returned. -func (i *inserter[N]) aggregator(agg aggregation.Aggregation, kind view.InstrumentKind, temporality metricdata.Temporality, monotonic bool) (internal.Aggregator[N], error) { +func (i *inserter[N]) aggregator(agg aggregation.Aggregation, kind InstrumentKind, temporality metricdata.Temporality, monotonic bool) (internal.Aggregator[N], error) { switch a := agg.(type) { case aggregation.Drop: return nil, nil @@ -341,7 +342,7 @@ func (i *inserter[N]) aggregator(agg aggregation.Aggregation, kind view.Instrume return internal.NewLastValue[N](), nil case aggregation.Sum: switch kind { - case view.AsyncCounter, view.AsyncUpDownCounter: + case InstrumentKindAsyncCounter, InstrumentKindAsyncUpDownCounter: // Asynchronous counters and up-down-counters are defined to record // the absolute value of the count: // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#asynchronous-counter-creation @@ -387,10 +388,10 @@ func (i *inserter[N]) aggregator(agg aggregation.Aggregation, kind view.Instrume // | Async Counter | X | | X | | | // | Async UpDown Counter | X | | X | | | // | Async Gauge | X | X | | | |. -func isAggregatorCompatible(kind view.InstrumentKind, agg aggregation.Aggregation) error { +func isAggregatorCompatible(kind InstrumentKind, agg aggregation.Aggregation) error { switch agg.(type) { case aggregation.ExplicitBucketHistogram: - if kind == view.SyncCounter || kind == view.SyncHistogram { + if kind == InstrumentKindSyncCounter || kind == InstrumentKindSyncHistogram { return nil } // TODO: review need for aggregation check after @@ -398,7 +399,7 @@ func isAggregatorCompatible(kind view.InstrumentKind, agg aggregation.Aggregatio return errIncompatibleAggregation case aggregation.Sum: switch kind { - case view.AsyncCounter, view.AsyncUpDownCounter, view.SyncCounter, view.SyncHistogram, view.SyncUpDownCounter: + case InstrumentKindAsyncCounter, InstrumentKindAsyncUpDownCounter, InstrumentKindSyncCounter, InstrumentKindSyncHistogram, InstrumentKindSyncUpDownCounter: return nil default: // TODO: review need for aggregation check after @@ -406,7 +407,7 @@ func isAggregatorCompatible(kind view.InstrumentKind, agg aggregation.Aggregatio return errIncompatibleAggregation } case aggregation.LastValue: - if kind == view.AsyncGauge { + if kind == InstrumentKindAsyncGauge { return nil } // TODO: review need for aggregation check after @@ -424,7 +425,7 @@ func isAggregatorCompatible(kind view.InstrumentKind, agg aggregation.Aggregatio // measurement. type pipelines []*pipeline -func newPipelines(res *resource.Resource, readers []Reader, views []view.View) pipelines { +func newPipelines(res *resource.Resource, readers []Reader, views []View) pipelines { pipes := make([]*pipeline, 0, len(readers)) for _, r := range readers { p := &pipeline{ @@ -451,22 +452,22 @@ type resolver[N int64 | float64] struct { inserters []*inserter[N] } -func newResolver[N int64 | float64](s instrumentation.Scope, p pipelines, c instrumentCache[N]) resolver[N] { +func newResolver[N int64 | float64](p pipelines, c instrumentCache[N]) resolver[N] { in := make([]*inserter[N], len(p)) for i := range in { - in[i] = newInserter(s, p[i], c) + in[i] = newInserter(p[i], c) } return resolver[N]{in} } // Aggregators returns the Aggregators that must be updated by the instrument // defined by key. -func (r resolver[N]) Aggregators(key instProviderKey) ([]internal.Aggregator[N], error) { +func (r resolver[N]) Aggregators(id Instrument) ([]internal.Aggregator[N], error) { var aggs []internal.Aggregator[N] errs := &multierror{} for _, i := range r.inserters { - a, err := i.Instrument(key) + a, err := i.Instrument(id) if err != nil { errs.append(err) } diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index 3f73637bf5f..91580242763 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -25,13 +25,13 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/internal" - "go.opentelemetry.io/otel/sdk/metric/view" "go.opentelemetry.io/otel/sdk/resource" ) +var defaultView = NewView(Instrument{Name: "*"}, Stream{}) + type invalidAggregation struct { aggregation.Aggregation } @@ -44,180 +44,179 @@ func (invalidAggregation) Err() error { } func testCreateAggregators[N int64 | float64](t *testing.T) { - changeAggView, _ := view.New( - view.MatchInstrumentName("foo"), - view.WithSetAggregation(aggregation.ExplicitBucketHistogram{}), + changeAggView := NewView( + Instrument{Name: "foo"}, + Stream{Aggregation: aggregation.ExplicitBucketHistogram{}}, ) - renameView, _ := view.New( - view.MatchInstrumentName("foo"), - view.WithRename("bar"), + renameView := NewView( + Instrument{Name: "foo"}, + Stream{Name: "bar"}, ) - defaultAggView, _ := view.New( - view.MatchInstrumentName("foo"), - view.WithSetAggregation(aggregation.Default{}), + defaultAggView := NewView( + Instrument{Name: "foo"}, + Stream{Aggregation: aggregation.Default{}}, ) - invalidAggView, _ := view.New( - view.MatchInstrumentName("foo"), - view.WithSetAggregation(invalidAggregation{}), + invalidAggView := NewView( + Instrument{Name: "foo"}, + Stream{Aggregation: invalidAggregation{}}, ) - instruments := []instProviderKey{ - {Name: "foo", Kind: view.InstrumentKind(0)}, //Unknown kind - {Name: "foo", Kind: view.SyncCounter}, - {Name: "foo", Kind: view.SyncUpDownCounter}, - {Name: "foo", Kind: view.SyncHistogram}, - {Name: "foo", Kind: view.AsyncCounter}, - {Name: "foo", Kind: view.AsyncUpDownCounter}, - {Name: "foo", Kind: view.AsyncGauge}, + instruments := []Instrument{ + {Name: "foo", Kind: InstrumentKind(0)}, //Unknown kind + {Name: "foo", Kind: InstrumentKindSyncCounter}, + {Name: "foo", Kind: InstrumentKindSyncUpDownCounter}, + {Name: "foo", Kind: InstrumentKindSyncHistogram}, + {Name: "foo", Kind: InstrumentKindAsyncCounter}, + {Name: "foo", Kind: InstrumentKindAsyncUpDownCounter}, + {Name: "foo", Kind: InstrumentKindAsyncGauge}, } testcases := []struct { name string reader Reader - views []view.View - inst instProviderKey + views []View + inst Instrument wantKind internal.Aggregator[N] //Aggregators should match len and types wantLen int wantErr error }{ { name: "drop should return 0 aggregators", - reader: NewManualReader(WithAggregationSelector(func(ik view.InstrumentKind) aggregation.Aggregation { return aggregation.Drop{} })), - views: []view.View{{}}, - inst: instruments[view.SyncCounter], + reader: NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.Drop{} })), + views: []View{defaultView}, + inst: instruments[InstrumentKindSyncCounter], }, { name: "default agg should use reader", reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), - views: []view.View{defaultAggView}, - inst: instruments[view.SyncUpDownCounter], + views: []View{defaultAggView}, + inst: instruments[InstrumentKindSyncUpDownCounter], wantKind: internal.NewDeltaSum[N](false), wantLen: 1, }, { name: "default agg should use reader", reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), - views: []view.View{defaultAggView}, - inst: instruments[view.SyncHistogram], + views: []View{defaultAggView}, + inst: instruments[InstrumentKindSyncHistogram], wantKind: internal.NewDeltaHistogram[N](aggregation.ExplicitBucketHistogram{}), wantLen: 1, }, { name: "default agg should use reader", reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), - views: []view.View{defaultAggView}, - inst: instruments[view.AsyncCounter], + views: []View{defaultAggView}, + inst: instruments[InstrumentKindAsyncCounter], wantKind: internal.NewPrecomputedDeltaSum[N](true), wantLen: 1, }, { name: "default agg should use reader", reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), - views: []view.View{defaultAggView}, - inst: instruments[view.AsyncUpDownCounter], + views: []View{defaultAggView}, + inst: instruments[InstrumentKindAsyncUpDownCounter], wantKind: internal.NewPrecomputedDeltaSum[N](false), wantLen: 1, }, { name: "default agg should use reader", reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), - views: []view.View{defaultAggView}, - inst: instruments[view.AsyncGauge], + views: []View{defaultAggView}, + inst: instruments[InstrumentKindAsyncGauge], wantKind: internal.NewLastValue[N](), wantLen: 1, }, { name: "default agg should use reader", reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), - views: []view.View{defaultAggView}, - inst: instruments[view.SyncCounter], + views: []View{defaultAggView}, + inst: instruments[InstrumentKindSyncCounter], wantKind: internal.NewDeltaSum[N](true), wantLen: 1, }, { name: "reader should set default agg", reader: NewManualReader(), - views: []view.View{{}}, - inst: instruments[view.SyncUpDownCounter], + views: []View{defaultView}, + inst: instruments[InstrumentKindSyncUpDownCounter], wantKind: internal.NewCumulativeSum[N](false), wantLen: 1, }, { name: "reader should set default agg", reader: NewManualReader(), - views: []view.View{{}}, - inst: instruments[view.SyncHistogram], + views: []View{defaultView}, + inst: instruments[InstrumentKindSyncHistogram], wantKind: internal.NewCumulativeHistogram[N](aggregation.ExplicitBucketHistogram{}), wantLen: 1, }, { name: "reader should set default agg", reader: NewManualReader(), - views: []view.View{{}}, - inst: instruments[view.AsyncCounter], + views: []View{defaultView}, + inst: instruments[InstrumentKindAsyncCounter], wantKind: internal.NewPrecomputedCumulativeSum[N](true), wantLen: 1, }, { name: "reader should set default agg", reader: NewManualReader(), - views: []view.View{{}}, - inst: instruments[view.AsyncUpDownCounter], + views: []View{defaultView}, + inst: instruments[InstrumentKindAsyncUpDownCounter], wantKind: internal.NewPrecomputedCumulativeSum[N](false), wantLen: 1, }, { name: "reader should set default agg", reader: NewManualReader(), - views: []view.View{{}}, - inst: instruments[view.AsyncGauge], + views: []View{defaultView}, + inst: instruments[InstrumentKindAsyncGauge], wantKind: internal.NewLastValue[N](), wantLen: 1, }, { name: "reader should set default agg", reader: NewManualReader(), - views: []view.View{{}}, - inst: instruments[view.SyncCounter], + views: []View{defaultView}, + inst: instruments[InstrumentKindSyncCounter], wantKind: internal.NewCumulativeSum[N](true), wantLen: 1, }, { name: "view should overwrite reader", reader: NewManualReader(), - views: []view.View{changeAggView}, - inst: instruments[view.SyncCounter], + views: []View{changeAggView}, + inst: instruments[InstrumentKindSyncCounter], wantKind: internal.NewCumulativeHistogram[N](aggregation.ExplicitBucketHistogram{}), wantLen: 1, }, { name: "multiple views should create multiple aggregators", reader: NewManualReader(), - views: []view.View{{}, renameView}, - inst: instruments[view.SyncCounter], + views: []View{defaultView, renameView}, + inst: instruments[InstrumentKindSyncCounter], wantKind: internal.NewCumulativeSum[N](true), wantLen: 2, }, { name: "reader with invalid aggregation should error", - reader: NewManualReader(WithAggregationSelector(func(ik view.InstrumentKind) aggregation.Aggregation { return aggregation.Default{} })), - views: []view.View{{}}, - inst: instruments[view.SyncCounter], + reader: NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.Default{} })), + views: []View{defaultView}, + inst: instruments[InstrumentKindSyncCounter], wantErr: errCreatingAggregators, }, { name: "view with invalid aggregation should error", reader: NewManualReader(), - views: []view.View{invalidAggView}, - inst: instruments[view.SyncCounter], + views: []View{invalidAggView}, + inst: instruments[InstrumentKindSyncCounter], wantErr: errCreatingAggregators, }, } - s := instrumentation.Scope{Name: "testCreateAggregators"} for _, tt := range testcases { t.Run(tt.name, func(t *testing.T) { c := newInstrumentCache[N](nil, nil) - i := newInserter(s, newPipeline(nil, tt.reader, tt.views), c) + i := newInserter(newPipeline(nil, tt.reader, tt.views), c) got, err := i.Instrument(tt.inst) assert.ErrorIs(t, err, tt.wantErr) require.Len(t, got, tt.wantLen) @@ -230,11 +229,10 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { func testInvalidInstrumentShouldPanic[N int64 | float64]() { c := newInstrumentCache[N](nil, nil) - s := instrumentation.Scope{Name: "testInvalidInstrumentShouldPanic"} - i := newInserter(s, newPipeline(nil, NewManualReader(), []view.View{{}}), c) - inst := instProviderKey{ + i := newInserter(newPipeline(nil, NewManualReader(), []View{defaultView}), c) + inst := Instrument{ Name: "foo", - Kind: view.InstrumentKind(255), + Kind: InstrumentKind(255), } _, _ = i.Instrument(inst) } @@ -250,57 +248,52 @@ func TestCreateAggregators(t *testing.T) { } func TestPipelineRegistryCreateAggregators(t *testing.T) { - renameView, _ := view.New( - view.MatchInstrumentName("foo"), - view.WithRename("bar"), - ) + renameView := NewView(Instrument{Name: "foo"}, Stream{Name: "bar"}) testRdr := NewManualReader() - testRdrHistogram := NewManualReader(WithAggregationSelector(func(ik view.InstrumentKind) aggregation.Aggregation { return aggregation.ExplicitBucketHistogram{} })) + testRdrHistogram := NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.ExplicitBucketHistogram{} })) testCases := []struct { name string readers []Reader - views []view.View - inst view.Instrument + views []View + inst Instrument wantCount int }{ { name: "No views have no aggregators", - inst: view.Instrument{Name: "foo"}, + inst: Instrument{Name: "foo"}, }, { name: "1 reader 1 view gets 1 aggregator", - inst: view.Instrument{Name: "foo"}, + inst: Instrument{Name: "foo"}, readers: []Reader{testRdr}, - views: []view.View{{}}, wantCount: 1, }, { name: "1 reader 2 views gets 2 aggregator", - inst: view.Instrument{Name: "foo"}, + inst: Instrument{Name: "foo"}, readers: []Reader{testRdr}, - views: []view.View{{}, renameView}, + views: []View{defaultView, renameView}, wantCount: 2, }, { name: "2 readers 1 view each gets 2 aggregators", - inst: view.Instrument{Name: "foo"}, + inst: Instrument{Name: "foo"}, readers: []Reader{testRdr, testRdrHistogram}, - views: []view.View{{}}, wantCount: 2, }, { name: "2 reader 2 views each gets 4 aggregators", - inst: view.Instrument{Name: "foo"}, + inst: Instrument{Name: "foo"}, readers: []Reader{testRdr, testRdrHistogram}, - views: []view.View{{}, renameView}, + views: []View{defaultView, renameView}, wantCount: 4, }, { name: "An instrument is duplicated in two views share the same aggregator", - inst: view.Instrument{Name: "foo"}, + inst: Instrument{Name: "foo"}, readers: []Reader{testRdr}, - views: []view.View{{}, {}}, + views: []View{defaultView, defaultView}, wantCount: 1, }, } @@ -315,11 +308,9 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { } func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCount int) { - inst := instProviderKey{Name: "foo", Kind: view.SyncCounter} - + inst := Instrument{Name: "foo", Kind: InstrumentKindSyncCounter} c := newInstrumentCache[int64](nil, nil) - s := instrumentation.Scope{Name: "testPipelineRegistryResolveIntAggregators"} - r := newResolver(s, p, c) + r := newResolver(p, c) aggs, err := r.Aggregators(inst) assert.NoError(t, err) @@ -327,11 +318,9 @@ func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCo } func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, wantCount int) { - inst := instProviderKey{Name: "foo", Kind: view.SyncCounter} - + inst := Instrument{Name: "foo", Kind: InstrumentKindSyncCounter} c := newInstrumentCache[float64](nil, nil) - s := instrumentation.Scope{Name: "testPipelineRegistryResolveFloatAggregators"} - r := newResolver(s, p, c) + r := newResolver(p, c) aggs, err := r.Aggregators(inst) assert.NoError(t, err) @@ -339,10 +328,9 @@ func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, want } func TestPipelineRegistryResource(t *testing.T) { - v, err := view.New(view.MatchInstrumentName("bar"), view.WithRename("foo")) - require.NoError(t, err) + v := NewView(Instrument{Name: "bar"}, Stream{Name: "foo"}) readers := []Reader{NewManualReader()} - views := []view.View{{}, v} + views := []View{defaultView, v} res := resource.NewSchemaless(attribute.String("key", "val")) pipes := newPipelines(res, readers, views) for _, p := range pipes { @@ -351,21 +339,20 @@ func TestPipelineRegistryResource(t *testing.T) { } func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { - testRdrHistogram := NewManualReader(WithAggregationSelector(func(ik view.InstrumentKind) aggregation.Aggregation { return aggregation.ExplicitBucketHistogram{} })) + testRdrHistogram := NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.ExplicitBucketHistogram{} })) readers := []Reader{testRdrHistogram} - views := []view.View{{}} + views := []View{defaultView} p := newPipelines(resource.Empty(), readers, views) - inst := instProviderKey{Name: "foo", Kind: view.AsyncGauge} + inst := Instrument{Name: "foo", Kind: InstrumentKindAsyncGauge} vc := cache[string, instrumentID]{} - s := instrumentation.Scope{Name: "TestPipelineRegistryCreateAggregatorsIncompatibleInstrument"} - ri := newResolver(s, p, newInstrumentCache[int64](nil, &vc)) + ri := newResolver(p, newInstrumentCache[int64](nil, &vc)) intAggs, err := ri.Aggregators(inst) assert.Error(t, err) assert.Len(t, intAggs, 0) - rf := newResolver(s, p, newInstrumentCache[float64](nil, &vc)) + rf := newResolver(p, newInstrumentCache[float64](nil, &vc)) floatAggs, err := rf.Aggregators(inst) assert.Error(t, err) assert.Len(t, floatAggs, 0) @@ -401,21 +388,17 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) { l := &logCounter{LogSink: tLog.GetSink()} otel.SetLogger(logr.New(l)) - renameView, _ := view.New( - view.MatchInstrumentName("bar"), - view.WithRename("foo"), - ) + renameView := NewView(Instrument{Name: "bar"}, Stream{Name: "foo"}) readers := []Reader{NewManualReader()} - views := []view.View{{}, renameView} + views := []View{defaultView, renameView} - fooInst := instProviderKey{Name: "foo", Kind: view.SyncCounter} - barInst := instProviderKey{Name: "bar", Kind: view.SyncCounter} + fooInst := Instrument{Name: "foo", Kind: InstrumentKindSyncCounter} + barInst := Instrument{Name: "bar", Kind: InstrumentKindSyncCounter} p := newPipelines(resource.Empty(), readers, views) vc := cache[string, instrumentID]{} - s := instrumentation.Scope{Name: "TestPipelineRegistryCreateAggregatorsDuplicateErrors"} - ri := newResolver(s, p, newInstrumentCache[int64](nil, &vc)) + ri := newResolver(p, newInstrumentCache[int64](nil, &vc)) intAggs, err := ri.Aggregators(fooInst) assert.NoError(t, err) assert.Equal(t, 0, l.InfoN(), "no info logging should happen") @@ -430,13 +413,13 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) { // Creating a float foo instrument should log a warning because there is an // int foo instrument. - rf := newResolver(s, p, newInstrumentCache[float64](nil, &vc)) + rf := newResolver(p, newInstrumentCache[float64](nil, &vc)) floatAggs, err := rf.Aggregators(fooInst) assert.NoError(t, err) assert.Equal(t, 1, l.InfoN(), "instrument conflict not logged") assert.Len(t, floatAggs, 1) - fooInst = instProviderKey{Name: "foo-float", Kind: view.SyncCounter} + fooInst = Instrument{Name: "foo-float", Kind: InstrumentKindSyncCounter} floatAggs, err = rf.Aggregators(fooInst) assert.NoError(t, err) @@ -452,147 +435,147 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) { } func TestIsAggregatorCompatible(t *testing.T) { - var undefinedInstrument view.InstrumentKind + var undefinedInstrument InstrumentKind testCases := []struct { name string - kind view.InstrumentKind + kind InstrumentKind agg aggregation.Aggregation want error }{ { name: "SyncCounter and Drop", - kind: view.SyncCounter, + kind: InstrumentKindSyncCounter, agg: aggregation.Drop{}, }, { name: "SyncCounter and LastValue", - kind: view.SyncCounter, + kind: InstrumentKindSyncCounter, agg: aggregation.LastValue{}, want: errIncompatibleAggregation, }, { name: "SyncCounter and Sum", - kind: view.SyncCounter, + kind: InstrumentKindSyncCounter, agg: aggregation.Sum{}, }, { name: "SyncCounter and ExplicitBucketHistogram", - kind: view.SyncCounter, + kind: InstrumentKindSyncCounter, agg: aggregation.ExplicitBucketHistogram{}, }, { name: "SyncUpDownCounter and Drop", - kind: view.SyncUpDownCounter, + kind: InstrumentKindSyncUpDownCounter, agg: aggregation.Drop{}, }, { name: "SyncUpDownCounter and LastValue", - kind: view.SyncUpDownCounter, + kind: InstrumentKindSyncUpDownCounter, agg: aggregation.LastValue{}, want: errIncompatibleAggregation, }, { name: "SyncUpDownCounter and Sum", - kind: view.SyncUpDownCounter, + kind: InstrumentKindSyncUpDownCounter, agg: aggregation.Sum{}, }, { name: "SyncUpDownCounter and ExplicitBucketHistogram", - kind: view.SyncUpDownCounter, + kind: InstrumentKindSyncUpDownCounter, agg: aggregation.ExplicitBucketHistogram{}, want: errIncompatibleAggregation, }, { name: "SyncHistogram and Drop", - kind: view.SyncHistogram, + kind: InstrumentKindSyncHistogram, agg: aggregation.Drop{}, }, { name: "SyncHistogram and LastValue", - kind: view.SyncHistogram, + kind: InstrumentKindSyncHistogram, agg: aggregation.LastValue{}, want: errIncompatibleAggregation, }, { name: "SyncHistogram and Sum", - kind: view.SyncHistogram, + kind: InstrumentKindSyncHistogram, agg: aggregation.Sum{}, }, { name: "SyncHistogram and ExplicitBucketHistogram", - kind: view.SyncHistogram, + kind: InstrumentKindSyncHistogram, agg: aggregation.ExplicitBucketHistogram{}, }, { name: "AsyncCounter and Drop", - kind: view.AsyncCounter, + kind: InstrumentKindAsyncCounter, agg: aggregation.Drop{}, }, { name: "AsyncCounter and LastValue", - kind: view.AsyncCounter, + kind: InstrumentKindAsyncCounter, agg: aggregation.LastValue{}, want: errIncompatibleAggregation, }, { name: "AsyncCounter and Sum", - kind: view.AsyncCounter, + kind: InstrumentKindAsyncCounter, agg: aggregation.Sum{}, }, { name: "AsyncCounter and ExplicitBucketHistogram", - kind: view.AsyncCounter, + kind: InstrumentKindAsyncCounter, agg: aggregation.ExplicitBucketHistogram{}, want: errIncompatibleAggregation, }, { name: "AsyncUpDownCounter and Drop", - kind: view.AsyncUpDownCounter, + kind: InstrumentKindAsyncUpDownCounter, agg: aggregation.Drop{}, }, { name: "AsyncUpDownCounter and LastValue", - kind: view.AsyncUpDownCounter, + kind: InstrumentKindAsyncUpDownCounter, agg: aggregation.LastValue{}, want: errIncompatibleAggregation, }, { name: "AsyncUpDownCounter and Sum", - kind: view.AsyncUpDownCounter, + kind: InstrumentKindAsyncUpDownCounter, agg: aggregation.Sum{}, }, { name: "AsyncUpDownCounter and ExplicitBucketHistogram", - kind: view.AsyncUpDownCounter, + kind: InstrumentKindAsyncUpDownCounter, agg: aggregation.ExplicitBucketHistogram{}, want: errIncompatibleAggregation, }, { name: "AsyncGauge and Drop", - kind: view.AsyncGauge, + kind: InstrumentKindAsyncGauge, agg: aggregation.Drop{}, }, { name: "AsyncGauge and aggregation.LastValue{}", - kind: view.AsyncGauge, + kind: InstrumentKindAsyncGauge, agg: aggregation.LastValue{}, }, { name: "AsyncGauge and Sum", - kind: view.AsyncGauge, + kind: InstrumentKindAsyncGauge, agg: aggregation.Sum{}, want: errIncompatibleAggregation, }, { name: "AsyncGauge and ExplicitBucketHistogram", - kind: view.AsyncGauge, + kind: InstrumentKindAsyncGauge, agg: aggregation.ExplicitBucketHistogram{}, want: errIncompatibleAggregation, }, { name: "Default aggregation should error", - kind: view.SyncCounter, + kind: InstrumentKindSyncCounter, agg: aggregation.Default{}, want: errUnknownAggregation, }, diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index 678d173c4e6..fe702a2d1ab 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -28,7 +28,6 @@ import ( "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" - "go.opentelemetry.io/otel/sdk/metric/view" "go.opentelemetry.io/otel/sdk/resource" ) @@ -134,18 +133,14 @@ func TestDefaultViewImplicit(t *testing.T) { } func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) { - scope := instrumentation.Scope{Name: "testing/lib"} - inst := instProviderKey{ + inst := Instrument{ Name: "requests", Description: "count of requests received", - Kind: view.SyncCounter, + Kind: InstrumentKindSyncCounter, Unit: unit.Dimensionless, } return func(t *testing.T) { reader := NewManualReader() - v, err := view.New(view.MatchInstrumentName("foo"), view.WithRename("bar")) - require.NoError(t, err) - tests := []struct { name string pipe *pipeline @@ -156,14 +151,16 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) { }, { name: "NoMatchingView", - pipe: newPipeline(nil, reader, []view.View{v}), + pipe: newPipeline(nil, reader, []View{ + NewView(Instrument{Name: "foo"}, Stream{Name: "bar"}), + }), }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { c := newInstrumentCache[N](nil, nil) - i := newInserter(scope, test.pipe, c) + i := newInserter(test.pipe, c) got, err := i.Instrument(inst) require.NoError(t, err) assert.Len(t, got, 1, "default view not applied") diff --git a/sdk/metric/reader.go b/sdk/metric/reader.go index 53c13c4bffe..aa9d50ef666 100644 --- a/sdk/metric/reader.go +++ b/sdk/metric/reader.go @@ -20,7 +20,6 @@ import ( "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/metric/view" ) // errDuplicateRegister is logged by a Reader when an attempt to registered it @@ -55,10 +54,10 @@ type Reader interface { register(producer) // temporality reports the Temporality for the instrument kind provided. - temporality(view.InstrumentKind) metricdata.Temporality + temporality(InstrumentKind) metricdata.Temporality // aggregation returns what Aggregation to use for an instrument kind. - aggregation(view.InstrumentKind) aggregation.Aggregation // nolint:revive // import-shadow for method scoped by type. + aggregation(InstrumentKind) aggregation.Aggregation // nolint:revive // import-shadow for method scoped by type. // Collect gathers and returns all metric data related to the Reader from // the SDK. An error is returned if this is called after Shutdown. @@ -108,18 +107,18 @@ func (p shutdownProducer) produce(context.Context) (metricdata.ResourceMetrics, } // TemporalitySelector selects the temporality to use based on the InstrumentKind. -type TemporalitySelector func(view.InstrumentKind) metricdata.Temporality +type TemporalitySelector func(InstrumentKind) metricdata.Temporality // DefaultTemporalitySelector is the default TemporalitySelector used if // WithTemporalitySelector is not provided. CumulativeTemporality will be used // for all instrument kinds if this TemporalitySelector is used. -func DefaultTemporalitySelector(view.InstrumentKind) metricdata.Temporality { +func DefaultTemporalitySelector(InstrumentKind) metricdata.Temporality { return metricdata.CumulativeTemporality } // AggregationSelector selects the aggregation and the parameters to use for // that aggregation based on the InstrumentKind. -type AggregationSelector func(view.InstrumentKind) aggregation.Aggregation +type AggregationSelector func(InstrumentKind) aggregation.Aggregation // DefaultAggregationSelector returns the default aggregation and parameters // that will be used to summarize measurement made from an instrument of @@ -127,13 +126,13 @@ type AggregationSelector func(view.InstrumentKind) aggregation.Aggregation // mapping: Counter ⇨ Sum, Asynchronous Counter ⇨ Sum, UpDownCounter ⇨ Sum, // Asynchronous UpDownCounter ⇨ Sum, Asynchronous Gauge ⇨ LastValue, // Histogram ⇨ ExplicitBucketHistogram. -func DefaultAggregationSelector(ik view.InstrumentKind) aggregation.Aggregation { +func DefaultAggregationSelector(ik InstrumentKind) aggregation.Aggregation { switch ik { - case view.SyncCounter, view.SyncUpDownCounter, view.AsyncCounter, view.AsyncUpDownCounter: + case InstrumentKindSyncCounter, InstrumentKindSyncUpDownCounter, InstrumentKindAsyncCounter, InstrumentKindAsyncUpDownCounter: return aggregation.Sum{} - case view.AsyncGauge: + case InstrumentKindAsyncGauge: return aggregation.LastValue{} - case view.SyncHistogram: + case InstrumentKindSyncHistogram: return aggregation.ExplicitBucketHistogram{ Boundaries: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, NoMinMax: false, diff --git a/sdk/metric/reader_test.go b/sdk/metric/reader_test.go index 43f923a1564..28b249bd3e2 100644 --- a/sdk/metric/reader_test.go +++ b/sdk/metric/reader_test.go @@ -29,7 +29,6 @@ import ( "go.opentelemetry.io/otel/metric/unit" "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/metric/view" "go.opentelemetry.io/otel/sdk/resource" ) @@ -205,16 +204,16 @@ func benchReaderCollectFunc(r Reader) func(*testing.B) { } func TestDefaultAggregationSelector(t *testing.T) { - var undefinedInstrument view.InstrumentKind + var undefinedInstrument InstrumentKind assert.Panics(t, func() { DefaultAggregationSelector(undefinedInstrument) }) - iKinds := []view.InstrumentKind{ - view.SyncCounter, - view.SyncUpDownCounter, - view.SyncHistogram, - view.AsyncCounter, - view.AsyncUpDownCounter, - view.AsyncGauge, + iKinds := []InstrumentKind{ + InstrumentKindSyncCounter, + InstrumentKindSyncUpDownCounter, + InstrumentKindSyncHistogram, + InstrumentKindAsyncCounter, + InstrumentKindAsyncUpDownCounter, + InstrumentKindAsyncGauge, } for _, ik := range iKinds { @@ -223,15 +222,15 @@ func TestDefaultAggregationSelector(t *testing.T) { } func TestDefaultTemporalitySelector(t *testing.T) { - var undefinedInstrument view.InstrumentKind - for _, ik := range []view.InstrumentKind{ + var undefinedInstrument InstrumentKind + for _, ik := range []InstrumentKind{ undefinedInstrument, - view.SyncCounter, - view.SyncUpDownCounter, - view.SyncHistogram, - view.AsyncCounter, - view.AsyncUpDownCounter, - view.AsyncGauge, + InstrumentKindSyncCounter, + InstrumentKindSyncUpDownCounter, + InstrumentKindSyncHistogram, + InstrumentKindAsyncCounter, + InstrumentKindAsyncUpDownCounter, + InstrumentKindAsyncGauge, } { assert.Equal(t, metricdata.CumulativeTemporality, DefaultTemporalitySelector(ik)) }