Skip to content

Commit

Permalink
support series relabeling on Thanos receiver (#5391)
Browse files Browse the repository at this point in the history
* support series relabeling on Thanos receiver

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* add changelog

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* fix lint

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* update lint

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* fix e2e test

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* fix relabel config pass

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* cleanup white space

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* address review comments

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* address comments

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* update comment

Signed-off-by: Ben Ye <ben.ye@bytedance.com>
  • Loading branch information
Ben Ye committed May 31, 2022
1 parent 113f4a2 commit 7ba274c
Show file tree
Hide file tree
Showing 7 changed files with 480 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -17,6 +17,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Added

- [#5352](https://github.com/thanos-io/thanos/pull/5352) Cache: Add cache metrics to groupcache.
- [#5391](https://github.com/thanos-io/thanos/pull/5391) Receive: Add relabeling support.

### Changed

Expand Down
17 changes: 16 additions & 1 deletion cmd/thanos/receive.go
Expand Up @@ -22,7 +22,9 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/tsdb"
"gopkg.in/yaml.v2"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
Expand Down Expand Up @@ -173,6 +175,15 @@ func runReceive(
return errors.Wrapf(err, "migrate legacy storage in %v to default tenant %v", conf.dataDir, conf.defaultTenantID)
}

relabelContentYaml, err := conf.relabelConfigPath.Content()
if err != nil {
return errors.Wrap(err, "get content of relabel configuration")
}
var relabelConfig []*relabel.Config
if err := yaml.Unmarshal(relabelContentYaml, &relabelConfig); err != nil {
return errors.Wrap(err, "parse relabel configuration")
}

dbs := receive.NewMultiTSDB(
conf.dataDir,
logger,
Expand All @@ -194,6 +205,7 @@ func runReceive(
DefaultTenantID: conf.defaultTenantID,
ReplicaHeader: conf.replicaHeader,
ReplicationFactor: conf.replicationFactor,
RelabelConfigs: relabelConfig,
ReceiverMode: receiveMode,
Tracer: tracer,
TLSConfig: rwTLSConfig,
Expand Down Expand Up @@ -729,7 +741,8 @@ type receiveConfig struct {
ignoreBlockSize bool
allowOutOfOrderUpload bool

reqLogConfig *extflag.PathOrContent
reqLogConfig *extflag.PathOrContent
relabelConfigPath *extflag.PathOrContent
}

func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -783,6 +796,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

rc.forwardTimeout = extkingpin.ModelDuration(cmd.Flag("receive-forward-timeout", "Timeout for each forward request.").Default("5s").Hidden())

rc.relabelConfigPath = extflag.RegisterPathOrContent(cmd, "receive.relabel-config", "YAML file that contains relabeling configuration.", extflag.WithEnvSubstitution())

rc.tsdbMinBlockDuration = extkingpin.ModelDuration(cmd.Flag("tsdb.min-block-duration", "Min duration for local TSDB blocks").Default("2h").Hidden())

rc.tsdbMaxBlockDuration = extkingpin.ModelDuration(cmd.Flag("tsdb.max-block-duration", "Max duration for local TSDB blocks").Default("2h").Hidden())
Expand Down
7 changes: 7 additions & 0 deletions docs/components/receive.md
Expand Up @@ -140,6 +140,13 @@ Flags:
configuration. If it's empty AND hashring
configuration was provided, it means that
receive will run in RoutingOnly mode.
--receive.relabel-config=<content>
Alternative to 'receive.relabel-config-file'
flag (mutually exclusive). Content of YAML file
that contains relabeling configuration.
--receive.relabel-config-file=<file-path>
Path to YAML file that contains relabeling
configuration.
--receive.replica-header="THANOS-REPLICA"
HTTP header specifying the replica number of a
write request.
Expand Down
27 changes: 27 additions & 0 deletions pkg/receive/handler.go
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"google.golang.org/grpc"
Expand All @@ -38,6 +39,7 @@ import (
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/server/http/middleware"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
"github.com/thanos-io/thanos/pkg/tracing"
Expand Down Expand Up @@ -81,6 +83,7 @@ type Options struct {
TLSConfig *tls.Config
DialOpts []grpc.DialOption
ForwardTimeout time.Duration
RelabelConfigs []*relabel.Config
}

// Handler serves a Prometheus remote write receiving HTTP endpoint.
Expand Down Expand Up @@ -353,6 +356,13 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
return
}

// Apply relabeling configs.
h.relabel(&wreq)
if len(wreq.Timeseries) == 0 {
level.Debug(tLogger).Log("msg", "remote write request dropped due to relabeling.")
return
}

err = h.handleRequest(ctx, rep, tenant, &wreq)
if err != nil {
level.Debug(tLogger).Log("msg", "failed to handle request", "err", err)
Expand Down Expand Up @@ -682,6 +692,23 @@ func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*st
}
}

// relabel relabels the time series labels in the remote write request.
func (h *Handler) relabel(wreq *prompb.WriteRequest) {
if len(h.options.RelabelConfigs) == 0 {
return
}
timeSeries := make([]prompb.TimeSeries, 0, len(wreq.Timeseries))
for _, ts := range wreq.Timeseries {
lbls := relabel.Process(labelpb.ZLabelsToPromLabels(ts.Labels), h.options.RelabelConfigs...)
if lbls == nil {
continue
}
ts.Labels = labelpb.ZLabelsFromPromLabels(lbls)
timeSeries = append(timeSeries, ts)
}
wreq.Timeseries = timeSeries
}

// isConflict returns whether or not the given error represents a conflict.
func isConflict(err error) bool {
if err == nil {
Expand Down

0 comments on commit 7ba274c

Please sign in to comment.