Skip to content

Commit

Permalink
xds: fix panic involving double close of channel in xDS transport (#5959
Browse files Browse the repository at this point in the history
)
  • Loading branch information
easwars committed Jan 23, 2023
1 parent 7bf6a58 commit 4075ef0
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 2 deletions.
8 changes: 8 additions & 0 deletions xds/internal/xdsclient/transport/loadreport.go
Expand Up @@ -62,6 +62,11 @@ func (t *Transport) lrsStartStream() {

ctx, cancel := context.WithCancel(context.Background())
t.lrsCancelStream = cancel

// Create a new done channel everytime a new stream is created. This ensures
// that we don't close the same channel multiple times (from lrsRunner()
// goroutine) when multiple streams are created and closed.
t.lrsRunnerDoneCh = make(chan struct{})
go t.lrsRunner(ctx)
}

Expand All @@ -78,6 +83,9 @@ func (t *Transport) lrsStopStream() {

t.lrsCancelStream()
t.logger.Infof("Stopping LRS stream")

// Wait for the runner goroutine to exit. The done channel will be
// recreated when a new stream is created.
<-t.lrsRunnerDoneCh
}

Expand Down
14 changes: 13 additions & 1 deletion xds/internal/xdsclient/transport/loadreport_test.go
Expand Up @@ -54,7 +54,7 @@ func (s) TestReportLoad(t *testing.T) {
NodeProto: nodeProto,
}

// Create a transport to the fake server.
// Create a transport to the fake management server.
tr, err := transport.New(transport.Options{
ServerCfg: serverCfg,
UpdateHandler: func(transport.ResourceUpdate) error { return nil }, // No ADS validation.
Expand Down Expand Up @@ -190,4 +190,16 @@ func (s) TestReportLoad(t *testing.T) {
if _, err := mgmtServer.LRSStreamCloseChan.Receive(ctx); err != nil {
t.Fatal("Timeout waiting for LRS stream to close")
}

// Calling the load reporting API again should result in the creation of a
// new LRS stream. This ensures that creating and closing multiple streams
// works smoothly.
_, cancelLRS3 := tr.ReportLoad()
if err != nil {
t.Fatalf("Failed to start LRS load reporting: %v", err)
}
if _, err := mgmtServer.LRSStreamOpenChan.Receive(ctx); err != nil {
t.Fatalf("Timeout when waiting for LRS stream to be created: %v", err)
}
cancelLRS3()
}
1 change: 0 additions & 1 deletion xds/internal/xdsclient/transport/transport.go
Expand Up @@ -202,7 +202,6 @@ func New(opts Options) (*Transport, error) {
versions: make(map[string]string),
nonces: make(map[string]string),
adsRunnerDoneCh: make(chan struct{}),
lrsRunnerDoneCh: make(chan struct{}),
}

// This context is used for sending and receiving RPC requests and
Expand Down

0 comments on commit 4075ef0

Please sign in to comment.