Skip to content

Commit

Permalink
Add Temporality/Aggregation to otlpmetric exp
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias committed Oct 4, 2022
1 parent fc30414 commit b9d0022
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 2 deletions.
9 changes: 9 additions & 0 deletions exporters/otlp/otlpmetric/client.go
Expand Up @@ -17,11 +17,20 @@ package otlpmetric // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric
import (
"context"

"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
mpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)

// Client handles the transmission of OTLP data to an OTLP receiving endpoint.
type Client interface {
// Temporality returns the Temporality to use for an instrument kind.
Temporality(view.InstrumentKind) metricdata.Temporality

// Aggregation returns the Aggregation to use for an instrument kind.
Aggregation(view.InstrumentKind) aggregation.Aggregation

// UploadMetrics transmits metric data to an OTLP receiver.
//
// All retry logic must be handled by UploadMetrics alone, the Exporter
Expand Down
34 changes: 32 additions & 2 deletions exporters/otlp/otlpmetric/exporter.go
Expand Up @@ -21,7 +21,9 @@ import (

"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/transform"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
mpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)

Expand All @@ -34,6 +36,20 @@ type exporter struct {
shutdownOnce sync.Once
}

// Temporality returns the Temporality to use for an instrument kind.
func (e *exporter) Temporality(k view.InstrumentKind) metricdata.Temporality {
e.clientMu.Lock()
defer e.clientMu.Unlock()
return e.client.Temporality(k)
}

// Aggregation returns the Aggregation to use for an instrument kind.
func (e *exporter) Aggregation(k view.InstrumentKind) aggregation.Aggregation {
e.clientMu.Lock()
defer e.clientMu.Unlock()
return e.client.Aggregation(k)
}

// Export transforms and transmits metric data to an OTLP receiver.
func (e *exporter) Export(ctx context.Context, rm metricdata.ResourceMetrics) error {
otlpRm, err := transform.ResourceMetrics(rm)
Expand Down Expand Up @@ -68,7 +84,10 @@ func (e *exporter) Shutdown(ctx context.Context) error {
e.shutdownOnce.Do(func() {
e.clientMu.Lock()
client := e.client
e.client = shutdownClient{}
e.client = shutdownClient{
temporalitySelector: client.Temporality,
aggregationSelector: client.Aggregation,
}
e.clientMu.Unlock()
err = client.Shutdown(ctx)
})
Expand All @@ -82,7 +101,10 @@ func New(client Client) metric.Exporter {
return &exporter{client: client}
}

type shutdownClient struct{}
type shutdownClient struct {
temporalitySelector metric.TemporalitySelector
aggregationSelector metric.AggregationSelector
}

func (c shutdownClient) err(ctx context.Context) error {
if err := ctx.Err(); err != nil {
Expand All @@ -91,6 +113,14 @@ func (c shutdownClient) err(ctx context.Context) error {
return errShutdown
}

func (c shutdownClient) Temporality(k view.InstrumentKind) metricdata.Temporality {
return c.temporalitySelector(k)
}

func (c shutdownClient) Aggregation(k view.InstrumentKind) aggregation.Aggregation {
return c.aggregationSelector(k)
}

func (c shutdownClient) UploadMetrics(ctx context.Context, _ *mpb.ResourceMetrics) error {
return c.err(ctx)
}
Expand Down
11 changes: 11 additions & 0 deletions exporters/otlp/otlpmetric/exporter_test.go
Expand Up @@ -21,7 +21,10 @@ import (

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
mpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)

Expand All @@ -31,6 +34,14 @@ type client struct {
n int
}

func (c *client) Temporality(k view.InstrumentKind) metricdata.Temporality {
return metric.DefaultTemporalitySelector(k)
}

func (c *client) Aggregation(k view.InstrumentKind) aggregation.Aggregation {
return metric.DefaultAggregationSelector(k)
}

func (c *client) UploadMetrics(context.Context, *mpb.ResourceMetrics) error {
c.n++
return nil
Expand Down

0 comments on commit b9d0022

Please sign in to comment.