Skip to content

Commit

Permalink
Add Temporality/Aggregation to http/grpc otlp clients
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias committed Oct 4, 2022
1 parent b9d0022 commit a2af7a1
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 0 deletions.
42 changes: 42 additions & 0 deletions exporters/otlp/otlpmetric/internal/oconf/options.go
Expand Up @@ -27,6 +27,10 @@ import (

"go.opentelemetry.io/otel/exporters/otlp/internal"
"go.opentelemetry.io/otel/exporters/otlp/internal/retry"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/view"
)

const (
Expand Down Expand Up @@ -57,6 +61,9 @@ type (

// gRPC configurations
GRPCCredentials credentials.TransportCredentials

TemporalitySelector metric.TemporalitySelector
AggregationSelector metric.AggregationSelector
}

Config struct {
Expand All @@ -82,6 +89,9 @@ func NewHTTPConfig(opts ...HTTPOption) Config {
URLPath: DefaultMetricsPath,
Compression: NoCompression,
Timeout: DefaultTimeout,

TemporalitySelector: metric.DefaultTemporalitySelector,
AggregationSelector: metric.DefaultAggregationSelector,
},
RetryConfig: retry.DefaultConfig,
}
Expand All @@ -102,6 +112,9 @@ func NewGRPCConfig(opts ...GRPCOption) Config {
URLPath: DefaultMetricsPath,
Compression: NoCompression,
Timeout: DefaultTimeout,

TemporalitySelector: metric.DefaultTemporalitySelector,
AggregationSelector: metric.DefaultAggregationSelector,
},
RetryConfig: retry.DefaultConfig,
}
Expand Down Expand Up @@ -312,3 +325,32 @@ func WithTimeout(duration time.Duration) GenericOption {
return cfg
})
}

func WithTemporalitySelector(selector metric.TemporalitySelector) GenericOption {
return newGenericOption(func(cfg Config) Config {
cfg.Metrics.TemporalitySelector = selector
return cfg
})
}

func WithAggregationSelector(selector metric.AggregationSelector) GenericOption {
// Deep copy and validate before using.
wrapped := func(ik view.InstrumentKind) aggregation.Aggregation {
a := selector(ik)
cpA := a.Copy()
if err := cpA.Err(); err != nil {
cpA = metric.DefaultAggregationSelector(ik)
global.Error(
err, "using default aggregation instead",
"aggregation", a,
"replacement", cpA,
)
}
return cpA
}

return newGenericOption(func(cfg Config) Config {
cfg.Metrics.AggregationSelector = wrapped
return cfg
})
}
19 changes: 19 additions & 0 deletions exporters/otlp/otlpmetric/otlpmetricgrpc/client.go
Expand Up @@ -28,6 +28,9 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)
Expand All @@ -53,6 +56,9 @@ type client struct {
exportTimeout time.Duration
requestFunc retry.RequestFunc

temporalitySelector metric.TemporalitySelector
aggregationSelector metric.AggregationSelector

// ourConn keeps track of where conn was created: true if created here in
// NewClient, or false if passed with an option. This is important on
// Shutdown as the conn should only be closed if we created it. Otherwise,
Expand All @@ -70,6 +76,9 @@ func newClient(ctx context.Context, options ...Option) (otlpmetric.Client, error
exportTimeout: cfg.Metrics.Timeout,
requestFunc: cfg.RetryConfig.RequestFunc(retryable),
conn: cfg.GRPCConn,

temporalitySelector: cfg.Metrics.TemporalitySelector,
aggregationSelector: cfg.Metrics.AggregationSelector,
}

if len(cfg.Metrics.Headers) > 0 {
Expand All @@ -94,6 +103,16 @@ func newClient(ctx context.Context, options ...Option) (otlpmetric.Client, error
return c, nil
}

// Temporality returns the Temporality to use for an instrument kind.
func (c *client) Temporality(k view.InstrumentKind) metricdata.Temporality {
return c.temporalitySelector(k)
}

// Aggregation returns the Aggregation to use for an instrument kind.
func (c *client) Aggregation(k view.InstrumentKind) aggregation.Aggregation {
return c.aggregationSelector(k)
}

// ForceFlush does nothing, the client holds no state.
func (c *client) ForceFlush(ctx context.Context) error { return ctx.Err() }

Expand Down
18 changes: 18 additions & 0 deletions exporters/otlp/otlpmetric/otlpmetricgrpc/config.go
Expand Up @@ -24,6 +24,7 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/internal/retry"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf"
"go.opentelemetry.io/otel/sdk/metric"
)

// Option applies a configuration option to the Exporter.
Expand Down Expand Up @@ -236,3 +237,20 @@ func WithTimeout(duration time.Duration) Option {
func WithRetry(settings RetryConfig) Option {
return wrappedOption{oconf.WithRetry(retry.Config(settings))}
}

// WithTemporalitySelector sets the TemporalitySelector the client will use to
// determine the Temporality of an instrument based on its kind. If this option
// is not used, the client will use the DefaultTemporalitySelector from the
// go.opentelemetry.io/otel/sdk/metric package.
func WithTemporalitySelector(selector metric.TemporalitySelector) Option {
return wrappedOption{oconf.WithTemporalitySelector(selector)}
}

// WithAggregationSelector sets the AggregationSelector the client will use to
// determine the aggregation to use for an instrument based on its kind. If
// this option is not used, the reader will use the DefaultAggregationSelector
// from the go.opentelemetry.io/otel/sdk/metric package, or the aggregation
// explicitly passed for a view matching an instrument.
func WithAggregationSelector(selector metric.AggregationSelector) Option {
return wrappedOption{oconf.WithAggregationSelector(selector)}
}
19 changes: 19 additions & 0 deletions exporters/otlp/otlpmetric/otlpmetrichttp/client.go
Expand Up @@ -33,6 +33,9 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)
Expand All @@ -54,6 +57,9 @@ type client struct {
compression Compression
requestFunc retry.RequestFunc
httpClient *http.Client

temporalitySelector metric.TemporalitySelector
aggregationSelector metric.AggregationSelector
}

// Keep it in sync with golang's DefaultTransport from net/http! We
Expand Down Expand Up @@ -113,9 +119,22 @@ func newClient(opts ...Option) (otlpmetric.Client, error) {
req: req,
requestFunc: cfg.RetryConfig.RequestFunc(evaluate),
httpClient: httpClient,

temporalitySelector: cfg.Metrics.TemporalitySelector,
aggregationSelector: cfg.Metrics.AggregationSelector,
}, nil
}

