Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xdsclient: fix LRS stream leaks when errors are encountered #5505

Merged
merged 4 commits into from Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
51 changes: 29 additions & 22 deletions xds/internal/xdsclient/controller/transport.go
Expand Up @@ -99,10 +99,10 @@ func (t *Controller) run(ctx context.Context) {
// new requests to send on the stream.
//
// For each new request (watchAction), it's
// - processed and added to the watch map
// - so resend will pick them up when there are new streams
// - sent on the current stream if there's one
// - the current stream is cleared when any send on it fails
// - processed and added to the watch map
// so, resend will pick them up when there are new streams
// - sent on the current stream if there's one
// the current stream is cleared when any send on it fails
//
// For each new stream, all the existing requests will be resent.
//
Expand Down Expand Up @@ -388,26 +388,33 @@ func (t *Controller) reportLoad(ctx context.Context, cc *grpc.ClientConn, opts c

retries++
lastStreamStartTime = time.Now()
stream, err := t.vClient.NewLoadStatsStream(ctx, cc)
if err != nil {
t.logger.Warningf("lrs: failed to create stream: %v", err)
continue
}
t.logger.Infof("lrs: created LRS stream")
func() {
// streamCtx is created and canceled in case we terminate the stream
// early for any reason, to avoid gRPC-Go leaking the RPC's monitoring
// goroutine.
streamCtx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := t.vClient.NewLoadStatsStream(streamCtx, cc)
if err != nil {
t.logger.Warningf("lrs: failed to create stream: %v", err)
return
}
t.logger.Infof("lrs: created LRS stream")

if err := t.vClient.SendFirstLoadStatsRequest(stream); err != nil {
t.logger.Warningf("lrs: failed to send first request: %v", err)
continue
}
if err := t.vClient.SendFirstLoadStatsRequest(stream); err != nil {
t.logger.Warningf("lrs: failed to send first request: %v", err)
return
}

clusters, interval, err := t.vClient.HandleLoadStatsResponse(stream)
if err != nil {
t.logger.Warningf("%v", err)
continue
}
clusters, interval, err := t.vClient.HandleLoadStatsResponse(stream)
if err != nil {
t.logger.Warningf("lrs: error from stream: %v", err)
return
}

retries = 0
t.sendLoads(ctx, stream, opts.LoadStore, clusters, interval)
retries = 0
t.sendLoads(streamCtx, stream, opts.LoadStore, clusters, interval)
}()
}
}

Expand All @@ -421,7 +428,7 @@ func (t *Controller) sendLoads(ctx context.Context, stream grpc.ClientStream, st
return
}
if err := t.vClient.SendLoadStatsRequest(stream, store.Stats(clusterNames)); err != nil {
t.logger.Warningf("%v", err)
t.logger.Warningf("lrs: error from stream: %v", err)
return
}
}
Expand Down
21 changes: 19 additions & 2 deletions xds/internal/xdsclient/controller/version/v2/loadreport.go
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"errors"
"fmt"
"io"
"time"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -59,7 +60,11 @@ func (v2c *client) SendFirstLoadStatsRequest(s grpc.ClientStream) error {

req := &lrspb.LoadStatsRequest{Node: node}
v2c.logger.Infof("lrs: sending init LoadStatsRequest: %v", pretty.ToJSON(req))
return stream.Send(req)
err := stream.Send(req)
if err == io.EOF {
return getStreamError(stream)
}
return err
}

func (v2c *client) HandleLoadStatsResponse(s grpc.ClientStream) ([]string, time.Duration, error) {
Expand Down Expand Up @@ -149,5 +154,17 @@ func (v2c *client) SendLoadStatsRequest(s grpc.ClientStream, loads []*load.Data)

req := &lrspb.LoadStatsRequest{ClusterStats: clusterStats}
v2c.logger.Infof("lrs: sending LRS loads: %+v", pretty.ToJSON(req))
return stream.Send(req)
err := stream.Send(req)
if err == io.EOF {
return getStreamError(stream)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/getStreamError/getStreamRecvError/?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is trying to achieve?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error returned from Send is only ever io.EOF in the event of the server terminating the stream or a connection loss. This function receives the actual error from the stream.

grpc-go/stream.go

Lines 108 to 110 in 6417495

// the stream. If the error was generated by the client, the status is
// returned directly; otherwise, io.EOF is returned and the status of
// the stream may be discovered using RecvMsg.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Recv always supposed to contain a more informational error on the client side, even if the error was encountered on the client? I'm thinking about the case where the error for Send happens on the client, but we still end up using the error we get from Recv. Is it possible that the former contained more contextual information?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My question is whether we should do this only when Send returns an error which is no io.EOF.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. Yes, that's a good idea: done.

}
return err
}

func getStreamError(stream lrsStream) error {
for {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be a way for this for loop to terminate other than when Recv() returns non-nil error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stream.Recv will terminate with an error if the context for the RPC is canceled, which is also how sendLoads exits (in transport.go). So I believe this is fine.

if _, err := stream.Recv(); err != nil {
return err
}
}
}
21 changes: 19 additions & 2 deletions xds/internal/xdsclient/controller/version/v3/loadreport.go
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"errors"
"fmt"
"io"
"time"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -59,7 +60,11 @@ func (v3c *client) SendFirstLoadStatsRequest(s grpc.ClientStream) error {

req := &lrspb.LoadStatsRequest{Node: node}
v3c.logger.Infof("lrs: sending init LoadStatsRequest: %v", pretty.ToJSON(req))
return stream.Send(req)
err := stream.Send(req)
if err == io.EOF {
return getStreamError(stream)
}
return err
}

func (v3c *client) HandleLoadStatsResponse(s grpc.ClientStream) ([]string, time.Duration, error) {
Expand Down Expand Up @@ -148,5 +153,17 @@ func (v3c *client) SendLoadStatsRequest(s grpc.ClientStream, loads []*load.Data)

req := &lrspb.LoadStatsRequest{ClusterStats: clusterStats}
v3c.logger.Infof("lrs: sending LRS loads: %+v", pretty.ToJSON(req))
return stream.Send(req)
err := stream.Send(req)
if err == io.EOF {
return getStreamError(stream)
}
return err
}

func getStreamError(stream lrsStream) error {
for {
if _, err := stream.Recv(); err != nil {
return err
}
}
}