-
Notifications
You must be signed in to change notification settings - Fork 8.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
added streaming remote read client side functionality #7354
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This actually looks quite amazing (: I suggested couple of things but with some more work this will work well 💪 Thanks!
storage/remote/client.go
Outdated
@@ -168,8 +172,7 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryRe | |||
httpReq.Header.Set("User-Agent", userAgent) | |||
httpReq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0") | |||
|
|||
ctx, cancel := context.WithTimeout(ctx, c.timeout) | |||
defer cancel() | |||
httpResp, err := c.client.Do(httpReq) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we use request before we put context into it? (:
e1a435f
to
88e34ba
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you asumption is right, I commented where you can find the answer (just giving you an answer would not teach you anything ❤️ )
One suggestion: Please do not leave dead or unfinished code you don't want to merge in a PR. Add your comments as GitHub comment. Otherwise it's hard to review something which is .. not ready. So there is nothing to approve / review really (: Making sure you keep PR reviewable / ready to go even if blocked, makes sure you will get it merged sooner! (:
I think assumption of @wwformat was totally alright. Did you try that before reaching me on slack? 😉
Sorry I didn't try @wwformat suggestion before reaching you on slack because I thought using |
Actively working on the fix and some issues with |
752c227
to
81ba12f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow!
This is actually high-quality code now 💪 Thanks for this. 🎉
I am approving, but let's merge when last comments will be addressed 🤗
81ba12f
to
dfaac89
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. PTAL.
|
||
if httpResp.StatusCode/100 != 2 { | ||
io.Copy(ioutil.Discard, httpResp.Body) | ||
httpResp.Body.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hope this is what you meant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not having:
defer func() {
io.Copy(ioutil.Discard, httpResp.Body)
httpResp.Body.Close()
}()
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really sorry Bartek. Missed the notification.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still not addressed (:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still not addressed, in case below with unsupported contentType the response body will be not closed/exhausted leaving the connection alive without good reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To solve the problem with the unsupported contentType, is the additional line here enough to solve the issue?
if !strings.HasPrefix(contentType, "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse") {
--> httpResp.Body.Close()
return nil, errors.Errorf("not supported remote read content type: %s", contentType)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is one easy fix here. Just defer all (exhaust + close body) once (: and that's it. And don't double close in handleStreamedResponse
(: WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I considered the above option but won't it be incorrect?
for eg: We're going to return return &chunkedResponseSeriesSet{stream: stream, httpResp: httpResp}, nil
in StartRead()
and thus if we have
defer func() {
io.Copy(ioutil.Discard, httpResp.Body)
httpResp.Body.Close()
}()
before the return &chunkedResponseSeriesSet{stream: stream, httpResp: httpResp}, nil
we will end up closing the http
connection that is needed for stream.NextProto(res)
.
From my understanding, the most straightforward option seems to be to close the HttpRest
individually/separately depending on the use case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well there is better way. You are right that we cannot do it for the case when we return iterator. So in this case I would do this.:
defer func() {
if err != nil {
io.Copy(ioutil.Discard, httpResp.Body)
httpResp.Body.Close()
}
}()
Then make sure the function returns (<something>, err error)
and it will only close on error (:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you mean return streamedResponse, streamedResponse.err
This makes sense to me. (I have a small clarifying question though)
But, I can see that won't be returning an error when we see an io.EOF
in the chunkedResponseSeriesSet
Next()
. The connection needs to be closed in this case too, right?
Should the Next()
be replaced from
if err := c.stream.NextProto(res); err != nil {
if err != io.EOF {
c.err = err
}
return false
}
Replaced to
if err := c.stream.NextProto(res); err != nil {
c.err = err
return false
}
This above approach produces some errors like:
api_test.go:1784: Unexpected error: execution: EOF
Or is this the best version:
if err := c.stream.NextProto(res); err != nil {
httpResp.Body.Close()
if err != io.EOF {
c.err = err
}
return false
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some important comments not addressed, and still, there is a way this code will leak resources.
):
But super close!
|
||
if httpResp.StatusCode/100 != 2 { | ||
io.Copy(ioutil.Discard, httpResp.Body) | ||
httpResp.Body.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not having:
defer func() {
io.Copy(ioutil.Discard, httpResp.Body)
httpResp.Body.Close()
}()
here?
32a4803
to
e6b4942
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stil potential leak place let's fix (:
storage/remote/client.go
Outdated
} | ||
|
||
if httpResp.StatusCode/100 != 2 { | ||
defer func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this does not help.
Please look on this function and think about what are the cases when we need to close the body and exhaust? I think you are missing one.
I think because we run functions it's totally ok to defer this in all cases JUST after we get response. and let's make sure nothing closes twice inside functions WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking a look at my code Bartek.
I think you are referring to closing the httpRest.Body
over here:
func (c *client) handleStreamedResponse(httpResp *http.Response) (storage.SeriesSet, error) {
stream := NewChunkedReader(httpResp.Body, DefaultChunkedReadLimit, nil)
return &chunkedResponseSeriesSet{stream: stream}, nil
}
So I have to close()
the bufio.Reader
in ChunkedReader
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PLTA! :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggested something here: #7354 (comment)
5b62052
to
b846c93
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see two things missing. Otherwise LGTM!
b846c93
to
b766a76
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👋
Let's get back to this if you want (: Added a few comments, otherwise good 👍 Also some old comments are still not addressed (again). Just letting you know that it's natural that if maintainers see that their comments are not addressed (nothing done, nothing responded, nothing adjusted, just ignored) it might mean that ther time is being wasted. If you wish to contribute and have PR merged fasted, you need to keep attention to details like this. (:
|
||
if httpResp.StatusCode/100 != 2 { | ||
io.Copy(ioutil.Discard, httpResp.Body) | ||
httpResp.Body.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still not addressed, in case below with unsupported contentType the response body will be not closed/exhausted leaving the connection alive without good reason.
storage/remote/client.go
Outdated
|
||
return resp.Results[0], nil | ||
func (c *Client) handleStreamedResponse(httpResp *http.Response) (storage.SeriesSet, error) { | ||
stream := NewChunkedReader(httpResp.Body, DefaultChunkedReadLimit, nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method might be too shallow I would just inline this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also it's ok to say that you are busy and cannot complete this PR. Let's just close it and allow others to finish this if they have time 👍 (:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey Bartek, I have some small queries. Can you please check them out when you're available? :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, responded for missed comment, sorry for missing it.
6dffabb
to
bd27683
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, just few comments and LGTM! (: Thanks for patience 👍
@@ -229,3 +234,160 @@ func TestMergeLabels(t *testing.T) { | |||
testutil.Equals(t, tc.expected, MergeLabels(tc.primary, tc.secondary)) | |||
} | |||
} | |||
|
|||
type TestSample struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type TestSample struct { | |
type sample struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not possible as it collides with this
prometheus/storage/remote/queue_manager.go
Line 650 in 90fc6be
type sample struct { |
Anyway, I have replaced it to
testSample
Can you also resolve comments which are addressed to keep this PR discussion clear? |
Tests are definitely failing because of this PR. I can see the endpoint tests are failing with |
00a6743
to
e453ffd
Compare
e453ffd
to
fd23df5
Compare
fd23df5
to
0850f62
Compare
Signed-off-by: Sudharshann D <sudhar287@gmail.com>
0850f62
to
c13c652
Compare
This has been idle for a couple of months now, and @wwformat wants to continue work on it in #8351 - I suggest we close this and continue there? |
We have looked at this pull request during our bug scrub. This is now a duplicate of #11379. Thank you for your contribution. |
For #5926
Thank you to @YaoZengzeng for his initial design. I've implemented his comments from here.
FYI @bwplotka