Skip to content

Commit

Permalink
csds: return empty response if xds client is not set (#4505)
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Jun 3, 2021
1 parent 0956b12 commit a371529
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 15 deletions.
31 changes: 19 additions & 12 deletions xds/csds/csds.go
Expand Up @@ -25,7 +25,6 @@ package csds

import (
"context"
"fmt"
"io"
"time"

Expand Down Expand Up @@ -59,28 +58,31 @@ type xdsClientInterface interface {

var (
logger = grpclog.Component("xds")
newXDSClient = func() (xdsClientInterface, error) {
return client.New()
newXDSClient = func() xdsClientInterface {
c, err := client.New()
if err != nil {
// If err is not nil, c is a typed nil (of type *xdsclient.Client).
// If c is returned and assigned to the xdsClient field in the CSDS
// server, the nil checks in the handlers will not handle it
// properly.
logger.Warningf("failed to create xds client: %v", err)
return nil
}
return c
}
)

// ClientStatusDiscoveryServer implementations interface ClientStatusDiscoveryServiceServer.
type ClientStatusDiscoveryServer struct {
// xdsClient will always be the same in practise. But we keep a copy in each
// xdsClient will always be the same in practice. But we keep a copy in each
// server instance for testing.
xdsClient xdsClientInterface
}

// NewClientStatusDiscoveryServer returns an implementation of the CSDS server that can be
// registered on a gRPC server.
func NewClientStatusDiscoveryServer() (*ClientStatusDiscoveryServer, error) {
xdsC, err := newXDSClient()
if err != nil {
return nil, fmt.Errorf("failed to create xds client: %v", err)
}
return &ClientStatusDiscoveryServer{
xdsClient: xdsC,
}, nil
return &ClientStatusDiscoveryServer{xdsClient: newXDSClient()}, nil
}

// StreamClientStatus implementations interface ClientStatusDiscoveryServiceServer.
Expand Down Expand Up @@ -113,6 +115,9 @@ func (s *ClientStatusDiscoveryServer) FetchClientStatus(_ context.Context, req *
//
// If it returns an error, the error is a status error.
func (s *ClientStatusDiscoveryServer) buildClientStatusRespForReq(req *v3statuspb.ClientStatusRequest) (*v3statuspb.ClientStatusResponse, error) {
if s.xdsClient == nil {
return &v3statuspb.ClientStatusResponse{}, nil
}
// Field NodeMatchers is unsupported, by design
// https://github.com/grpc/proposal/blob/master/A40-csds-support.md#detail-node-matching.
if len(req.NodeMatchers) != 0 {
Expand All @@ -137,7 +142,9 @@ func (s *ClientStatusDiscoveryServer) buildClientStatusRespForReq(req *v3statusp

// Close cleans up the resources.
func (s *ClientStatusDiscoveryServer) Close() {
s.xdsClient.Close()
if s.xdsClient != nil {
s.xdsClient.Close()
}
}

// nodeProtoToV3 converts the given proto into a v3.Node. n is from bootstrap
Expand Down
56 changes: 53 additions & 3 deletions xds/csds/csds_test.go
Expand Up @@ -275,9 +275,7 @@ func commonSetup(t *testing.T) (xdsClientInterfaceWithWatch, *e2e.ManagementServ
t.Fatalf("failed to create xds client: %v", err)
}
oldNewXDSClient := newXDSClient
newXDSClient = func() (xdsClientInterface, error) {
return xdsC, nil
}
newXDSClient = func() xdsClientInterface { return xdsC }

// Initialize an gRPC server and register CSDS on it.
server := grpc.NewServer()
Expand Down Expand Up @@ -635,6 +633,58 @@ func protoToJSON(p proto.Message) string {
return ret
}

func TestCSDSNoXDSClient(t *testing.T) {
oldNewXDSClient := newXDSClient
newXDSClient = func() xdsClientInterface { return nil }
defer func() { newXDSClient = oldNewXDSClient }()

// Initialize an gRPC server and register CSDS on it.
server := grpc.NewServer()
csdss, err := NewClientStatusDiscoveryServer()
if err != nil {
t.Fatal(err)
}
defer csdss.Close()
v3statuspbgrpc.RegisterClientStatusDiscoveryServiceServer(server, csdss)
// Create a local listener and pass it to Serve().
lis, err := xtestutils.LocalTCPListener()
if err != nil {
t.Fatalf("xtestutils.LocalTCPListener() failed: %v", err)
}
go func() {
if err := server.Serve(lis); err != nil {
t.Errorf("Serve() failed: %v", err)
}
}()
defer server.Stop()

// Create CSDS client.
conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
if err != nil {
t.Fatalf("cannot connect to server: %v", err)
}
defer conn.Close()
c := v3statuspbgrpc.NewClientStatusDiscoveryServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
stream, err := c.StreamClientStatus(ctx, grpc.WaitForReady(true))
if err != nil {
t.Fatalf("cannot get ServerReflectionInfo: %v", err)
}

if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil {
t.Fatalf("failed to send: %v", err)
}
r, err := stream.Recv()
if err != nil {
// io.EOF is not ok.
t.Fatalf("failed to recv response: %v", err)
}
if n := len(r.Config); n != 0 {
t.Fatalf("got %d configs, want 0: %v", n, proto.MarshalTextString(r))
}
}

func Test_nodeProtoToV3(t *testing.T) {
const (
testID = "test-id"
Expand Down

0 comments on commit a371529

Please sign in to comment.