Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: fix panic involving double close of channel in xDS transport #5959

Merged
merged 2 commits into from Jan 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions xds/internal/xdsclient/transport/loadreport.go
Expand Up @@ -62,6 +62,11 @@ func (t *Transport) lrsStartStream() {

ctx, cancel := context.WithCancel(context.Background())
t.lrsCancelStream = cancel

// Create a new done channel everytime a new stream is created. This ensures
// that we don't close the same channel multiple times (from lrsRunner()
// goroutine) when multiple streams are created and closed.
t.lrsRunnerDoneCh = make(chan struct{})
go t.lrsRunner(ctx)
}

Expand All @@ -78,6 +83,9 @@ func (t *Transport) lrsStopStream() {

t.lrsCancelStream()
t.logger.Infof("Stopping LRS stream")

// Wait for the runner goroutine to exit. The done channel will be
// recreated when a new stream is created.
<-t.lrsRunnerDoneCh
}

Expand Down
14 changes: 13 additions & 1 deletion xds/internal/xdsclient/transport/loadreport_test.go
Expand Up @@ -54,7 +54,7 @@ func (s) TestReportLoad(t *testing.T) {
NodeProto: nodeProto,
}

// Create a transport to the fake server.
// Create a transport to the fake management server.
tr, err := transport.New(transport.Options{
ServerCfg: serverCfg,
UpdateHandler: func(transport.ResourceUpdate) error { return nil }, // No ADS validation.
Expand Down Expand Up @@ -190,4 +190,16 @@ func (s) TestReportLoad(t *testing.T) {
if _, err := mgmtServer.LRSStreamCloseChan.Receive(ctx); err != nil {
t.Fatal("Timeout waiting for LRS stream to close")
}

// Calling the load reporting API again should result in the creation of a
// new LRS stream. This ensures that creating and closing multiple streams
// works smoothly.
_, cancelLRS3 := tr.ReportLoad()
if err != nil {
t.Fatalf("Failed to start LRS load reporting: %v", err)
}
if _, err := mgmtServer.LRSStreamOpenChan.Receive(ctx); err != nil {
t.Fatalf("Timeout when waiting for LRS stream to be created: %v", err)
}
cancelLRS3()
}
1 change: 0 additions & 1 deletion xds/internal/xdsclient/transport/transport.go
Expand Up @@ -202,7 +202,6 @@ func New(opts Options) (*Transport, error) {
versions: make(map[string]string),
nonces: make(map[string]string),
adsRunnerDoneCh: make(chan struct{}),
lrsRunnerDoneCh: make(chan struct{}),
}

// This context is used for sending and receiving RPC requests and
Expand Down