Skip to content

Commit

Permalink
Fix 'Client.Endpoint' to not 'cancel' when bufferedStream (#776)
Browse files Browse the repository at this point in the history
* transport/http/client_test: Modify the test to make it fail

With this modifications we can trigger the error that we are searching, 'context canceled'

* transport/http/client: Add the 'bodyWithCancel' to wrap the Response.Body

It adds the context.CancelFunc to the io.ReadCloser.Close function so bouth are called together

* transport/http/client: Add more documentation to clarify the changes

Also abstracted some logic on the test to make it more clear and also added more docuemntation. Added more documentation
on the definition of 'BufferedStream' to clarify that the Body has to be closed manually to properly close the response.

* transport/http/client: Add period at the end of the doc of 'BufferedStream'
  • Loading branch information
xescugc authored and peterbourgon committed Oct 22, 2018
1 parent ebd0dc0 commit ec0cc13
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 3 deletions.
27 changes: 25 additions & 2 deletions transport/http/client.go
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"encoding/xml"
"io"
"io/ioutil"
"net/http"
"net/url"
Expand Down Expand Up @@ -84,6 +85,7 @@ func ClientFinalizer(f ...ClientFinalizerFunc) ClientOption {

// BufferedStream sets whether the Response.Body is left open, allowing it
// to be read from later. Useful for transporting a file as a buffered stream.
// That body has to be Closed to propery end the request.
func BufferedStream(buffered bool) ClientOption {
return func(c *Client) { c.bufferedStream = buffered }
}
Expand All @@ -92,7 +94,6 @@ func BufferedStream(buffered bool) ClientOption {
func (c Client) Endpoint() endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

var (
resp *http.Response
Expand All @@ -112,10 +113,12 @@ func (c Client) Endpoint() endpoint.Endpoint {

req, err := http.NewRequest(c.method, c.tgt.String(), nil)
if err != nil {
cancel()
return nil, err
}

if err = c.enc(ctx, req, request); err != nil {
cancel()
return nil, err
}

Expand All @@ -126,11 +129,17 @@ func (c Client) Endpoint() endpoint.Endpoint {
resp, err = c.client.Do(req.WithContext(ctx))

if err != nil {
cancel()
return nil, err
}

if !c.bufferedStream {
// If we expect a buffered stream, we don't cancel the context when the endpoint returns.
// Instead, we should call the cancel func when closing the response body.
if c.bufferedStream {
resp.Body = bodyWithCancel{ReadCloser: resp.Body, cancel: cancel}
} else {
defer resp.Body.Close()
defer cancel()
}

for _, f := range c.after {
Expand All @@ -146,6 +155,20 @@ func (c Client) Endpoint() endpoint.Endpoint {
}
}

// bodyWithCancel is a wrapper for an io.ReadCloser with also a
// cancel function which is called when the Close is used
type bodyWithCancel struct {
io.ReadCloser

cancel context.CancelFunc
}

func (bwc bodyWithCancel) Close() error {
bwc.ReadCloser.Close()
bwc.cancel()
return nil
}

// ClientFinalizerFunc can be used to perform work at the end of a client HTTP
// request, after the response is returned. The principal
// intended use is for error logging. Additional response parameters are
Expand Down
9 changes: 8 additions & 1 deletion transport/http/client_test.go
Expand Up @@ -98,8 +98,12 @@ func TestHTTPClient(t *testing.T) {
}

func TestHTTPClientBufferedStream(t *testing.T) {
// bodysize has a size big enought to make the resopnse.Body not an instant read
// so if the response is cancelled it wount be all readed and the test would fail
// The 6000 has not a particular meaning, it big enough to fulfill the usecase.
const bodysize = 6000
var (
testbody = "testbody"
testbody = string(make([]byte, bodysize))
encode = func(context.Context, *http.Request, interface{}) error { return nil }
decode = func(_ context.Context, r *http.Response) (interface{}, error) {
return TestResponse{r.Body, ""}, nil
Expand Down Expand Up @@ -129,6 +133,9 @@ func TestHTTPClientBufferedStream(t *testing.T) {
if !ok {
t.Fatal("response should be TestResponse")
}
defer response.Body.Close()
// Faking work
time.Sleep(time.Second * 1)

// Check that response body was NOT closed
b := make([]byte, len(testbody))
Expand Down

0 comments on commit ec0cc13

Please sign in to comment.