Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't use sync.Pool for json.NewEncoder target buffers #421

Merged
merged 2 commits into from Sep 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion client.go
Expand Up @@ -833,7 +833,6 @@ func (c *Client) GetClient() *http.Client {
// Executes method executes the given `Request` object and returns response
// error.
func (c *Client) execute(req *Request) (*Response, error) {
defer releaseBuffer(req.bodyBuf)
// Apply Request middleware
var err error

Expand Down Expand Up @@ -867,6 +866,8 @@ func (c *Client) execute(req *Request) (*Response, error) {
return nil, wrapNoRetryErr(err)
}

req.RawRequest.Body = newRequestBodyReleaser(req.RawRequest.Body, req.bodyBuf)

req.Time = time.Now()
resp, err := c.httpClient.Do(req.RawRequest)

Expand Down
2 changes: 1 addition & 1 deletion middleware.go
Expand Up @@ -458,7 +458,7 @@ func handleRequestBody(c *Client, r *Request) (err error) {
bodyBytes = []byte(s)
} else if IsJSONType(contentType) &&
(kind == reflect.Struct || kind == reflect.Map || kind == reflect.Slice) {
bodyBytes, err = jsonMarshal(c, r, r.Body)
r.bodyBuf, err = jsonMarshal(c, r, r.Body)
if err != nil {
return
}
Expand Down
11 changes: 7 additions & 4 deletions request.go
Expand Up @@ -838,11 +838,14 @@ func (r *Request) initValuesMap() {
}
}

var noescapeJSONMarshal = func(v interface{}) ([]byte, error) {
var noescapeJSONMarshal = func(v interface{}) (*bytes.Buffer, error) {
buf := acquireBuffer()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pborzenkov do you mean usage of sync.Pool with json.NewEncoder has an issue at language level?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeevatkm No. What I mean is that bytes.Buffer is returned back to sync.Pool before noescapeJSONMarshal returns (due to defer). Yet the underlying byte slice is used later, as bytes.Buffer.Bytes() doesn't copy the data.

So a concurrently running goroutine can get the very same bytes.Buffer from the pool and overwrite its contents before the data produced by json.NewEncoder is actually used.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pborzenkov Thanks. My understanding is defer executes after the return statement executed. Maybe I have to read, try it out, and then get back to you.
https://golang.org/ref/spec#Defer_statements

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeevatkm

Yes, it will execute after the return. The problem is that return buf.Bytes() does not produce a copy of the data, it return the underlying slice. And since the buf is returned back to the pool it can easily get reused.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pborzenkov I think I missed this part "doesn't copy the data" from your previous response. I want to keep the sync.Pool usage instead of creating new. It's better to restructure the method flow to make use of the pool properly.
Thanks for bringing it up 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeevatkm

Yeah, that's probably a good idea. Thanks!

BTW, there is another problem with how sync.Pool is used. When a request is made by the execute method, there is defer that releases request body back to the pool:

// Executes method executes the given `Request` object and returns response
// error.
func (c *Client) execute(req *Request) (*Response, error) {
	defer releaseBuffer(req.bodyBuf)
	// Apply Request middleware

But since on the first entry to this function (e.g. not a request retry) the bodyBuf is nil, the call to releaseBuffer is basically a no-op. The trivial fix would be to do it like this:

// Executes method executes the given `Request` object and returns response
// error.
func (c *Client) execute(req *Request) (*Response, error) {
	defer func() {
		releaseBuffer(req.bodyBuf)
	}()
	// Apply Request middleware

But this is probably racy, as RoundTripper may hold the body even after http.Client.Do returns:

    // RoundTrip should not modify the request, except for
    // consuming and closing the Request's Body. RoundTrip may
    // read fields of the request in a separate goroutine. Callers
    // should not mutate or reuse the request until the Response's
    // Body has been closed.
    //
    // RoundTrip must always close the body, including on errors,
    // but depending on the implementation may do so in a separate
    // goroutine even after RoundTrip returns. This means that
    // callers wanting to reuse the body for subsequent requests
    // must arrange to wait for the Close call before doing so.

So the correct fix is to wrap Response.Body and release the request buffer whenever the response body is closed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pborzenkov Your explanation and details make sense. Do you want to take a shot at it in this PR? or I will try it when I get a chance in the upcoming days.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeevatkm Yeah. Gonna try and fix it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pborzenkov I'm sorry for the delayed response at my end. Just went through your updates, it looks good. Also thanks for doing other improvements along with sync pool enhancement.

defer releaseBuffer(buf)
encoder := json.NewEncoder(buf)
encoder.SetEscapeHTML(false)
err := encoder.Encode(v)
return buf.Bytes(), err
if err := encoder.Encode(v); err != nil {
releaseBuffer(buf)
return nil, err
}

return buf, nil
}
45 changes: 40 additions & 5 deletions util.go
Expand Up @@ -19,6 +19,7 @@ import (
"runtime"
"sort"
"strings"
"sync"
)

//‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾
Expand Down Expand Up @@ -139,13 +140,19 @@ type ResponseLog struct {
//_______________________________________________________________________

// way to disable the HTML escape as opt-in
func jsonMarshal(c *Client, r *Request, d interface{}) ([]byte, error) {
if !r.jsonEscapeHTML {
return noescapeJSONMarshal(d)
} else if !c.jsonEscapeHTML {
func jsonMarshal(c *Client, r *Request, d interface{}) (*bytes.Buffer, error) {
if !r.jsonEscapeHTML || !c.jsonEscapeHTML {
return noescapeJSONMarshal(d)
}
return c.JSONMarshal(d)

data, err := c.JSONMarshal(d)
if err != nil {
return nil, err
}

buf := acquireBuffer()
_, _ = buf.Write(data)
return buf, nil
}

func firstNonEmpty(v ...string) string {
Expand Down Expand Up @@ -283,6 +290,34 @@ func releaseBuffer(buf *bytes.Buffer) {
}
}

// requestBodyReleaser wraps requests's body and implements custom Close for it.
// The Close method closes original body and releases request body back to sync.Pool.
type requestBodyReleaser struct {
releaseOnce sync.Once
reqBuf *bytes.Buffer
io.ReadCloser
}

func newRequestBodyReleaser(respBody io.ReadCloser, reqBuf *bytes.Buffer) io.ReadCloser {
if reqBuf == nil {
return respBody
}

return &requestBodyReleaser{
reqBuf: reqBuf,
ReadCloser: respBody,
}
}

func (rr *requestBodyReleaser) Close() error {
err := rr.ReadCloser.Close()
rr.releaseOnce.Do(func() {
releaseBuffer(rr.reqBuf)
})

return err
}

func closeq(v interface{}) {
if c, ok := v.(io.Closer); ok {
silently(c.Close())
Expand Down