Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

csds: return empty response if xds client is not set #4505

Merged
merged 2 commits into from Jun 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 19 additions & 12 deletions xds/csds/csds.go
Expand Up @@ -25,7 +25,6 @@ package csds

import (
"context"
"fmt"
"io"
"time"

Expand Down Expand Up @@ -59,28 +58,31 @@ type xdsClientInterface interface {

var (
logger = grpclog.Component("xds")
newXDSClient = func() (xdsClientInterface, error) {
return client.New()
newXDSClient = func() xdsClientInterface {
c, err := client.New()
if err != nil {
// If err is not nil, c is a typed nil (of type *xdsclient.Client).
// If c is returned and assigned to the xdsClient field in the CSDS
// server, the nil checks in the handlers will not handle it
// properly.
logger.Warningf("failed to create xds client: %v", err)
return nil
}
return c
}
)

// ClientStatusDiscoveryServer implementations interface ClientStatusDiscoveryServiceServer.
type ClientStatusDiscoveryServer struct {
// xdsClient will always be the same in practise. But we keep a copy in each
// xdsClient will always be the same in practice. But we keep a copy in each
// server instance for testing.
xdsClient xdsClientInterface
}

// NewClientStatusDiscoveryServer returns an implementation of the CSDS server that can be
// registered on a gRPC server.
func NewClientStatusDiscoveryServer() (*ClientStatusDiscoveryServer, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Above, s/practise/practice/.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

xdsC, err := newXDSClient()
if err != nil {
return nil, fmt.Errorf("failed to create xds client: %v", err)
}
return &ClientStatusDiscoveryServer{
xdsClient: xdsC,
}, nil
return &ClientStatusDiscoveryServer{xdsClient: newXDSClient()}, nil
}

// StreamClientStatus implementations interface ClientStatusDiscoveryServiceServer.
Expand Down Expand Up @@ -113,6 +115,9 @@ func (s *ClientStatusDiscoveryServer) FetchClientStatus(_ context.Context, req *
//
// If it returns an error, the error is a status error.
func (s *ClientStatusDiscoveryServer) buildClientStatusRespForReq(req *v3statuspb.ClientStatusRequest) (*v3statuspb.ClientStatusResponse, error) {
if s.xdsClient == nil {
return &v3statuspb.ClientStatusResponse{}, nil
}
// Field NodeMatchers is unsupported, by design
// https://github.com/grpc/proposal/blob/master/A40-csds-support.md#detail-node-matching.
if len(req.NodeMatchers) != 0 {
Expand All @@ -137,7 +142,9 @@ func (s *ClientStatusDiscoveryServer) buildClientStatusRespForReq(req *v3statusp

// Close cleans up the resources.
func (s *ClientStatusDiscoveryServer) Close() {
s.xdsClient.Close()
if s.xdsClient != nil {
s.xdsClient.Close()
}
}

// nodeProtoToV3 converts the given proto into a v3.Node. n is from bootstrap
Expand Down
56 changes: 53 additions & 3 deletions xds/csds/csds_test.go
Expand Up @@ -275,9 +275,7 @@ func commonSetup(t *testing.T) (xdsClientInterfaceWithWatch, *e2e.ManagementServ
t.Fatalf("failed to create xds client: %v", err)
}
oldNewXDSClient := newXDSClient
newXDSClient = func() (xdsClientInterface, error) {
return xdsC, nil
}
newXDSClient = func() xdsClientInterface { return xdsC }

// Initialize an gRPC server and register CSDS on it.
server := grpc.NewServer()
Expand Down Expand Up @@ -635,6 +633,58 @@ func protoToJSON(p proto.Message) string {
return ret
}

func TestCSDSNoXDSClient(t *testing.T) {
oldNewXDSClient := newXDSClient
newXDSClient = func() xdsClientInterface { return nil }
defer func() { newXDSClient = oldNewXDSClient }()

// Initialize an gRPC server and register CSDS on it.
server := grpc.NewServer()
csdss, err := NewClientStatusDiscoveryServer()
if err != nil {
t.Fatal(err)
}
defer csdss.Close()
v3statuspbgrpc.RegisterClientStatusDiscoveryServiceServer(server, csdss)
// Create a local listener and pass it to Serve().
lis, err := xtestutils.LocalTCPListener()
if err != nil {
t.Fatalf("xtestutils.LocalTCPListener() failed: %v", err)
}
go func() {
if err := server.Serve(lis); err != nil {
t.Errorf("Serve() failed: %v", err)
}
}()
defer server.Stop()

// Create CSDS client.
conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
if err != nil {
t.Fatalf("cannot connect to server: %v", err)
}
defer conn.Close()
c := v3statuspbgrpc.NewClientStatusDiscoveryServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
stream, err := c.StreamClientStatus(ctx, grpc.WaitForReady(true))
if err != nil {
t.Fatalf("cannot get ServerReflectionInfo: %v", err)
}

if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil {
t.Fatalf("failed to send: %v", err)
}
r, err := stream.Recv()
if err != nil {
// io.EOF is not ok.
t.Fatalf("failed to recv response: %v", err)
}
if n := len(r.Config); n != 0 {
t.Fatalf("got %d configs, want 0: %v", n, proto.MarshalTextString(r))
}
}

func Test_nodeProtoToV3(t *testing.T) {
const (
testID = "test-id"
Expand Down