Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add streaming remote read to ReadClient #11379

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 13 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ var (
// DefaultRemoteReadConfig is the default remote read configuration.
DefaultRemoteReadConfig = RemoteReadConfig{
RemoteTimeout: model.Duration(1 * time.Minute),
ChunkedReadLimit: DefaultChunkedReadLimit,
HTTPClientConfig: config.DefaultHTTPClientConfig,
FilterExternalLabels: true,
}
Expand Down Expand Up @@ -1152,13 +1153,20 @@ type MetadataConfig struct {
MaxSamplesPerSend int `yaml:"max_samples_per_send,omitempty"`
}

const (
// DefaultChunkedReadLimit is the default value for the maximum size of the protobuf frame client allows.
// 50MB is the default. This is equivalent to ~100k full XOR chunks and average labelset.
DefaultChunkedReadLimit = 5e+7
)

// RemoteReadConfig is the configuration for reading from remote storage.
type RemoteReadConfig struct {
URL *config.URL `yaml:"url"`
RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"`
Headers map[string]string `yaml:"headers,omitempty"`
ReadRecent bool `yaml:"read_recent,omitempty"`
Name string `yaml:"name,omitempty"`
URL *config.URL `yaml:"url"`
RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"`
ChunkedReadLimit uint64 `yaml:"chunked_read_limit,omitempty"`
Headers map[string]string `yaml:"headers,omitempty"`
ReadRecent bool `yaml:"read_recent,omitempty"`
Name string `yaml:"name,omitempty"`

// We cannot do proper Go type embedding below as the parser will then parse
// values arbitrarily into the overflow maps of further-down types.
Expand Down
10 changes: 6 additions & 4 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,11 @@ var expectedConf = &Config{

RemoteReadConfigs: []*RemoteReadConfig{
{
URL: mustParseURL("http://remote1/read"),
RemoteTimeout: model.Duration(1 * time.Minute),
ReadRecent: true,
Name: "default",
URL: mustParseURL("http://remote1/read"),
RemoteTimeout: model.Duration(1 * time.Minute),
ChunkedReadLimit: DefaultChunkedReadLimit,
ReadRecent: true,
Name: "default",
HTTPClientConfig: config.HTTPClientConfig{
FollowRedirects: true,
EnableHTTP2: false,
Expand All @@ -164,6 +165,7 @@ var expectedConf = &Config{
{
URL: mustParseURL("http://remote3/read"),
RemoteTimeout: model.Duration(1 * time.Minute),
ChunkedReadLimit: DefaultChunkedReadLimit,
ReadRecent: false,
Name: "read_special",
RequiredMatchers: model.LabelSet{"job": "special"},
Expand Down
4 changes: 0 additions & 4 deletions storage/remote/chunked.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ import (
"github.com/gogo/protobuf/proto"
)

// DefaultChunkedReadLimit is the default value for the maximum size of the protobuf frame client allows.
// 50MB is the default. This is equivalent to ~100k full XOR chunks and average labelset.
const DefaultChunkedReadLimit = 5e+7

// The table gets initialized with sync.Once but may still cause a race
// with any other use of the crc32 package anywhere. Thus we initialize it
// before.
Expand Down
114 changes: 81 additions & 33 deletions storage/remote/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -36,22 +37,30 @@ import (
"go.opentelemetry.io/otel/trace"

"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote/azuread"
)

const maxErrMsgLen = 1024

var UserAgent = fmt.Sprintf("Prometheus/%s", version.Version)
const (
maxErrMsgLen = 1024
)

var (
UserAgent = fmt.Sprintf("Prometheus/%s", version.Version)

AcceptedResponseTypes = []prompb.ReadRequest_ResponseType{
prompb.ReadRequest_STREAMED_XOR_CHUNKS,
prompb.ReadRequest_SAMPLES,
}

remoteReadQueriesTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "read_queries_total",
Help: "The total number of remote read queries.",
},
[]string{remoteName, endpoint, "code"},
[]string{remoteName, endpoint, "response_type", "code"},
)
remoteReadQueries = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -67,13 +76,13 @@ var (
Namespace: namespace,
Subsystem: subsystem,
Name: "read_request_duration_seconds",
Help: "Histogram of the latency for remote read requests.",
Help: "Histogram of the latency for remote read requests. Note that for streamed responses this is only the duration of the initial call and does not include the processing of the stream.",
Buckets: append(prometheus.DefBuckets, 25, 60),
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 1 * time.Hour,
},
[]string{remoteName, endpoint},
[]string{remoteName, endpoint, "response_type"},
)
)

Expand All @@ -89,10 +98,11 @@ type Client struct {
timeout time.Duration

retryOnRateLimit bool
chunkedReadLimit uint64

readQueries prometheus.Gauge
readQueriesTotal *prometheus.CounterVec
readQueriesDuration prometheus.Observer
readQueriesDuration prometheus.ObserverVec
}

// ClientConfig configures a client.
Expand All @@ -104,12 +114,13 @@ type ClientConfig struct {
AzureADConfig *azuread.AzureADConfig
Headers map[string]string
RetryOnRateLimit bool
ChunkedReadLimit uint64
}

// ReadClient uses the SAMPLES method of remote read to read series samples from remote server.
// TODO(bwplotka): Add streamed chunked remote read method as well (https://github.com/prometheus/prometheus/issues/5926).
// ReadClient will request the STREAMED_XOR_CHUNKS method of remote read but can
// also fall back to the SAMPLES method if necessary.
type ReadClient interface {
Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error)
Read(ctx context.Context, query *prompb.Query, sortSeries bool) (storage.SeriesSet, error)
}

// NewReadClient creates a new client for remote read.
Expand All @@ -130,9 +141,10 @@ func NewReadClient(name string, conf *ClientConfig) (ReadClient, error) {
urlString: conf.URL.String(),
Client: httpClient,
timeout: time.Duration(conf.Timeout),
chunkedReadLimit: conf.ChunkedReadLimit,
readQueries: remoteReadQueries.WithLabelValues(name, conf.URL.String()),
readQueriesTotal: remoteReadQueriesTotal.MustCurryWith(prometheus.Labels{remoteName: name, endpoint: conf.URL.String()}),
readQueriesDuration: remoteReadQueryDuration.WithLabelValues(name, conf.URL.String()),
readQueriesDuration: remoteReadQueryDuration.MustCurryWith(prometheus.Labels{remoteName: name, endpoint: conf.URL.String()}),
}, nil
}

Expand Down Expand Up @@ -227,8 +239,8 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int) error {
return RecoverableError{err, defaultBackoff}
}
defer func() {
io.Copy(io.Discard, httpResp.Body)
httpResp.Body.Close()
_, _ = io.Copy(io.Discard, httpResp.Body)
_ = httpResp.Body.Close()
}()

if httpResp.StatusCode/100 != 2 {
Expand Down Expand Up @@ -263,26 +275,26 @@ func retryAfterDuration(t string) model.Duration {
}

// Name uniquely identifies the client.
func (c Client) Name() string {
func (c *Client) Name() string {
return c.remoteName
}

// Endpoint is the remote read or write endpoint.
func (c Client) Endpoint() string {
func (c *Client) Endpoint() string {
return c.urlString
}

// Read reads from a remote endpoint.
func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) {
// Read reads from a remote endpoint. The sortSeries parameter is only respected in the case of a sampled response;
// chunked responses arrive already sorted by the server.
func (c *Client) Read(ctx context.Context, query *prompb.Query, sortSeries bool) (storage.SeriesSet, error) {
c.readQueries.Inc()
defer c.readQueries.Dec()

req := &prompb.ReadRequest{
// TODO: Support batching multiple queries into one read request,
// as the protobuf interface allows for it.
Queries: []*prompb.Query{
query,
},
Queries: []*prompb.Query{query},
AcceptedResponseTypes: AcceptedResponseTypes,
}
data, err := proto.Marshal(req)
if err != nil {
Expand All @@ -301,31 +313,64 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryRe
httpReq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0")

ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()

ctx, span := otel.Tracer("").Start(ctx, "Remote Read", trace.WithSpanKind(trace.SpanKindClient))
defer span.End()

start := time.Now()
httpResp, err := c.Client.Do(httpReq.WithContext(ctx))
if err != nil {
cancel()
return nil, fmt.Errorf("error sending request: %w", err)
}
defer func() {
io.Copy(io.Discard, httpResp.Body)
httpResp.Body.Close()
}()
c.readQueriesDuration.Observe(time.Since(start).Seconds())
c.readQueriesTotal.WithLabelValues(strconv.Itoa(httpResp.StatusCode)).Inc()

compressed, err = io.ReadAll(httpResp.Body)
if err != nil {
return nil, fmt.Errorf("error reading response. HTTP status code: %s: %w", httpResp.Status, err)
if httpResp.StatusCode/100 != 2 {
// Make an attempt at getting an error message.
body, _ := io.ReadAll(httpResp.Body)
_ = httpResp.Body.Close()

cancel()
return nil, fmt.Errorf("remote server %s returned http status %s: %s", c.urlString, httpResp.Status, string(body))
}

if httpResp.StatusCode/100 != 2 {
return nil, fmt.Errorf("remote server %s returned HTTP status %s: %s", c.urlString, httpResp.Status, strings.TrimSpace(string(compressed)))
contentType := httpResp.Header.Get("Content-Type")

switch {
case strings.HasPrefix(contentType, "application/x-protobuf"):
c.readQueriesDuration.WithLabelValues("sampled").Observe(time.Since(start).Seconds())
c.readQueriesTotal.WithLabelValues("sampled", strconv.Itoa(httpResp.StatusCode)).Inc()
ss, err := c.handleSampledResponse(req, httpResp, sortSeries)
cancel()
return ss, err
case strings.HasPrefix(contentType, "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse"):
c.readQueriesDuration.WithLabelValues("chunked").Observe(time.Since(start).Seconds())
bwplotka marked this conversation as resolved.
Show resolved Hide resolved

s := NewChunkedReader(httpResp.Body, c.chunkedReadLimit, nil)
return NewChunkedSeriesSet(s, httpResp.Body, query.StartTimestampMs, query.EndTimestampMs, func(err error) {
code := strconv.Itoa(httpResp.StatusCode)
if !errors.Is(err, io.EOF) {
code = "aborted_stream"
}
c.readQueriesTotal.WithLabelValues("chunked", code).Inc()
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
cancel()
}), nil
default:
c.readQueriesDuration.WithLabelValues("unsupported").Observe(time.Since(start).Seconds())
c.readQueriesTotal.WithLabelValues("unsupported", strconv.Itoa(httpResp.StatusCode)).Inc()
cancel()
return nil, fmt.Errorf("unsupported content type: %s", contentType)
}
}

func (c *Client) handleSampledResponse(req *prompb.ReadRequest, httpResp *http.Response, sortSeries bool) (storage.SeriesSet, error) {
compressed, err := io.ReadAll(httpResp.Body)
if err != nil {
return nil, fmt.Errorf("error reading response. HTTP status code: %s: %w", httpResp.Status, err)
}
defer func() {
_, _ = io.Copy(io.Discard, httpResp.Body)
_ = httpResp.Body.Close()
}()

uncompressed, err := snappy.Decode(nil, compressed)
if err != nil {
Expand All @@ -342,5 +387,8 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryRe
return nil, fmt.Errorf("responses: want %d, got %d", len(req.Queries), len(resp.Results))
}

return resp.Results[0], nil
// This client does not batch queries so there's always only 1 result.
res := resp.Results[0]

return FromQueryResult(sortSeries, res), nil
}