Skip to content

Commit

Permalink
receiver/prometheus: add "up" metric for instances
Browse files Browse the repository at this point in the history
Make a receiver specific view that'll be registered
and used to record the "up" status either "0.0" or "1.0"
when an instance can't be scraped from or can be, respectively.

This ensures that the collector can act as a passthrough
for statuses and it currently outputs:

    # HELP up Whether the endpoint is alive or not
    # TYPE up gauge
    up{instance="0.0.0.0:8888"} 1
    up{instance="localhost:9999"} 0

I did not take the approach of plainly sending up suffixed metric names.
to recommend instead using relabelling inside the exporter itself like:

    - source_labels: [__name__]
        regex: "(.+)_up"
        target_label: "__name__"
        replacement: "up"

because:
* it'd apply ConstLabels on every *_up metric, only want "instance=$INSTANCE"
* other exporters wouldn't be able to use the "up" metric as is if we
inject rewrites

Regardless of if we used a label rewrite, the end result would be the
following:

    up{instance="localhost:8888",job="otlc"}
    up{exported_instance="0.0.0.0:9999",instance="localhost:8888",job="otlc"}
    up{exported_instance="0.0.0.0:1234",instance="localhost:8888",job="otlc"}

which this change accomplishes without having to inject any label
rewrites, but just by the new imports and upgrade of the prometheus
exporter.

Fixes open-telemetry/wg-prometheus#8
Requires census-ecosystem/opencensus-go-exporter-prometheus#24
  • Loading branch information
odeke-em committed Apr 26, 2021
1 parent 1e1f24b commit 5d67dcc
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 11 deletions.
51 changes: 51 additions & 0 deletions receiver/prometheusreceiver/internal/metrics.go
@@ -0,0 +1,51 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"context"

"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)

var tagInstance, _ = tag.NewKey("instance")

var statUpStatus = stats.Int64("up", "Whether the endpoint is alive or not", stats.UnitDimensionless)

func MetricViews() []*view.View {
return []*view.View{
{
Name: statUpStatus.Name(),
Measure: statUpStatus,
Description: statUpStatus.Description(),
TagKeys: []tag.Key{tagInstance},
Aggregation: view.LastValue(),
},
}
}

func recordInstanceAsUp(ctx context.Context, instanceValue string) context.Context {
ctx, _ = tag.New(ctx, tag.Upsert(tagInstance, instanceValue))
stats.Record(ctx, statUpStatus.M(1))
return ctx
}

func recordInstanceAsDown(ctx context.Context, instanceValue string) context.Context {
ctx, _ = tag.New(ctx, tag.Upsert(tagInstance, instanceValue))
stats.Record(ctx, statUpStatus.M(0))
return ctx
}
35 changes: 24 additions & 11 deletions receiver/prometheusreceiver/internal/metricsbuilder.go
Expand Up @@ -15,6 +15,7 @@
package internal

import (
"context"
"errors"
"fmt"
"regexp"
Expand Down Expand Up @@ -93,25 +94,37 @@ func (b *metricBuilder) AddDataPoint(ls labels.Labels, t int64, v float64) error
b.numTimeseries++
b.droppedTimeseries++
return errMetricNameNotFound

case isInternalMetric(metricName):
b.hasInternalMetric = true
lm := ls.Map()
delete(lm, model.MetricNameLabel)
if metricName != scrapeUpMetricName {
return nil
}

// See https://www.prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series
// up: 1 if the instance is healthy, i.e. reachable, or 0 if the scrape failed.
if metricName == scrapeUpMetricName && v != 1.0 {
if v == 0.0 {
b.logger.Warn("Failed to scrape Prometheus endpoint",
zap.Int64("scrape_timestamp", t),
zap.String("target_labels", fmt.Sprintf("%v", lm)))
} else {
b.logger.Warn("The 'up' metric contains invalid value",
zap.Float64("value", v),
zap.Int64("scrape_timestamp", t),
zap.String("target_labels", fmt.Sprintf("%v", lm)))
}
instanceValue := lm["instance"]
switch v {
case 1.0: // The instance is up!
recordInstanceAsUp(context.Background(), instanceValue)

case 0.0: // The instance is definitely down.
recordInstanceAsDown(context.Background(), instanceValue)
b.logger.Warn("Failed to scrape Prometheus endpoint",
zap.Int64("scrape_timestamp", t),
zap.String("target_labels", fmt.Sprintf("%v", lm)))

default: // We got an invalid value for "up"
recordInstanceAsDown(context.Background(), instanceValue)
b.logger.Warn("The 'up' metric contains invalid value",
zap.Float64("value", v),
zap.Int64("scrape_timestamp", t),
zap.String("target_labels", fmt.Sprintf("%v", lm)))
}
return nil

case b.useStartTimeMetric && b.matchStartTimeMetric(metricName):
b.startTime = v
}
Expand Down
2 changes: 2 additions & 0 deletions receiver/prometheusreceiver/metrics_receiver.go
Expand Up @@ -49,6 +49,8 @@ func newPrometheusReceiver(logger *zap.Logger, cfg *Config, next consumer.Metric
return pr
}

var MetricViews = internal.MetricViews

// Start is the method that starts Prometheus scraping and it
// is controlled by having previously defined a Configuration using perhaps New.
func (r *pReceiver) Start(_ context.Context, host component.Host) error {
Expand Down
2 changes: 2 additions & 0 deletions service/telemetry.go
Expand Up @@ -30,6 +30,7 @@ import (
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/processor/batchprocessor"
"go.opentelemetry.io/collector/receiver/kafkareceiver"
"go.opentelemetry.io/collector/receiver/prometheusreceiver"
telemetry2 "go.opentelemetry.io/collector/service/internal/telemetry"
"go.opentelemetry.io/collector/translator/conventions"
)
Expand Down Expand Up @@ -66,6 +67,7 @@ func (tel *appTelemetry) init(asyncErrorChannel chan<- error, ballastSizeBytes u
views = append(views, kafkareceiver.MetricViews()...)
views = append(views, obsreport.Configure(level)...)
views = append(views, processMetricsViews.Views()...)
views = append(views, prometheusreceiver.MetricViews()...)

tel.views = views
if err = view.Register(views...); err != nil {
Expand Down

0 comments on commit 5d67dcc

Please sign in to comment.