Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PRW-2.0] New config surface allowing content-type and encoding preferences for both sending and receiving. #13968

Draft
wants to merge 1 commit into
base: remote-write-2.0
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
76 changes: 68 additions & 8 deletions cmd/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"

Check failure on line 72 in cmd/prometheus/main.go

View workflow job for this annotation

GitHub Actions / golangci-lint

could not import github.com/prometheus/prometheus/storage/remote (-: # github.com/prometheus/prometheus/storage/remote

Check failure on line 72 in cmd/prometheus/main.go

View workflow job for this annotation

GitHub Actions / golangci-lint

could not import github.com/prometheus/prometheus/storage/remote (-: # github.com/prometheus/prometheus/storage/remote
"github.com/prometheus/prometheus/tracing"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/agent"
Expand Down Expand Up @@ -158,10 +158,8 @@
enableNewSDManager bool
enablePerStepStats bool
enableAutoGOMAXPROCS bool
// todo: how to use the enable feature flag properly + use the remote format enum type
rwFormat int
enableAutoGOMEMLIMIT bool
enableConcurrentRuleEval bool
enableAutoGOMEMLIMIT bool
enableConcurrentRuleEval bool

prometheusURL string
corsRegexString string
Expand Down Expand Up @@ -314,6 +312,12 @@
a.Flag("web.enable-remote-write-receiver", "Enable API endpoint accepting remote write requests.").
Default("false").BoolVar(&cfg.web.EnableRemoteWriteReceiver)

a.Flag("web.remote-write-receiver.protobuf-types", fmt.Sprintf("List of accepted remote write 2.0 content types to advertise to senders, ordered by the preference. Note that the final decision is on the sender. Supported list values: %v", config.DefaultRemoteWriteProtoTypes.String())).
Default(config.DefaultRemoteWriteProtoTypes.Strings()...).SetValue(rwProtoTypeFlagValue(&cfg.web.RemoteWriteReceiverProtoTypes))

a.Flag("web.remote-write-receiver.compressions", fmt.Sprintf("List of accepted remote write 2.0 content encodings (compressions) to advertise to senders, ordered by the preference. Note that the final decision is on the sender. Supported list values: %v", config.DefaultRemoteWriteCompressions.String())).
Default(config.DefaultRemoteWriteCompressions.Strings()...).SetValue(rwCompressionFlagValue(&cfg.web.RemoteWriteReceiverCompressions))

Comment on lines 312 to +320
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can make a case for moving all of this, including the flag to enable the receiver, to the config file

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, totally possible. But it's hard to do in our feature - it's too many changes.

Perhaps NOT exposing any flags for now and adding TODO + issue would be fine for now?

a.Flag("web.console.templates", "Path to the console template directory, available at /consoles.").
Default("consoles").StringVar(&cfg.web.ConsoleTemplatesPath)

Expand Down Expand Up @@ -455,9 +459,6 @@
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
Default("").StringsVar(&cfg.featureList)

a.Flag("remote-write-format", "remote write proto format to use, valid options: 0 (1.0), 1 (reduced format), 3 (min64 format)").
Default("0").IntVar(&cfg.rwFormat)

promlogflag.AddFlags(a, &cfg.promlogConfig)

a.Flag("write-documentation", "Generate command line documentation. Internal use.").Hidden().Action(func(ctx *kingpin.ParseContext) error {
Expand Down Expand Up @@ -820,7 +821,6 @@
cfg.web.Flags[f.Name] = f.Value.String()
}

cfg.web.RemoteWriteFormat = config.RemoteWriteFormat(cfg.rwFormat)
// Depends on cfg.web.ScrapeManager so needs to be after cfg.web.ScrapeManager = scrapeManager.
webHandler := web.New(log.With(logger, "component", "web"), &cfg.web)

Expand Down Expand Up @@ -1737,3 +1737,63 @@
Run() error
SyncCh() <-chan map[string][]*targetgroup.Group
}

// TODO(bwplotka): Add unit test.
type rwProtoTypeFlagParser struct {
types *[]config.RemoteWriteProtoType
}

func rwProtoTypeFlagValue(types *[]config.RemoteWriteProtoType) kingpin.Value {
return &rwProtoTypeFlagParser{types: types}
}

func (p *rwProtoTypeFlagParser) IsCumulative() bool {
return true
}

func (p *rwProtoTypeFlagParser) String() string {
ss := make([]string, 0, len(*p.types))
for _, t := range *p.types {
ss = append(ss, string(t))
}
return strings.Join(ss, ",")
}

func (p *rwProtoTypeFlagParser) Set(opt string) error {
t := config.RemoteWriteProtoType(opt)
if err := t.Validate(); err != nil {
return err
}
*p.types = append(*p.types, t)
return nil
}

// TODO(bwplotka): Add unit test.
type rwCompressionFlagParser struct {
types *[]config.RemoteWriteCompression
}

func rwCompressionFlagValue(types *[]config.RemoteWriteCompression) kingpin.Value {
return &rwCompressionFlagParser{types: types}
}

func (p *rwCompressionFlagParser) IsCumulative() bool {
return true
}

func (p *rwCompressionFlagParser) String() string {
ss := make([]string, 0, len(*p.types))
for _, t := range *p.types {
ss = append(ss, string(t))
}
return strings.Join(ss, ",")
}

func (p *rwCompressionFlagParser) Set(opt string) error {
t := config.RemoteWriteCompression(opt)
if err := t.Validate(); err != nil {
return err
}
*p.types = append(*p.types, t)
return nil
}
41 changes: 27 additions & 14 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,10 @@ var (

// DefaultRemoteWriteConfig is the default remote write configuration.
DefaultRemoteWriteConfig = RemoteWriteConfig{
RemoteTimeout: model.Duration(30 * time.Second),
RemoteTimeout: model.Duration(30 * time.Second),
ProtobufTypes: DefaultRemoteWriteProtoTypes,
Compressions: DefaultRemoteWriteCompressions,

QueueConfig: DefaultQueueConfig,
MetadataConfig: DefaultMetadataConfig,
HTTPClientConfig: config.DefaultHTTPClientConfig,
Expand Down Expand Up @@ -1025,22 +1028,18 @@ func CheckTargetAddress(address model.LabelValue) error {
return nil
}

// This needs to live here rather than in the remote package to avoid an import cycle.
type RemoteWriteFormat int64

// RemoteWriteConfig is the configuration for writing to remote storage.
type RemoteWriteConfig struct {
URL *config.URL `yaml:"url"`
RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"`
Headers map[string]string `yaml:"headers,omitempty"`
WriteRelabelConfigs []*relabel.Config `yaml:"write_relabel_configs,omitempty"`
Name string `yaml:"name,omitempty"`
SendExemplars bool `yaml:"send_exemplars,omitempty"`
SendNativeHistograms bool `yaml:"send_native_histograms,omitempty"`
ProtocolVersion RemoteWriteFormat `yaml:"remote_write_version,omitempty"`
URL *config.URL `yaml:"url"`
RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"`
Headers map[string]string `yaml:"headers,omitempty"`
WriteRelabelConfigs []*relabel.Config `yaml:"write_relabel_configs,omitempty"`
Name string `yaml:"name,omitempty"`
SendExemplars bool `yaml:"send_exemplars,omitempty"`
SendNativeHistograms bool `yaml:"send_native_histograms,omitempty"`
ProtobufTypes []RemoteWriteProtoType `yaml:"proto_types,omitempty"`
Compressions []RemoteWriteCompression `yaml:"compressions,omitempty"`

// We cannot do proper Go type embedding below as the parser will then parse
// values arbitrarily into the overflow maps of further-down types.
HTTPClientConfig config.HTTPClientConfig `yaml:",inline"`
QueueConfig QueueConfig `yaml:"queue_config,omitempty"`
MetadataConfig MetadataConfig `yaml:"metadata_config,omitempty"`
Expand Down Expand Up @@ -1072,6 +1071,20 @@ func (c *RemoteWriteConfig) UnmarshalYAML(unmarshal func(interface{}) error) err
return err
}

if c.ProtobufTypes == nil {
c.ProtobufTypes = DefaultRemoteWriteProtoTypes
}
if err := validateRemoteWriteProtoTypes(c.ProtobufTypes); err != nil {
return fmt.Errorf("invalid protobuf_types value: %w", err)
}

if c.Compressions == nil {
c.Compressions = DefaultRemoteWriteCompressions
}
if err := validateRemoteWriteCompressions(c.Compressions); err != nil {
return fmt.Errorf("invalid compressions value: %w", err)
}

// The UnmarshalYAML method of HTTPClientConfig is not being called because it's not a pointer.
// We cannot make it a pointer as the parser panics for inlined pointer structs.
// Thus we just do its validation here.
Expand Down
162 changes: 162 additions & 0 deletions config/config_remote_write.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package config

import (
"errors"
"fmt"
"sort"
"strings"
)

// TODO(bwplotka): Consider an util for "preference enums" as it's similar code for rw compression, proto type and scrape protocol.

// RemoteWriteProtoType represents the supported protobuf types for the remote write.
type RemoteWriteProtoType string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProtoType feels awkward, can we call it RemoteWriteProtoVersion ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried ProtoMessages, ProtoVersion etc but they might be worse options.

ProtoVersion feels like Protocol Version, so spec version? But it represent protobuf type, to in short prototype, not protocol version and I want to make it explicit. Or do you think ProtoVersion would not mislead?

Perhaps RemoteWriteProtobufType would be better?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RemoteWriteProtobufType or RemoteWriteProtobufVersion, or maybe RemoteWriteWireType/Format/Version since we've discussed implementing and allow usage of something like Arrow in the future?


// Validate returns error if the given protobuf type is not supported.
func (s RemoteWriteProtoType) Validate() error {
if _, ok := RemoteWriteContentTypeHeaders[s]; !ok {
return fmt.Errorf("unknown remote write protobuf type %v, supported: %v",
s, func() (ret []string) {
for k := range RemoteWriteContentTypeHeaders {
ret = append(ret, string(k))
}
sort.Strings(ret)
return ret
}())
}
return nil
}

type RemoteWriteProtoTypes []RemoteWriteProtoType

func (t RemoteWriteProtoTypes) Strings() []string {
ret := make([]string, 0, len(t))
for _, typ := range t {
ret = append(ret, string(typ))
}
return ret
}

func (t RemoteWriteProtoTypes) String() string {
return strings.Join(t.Strings(), ",")
}

// ServerAcceptHeaderValue returns server Accept header value for
// given list of proto types as per RFC 9110 https://www.rfc-editor.org/rfc/rfc9110.html#section-12.5.1-14
func (t RemoteWriteProtoTypes) ServerAcceptHeaderValue() string {
// TODO(bwplotka): Consider implementing an optional quality factor.
ret := make([]string, 0, len(t))
for _, typ := range t {
ret = append(ret, RemoteWriteContentTypeHeaders[typ])
}
return strings.Join(ret, ",")
}

var (
RemoteWriteProtoTypeV1 RemoteWriteProtoType = "v1.WriteRequest"
RemoteWriteProtoTypeV2 RemoteWriteProtoType = "v2.WriteRequest"
RemoteWriteContentTypeHeaders = map[RemoteWriteProtoType]string{
RemoteWriteProtoTypeV1: "application/x-protobuf", // Also application/x-protobuf; proto=prometheus.WriteRequest but simplified for compatibility with 1.x spec.
RemoteWriteProtoTypeV2: "application/x-protobuf;proto=io.prometheus.remote.write.v2.WriteRequest",
}

// DefaultRemoteWriteProtoTypes is the set of remote write protobuf types that will be
// preferred by the remote write client.
DefaultRemoteWriteProtoTypes = RemoteWriteProtoTypes{
RemoteWriteProtoTypeV1,
RemoteWriteProtoTypeV2,
}
)

// validateRemoteWriteProtoTypes return errors if we see problems with rw protobuf types in
// the Prometheus configuration.
func validateRemoteWriteProtoTypes(ts []RemoteWriteProtoType) error {
if len(ts) == 0 {
return errors.New("protobuf_types cannot be empty")
}
dups := map[string]struct{}{}
for _, t := range ts {
if _, ok := dups[strings.ToLower(string(t))]; ok {
return fmt.Errorf("duplicated protobuf types in protobuf_types, got %v", ts)
}
if err := t.Validate(); err != nil {
return fmt.Errorf("protobuf_types: %w", err)
}
dups[strings.ToLower(string(t))] = struct{}{}
}
return nil
}

// RemoteWriteCompression represents the supported compressions for the remote write.
type RemoteWriteCompression string

// Validate returns error if the given protobuf type is not supported.
func (s RemoteWriteCompression) Validate() error {
if _, ok := RemoteWriteContentEncodingHeaders[s]; !ok {
return fmt.Errorf("unknown remote write protobuf type %v, supported: %v",
s, func() (ret []string) {
for k := range RemoteWriteContentEncodingHeaders {
ret = append(ret, string(k))
}
sort.Strings(ret)
return ret
}())
}
return nil
}

type RemoteWriteCompressions []RemoteWriteCompression

func (cs RemoteWriteCompressions) Strings() []string {
ret := make([]string, 0, len(cs))
for _, c := range cs {
ret = append(ret, string(c))
}
return ret
}

func (cs RemoteWriteCompressions) String() string {
return strings.Join(cs.Strings(), ",")
}

// ServerAcceptEncodingHeaderValue returns server Accept-Encoding header value for
// given list of compressions as per RFC 9110 https://www.rfc-editor.org/rfc/rfc9110.html#name-accept-encoding
func (cs RemoteWriteCompressions) ServerAcceptEncodingHeaderValue() string {
// TODO(bwplotka): Consider implementing an optional quality factor.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by not doing so yet we're just interpreting the order as the preference ordering, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, which is part of RFC 9110 spec, q is optional and allow to tell "weight" of the ordering.

ret := make([]string, 0, len(cs))
for _, typ := range cs {
ret = append(ret, RemoteWriteContentEncodingHeaders[typ])
}
return strings.Join(ret, ",")
}

// validateRemoteWriteCompressions return errors if we see problems with rw compressions in
// the Prometheus configuration.
func validateRemoteWriteCompressions(cs []RemoteWriteCompression) error {
if len(cs) == 0 {
return errors.New("compressions cannot be empty")
}
dups := map[string]struct{}{}
for _, c := range cs {
if _, ok := dups[strings.ToLower(string(c))]; ok {
return fmt.Errorf("duplicated compression in compressions, got %v", cs)
}
if err := c.Validate(); err != nil {
return fmt.Errorf("compressions: %w", err)
}
dups[strings.ToLower(string(c))] = struct{}{}
}
return nil
}

var (
RemoteWriteCompressionSnappy RemoteWriteCompression = "snappy"

RemoteWriteContentEncodingHeaders = map[RemoteWriteCompression]string{
RemoteWriteCompressionSnappy: "snappy",
}

DefaultRemoteWriteCompressions = RemoteWriteCompressions{
RemoteWriteCompressionSnappy,
}
)