Skip to content

Commit

Permalink
Instrument obsreport.receiver metrics with otel go (#6222)
Browse files Browse the repository at this point in the history
* Instrument obs_receiver with otel go

* add changelog entry

* remove views configuration

* move otel-go metrics to obsreport

* add UseOtelForMetrics to TelemetrySettings; extract otel sdk initialization changes to another pr; remove public InstrumentWithOtel from obsreport;

* default UseOtelForMetrics to false

* remove duplicate oc registry initialization

* move `UseOtelForMetrics` to the internal package `obsreportconfig`

* address comments

* only register view if featuregate is disabled

* prefix meter name with `receiver/`

* change meter scope name

* upgrade otel metric sdk

* run make gotidy

* update changelog

* revert: otel go version upgrade

* revert: x/sys upgrades
  • Loading branch information
paivagustavo committed Oct 12, 2022
1 parent 2118622 commit a9f41a2
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 52 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

### 💡 Enhancements 💡

- Instrument `obsreport.Receiver` metrics with otel-go (#6222)

## v0.62.0 Beta

### 🛑 Breaking changes 🛑
Expand Down Expand Up @@ -55,7 +59,7 @@
- Add config marshaler (#5566)
- Add semantic conventions for specification v1.10-v1.13 (#6213)
- `receiver/otlp`: Make logs related to gRCPC and HTTP server startup clearer (#6174)
- Add prometheus metric prefix and constant service attributes to Collector's own telemetry when using OpenTelemetry for internal telemetry (#6223)
- Add prometheus metric prefix to Collector's own telemetry when using OpenTelemetry for internal telemetry (#6223)
- `exporter/logging`: Apply consistent rendering of map values (#6244)
- Add support in the confmap.Resolver to expand embedded config URIs inside configuration. (#6276)

Expand Down
57 changes: 45 additions & 12 deletions internal/obsreportconfig/obsreportconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,30 @@ import (
"go.opencensus.io/tag"

"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
)

const (
// UseOtelForInternalMetricsfeatureGateID is the feature gate ID that controls whether the collector uses open
// telemetrySettings for internal metrics.
UseOtelForInternalMetricsfeatureGateID = "telemetry.useOtelForInternalMetrics"
)

func init() {
// register feature gate
RegisterInternalMetricFeatureGate(featuregate.GetRegistry())
}

// RegisterInternalMetricFeatureGate registers the Internal Metric feature gate to the passed in registry
func RegisterInternalMetricFeatureGate(registry *featuregate.Registry) {
registry.MustRegister(featuregate.Gate{
ID: UseOtelForInternalMetricsfeatureGateID,
Description: "controls whether the collector uses OpenTelemetry for internal metrics",
Enabled: false,
})
}

// ObsMetrics wraps OpenCensus View for Collector observability metrics
type ObsMetrics struct {
Views []*view.View
Expand All @@ -43,19 +64,11 @@ func Configure(level configtelemetry.Level) *ObsMetrics {
// allViews return the list of all views that needs to be configured.
func allViews() []*view.View {
var views []*view.View
var measures []*stats.Int64Measure
var tagKeys []tag.Key

// Receiver views.
measures := []*stats.Int64Measure{
obsmetrics.ReceiverAcceptedSpans,
obsmetrics.ReceiverRefusedSpans,
obsmetrics.ReceiverAcceptedMetricPoints,
obsmetrics.ReceiverRefusedMetricPoints,
obsmetrics.ReceiverAcceptedLogRecords,
obsmetrics.ReceiverRefusedLogRecords,
}
tagKeys := []tag.Key{
obsmetrics.TagKeyReceiver, obsmetrics.TagKeyTransport,
}
views = append(views, genViews(measures, tagKeys, view.Sum())...)
views = append(views, receiverViews()...)

// Scraper views.
measures = []*stats.Int64Measure{
Expand Down Expand Up @@ -103,6 +116,26 @@ func allViews() []*view.View {
return views
}

func receiverViews() []*view.View {
if featuregate.GetRegistry().IsEnabled(UseOtelForInternalMetricsfeatureGateID) {
return nil
}

measures := []*stats.Int64Measure{
obsmetrics.ReceiverAcceptedSpans,
obsmetrics.ReceiverRefusedSpans,
obsmetrics.ReceiverAcceptedMetricPoints,
obsmetrics.ReceiverRefusedMetricPoints,
obsmetrics.ReceiverAcceptedLogRecords,
obsmetrics.ReceiverRefusedLogRecords,
}
tagKeys := []tag.Key{
obsmetrics.TagKeyReceiver, obsmetrics.TagKeyTransport,
}

return genViews(measures, tagKeys, view.Sum())
}

func genViews(
measures []*stats.Int64Measure,
tagKeys []tag.Key,
Expand Down
6 changes: 6 additions & 0 deletions obsreport/obsreport.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ import (
"go.opentelemetry.io/otel/trace"
)

const (
scopeName = "go.opentelemetry.io/collector/obsreport"

nameSep = "/"
)

func recordError(span trace.Span, err error) {
if err != nil {
span.SetStatus(codes.Error, err.Error())
Expand Down
158 changes: 140 additions & 18 deletions obsreport/obsreport_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,27 @@ import (
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
"go.opentelemetry.io/otel/metric/unit"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/internal/obsreportconfig"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
)

const (
receiverName = "receiver"

receiverScope = scopeName + nameSep + receiverName
)

// Receiver is a helper to add observability to a component.Receiver.
type Receiver struct {
level configtelemetry.Level
Expand All @@ -36,6 +49,18 @@ type Receiver struct {
longLivedCtx bool
mutators []tag.Mutator
tracer trace.Tracer
meter metric.Meter
logger *zap.Logger

useOtelForMetrics bool
otelAttrs []attribute.KeyValue

acceptedSpansCounter syncint64.Counter
refusedSpansCounter syncint64.Counter
acceptedMetricPointsCounter syncint64.Counter
refusedMetricPointsCounter syncint64.Counter
acceptedLogRecordsCounter syncint64.Counter
refusedLogRecordsCounter syncint64.Counter
}

// ReceiverSettings are settings for creating an Receiver.
Expand All @@ -53,7 +78,7 @@ type ReceiverSettings struct {

// NewReceiver creates a new Receiver.
func NewReceiver(cfg ReceiverSettings) *Receiver {
return &Receiver{
rec := &Receiver{
level: cfg.ReceiverCreateSettings.TelemetrySettings.MetricsLevel,
spanNamePrefix: obsmetrics.ReceiverPrefix + cfg.ReceiverID.String(),
transport: cfg.Transport,
Expand All @@ -63,7 +88,74 @@ func NewReceiver(cfg ReceiverSettings) *Receiver {
tag.Upsert(obsmetrics.TagKeyTransport, cfg.Transport, tag.WithTTL(tag.TTLNoPropagation)),
},
tracer: cfg.ReceiverCreateSettings.TracerProvider.Tracer(cfg.ReceiverID.String()),
meter: cfg.ReceiverCreateSettings.MeterProvider.Meter(receiverScope),
logger: cfg.ReceiverCreateSettings.Logger,

useOtelForMetrics: featuregate.GetRegistry().IsEnabled(obsreportconfig.UseOtelForInternalMetricsfeatureGateID),
otelAttrs: []attribute.KeyValue{
attribute.String(obsmetrics.ReceiverKey, cfg.ReceiverID.String()),
attribute.String(obsmetrics.TransportKey, cfg.Transport),
},
}

rec.createOtelMetrics()

return rec
}

func (rec *Receiver) createOtelMetrics() {
if !rec.useOtelForMetrics {
return
}

var err error
handleError := func(metricName string, err error) {
if err != nil {
rec.logger.Warn("failed to create otel instrument", zap.Error(err), zap.String("metric", metricName))
}
}

rec.acceptedSpansCounter, err = rec.meter.SyncInt64().Counter(
obsmetrics.ReceiverPrefix+obsmetrics.AcceptedSpansKey,
instrument.WithDescription("Number of spans successfully pushed into the pipeline."),
instrument.WithUnit(unit.Dimensionless),
)
handleError(obsmetrics.ReceiverPrefix+obsmetrics.AcceptedSpansKey, err)

rec.refusedSpansCounter, err = rec.meter.SyncInt64().Counter(
obsmetrics.ReceiverPrefix+obsmetrics.RefusedSpansKey,
instrument.WithDescription("Number of spans that could not be pushed into the pipeline."),
instrument.WithUnit(unit.Dimensionless),
)
handleError(obsmetrics.ReceiverPrefix+obsmetrics.RefusedSpansKey, err)

rec.acceptedMetricPointsCounter, err = rec.meter.SyncInt64().Counter(
obsmetrics.ReceiverPrefix+obsmetrics.AcceptedMetricPointsKey,
instrument.WithDescription("Number of metric points successfully pushed into the pipeline."),
instrument.WithUnit(unit.Dimensionless),
)
handleError(obsmetrics.ReceiverPrefix+obsmetrics.AcceptedMetricPointsKey, err)

rec.refusedMetricPointsCounter, err = rec.meter.SyncInt64().Counter(
obsmetrics.ReceiverPrefix+obsmetrics.RefusedMetricPointsKey,
instrument.WithDescription("Number of metric points that could not be pushed into the pipeline."),
instrument.WithUnit(unit.Dimensionless),
)
handleError(obsmetrics.ReceiverPrefix+obsmetrics.RefusedMetricPointsKey, err)

rec.acceptedLogRecordsCounter, err = rec.meter.SyncInt64().Counter(
obsmetrics.ReceiverPrefix+obsmetrics.AcceptedLogRecordsKey,
instrument.WithDescription("Number of log records successfully pushed into the pipeline."),
instrument.WithUnit(unit.Dimensionless),
)
handleError(obsmetrics.ReceiverPrefix+obsmetrics.AcceptedLogRecordsKey, err)

rec.refusedLogRecordsCounter, err = rec.meter.SyncInt64().Counter(
obsmetrics.ReceiverPrefix+obsmetrics.RefusedLogRecordsKey,
instrument.WithDescription("Number of log records that could not be pushed into the pipeline."),
instrument.WithUnit(unit.Dimensionless),
)
handleError(obsmetrics.ReceiverPrefix+obsmetrics.RefusedLogRecordsKey, err)
}

// StartTracesOp is called when a request is received from a client.
Expand Down Expand Up @@ -163,23 +255,7 @@ func (rec *Receiver) endOp(
span := trace.SpanFromContext(receiverCtx)

if rec.level != configtelemetry.LevelNone {
var acceptedMeasure, refusedMeasure *stats.Int64Measure
switch dataType {
case config.TracesDataType:
acceptedMeasure = obsmetrics.ReceiverAcceptedSpans
refusedMeasure = obsmetrics.ReceiverRefusedSpans
case config.MetricsDataType:
acceptedMeasure = obsmetrics.ReceiverAcceptedMetricPoints
refusedMeasure = obsmetrics.ReceiverRefusedMetricPoints
case config.LogsDataType:
acceptedMeasure = obsmetrics.ReceiverAcceptedLogRecords
refusedMeasure = obsmetrics.ReceiverRefusedLogRecords
}

stats.Record(
receiverCtx,
acceptedMeasure.M(int64(numAccepted)),
refusedMeasure.M(int64(numRefused)))
rec.recordMetrics(receiverCtx, dataType, numAccepted, numRefused)
}

// end span according to errors
Expand All @@ -206,3 +282,49 @@ func (rec *Receiver) endOp(
}
span.End()
}

func (rec *Receiver) recordMetrics(receiverCtx context.Context, dataType config.DataType, numAccepted, numRefused int) {
if rec.useOtelForMetrics {
rec.recordWithOtel(receiverCtx, dataType, numAccepted, numRefused)
} else {
rec.recordWithOC(receiverCtx, dataType, numAccepted, numRefused)
}
}

func (rec *Receiver) recordWithOtel(receiverCtx context.Context, dataType config.DataType, numAccepted, numRefused int) {
var acceptedMeasure, refusedMeasure syncint64.Counter
switch dataType {
case config.TracesDataType:
acceptedMeasure = rec.acceptedSpansCounter
refusedMeasure = rec.refusedSpansCounter
case config.MetricsDataType:
acceptedMeasure = rec.acceptedMetricPointsCounter
refusedMeasure = rec.refusedMetricPointsCounter
case config.LogsDataType:
acceptedMeasure = rec.acceptedLogRecordsCounter
refusedMeasure = rec.refusedLogRecordsCounter
}

acceptedMeasure.Add(receiverCtx, int64(numAccepted), rec.otelAttrs...)
refusedMeasure.Add(receiverCtx, int64(numRefused), rec.otelAttrs...)
}

func (rec *Receiver) recordWithOC(receiverCtx context.Context, dataType config.DataType, numAccepted, numRefused int) {
var acceptedMeasure, refusedMeasure *stats.Int64Measure
switch dataType {
case config.TracesDataType:
acceptedMeasure = obsmetrics.ReceiverAcceptedSpans
refusedMeasure = obsmetrics.ReceiverRefusedSpans
case config.MetricsDataType:
acceptedMeasure = obsmetrics.ReceiverAcceptedMetricPoints
refusedMeasure = obsmetrics.ReceiverRefusedMetricPoints
case config.LogsDataType:
acceptedMeasure = obsmetrics.ReceiverAcceptedLogRecords
refusedMeasure = obsmetrics.ReceiverRefusedLogRecords
}

stats.Record(
receiverCtx,
acceptedMeasure.M(int64(numAccepted)),
refusedMeasure.M(int64(numRefused)))
}
5 changes: 3 additions & 2 deletions service/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/extension/zpagesextension"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/internal/obsreportconfig"
"go.opentelemetry.io/collector/internal/testutil"
)

Expand Down Expand Up @@ -410,9 +411,9 @@ func TestCollectorStartWithOpenTelemetryMetrics(t *testing.T) {
for _, tc := range ownMetricsTestCases("test version") {
t.Run(tc.name, func(t *testing.T) {
registry := featuregate.NewRegistry()
registerInternalMetricFeatureGate(registry)
obsreportconfig.RegisterInternalMetricFeatureGate(registry)
colTel := newColTelemetry(registry)
require.NoError(t, colTel.registry.Apply(map[string]bool{useOtelForInternalMetricsfeatureGateID: true}))
require.NoError(t, colTel.registry.Apply(map[string]bool{obsreportconfig.UseOtelForInternalMetricsfeatureGateID: true}))
testCollectorStartHelper(t, colTel, tc)
})
}
Expand Down
20 changes: 1 addition & 19 deletions service/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ const (
zapKeyTelemetryAddress = "address"
zapKeyTelemetryLevel = "level"

// useOtelForInternalMetricsfeatureGateID is the feature gate ID that controls whether the collector uses open
// telemetrySettings for internal metrics.
useOtelForInternalMetricsfeatureGateID = "telemetry.useOtelForInternalMetrics"

// supported trace propagators
traceContextPropagator = "tracecontext"
b3Propagator = "b3"
Expand All @@ -77,20 +73,6 @@ type telemetryInitializer struct {
doInitOnce sync.Once
}

func init() {
// register feature gate
registerInternalMetricFeatureGate(featuregate.GetRegistry())
}

// registerInternalMetricFeatureGate registers the Internal Metric feature gate to the passed in registry
func registerInternalMetricFeatureGate(registry *featuregate.Registry) {
registry.MustRegister(featuregate.Gate{
ID: useOtelForInternalMetricsfeatureGateID,
Description: "controls whether the collector to uses OpenTelemetry for internal metrics",
Enabled: false,
})
}

func newColTelemetry(registry *featuregate.Registry) *telemetryInitializer {
return &telemetryInitializer{
registry: registry,
Expand Down Expand Up @@ -131,7 +113,7 @@ func (tel *telemetryInitializer) initOnce(buildInfo component.BuildInfo, logger

var pe http.Handler
var err error
if tel.registry.IsEnabled(useOtelForInternalMetricsfeatureGateID) {
if tel.registry.IsEnabled(obsreportconfig.UseOtelForInternalMetricsfeatureGateID) {
pe, err = tel.initOpenTelemetry(telAttrs)
} else {
pe, err = tel.initOpenCensus(cfg, telAttrs)
Expand Down

0 comments on commit a9f41a2

Please sign in to comment.