-
Notifications
You must be signed in to change notification settings - Fork 8.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
added streaming remote read client side functionality (fork) #8351
Conversation
@bwplotka please review it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for resurrecting this @wwformat! Bunch of minor nits, nothing major.
I moved the remote read server code into the remote package in #8536, so I think this might need rebasing against that. It might also make writing an e2e test easier?
storage/remote/client.go
Outdated
@@ -99,7 +100,7 @@ type ClientConfig struct { | |||
// ReadClient uses the SAMPLES method of remote read to read series samples from remote server. | |||
// TODO(bwplotka): Add streamed chunked remote read method as well (https://github.com/prometheus/prometheus/issues/5926). | |||
type ReadClient interface { | |||
Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) | |||
StartRead(ctx context.Context, query *prompb.Query, sortSeries bool) (storage.SeriesSet, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to rename the method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think StartRead
is asynchronous method name, it imply that the result storage.SeriesSet
must be handled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do this all the time, we have method Query and Select
that gives SeriesSet so it might be ok to have just Read
for simplicity and consistency (:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
storage/remote/client.go
Outdated
@@ -99,7 +100,7 @@ type ClientConfig struct { | |||
// ReadClient uses the SAMPLES method of remote read to read series samples from remote server. | |||
// TODO(bwplotka): Add streamed chunked remote read method as well (https://github.com/prometheus/prometheus/issues/5926). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment can be removed now.
if httpResp.StatusCode/100 != 2 { | ||
return nil, errors.Errorf("remote server %s returned HTTP status %s: %s", c.url.String(), httpResp.Status, strings.TrimSpace(string(compressed))) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure removing this is correct - for the sample response path, we never seen to check the status code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How? We still have it removed (:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved this code to Client.Read
to handle sample/stream response path, like:
start := time.Now()
httpResp, err := c.Client.Do(httpReq)
if err != nil {
return nil, errors.Wrap(err, "error sending request")
}
if httpResp.StatusCode/100 != 2 {
httpResp.Body.Close()
return nil, errors.Errorf("remote server %s returned HTTP status %s: %s", c.url.String(), httpResp.Status, strings.TrimSpace(string(compressed)))
}
storage/remote/codec_test.go
Outdated
"github.com/prometheus/prometheus/pkg/labels" | ||
"github.com/prometheus/prometheus/pkg/textparse" | ||
"github.com/prometheus/prometheus/prompb" | ||
"github.com/prometheus/prometheus/storage" | ||
"github.com/prometheus/prometheus/tsdb/tsdbutil" | ||
"github.com/stretchr/testify/require" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We try and keep non-prometheus/prometheus imports in a separate block.
8a632c4
to
7587a51
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! LGTM overall, just minor suggestions.
Sorry for the massive lag on review. Would you like to fix the last nits?
storage/remote/client.go
Outdated
@@ -99,7 +100,7 @@ type ClientConfig struct { | |||
// ReadClient uses the SAMPLES method of remote read to read series samples from remote server. | |||
// TODO(bwplotka): Add streamed chunked remote read method as well (https://github.com/prometheus/prometheus/issues/5926). | |||
type ReadClient interface { | |||
Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) | |||
StartRead(ctx context.Context, query *prompb.Query, sortSeries bool) (storage.SeriesSet, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do this all the time, we have method Query and Select
that gives SeriesSet so it might be ok to have just Read
for simplicity and consistency (:
storage/remote/client.go
Outdated
return nil, errors.Errorf("not supported remote read content type: %s", contentType) | ||
} | ||
|
||
// Handling streamed response |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's have comments a full sentence 🤗
// Handling streamed response | |
// Handling streamed response. |
if httpResp.StatusCode/100 != 2 { | ||
return nil, errors.Errorf("remote server %s returned HTTP status %s: %s", c.url.String(), httpResp.Status, strings.TrimSpace(string(compressed))) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How? We still have it removed (:
Hi there. I have a suggestion for refactoring the complete The API looks as follows:
The implementation is done, but I have to create tests for it. Since this would most probably replace 100% of the code of this PR, I would like all of the participants to decide if this approach or the one of PR is preferred. Best regards, Christian |
Signed-off-by: Yunchuan Wen <yunchuanwen@gmail.com>
What is the status of this PR? It seems like in the current state it was good-enough to be merged. Is there anything we can do to help? |
We're also looking forward to this feature. Any updates? |
I think both #8351 and #11379 are not good enough.
if response type is sampled, it's ok to return storage.SeriesSet. prometheus/storage/remote/read.go Line 232 in 93e451c
the storage.SeriesSet is compressed to return storage.ChunkSeriesSet. So the chunked data firstly are decompressed and then compressed if storage.ChunkQuerier.Select is invoked. I think it depends on the response type to return storage.SeriesSet or storage.ChunkedSeriesSet. |
The origin pull request(#7354) is already stale, so I fork it and try to finish it.
Fixes #5926