Skip to content

Commit

Permalink
Remote read instrumentation tweaks
Browse files Browse the repository at this point in the history
Signed-off-by: Justin Lei <lei.justin@gmail.com>
  • Loading branch information
leizor committed Jul 18, 2023
1 parent 12a91cf commit d082eec
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 10 deletions.
18 changes: 14 additions & 4 deletions storage/remote/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ var (
Namespace: namespace,
Subsystem: subsystem,
Name: "read_request_duration_seconds",
Help: "Histogram of the latency for remote read requests.",
Help: "Histogram of the latency for remote read requests. Note that for streamed responses this is only the duration of the initial call and does not include the processing of the stream.",
Buckets: append(prometheus.DefBuckets, 25, 60),
},
[]string{remoteName, endpoint, "response_type"},
Expand Down Expand Up @@ -333,9 +333,19 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query, sortSeries bool)
return c.handleSampledResponse(req, httpResp, sortSeries)
case strings.HasPrefix(contentType, "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse"):
c.readQueriesDuration.WithLabelValues("chunked").Observe(time.Since(start).Seconds())
c.readQueriesTotal.WithLabelValues("chunked", strconv.Itoa(httpResp.StatusCode)).Inc()
ss := c.handleChunkedResponse(httpResp, query.StartTimestampMs, query.EndTimestampMs, cancel)
// We copy cancel here so we can nil out the original and prevent the context from being cancelled as soon as
// Read returns.
cancelCopy := cancel
cancel = nil

ss := c.handleChunkedResponse(httpResp, query.StartTimestampMs, query.EndTimestampMs, func(err error) {
code := strconv.Itoa(httpResp.StatusCode)
if err != io.EOF {
code = "aborted_stream"
}
c.readQueriesTotal.WithLabelValues("chunked", code).Inc()
cancelCopy()
})
return ss, nil
default:
c.readQueriesDuration.WithLabelValues("unsupported").Observe(time.Since(start).Seconds())
Expand Down Expand Up @@ -375,7 +385,7 @@ func (c *Client) handleSampledResponse(req *prompb.ReadRequest, httpResp *http.R
return FromQueryResult(sortSeries, res), nil
}

func (c *Client) handleChunkedResponse(httpResp *http.Response, mint, maxt int64, cancel context.CancelFunc) storage.SeriesSet {
func (c *Client) handleChunkedResponse(httpResp *http.Response, mint, maxt int64, cancel func(error)) storage.SeriesSet {
s := NewChunkedReader(httpResp.Body, c.chunkedReadLimit, nil)
return NewChunkedSeriesSet(s, httpResp.Body, mint, maxt, cancel)
}
7 changes: 3 additions & 4 deletions storage/remote/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package remote

import (
"context"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -547,13 +546,13 @@ type chunkedSeriesSet struct {
chunkedReader *ChunkedReader
respBody io.ReadCloser
mint, maxt int64
cancel context.CancelFunc
cancel func(error)

current storage.Series
err error
}

func NewChunkedSeriesSet(chunkedReader *ChunkedReader, respBody io.ReadCloser, mint, maxt int64, cancel context.CancelFunc) storage.SeriesSet {
func NewChunkedSeriesSet(chunkedReader *ChunkedReader, respBody io.ReadCloser, mint, maxt int64, cancel func(error)) storage.SeriesSet {
return &chunkedSeriesSet{
chunkedReader: chunkedReader,
respBody: respBody,
Expand All @@ -576,7 +575,7 @@ func (s *chunkedSeriesSet) Next() bool {
}

_ = s.respBody.Close()
s.cancel()
s.cancel(err)

return false
}
Expand Down
4 changes: 2 additions & 2 deletions storage/remote/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,7 @@ func TestChunkedSeriesSet(t *testing.T) {
require.NoError(t, err)
}

ss := NewChunkedSeriesSet(r, io.NopCloser(buf), 0, 14000, func() {})
ss := NewChunkedSeriesSet(r, io.NopCloser(buf), 0, 14000, func(error) {})
require.Nil(t, ss.Err())
require.Nil(t, ss.Warnings())

Expand Down Expand Up @@ -1060,7 +1060,7 @@ func TestChunkedSeriesSet(t *testing.T) {
require.NoError(t, err)
}

ss := NewChunkedSeriesSet(r, io.NopCloser(buf), 0, 14000, func() {})
ss := NewChunkedSeriesSet(r, io.NopCloser(buf), 0, 14000, func(error) {})
require.Nil(t, ss.Err())
require.Nil(t, ss.Warnings())

Expand Down

0 comments on commit d082eec

Please sign in to comment.