// Temporality returns the Temporality to use for an instrument kind.
func (c *client) Temporality(k view.InstrumentKind) metricdata.Temporality {
return c.temporalitySelector(k)
}

// Aggregation returns the Aggregation to use for an instrument kind.
func (c *client) Aggregation(k view.InstrumentKind) aggregation.Aggregation {
return c.aggregationSelector(k)
}

// ForceFlush does nothing, the client holds no state.
func (c *client) ForceFlush(ctx context.Context) error { return ctx.Err() }

Expand Down
18 changes: 18 additions & 0 deletions exporters/otlp/otlpmetric/otlpmetrichttp/config.go
Expand Up @@ -20,6 +20,7 @@ import (

"go.opentelemetry.io/otel/exporters/otlp/internal/retry"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf"
"go.opentelemetry.io/otel/sdk/metric"
)

// Compression describes the compression used for payloads sent to the
Expand Down Expand Up @@ -179,3 +180,20 @@ func WithTimeout(duration time.Duration) Option {
func WithRetry(rc RetryConfig) Option {
return wrappedOption{oconf.WithRetry(retry.Config(rc))}
}

// WithTemporalitySelector sets the TemporalitySelector the client will use to
// determine the Temporality of an instrument based on its kind. If this option
// is not used, the client will use the DefaultTemporalitySelector from the
// go.opentelemetry.io/otel/sdk/metric package.
func WithTemporalitySelector(selector metric.TemporalitySelector) Option {
return wrappedOption{oconf.WithTemporalitySelector(selector)}
}

// WithAggregationSelector sets the AggregationSelector the client will use to
// determine the aggregation to use for an instrument based on its kind. If
// this option is not used, the reader will use the DefaultAggregationSelector
// from the go.opentelemetry.io/otel/sdk/metric package, or the aggregation
// explicitly passed for a view matching an instrument.
func WithAggregationSelector(selector metric.AggregationSelector) Option {
return wrappedOption{oconf.WithAggregationSelector(selector)}
}

0 comments on commit a2af7a1

Please sign in to comment.