Skip to content

Commit

Permalink
Don't use sync.Pool for json.NewEncoder target buffers (#421)
Browse files Browse the repository at this point in the history
* Don't release buf to sync.Pool to early

The slice returned by `noescapeJSONMarshal` continues to be accessed
even after `pool.Put` was called on the buffer. This might lead to
incorrect data being sent in request body if the buffer is acquired and
re-written by a concurrently running goroutine.

So make sure we release the buffer once the request completes.

* Release request body buf back to sync.Pool only after body is closed

net/http.RoundTripper may access request body in a separate goroutine,
so we need to wait release the buf back to sync.Pool only after the
body is closed. From the docs:

  	// RoundTrip must always close the body, including on errors,
	// but depending on the implementation may do so in a separate
	// goroutine even after RoundTrip returns. This means that
	// callers wanting to reuse the body for subsequent requests
	// must arrange to wait for the Close call before doing so.
  • Loading branch information
pborzenkov committed Sep 12, 2021
1 parent d717652 commit c1fa358
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 11 deletions.
3 changes: 2 additions & 1 deletion client.go
Expand Up @@ -869,7 +869,6 @@ func (c *Client) GetClient() *http.Client {
// Executes method executes the given `Request` object and returns response
// error.
func (c *Client) execute(req *Request) (*Response, error) {
defer releaseBuffer(req.bodyBuf)
// Apply Request middleware
var err error

Expand Down Expand Up @@ -903,6 +902,8 @@ func (c *Client) execute(req *Request) (*Response, error) {
return nil, wrapNoRetryErr(err)
}

req.RawRequest.Body = newRequestBodyReleaser(req.RawRequest.Body, req.bodyBuf)

req.Time = time.Now()
resp, err := c.httpClient.Do(req.RawRequest)

Expand Down
2 changes: 1 addition & 1 deletion middleware.go
Expand Up @@ -458,7 +458,7 @@ func handleRequestBody(c *Client, r *Request) (err error) {
bodyBytes = []byte(s)
} else if IsJSONType(contentType) &&
(kind == reflect.Struct || kind == reflect.Map || kind == reflect.Slice) {
bodyBytes, err = jsonMarshal(c, r, r.Body)
r.bodyBuf, err = jsonMarshal(c, r, r.Body)
if err != nil {
return
}
Expand Down
11 changes: 7 additions & 4 deletions request.go
Expand Up @@ -870,11 +870,14 @@ func (r *Request) initValuesMap() {
}
}

var noescapeJSONMarshal = func(v interface{}) ([]byte, error) {
var noescapeJSONMarshal = func(v interface{}) (*bytes.Buffer, error) {
buf := acquireBuffer()
defer releaseBuffer(buf)
encoder := json.NewEncoder(buf)
encoder.SetEscapeHTML(false)
err := encoder.Encode(v)
return buf.Bytes(), err
if err := encoder.Encode(v); err != nil {
releaseBuffer(buf)
return nil, err
}

return buf, nil
}
45 changes: 40 additions & 5 deletions util.go
Expand Up @@ -19,6 +19,7 @@ import (
"runtime"
"sort"
"strings"
"sync"
)

//‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾
Expand Down Expand Up @@ -139,13 +140,19 @@ type ResponseLog struct {
//_______________________________________________________________________

// way to disable the HTML escape as opt-in
func jsonMarshal(c *Client, r *Request, d interface{}) ([]byte, error) {
if !r.jsonEscapeHTML {
return noescapeJSONMarshal(d)
} else if !c.jsonEscapeHTML {
func jsonMarshal(c *Client, r *Request, d interface{}) (*bytes.Buffer, error) {
if !r.jsonEscapeHTML || !c.jsonEscapeHTML {
return noescapeJSONMarshal(d)
}
return c.JSONMarshal(d)

data, err := c.JSONMarshal(d)
if err != nil {
return nil, err
}

buf := acquireBuffer()
_, _ = buf.Write(data)
return buf, nil
}

func firstNonEmpty(v ...string) string {
Expand Down Expand Up @@ -283,6 +290,34 @@ func releaseBuffer(buf *bytes.Buffer) {
}
}

// requestBodyReleaser wraps requests's body and implements custom Close for it.
// The Close method closes original body and releases request body back to sync.Pool.
type requestBodyReleaser struct {
releaseOnce sync.Once
reqBuf *bytes.Buffer
io.ReadCloser
}

func newRequestBodyReleaser(respBody io.ReadCloser, reqBuf *bytes.Buffer) io.ReadCloser {
if reqBuf == nil {
return respBody
}

return &requestBodyReleaser{
reqBuf: reqBuf,
ReadCloser: respBody,
}
}

func (rr *requestBodyReleaser) Close() error {
err := rr.ReadCloser.Close()
rr.releaseOnce.Do(func() {
releaseBuffer(rr.reqBuf)
})

return err
}

func closeq(v interface{}) {
if c, ok := v.(io.Closer); ok {
silently(c.Close())
Expand Down

0 comments on commit c1fa358

Please sign in to comment.