From 58bbdbf813fe5e7c82568734337c1f249fe1b4fa Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Wed, 13 Jul 2022 22:33:19 +0000 Subject: [PATCH 1/4] xdsclient: fix LRS stream leaks when errors are encountered --- .../xdsclient/controller/transport.go | 24 ++++++++++++------- .../controller/version/v2/loadreport.go | 19 +++++++++++++-- .../controller/version/v3/loadreport.go | 19 +++++++++++++-- 3 files changed, 50 insertions(+), 12 deletions(-) diff --git a/xds/internal/xdsclient/controller/transport.go b/xds/internal/xdsclient/controller/transport.go index 34c5b024dd8..ada69683fd4 100644 --- a/xds/internal/xdsclient/controller/transport.go +++ b/xds/internal/xdsclient/controller/transport.go @@ -99,10 +99,10 @@ func (t *Controller) run(ctx context.Context) { // new requests to send on the stream. // // For each new request (watchAction), it's -// - processed and added to the watch map -// - so resend will pick them up when there are new streams -// - sent on the current stream if there's one -// - the current stream is cleared when any send on it fails +// - processed and added to the watch map +// so, resend will pick them up when there are new streams +// - sent on the current stream if there's one +// the current stream is cleared when any send on it fails // // For each new stream, all the existing requests will be resent. // @@ -388,26 +388,34 @@ func (t *Controller) reportLoad(ctx context.Context, cc *grpc.ClientConn, opts c retries++ lastStreamStartTime = time.Now() - stream, err := t.vClient.NewLoadStatsStream(ctx, cc) + // streamCtx is created and canceled in case we terminate the stream + // early for any reason, to avoid gRPC-Go leaking the RPC's monitoring + // goroutine. + streamCtx, cancel := context.WithCancel(ctx) + stream, err := t.vClient.NewLoadStatsStream(streamCtx, cc) if err != nil { t.logger.Warningf("lrs: failed to create stream: %v", err) + cancel() continue } t.logger.Infof("lrs: created LRS stream") if err := t.vClient.SendFirstLoadStatsRequest(stream); err != nil { t.logger.Warningf("lrs: failed to send first request: %v", err) + cancel() continue } clusters, interval, err := t.vClient.HandleLoadStatsResponse(stream) if err != nil { - t.logger.Warningf("%v", err) + t.logger.Warningf("lrs: error from stream: %v", err) + cancel() continue } retries = 0 - t.sendLoads(ctx, stream, opts.LoadStore, clusters, interval) + t.sendLoads(streamCtx, stream, opts.LoadStore, clusters, interval) + cancel() } } @@ -421,7 +429,7 @@ func (t *Controller) sendLoads(ctx context.Context, stream grpc.ClientStream, st return } if err := t.vClient.SendLoadStatsRequest(stream, store.Stats(clusterNames)); err != nil { - t.logger.Warningf("%v", err) + t.logger.Warningf("lrs: error from stream: %v", err) return } } diff --git a/xds/internal/xdsclient/controller/version/v2/loadreport.go b/xds/internal/xdsclient/controller/version/v2/loadreport.go index f0034e21c35..5b5538f5329 100644 --- a/xds/internal/xdsclient/controller/version/v2/loadreport.go +++ b/xds/internal/xdsclient/controller/version/v2/loadreport.go @@ -59,7 +59,10 @@ func (v2c *client) SendFirstLoadStatsRequest(s grpc.ClientStream) error { req := &lrspb.LoadStatsRequest{Node: node} v2c.logger.Infof("lrs: sending init LoadStatsRequest: %v", pretty.ToJSON(req)) - return stream.Send(req) + if err := stream.Send(req); err != nil { + return getStreamError(stream) + } + return nil } func (v2c *client) HandleLoadStatsResponse(s grpc.ClientStream) ([]string, time.Duration, error) { @@ -149,5 +152,17 @@ func (v2c *client) SendLoadStatsRequest(s grpc.ClientStream, loads []*load.Data) req := &lrspb.LoadStatsRequest{ClusterStats: clusterStats} v2c.logger.Infof("lrs: sending LRS loads: %+v", pretty.ToJSON(req)) - return stream.Send(req) + if err := stream.Send(req); err != nil { + return getStreamError(stream) + } + return nil +} + +func getStreamError(stream lrsStream) error { + for { + _, err := stream.Recv() + if err != nil { + return err + } + } } diff --git a/xds/internal/xdsclient/controller/version/v3/loadreport.go b/xds/internal/xdsclient/controller/version/v3/loadreport.go index 8cdb5476fbb..ad54ba32761 100644 --- a/xds/internal/xdsclient/controller/version/v3/loadreport.go +++ b/xds/internal/xdsclient/controller/version/v3/loadreport.go @@ -59,7 +59,10 @@ func (v3c *client) SendFirstLoadStatsRequest(s grpc.ClientStream) error { req := &lrspb.LoadStatsRequest{Node: node} v3c.logger.Infof("lrs: sending init LoadStatsRequest: %v", pretty.ToJSON(req)) - return stream.Send(req) + if err := stream.Send(req); err != nil { + return getStreamError(stream) + } + return nil } func (v3c *client) HandleLoadStatsResponse(s grpc.ClientStream) ([]string, time.Duration, error) { @@ -148,5 +151,17 @@ func (v3c *client) SendLoadStatsRequest(s grpc.ClientStream, loads []*load.Data) req := &lrspb.LoadStatsRequest{ClusterStats: clusterStats} v3c.logger.Infof("lrs: sending LRS loads: %+v", pretty.ToJSON(req)) - return stream.Send(req) + if err := stream.Send(req); err != nil { + return getStreamError(stream) + } + return nil +} + +func getStreamError(stream lrsStream) error { + for { + _, err := stream.Recv() + if err != nil { + return err + } + } } From 18ac24de951b890bcbfcfa3d9d940badd993cb96 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Thu, 14 Jul 2022 23:56:29 +0000 Subject: [PATCH 2/4] review comments --- .../xdsclient/controller/transport.go | 50 +++++++++---------- .../controller/version/v2/loadreport.go | 3 +- .../controller/version/v3/loadreport.go | 3 +- 3 files changed, 27 insertions(+), 29 deletions(-) diff --git a/xds/internal/xdsclient/controller/transport.go b/xds/internal/xdsclient/controller/transport.go index ada69683fd4..12b56910f5f 100644 --- a/xds/internal/xdsclient/controller/transport.go +++ b/xds/internal/xdsclient/controller/transport.go @@ -388,34 +388,34 @@ func (t *Controller) reportLoad(ctx context.Context, cc *grpc.ClientConn, opts c retries++ lastStreamStartTime = time.Now() - // streamCtx is created and canceled in case we terminate the stream - // early for any reason, to avoid gRPC-Go leaking the RPC's monitoring - // goroutine. - streamCtx, cancel := context.WithCancel(ctx) - stream, err := t.vClient.NewLoadStatsStream(streamCtx, cc) - if err != nil { - t.logger.Warningf("lrs: failed to create stream: %v", err) - cancel() - continue - } - t.logger.Infof("lrs: created LRS stream") + func() { + // streamCtx is created and canceled in case we terminate the stream + // early for any reason, to avoid gRPC-Go leaking the RPC's monitoring + // goroutine. + streamCtx, cancel := context.WithCancel(ctx) + defer cancel() + stream, err := t.vClient.NewLoadStatsStream(streamCtx, cc) + if err != nil { + t.logger.Warningf("lrs: failed to create stream: %v", err) + return + } + t.logger.Infof("lrs: created LRS stream") - if err := t.vClient.SendFirstLoadStatsRequest(stream); err != nil { - t.logger.Warningf("lrs: failed to send first request: %v", err) - cancel() - continue - } + if err := t.vClient.SendFirstLoadStatsRequest(stream); err != nil { + t.logger.Warningf("lrs: failed to send first request: %v", err) + return + } - clusters, interval, err := t.vClient.HandleLoadStatsResponse(stream) - if err != nil { - t.logger.Warningf("lrs: error from stream: %v", err) - cancel() - continue - } + clusters, interval, err := t.vClient.HandleLoadStatsResponse(stream) + if err != nil { + t.logger.Warningf("lrs: error from stream: %v", err) + return + } - retries = 0 - t.sendLoads(streamCtx, stream, opts.LoadStore, clusters, interval) - cancel() + retries = 0 + t.sendLoads(streamCtx, stream, opts.LoadStore, clusters, interval) + cancel() + }() } } diff --git a/xds/internal/xdsclient/controller/version/v2/loadreport.go b/xds/internal/xdsclient/controller/version/v2/loadreport.go index 5b5538f5329..29a00b52a3f 100644 --- a/xds/internal/xdsclient/controller/version/v2/loadreport.go +++ b/xds/internal/xdsclient/controller/version/v2/loadreport.go @@ -160,8 +160,7 @@ func (v2c *client) SendLoadStatsRequest(s grpc.ClientStream, loads []*load.Data) func getStreamError(stream lrsStream) error { for { - _, err := stream.Recv() - if err != nil { + if _, err := stream.Recv(); err != nil { return err } } diff --git a/xds/internal/xdsclient/controller/version/v3/loadreport.go b/xds/internal/xdsclient/controller/version/v3/loadreport.go index ad54ba32761..f7438f1de5b 100644 --- a/xds/internal/xdsclient/controller/version/v3/loadreport.go +++ b/xds/internal/xdsclient/controller/version/v3/loadreport.go @@ -159,8 +159,7 @@ func (v3c *client) SendLoadStatsRequest(s grpc.ClientStream, loads []*load.Data) func getStreamError(stream lrsStream) error { for { - _, err := stream.Recv() - if err != nil { + if _, err := stream.Recv(); err != nil { return err } } From bb503c3bf2b62e347fe3b2544c4af76c981806f7 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Thu, 14 Jul 2022 23:57:18 +0000 Subject: [PATCH 3/4] stray cancel --- xds/internal/xdsclient/controller/transport.go | 1 - 1 file changed, 1 deletion(-) diff --git a/xds/internal/xdsclient/controller/transport.go b/xds/internal/xdsclient/controller/transport.go index 12b56910f5f..28641dc874a 100644 --- a/xds/internal/xdsclient/controller/transport.go +++ b/xds/internal/xdsclient/controller/transport.go @@ -414,7 +414,6 @@ func (t *Controller) reportLoad(ctx context.Context, cc *grpc.ClientConn, opts c retries = 0 t.sendLoads(streamCtx, stream, opts.LoadStore, clusters, interval) - cancel() }() } } From 05333330454e7eb80cf198990a8030af55e639ae Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Fri, 15 Jul 2022 16:00:33 +0000 Subject: [PATCH 4/4] return non-io.EOF errors --- .../xdsclient/controller/version/v2/loadreport.go | 11 +++++++---- .../xdsclient/controller/version/v3/loadreport.go | 11 +++++++---- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/xds/internal/xdsclient/controller/version/v2/loadreport.go b/xds/internal/xdsclient/controller/version/v2/loadreport.go index 29a00b52a3f..da5128ac456 100644 --- a/xds/internal/xdsclient/controller/version/v2/loadreport.go +++ b/xds/internal/xdsclient/controller/version/v2/loadreport.go @@ -22,6 +22,7 @@ import ( "context" "errors" "fmt" + "io" "time" "github.com/golang/protobuf/proto" @@ -59,10 +60,11 @@ func (v2c *client) SendFirstLoadStatsRequest(s grpc.ClientStream) error { req := &lrspb.LoadStatsRequest{Node: node} v2c.logger.Infof("lrs: sending init LoadStatsRequest: %v", pretty.ToJSON(req)) - if err := stream.Send(req); err != nil { + err := stream.Send(req) + if err == io.EOF { return getStreamError(stream) } - return nil + return err } func (v2c *client) HandleLoadStatsResponse(s grpc.ClientStream) ([]string, time.Duration, error) { @@ -152,10 +154,11 @@ func (v2c *client) SendLoadStatsRequest(s grpc.ClientStream, loads []*load.Data) req := &lrspb.LoadStatsRequest{ClusterStats: clusterStats} v2c.logger.Infof("lrs: sending LRS loads: %+v", pretty.ToJSON(req)) - if err := stream.Send(req); err != nil { + err := stream.Send(req) + if err == io.EOF { return getStreamError(stream) } - return nil + return err } func getStreamError(stream lrsStream) error { diff --git a/xds/internal/xdsclient/controller/version/v3/loadreport.go b/xds/internal/xdsclient/controller/version/v3/loadreport.go index f7438f1de5b..f8d866bb1a5 100644 --- a/xds/internal/xdsclient/controller/version/v3/loadreport.go +++ b/xds/internal/xdsclient/controller/version/v3/loadreport.go @@ -22,6 +22,7 @@ import ( "context" "errors" "fmt" + "io" "time" "github.com/golang/protobuf/proto" @@ -59,10 +60,11 @@ func (v3c *client) SendFirstLoadStatsRequest(s grpc.ClientStream) error { req := &lrspb.LoadStatsRequest{Node: node} v3c.logger.Infof("lrs: sending init LoadStatsRequest: %v", pretty.ToJSON(req)) - if err := stream.Send(req); err != nil { + err := stream.Send(req) + if err == io.EOF { return getStreamError(stream) } - return nil + return err } func (v3c *client) HandleLoadStatsResponse(s grpc.ClientStream) ([]string, time.Duration, error) { @@ -151,10 +153,11 @@ func (v3c *client) SendLoadStatsRequest(s grpc.ClientStream, loads []*load.Data) req := &lrspb.LoadStatsRequest{ClusterStats: clusterStats} v3c.logger.Infof("lrs: sending LRS loads: %+v", pretty.ToJSON(req)) - if err := stream.Send(req); err != nil { + err := stream.Send(req) + if err == io.EOF { return getStreamError(stream) } - return nil + return err } func getStreamError(stream lrsStream) error {