Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xdsclient: fix LRS stream leaks when errors are encountered #5505

Merged
merged 4 commits into from Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 16 additions & 8 deletions xds/internal/xdsclient/controller/transport.go
Expand Up @@ -99,10 +99,10 @@ func (t *Controller) run(ctx context.Context) {
// new requests to send on the stream.
//
// For each new request (watchAction), it's
// - processed and added to the watch map
// - so resend will pick them up when there are new streams
// - sent on the current stream if there's one
// - the current stream is cleared when any send on it fails
// - processed and added to the watch map
// so, resend will pick them up when there are new streams
// - sent on the current stream if there's one
// the current stream is cleared when any send on it fails
//
// For each new stream, all the existing requests will be resent.
//
Expand Down Expand Up @@ -388,26 +388,34 @@ func (t *Controller) reportLoad(ctx context.Context, cc *grpc.ClientConn, opts c

retries++
lastStreamStartTime = time.Now()
stream, err := t.vClient.NewLoadStatsStream(ctx, cc)
// streamCtx is created and canceled in case we terminate the stream
// early for any reason, to avoid gRPC-Go leaking the RPC's monitoring
// goroutine.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RPC's monitoring goroutine? Which goroutine are you taking about here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one:

grpc-go/stream.go

Lines 354 to 361 in 6417495

go func() {
select {
case <-cc.ctx.Done():
cs.finish(ErrClientConnClosing)
case <-ctx.Done():
cs.finish(toRPCErr(ctx.Err()))
}
}()

streamCtx, cancel := context.WithCancel(ctx)
stream, err := t.vClient.NewLoadStatsStream(streamCtx, cc)
if err != nil {
t.logger.Warningf("lrs: failed to create stream: %v", err)
cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do a defer cancel() and put everything that is currently inside of the for loop in an anonymous function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have mixed feelings about this sort of thing. Done; let me know what you think.

continue
}
t.logger.Infof("lrs: created LRS stream")

if err := t.vClient.SendFirstLoadStatsRequest(stream); err != nil {
t.logger.Warningf("lrs: failed to send first request: %v", err)
cancel()
continue
}

clusters, interval, err := t.vClient.HandleLoadStatsResponse(stream)
if err != nil {
t.logger.Warningf("%v", err)
t.logger.Warningf("lrs: error from stream: %v", err)
cancel()
continue
}

retries = 0
t.sendLoads(ctx, stream, opts.LoadStore, clusters, interval)
t.sendLoads(streamCtx, stream, opts.LoadStore, clusters, interval)
cancel()
}
}

Expand All @@ -421,7 +429,7 @@ func (t *Controller) sendLoads(ctx context.Context, stream grpc.ClientStream, st
return
}
if err := t.vClient.SendLoadStatsRequest(stream, store.Stats(clusterNames)); err != nil {
t.logger.Warningf("%v", err)
t.logger.Warningf("lrs: error from stream: %v", err)
return
}
}
Expand Down
19 changes: 17 additions & 2 deletions xds/internal/xdsclient/controller/version/v2/loadreport.go
Expand Up @@ -59,7 +59,10 @@ func (v2c *client) SendFirstLoadStatsRequest(s grpc.ClientStream) error {

req := &lrspb.LoadStatsRequest{Node: node}
v2c.logger.Infof("lrs: sending init LoadStatsRequest: %v", pretty.ToJSON(req))
return stream.Send(req)
if err := stream.Send(req); err != nil {
return getStreamError(stream)
}
return nil
}

func (v2c *client) HandleLoadStatsResponse(s grpc.ClientStream) ([]string, time.Duration, error) {
Expand Down Expand Up @@ -149,5 +152,17 @@ func (v2c *client) SendLoadStatsRequest(s grpc.ClientStream, loads []*load.Data)

req := &lrspb.LoadStatsRequest{ClusterStats: clusterStats}
v2c.logger.Infof("lrs: sending LRS loads: %+v", pretty.ToJSON(req))
return stream.Send(req)
if err := stream.Send(req); err != nil {
return getStreamError(stream)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/getStreamError/getStreamRecvError/?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is trying to achieve?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error returned from Send is only ever io.EOF in the event of the server terminating the stream or a connection loss. This function receives the actual error from the stream.

grpc-go/stream.go

Lines 108 to 110 in 6417495

// the stream. If the error was generated by the client, the status is
// returned directly; otherwise, io.EOF is returned and the status of
// the stream may be discovered using RecvMsg.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Recv always supposed to contain a more informational error on the client side, even if the error was encountered on the client? I'm thinking about the case where the error for Send happens on the client, but we still end up using the error we get from Recv. Is it possible that the former contained more contextual information?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My question is whether we should do this only when Send returns an error which is no io.EOF.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. Yes, that's a good idea: done.

}
return nil
}

func getStreamError(stream lrsStream) error {
for {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be a way for this for loop to terminate other than when Recv() returns non-nil error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stream.Recv will terminate with an error if the context for the RPC is canceled, which is also how sendLoads exits (in transport.go). So I believe this is fine.

_, err := stream.Recv()
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Combine these two lines.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return err
}
}
}
19 changes: 17 additions & 2 deletions xds/internal/xdsclient/controller/version/v3/loadreport.go
Expand Up @@ -59,7 +59,10 @@ func (v3c *client) SendFirstLoadStatsRequest(s grpc.ClientStream) error {

req := &lrspb.LoadStatsRequest{Node: node}
v3c.logger.Infof("lrs: sending init LoadStatsRequest: %v", pretty.ToJSON(req))
return stream.Send(req)
if err := stream.Send(req); err != nil {
return getStreamError(stream)
}
return nil
}

func (v3c *client) HandleLoadStatsResponse(s grpc.ClientStream) ([]string, time.Duration, error) {
Expand Down Expand Up @@ -148,5 +151,17 @@ func (v3c *client) SendLoadStatsRequest(s grpc.ClientStream, loads []*load.Data)

req := &lrspb.LoadStatsRequest{ClusterStats: clusterStats}
v3c.logger.Infof("lrs: sending LRS loads: %+v", pretty.ToJSON(req))
return stream.Send(req)
if err := stream.Send(req); err != nil {
return getStreamError(stream)
}
return nil
}

func getStreamError(stream lrsStream) error {
for {
_, err := stream.Recv()
if err != nil {
return err
}
}
}