Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Receive: WIP adding experimental otlp ingest endpoint #7227

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions pkg/receive/handler.go
Expand Up @@ -262,6 +262,18 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
),
)

h.router.Post(
"/v1/metrics",
instrf(
"otlp",
readyf(
middleware.RequestID(
http.HandlerFunc(h.receiveOTLPHTTP),
),
),
),
)

statusAPI := statusapi.New(statusapi.Options{
GetStats: h.getStats,
Registry: h.options.Registry,
Expand Down
232 changes: 232 additions & 0 deletions pkg/receive/handler_otlp.go
@@ -0,0 +1,232 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package receive

import (
"context"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage/remote"
otlptranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
"github.com/thanos-io/thanos/pkg/store/labelpb"
tprompb "github.com/thanos-io/thanos/pkg/store/storepb/prompb"
"github.com/thanos-io/thanos/pkg/tenancy"
"github.com/thanos-io/thanos/pkg/tracing"
"net/http"
"strconv"
)

func (h *Handler) receiveOTLPHTTP(w http.ResponseWriter, r *http.Request) {
var err error
span, ctx := tracing.StartSpan(r.Context(), "receive_http")
span.SetTag("receiver.mode", string(h.receiverMode))
defer span.Finish()

tenant, err := tenancy.GetTenantFromHTTP(r, h.options.TenantHeader, h.options.DefaultTenantID, h.options.TenantField)
if err != nil {
level.Error(h.logger).Log("msg", "error getting tenant from HTTP", "err", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

tLogger := log.With(h.logger, "tenant", tenant)
span.SetTag("tenant", tenant)

writeGate := h.Limiter.WriteGate()
tracing.DoInSpan(r.Context(), "receive_write_gate_ismyturn", func(ctx context.Context) {
err = writeGate.Start(r.Context())
})
defer writeGate.Done()
if err != nil {
level.Error(tLogger).Log("err", err, "msg", "internal server error")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

under, err := h.Limiter.HeadSeriesLimiter().isUnderLimit(tenant)
if err != nil {
level.Error(tLogger).Log("msg", "error while limiting", "err", err.Error())
}

// Fail request fully if tenant has exceeded set limit.
if !under {
http.Error(w, "tenant is above active series limit", http.StatusTooManyRequests)
return
}

requestLimiter := h.Limiter.RequestLimiter()
// io.ReadAll dynamically adjust the byte slice for read data, starting from 512B.
// Since this is receive hot path, grow upfront saving allocations and CPU time.
// TODO Fix getting rid of the memory limiting stuff for now
//compressed := bytes.Buffer{}
//if r.ContentLength >= 0 {
// if !requestLimiter.AllowSizeBytes(tenant, r.ContentLength) {
// http.Error(w, "write request too large", http.StatusRequestEntityTooLarge)
// return
// }
// compressed.Grow(int(r.ContentLength))
//} else {
// compressed.Grow(512)
//}
//_, err = io.Copy(&compressed, r.Body)
//if err != nil {
// http.Error(w, errors.Wrap(err, "read compressed request body").Error(), http.StatusInternalServerError)
// return
//}
//reqBuf, err := s2.Decode(nil, compressed.Bytes())
//if err != nil {
// level.Error(tLogger).Log("msg", "snappy decode error", "err", err)
// http.Error(w, errors.Wrap(err, "snappy decode error").Error(), http.StatusBadRequest)
// return
//}
//
//if !requestLimiter.AllowSizeBytes(tenant, int64(len(reqBuf))) {
// http.Error(w, "write request too large", http.StatusRequestEntityTooLarge)
// return
//}

// NOTE: Due to zero copy ZLabels, Labels used from WriteRequests keeps memory
// from the whole request. Ensure that we always copy those when we want to
// store them for longer time.
// TODO Fix getting rid of this for now
//var wreq prompb.WriteRequest
//if err := proto.Unmarshal(reqBuf, &wreq); err != nil {
// http.Error(w, err.Error(), http.StatusBadRequest)
// return
//}

req, err := remote.DecodeOTLPWriteRequest(r)
if err != nil {
level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

// TODO expose otlptranslator.Settings as config options
prwMetricsMap, errs := otlptranslator.FromMetrics(req.Metrics(), otlptranslator.Settings{
AddMetricSuffixes: true,
})
if errs != nil {
level.Warn(h.logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", errs)
}

prwMetrics := make([]tprompb.TimeSeries, 0, 100)

for _, ts := range prwMetricsMap {
t := tprompb.TimeSeries{
Labels: makeLabels(ts.Labels),
Samples: makeSamples(ts.Samples),
Exemplars: makeExemplars(ts.Exemplars),
// TODO handle historgrams
}
prwMetrics = append(prwMetrics, t)
}

wreq := tprompb.WriteRequest{
Timeseries: prwMetrics,
// TODO Handle metadata, requires thanos receiver support ingesting metadata
//Metadata: otlptranslator.OtelMetricsToMetadata(),
}

rep := uint64(0)
// If the header is empty, we assume the request is not yet replicated.
if replicaRaw := r.Header.Get(h.options.ReplicaHeader); replicaRaw != "" {
if rep, err = strconv.ParseUint(replicaRaw, 10, 64); err != nil {
http.Error(w, "could not parse replica header", http.StatusBadRequest)
return
}
}

// Exit early if the request contained no data. We don't support metadata yet. We also cannot fail here, because
// this would mean lack of forward compatibility for remote write proto.
if len(wreq.Timeseries) == 0 {
// TODO(yeya24): Handle remote write metadata.
if len(wreq.Metadata) > 0 {
// TODO(bwplotka): Do we need this error message?
level.Debug(tLogger).Log("msg", "only metadata from client; metadata ingestion not supported; skipping")
return
}
level.Debug(tLogger).Log("msg", "empty remote write request; client bug or newer remote write protocol used?; skipping")
return
}

if !requestLimiter.AllowSeries(tenant, int64(len(wreq.Timeseries))) {
http.Error(w, "too many timeseries", http.StatusRequestEntityTooLarge)
return
}

totalSamples := 0
for _, timeseries := range wreq.Timeseries {
totalSamples += len(timeseries.Samples)
}
if !requestLimiter.AllowSamples(tenant, int64(totalSamples)) {
http.Error(w, "too many samples", http.StatusRequestEntityTooLarge)
return
}

// Apply relabeling configs.
h.relabel(&wreq)
if len(wreq.Timeseries) == 0 {
level.Debug(tLogger).Log("msg", "remote write request dropped due to relabeling.")
return
}

responseStatusCode := http.StatusOK
tenantStats, err := h.handleRequest(ctx, rep, tenant, &wreq)
if err != nil {
level.Debug(tLogger).Log("msg", "failed to handle request", "err", err.Error())
switch errors.Cause(err) {
case errNotReady:
responseStatusCode = http.StatusServiceUnavailable
case errUnavailable:
responseStatusCode = http.StatusServiceUnavailable
case errConflict:
responseStatusCode = http.StatusConflict
case errBadReplica:
responseStatusCode = http.StatusBadRequest
default:
level.Error(tLogger).Log("err", err, "msg", "internal server error")
responseStatusCode = http.StatusInternalServerError
}
http.Error(w, err.Error(), responseStatusCode)
}

for tenant, stats := range tenantStats {
h.writeTimeseriesTotal.WithLabelValues(strconv.Itoa(responseStatusCode), tenant).Observe(float64(stats.timeseries))
h.writeSamplesTotal.WithLabelValues(strconv.Itoa(responseStatusCode), tenant).Observe(float64(stats.totalSamples))
}
}

func makeLabels(in []prompb.Label) []labelpb.ZLabel {
out := make([]labelpb.ZLabel, 0, len(in))
for _, l := range in {
out = append(out, labelpb.ZLabel{Name: l.Name, Value: l.Value})
}
return out
}

func makeSamples(in []prompb.Sample) []tprompb.Sample {
out := make([]tprompb.Sample, 0, len(in))
for _, s := range in {
out = append(out, tprompb.Sample{
Value: s.Value,
Timestamp: s.Timestamp,
})
}
return out
}

func makeExemplars(in []prompb.Exemplar) []tprompb.Exemplar {
out := make([]tprompb.Exemplar, 0, len(in))
for _, e := range in {
out = append(out, tprompb.Exemplar{
Labels: makeLabels(e.Labels),
Value: e.Value,
Timestamp: e.Timestamp,
})
}
return out
}
126 changes: 126 additions & 0 deletions scripts/quickstart-otel.sh
@@ -0,0 +1,126 @@
#!/usr/bin/env bash
#
# Starts three Prometheus servers scraping themselves and sidecars for each.
# Two query nodes are started and all are clustered together.

trap 'kill 0' SIGTERM

PROMETHEUS_EXECUTABLE=${PROMETHEUS_EXECUTABLE:-"prometheus"}
THANOS_EXECUTABLE=${THANOS_EXECUTABLE:-"thanos"}
OTEL_EXECUTABLE=${OTEL_EXECUTABLE:-"otelcol-contrib"}
REMOTE_WRITE_ENABLED=true

if [ ! $(command -v "$PROMETHEUS_EXECUTABLE") ]; then
echo "Cannot find or execute Prometheus binary $PROMETHEUS_EXECUTABLE, you can override it by setting the PROMETHEUS_EXECUTABLE env variable"
exit 1
fi

if [ ! $(command -v "$THANOS_EXECUTABLE") ]; then
echo "Cannot find or execute Thanos binary $THANOS_EXECUTABLE, you can override it by setting the THANOS_EXECUTABLE env variable"
exit 1
fi

sleep 0.5

if [ -n "${REMOTE_WRITE_ENABLED}" ]; then

for i in $(seq 0 1 2); do
${THANOS_EXECUTABLE} receive \
--debug.name receive${i} \
--log.level debug \
--tsdb.path "./data/remote-write-receive-${i}-data" \
--grpc-address 0.0.0.0:1${i}907 \
--grpc-grace-period 1s \
--http-address 0.0.0.0:1${i}909 \
--http-grace-period 1s \
--receive.replication-factor 1 \
--tsdb.min-block-duration 5m \
--tsdb.max-block-duration 5m \
--label "receive_replica=\"${i}\"" \
--label 'receive="true"' \
--receive.local-endpoint 127.0.0.1:1${i}907 \
--receive.hashrings '[{"endpoints":["127.0.0.1:10907","127.0.0.1:11907","127.0.0.1:12907"]}]' \
--remote-write.address 0.0.0.0:1${i}908 &

STORES="${STORES} --store 127.0.0.1:1${i}907"
done

fi

# Setup alert / rules config file.
cat >data/otel-config.yaml <<-EOF
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
prometheus:
config:
scrape_configs:
- job_name: otel-collector
scrape_interval: 5s
static_configs:
- targets: [localhost:8888]
exporters:
otlphttp/thanos:
endpoint: "http://localhost:10908"
tls:
insecure: true
debug:
verbosity: detailed
extensions:
health_check:
pprof:
service:
telemetry:
logs:
level: "debug"
extensions: [pprof, health_check]
pipelines:
metrics:
receivers:
- prometheus
- otlp
exporters:
- otlphttp/thanos
#- debug
EOF

QUERIER_JAEGER_CONFIG=$(
cat <<-EOF
type: JAEGER
config:
service_name: thanos-query
sampler_type: ratelimiting
sampler_param: 2
EOF
)

# Start two query nodes.
for i in $(seq 0 1); do
${THANOS_EXECUTABLE} query \
--debug.name query-"${i}" \
--log.level debug \
--grpc-address 0.0.0.0:109"${i}"3 \
--grpc-grace-period 1s \
--http-address 0.0.0.0:109"${i}"4 \
--http-grace-period 1s \
--query.replica-label prometheus \
--tracing.config="${QUERIER_JAEGER_CONFIG}" \
--query.replica-label receive_replica \
${STORES} &
done

sleep 0.5

# Requires otel-contrib binary which can be grabbed from https://github.com/open-telemetry/opentelemetry-collector-releases/releases
${OTEL_EXECUTABLE} \
--config=data/otel-config.yaml

sleep 0.5

echo "all started; waiting for signal"

wait