diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a66874971..70e335f1f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Added - [#5352](https://github.com/thanos-io/thanos/pull/5352) Cache: Add cache metrics to groupcache. +- [#5391](https://github.com/thanos-io/thanos/pull/5391) Receive: Add relabeling support. ### Changed diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 5d60a19ba1..6ed11a2835 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -22,7 +22,9 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/tsdb" + "gopkg.in/yaml.v2" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component" @@ -173,6 +175,15 @@ func runReceive( return errors.Wrapf(err, "migrate legacy storage in %v to default tenant %v", conf.dataDir, conf.defaultTenantID) } + relabelContentYaml, err := conf.relabelConfigPath.Content() + if err != nil { + return errors.Wrap(err, "get content of relabel configuration") + } + var relabelConfig []*relabel.Config + if err := yaml.Unmarshal(relabelContentYaml, &relabelConfig); err != nil { + return errors.Wrap(err, "parse relabel configuration") + } + dbs := receive.NewMultiTSDB( conf.dataDir, logger, @@ -194,6 +205,7 @@ func runReceive( DefaultTenantID: conf.defaultTenantID, ReplicaHeader: conf.replicaHeader, ReplicationFactor: conf.replicationFactor, + RelabelConfigs: relabelConfig, ReceiverMode: receiveMode, Tracer: tracer, TLSConfig: rwTLSConfig, @@ -729,7 +741,8 @@ type receiveConfig struct { ignoreBlockSize bool allowOutOfOrderUpload bool - reqLogConfig *extflag.PathOrContent + reqLogConfig *extflag.PathOrContent + relabelConfigPath *extflag.PathOrContent } func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -783,6 +796,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { rc.forwardTimeout = extkingpin.ModelDuration(cmd.Flag("receive-forward-timeout", "Timeout for each forward request.").Default("5s").Hidden()) + rc.relabelConfigPath = extflag.RegisterPathOrContent(cmd, "receive.relabel-config", "YAML file that contains relabeling configuration.", extflag.WithEnvSubstitution()) + rc.tsdbMinBlockDuration = extkingpin.ModelDuration(cmd.Flag("tsdb.min-block-duration", "Min duration for local TSDB blocks").Default("2h").Hidden()) rc.tsdbMaxBlockDuration = extkingpin.ModelDuration(cmd.Flag("tsdb.max-block-duration", "Max duration for local TSDB blocks").Default("2h").Hidden()) diff --git a/docs/components/receive.md b/docs/components/receive.md index 435e1e15df..16dfd8a42f 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -140,6 +140,13 @@ Flags: configuration. If it's empty AND hashring configuration was provided, it means that receive will run in RoutingOnly mode. + --receive.relabel-config= + Alternative to 'receive.relabel-config-file' + flag (mutually exclusive). Content of YAML file + that contains relabeling configuration. + --receive.relabel-config-file= + Path to YAML file that contains relabeling + configuration. --receive.replica-header="THANOS-REPLICA" HTTP header specifying the replica number of a write request. diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index c35c3bea41..0cf8509ea5 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -28,6 +28,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/route" + "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "google.golang.org/grpc" @@ -38,6 +39,7 @@ import ( extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/server/http/middleware" + "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" "github.com/thanos-io/thanos/pkg/tracing" @@ -81,6 +83,7 @@ type Options struct { TLSConfig *tls.Config DialOpts []grpc.DialOption ForwardTimeout time.Duration + RelabelConfigs []*relabel.Config } // Handler serves a Prometheus remote write receiving HTTP endpoint. @@ -353,6 +356,13 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { return } + // Apply relabeling configs. + h.relabel(&wreq) + if len(wreq.Timeseries) == 0 { + level.Debug(tLogger).Log("msg", "remote write request dropped due to relabeling.") + return + } + err = h.handleRequest(ctx, rep, tenant, &wreq) if err != nil { level.Debug(tLogger).Log("msg", "failed to handle request", "err", err) @@ -682,6 +692,23 @@ func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*st } } +// relabel relabels the time series labels in the remote write request. +func (h *Handler) relabel(wreq *prompb.WriteRequest) { + if len(h.options.RelabelConfigs) == 0 { + return + } + timeSeries := make([]prompb.TimeSeries, 0, len(wreq.Timeseries)) + for _, ts := range wreq.Timeseries { + lbls := relabel.Process(labelpb.ZLabelsToPromLabels(ts.Labels), h.options.RelabelConfigs...) + if lbls == nil { + continue + } + ts.Labels = labelpb.ZLabelsFromPromLabels(lbls) + timeSeries = append(timeSeries, ts) + } + wreq.Timeseries = timeSeries +} + // isConflict returns whether or not the given error represents a conflict. func isConflict(err error) bool { if err == nil { diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 6c92fbabc8..6bc0bcf8b0 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -26,8 +26,10 @@ import ( "github.com/golang/snappy" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "google.golang.org/grpc" @@ -1300,3 +1302,372 @@ func Heap(dir string) (err error) { defer runutil.CloseWithErrCapture(&err, f, "close") return pprof.WriteHeapProfile(f) } + +func TestRelabel(t *testing.T) { + for _, tcase := range []struct { + name string + relabel []*relabel.Config + writeRequest prompb.WriteRequest + expectedWriteRequest prompb.WriteRequest + }{ + { + name: "empty relabel configs", + writeRequest: prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []labelpb.ZLabel{ + { + Name: "__name__", + Value: "test_metric", + }, + { + Name: "foo", + Value: "bar", + }, + }, + Samples: []prompb.Sample{ + { + Timestamp: 0, + Value: 1, + }, + }, + }, + }, + }, + expectedWriteRequest: prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []labelpb.ZLabel{ + { + Name: "__name__", + Value: "test_metric", + }, + { + Name: "foo", + Value: "bar", + }, + }, + Samples: []prompb.Sample{ + { + Timestamp: 0, + Value: 1, + }, + }, + }, + }, + }, + }, + { + name: "has relabel configs but no relabelling applied", + relabel: []*relabel.Config{ + { + SourceLabels: model.LabelNames{"zoo"}, + TargetLabel: "bar", + Regex: relabel.MustNewRegexp("bar"), + Action: relabel.Replace, + Replacement: "baz", + }, + }, + writeRequest: prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []labelpb.ZLabel{ + { + Name: "__name__", + Value: "test_metric", + }, + { + Name: "foo", + Value: "bar", + }, + }, + Samples: []prompb.Sample{ + { + Timestamp: 0, + Value: 1, + }, + }, + }, + }, + }, + expectedWriteRequest: prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []labelpb.ZLabel{ + { + Name: "__name__", + Value: "test_metric", + }, + { + Name: "foo", + Value: "bar", + }, + }, + Samples: []prompb.Sample{ + { + Timestamp: 0, + Value: 1, + }, + }, + }, + }, + }, + }, + { + name: "relabel rewrite existing labels", + relabel: []*relabel.Config{ + { + TargetLabel: "foo", + Action: relabel.Replace, + Regex: relabel.MustNewRegexp(""), + Replacement: "test", + }, + { + TargetLabel: "__name__", + Action: relabel.Replace, + Regex: relabel.MustNewRegexp(""), + Replacement: "foo", + }, + }, + writeRequest: prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []labelpb.ZLabel{ + { + Name: "__name__", + Value: "test_metric", + }, + { + Name: "foo", + Value: "bar", + }, + }, + Samples: []prompb.Sample{ + { + Timestamp: 0, + Value: 1, + }, + }, + }, + }, + }, + expectedWriteRequest: prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []labelpb.ZLabel{ + { + Name: "__name__", + Value: "foo", + }, + { + Name: "foo", + Value: "test", + }, + }, + Samples: []prompb.Sample{ + { + Timestamp: 0, + Value: 1, + }, + }, + }, + }, + }, + }, + { + name: "relabel drops label", + relabel: []*relabel.Config{ + { + Action: relabel.LabelDrop, + Regex: relabel.MustNewRegexp("foo"), + }, + }, + writeRequest: prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []labelpb.ZLabel{ + { + Name: "__name__", + Value: "test_metric", + }, + { + Name: "foo", + Value: "bar", + }, + }, + Samples: []prompb.Sample{ + { + Timestamp: 0, + Value: 1, + }, + }, + }, + }, + }, + expectedWriteRequest: prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []labelpb.ZLabel{ + { + Name: "__name__", + Value: "test_metric", + }, + }, + Samples: []prompb.Sample{ + { + Timestamp: 0, + Value: 1, + }, + }, + }, + }, + }, + }, + { + name: "relabel drops time series", + relabel: []*relabel.Config{ + { + SourceLabels: model.LabelNames{"foo"}, + Action: relabel.Drop, + Regex: relabel.MustNewRegexp("bar"), + }, + }, + writeRequest: prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []labelpb.ZLabel{ + { + Name: "__name__", + Value: "test_metric", + }, + { + Name: "foo", + Value: "bar", + }, + }, + Samples: []prompb.Sample{ + { + Timestamp: 0, + Value: 1, + }, + }, + }, + }, + }, + expectedWriteRequest: prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{}, + }, + }, + { + name: "relabel rewrite existing exemplar series labels", + relabel: []*relabel.Config{ + { + Action: relabel.LabelDrop, + Regex: relabel.MustNewRegexp("foo"), + }, + }, + writeRequest: prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []labelpb.ZLabel{ + { + Name: "__name__", + Value: "test_metric", + }, + { + Name: "foo", + Value: "bar", + }, + }, + Exemplars: []prompb.Exemplar{ + { + Labels: []labelpb.ZLabel{ + { + Name: "traceID", + Value: "foo", + }, + }, + Value: 1, + Timestamp: 1, + }, + }, + }, + }, + }, + expectedWriteRequest: prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []labelpb.ZLabel{ + { + Name: "__name__", + Value: "test_metric", + }, + }, + Exemplars: []prompb.Exemplar{ + { + Labels: []labelpb.ZLabel{ + { + Name: "traceID", + Value: "foo", + }, + }, + Value: 1, + Timestamp: 1, + }, + }, + }, + }, + }, + }, + { + name: "relabel drops exemplars", + relabel: []*relabel.Config{ + { + SourceLabels: model.LabelNames{"foo"}, + Action: relabel.Drop, + Regex: relabel.MustNewRegexp("bar"), + }, + }, + writeRequest: prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []labelpb.ZLabel{ + { + Name: "__name__", + Value: "test_metric", + }, + { + Name: "foo", + Value: "bar", + }, + }, + Exemplars: []prompb.Exemplar{ + { + Labels: []labelpb.ZLabel{ + { + Name: "traceID", + Value: "foo", + }, + }, + Value: 1, + Timestamp: 1, + }, + }, + }, + }, + }, + expectedWriteRequest: prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{}, + }, + }, + } { + t.Run(tcase.name, func(t *testing.T) { + h := NewHandler(nil, &Options{ + RelabelConfigs: tcase.relabel, + }) + + h.relabel(&tcase.writeRequest) + testutil.Equals(t, tcase.expectedWriteRequest, tcase.writeRequest) + }) + } +} diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index 8260f392e7..587d9e05c3 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -365,6 +365,7 @@ type ReceiveBuilder struct { maxExemplars int ingestion bool hashringConfigs []receive.HashringConfig + relabelConfigs []*relabel.Config replication int image string } @@ -403,6 +404,11 @@ func (r *ReceiveBuilder) WithRouting(replication int, hashringConfigs ...receive return r } +func (r *ReceiveBuilder) WithRelabelConfigs(relabelConfigs []*relabel.Config) *ReceiveBuilder { + r.relabelConfigs = relabelConfigs + return r +} + // Init creates a Thanos Receive instance. // If ingestion is enabled it will be configured for ingesting samples. // If routing is configured (i.e. hashring configuration is provided) it routes samples to other receivers. @@ -448,6 +454,14 @@ func (r *ReceiveBuilder) Init() e2e.InstrumentedRunnable { args["--receive.replication-factor"] = strconv.Itoa(r.replication) } + if len(r.relabelConfigs) > 0 { + relabelConfigBytes, err := yaml.Marshal(r.relabelConfigs) + if err != nil { + return e2e.NewErrInstrumentedRunnable(r.Name(), errors.Wrapf(err, "generate relabel configs: %v", relabelConfigBytes)) + } + args["--receive.relabel-config"] = string(relabelConfigBytes) + } + return r.f.Init(wrapWithDefaults(e2e.StartOptions{ Image: r.image, Command: e2e.NewCommand("receive", e2e.BuildArgs(args)...), diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index 9432dbeb28..53d1ebeeab 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -13,6 +13,8 @@ import ( "github.com/efficientgo/e2e" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/relabel" + "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/receive" "github.com/thanos-io/thanos/pkg/testutil" @@ -558,4 +560,46 @@ func TestReceive(t *testing.T) { }, }) }) + + t.Run("relabel", func(t *testing.T) { + t.Parallel() + e, err := e2e.NewDockerEnvironment("e2e_receive_relabel") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + // Setup Router Ingestor. + i := e2ethanos.NewReceiveBuilder(e, "ingestor"). + WithIngestionEnabled(). + WithRelabelConfigs([]*relabel.Config{ + { + Action: relabel.LabelDrop, + Regex: relabel.MustNewRegexp("prometheus"), + }, + }).Init() + + testutil.Ok(t, e2e.StartAndWaitReady(i)) + + // Setup Prometheus + prom := e2ethanos.NewPrometheus(e, "1", e2ethanos.DefaultPromConfig("prom1", 0, e2ethanos.RemoteWriteEndpoint(i.InternalEndpoint("remote-write")), "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage()) + testutil.Ok(t, e2e.StartAndWaitReady(prom)) + + q := e2ethanos.NewQuerierBuilder(e, "1", i.InternalEndpoint("grpc")).Init() + testutil.Ok(t, e2e.StartAndWaitReady(q)) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + t.Cleanup(cancel) + + testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics())) + // Label `prometheus` should be dropped. + queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{ + Deduplicate: false, + }, []model.Metric{ + { + "job": "myself", + "receive": "receive-ingestor", + "replica": "0", + "tenant_id": "default-tenant", + }, + }) + }) }