Skip to content

Commit

Permalink
xds: start a management server per test (#4720)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Sep 1, 2021
1 parent ed501aa commit 51003aa
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 72 deletions.
11 changes: 7 additions & 4 deletions xds/internal/test/xds_client_integration_test.go
Expand Up @@ -65,13 +65,16 @@ func clientSetup(t *testing.T) (uint32, func()) {
}

func (s) TestClientSideXDS(t *testing.T) {
port, cleanup := clientSetup(t)
defer cleanup()
managementServer, nodeID, _, resolver, cleanup1 := setupManagementServer(t)
defer cleanup1()

port, cleanup2 := clientSetup(t)
defer cleanup2()

const serviceName = "my-service-client-side-xds"
resources := e2e.DefaultClientResources(e2e.ResourceParams{
DialTarget: serviceName,
NodeID: xdsClientNodeID,
NodeID: nodeID,
Host: "localhost",
Port: port,
SecLevel: e2e.SecurityLevelNone,
Expand All @@ -81,7 +84,7 @@ func (s) TestClientSideXDS(t *testing.T) {
}

// Create a ClientConn and make a successful RPC.
cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(xdsResolverBuilder))
cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
Expand Down
81 changes: 31 additions & 50 deletions xds/internal/test/xds_integration_test.go
Expand Up @@ -29,16 +29,15 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"os"
"path"
"testing"
"time"

"github.com/google/uuid"

"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/leakcheck"
"google.golang.org/grpc/internal/xds/env"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/testdata"
Expand Down Expand Up @@ -70,34 +69,6 @@ func (*testService) EmptyCall(context.Context, *testpb.Empty) (*testpb.Empty, er
return &testpb.Empty{}, nil
}

var (
// Globals corresponding to the single instance of the xDS management server
// which is spawned for all the tests in this package.
managementServer *e2e.ManagementServer
xdsClientNodeID string
bootstrapContents []byte
xdsResolverBuilder resolver.Builder
)

// TestMain sets up an xDS management server, runs all tests, and stops the
// management server.
func TestMain(m *testing.M) {
// The management server is started and stopped from here, but the leakcheck
// runs after every individual test. So, we need to skip the goroutine which
// spawns the management server and is blocked on the call to `Serve()`.
leakcheck.RegisterIgnoreGoroutine("e2e.StartManagementServer")

cancel, err := setupManagementServer()
if err != nil {
log.Printf("setupManagementServer() failed: %v", err)
os.Exit(1)
}

code := m.Run()
cancel()
os.Exit(code)
}

func createTmpFile(src, dst string) error {
data, err := ioutil.ReadFile(src)
if err != nil {
Expand Down Expand Up @@ -159,37 +130,45 @@ func createClientTLSCredentials(t *testing.T) credentials.TransportCredentials {
// setupManagement server performs the following:
// - spin up an xDS management server on a local port
// - set up certificates for consumption by the file_watcher plugin
// - sets up the global variables which refer to this management server and the
// nodeID to be used when talking to this management server.
// - creates a bootstrap file in a temporary location
// - creates an xDS resolver using the above bootstrap contents
//
// Returns a function to be invoked by the caller to stop the management
// server.
func setupManagementServer() (cleanup func(), err error) {
// Returns the following:
// - management server
// - nodeID to be used by the client when connecting to the management server
// - bootstrap contents to be used by the client
// - xDS resolver builder to be used by the client
// - a cleanup function to be invoked at the end of the test
func setupManagementServer(t *testing.T) (*e2e.ManagementServer, string, []byte, resolver.Builder, func()) {
t.Helper()

// Turn on the env var protection for client-side security.
origClientSideSecurityEnvVar := env.ClientSideSecuritySupport
env.ClientSideSecuritySupport = true

// Spin up an xDS management server on a local port.
managementServer, err = e2e.StartManagementServer()
server, err := e2e.StartManagementServer()
if err != nil {
return nil, err
t.Fatalf("Failed to spin up the xDS management server: %v", err)
}
defer func() {
if err != nil {
managementServer.Stop()
server.Stop()
}
}()

// Create a directory to hold certs and key files used on the server side.
serverDir, err := createTmpDirWithFiles("testServerSideXDS*", "x509/server1_cert.pem", "x509/server1_key.pem", "x509/client_ca_cert.pem")
if err != nil {
return nil, err
server.Stop()
t.Fatal(err)
}

// Create a directory to hold certs and key files used on the client side.
clientDir, err := createTmpDirWithFiles("testClientSideXDS*", "x509/client1_cert.pem", "x509/client1_key.pem", "x509/server_ca_cert.pem")
if err != nil {
return nil, err
server.Stop()
t.Fatal(err)
}

// Create certificate providers section of the bootstrap config with entries
Expand All @@ -200,24 +179,26 @@ func setupManagementServer() (cleanup func(), err error) {
}

// Create a bootstrap file in a temporary directory.
xdsClientNodeID = uuid.New().String()
bootstrapContents, err = xdsinternal.BootstrapContents(xdsinternal.BootstrapOptions{
nodeID := uuid.New().String()
bootstrapContents, err := xdsinternal.BootstrapContents(xdsinternal.BootstrapOptions{
Version: xdsinternal.TransportV3,
NodeID: xdsClientNodeID,
ServerURI: managementServer.Address,
NodeID: nodeID,
ServerURI: server.Address,
CertificateProviders: cpc,
ServerListenerResourceNameTemplate: e2e.ServerListenerResourceNameTemplate,
})
if err != nil {
return nil, err
server.Stop()
t.Fatalf("Failed to create bootstrap file: %v", err)
}
xdsResolverBuilder, err = xds.NewXDSResolverWithConfigForTesting(bootstrapContents)
resolver, err := xds.NewXDSResolverWithConfigForTesting(bootstrapContents)
if err != nil {
return nil, err
server.Stop()
t.Fatalf("Failed to create xDS resolver for testing: %v", err)
}

return func() {
managementServer.Stop()
return server, nodeID, bootstrapContents, resolver, func() {
server.Stop()
env.ClientSideSecuritySupport = origClientSideSecurityEnvVar
}, nil
}
}
37 changes: 23 additions & 14 deletions xds/internal/test/xds_server_integration_test.go
Expand Up @@ -56,7 +56,7 @@ const (
// Returns the following:
// - local listener on which the xDS-enabled gRPC server is serving on
// - cleanup function to be invoked by the tests when done
func setupGRPCServer(t *testing.T) (net.Listener, func()) {
func setupGRPCServer(t *testing.T, bootstrapContents []byte) (net.Listener, func()) {
t.Helper()

// Configure xDS credentials to be used on the server-side.
Expand Down Expand Up @@ -111,8 +111,11 @@ func hostPortFromListener(lis net.Listener) (string, uint32, error) {
// the client and the server. This results in both of them using the
// configured fallback credentials (which is insecure creds in this case).
func (s) TestServerSideXDS_Fallback(t *testing.T) {
lis, cleanup := setupGRPCServer(t)
defer cleanup()
managementServer, nodeID, bootstrapContents, resolver, cleanup1 := setupManagementServer(t)
defer cleanup1()

lis, cleanup2 := setupGRPCServer(t, bootstrapContents)
defer cleanup2()

// Grab the host and port of the server and create client side xDS resources
// corresponding to it. This contains default resources with no security
Expand All @@ -124,7 +127,7 @@ func (s) TestServerSideXDS_Fallback(t *testing.T) {
const serviceName = "my-service-fallback"
resources := e2e.DefaultClientResources(e2e.ResourceParams{
DialTarget: serviceName,
NodeID: xdsClientNodeID,
NodeID: nodeID,
Host: host,
Port: port,
SecLevel: e2e.SecurityLevelNone,
Expand Down Expand Up @@ -152,7 +155,7 @@ func (s) TestServerSideXDS_Fallback(t *testing.T) {
// Create a ClientConn with the xds scheme and make a successful RPC.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc, err := grpc.DialContext(ctx, fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(creds), grpc.WithResolvers(xdsResolverBuilder))
cc, err := grpc.DialContext(ctx, fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(creds), grpc.WithResolvers(resolver))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
Expand Down Expand Up @@ -190,8 +193,11 @@ func (s) TestServerSideXDS_FileWatcherCerts(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
lis, cleanup := setupGRPCServer(t)
defer cleanup()
managementServer, nodeID, bootstrapContents, resolver, cleanup1 := setupManagementServer(t)
defer cleanup1()

lis, cleanup2 := setupGRPCServer(t, bootstrapContents)
defer cleanup2()

// Grab the host and port of the server and create client side xDS
// resources corresponding to it.
Expand All @@ -206,7 +212,7 @@ func (s) TestServerSideXDS_FileWatcherCerts(t *testing.T) {
serviceName := "my-service-file-watcher-certs-" + test.name
resources := e2e.DefaultClientResources(e2e.ResourceParams{
DialTarget: serviceName,
NodeID: xdsClientNodeID,
NodeID: nodeID,
Host: host,
Port: port,
SecLevel: test.secLevel,
Expand Down Expand Up @@ -234,7 +240,7 @@ func (s) TestServerSideXDS_FileWatcherCerts(t *testing.T) {
// Create a ClientConn with the xds scheme and make an RPC.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc, err := grpc.DialContext(ctx, fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(creds), grpc.WithResolvers(xdsResolverBuilder))
cc, err := grpc.DialContext(ctx, fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(creds), grpc.WithResolvers(resolver))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
Expand All @@ -257,8 +263,11 @@ func (s) TestServerSideXDS_FileWatcherCerts(t *testing.T) {
// configuration pointing to the use of the file_watcher plugin and we verify
// that the same client is now able to successfully make an RPC.
func (s) TestServerSideXDS_SecurityConfigChange(t *testing.T) {
lis, cleanup := setupGRPCServer(t)
defer cleanup()
managementServer, nodeID, bootstrapContents, resolver, cleanup1 := setupManagementServer(t)
defer cleanup1()

lis, cleanup2 := setupGRPCServer(t, bootstrapContents)
defer cleanup2()

// Grab the host and port of the server and create client side xDS resources
// corresponding to it. This contains default resources with no security
Expand All @@ -271,7 +280,7 @@ func (s) TestServerSideXDS_SecurityConfigChange(t *testing.T) {
const serviceName = "my-service-security-config-change"
resources := e2e.DefaultClientResources(e2e.ResourceParams{
DialTarget: serviceName,
NodeID: xdsClientNodeID,
NodeID: nodeID,
Host: host,
Port: port,
SecLevel: e2e.SecurityLevelNone,
Expand Down Expand Up @@ -299,7 +308,7 @@ func (s) TestServerSideXDS_SecurityConfigChange(t *testing.T) {
// Create a ClientConn with the xds scheme and make a successful RPC.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
xdsCC, err := grpc.DialContext(ctx, fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(xdsCreds), grpc.WithResolvers(xdsResolverBuilder))
xdsCC, err := grpc.DialContext(ctx, fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(xdsCreds), grpc.WithResolvers(resolver))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
Expand Down Expand Up @@ -329,7 +338,7 @@ func (s) TestServerSideXDS_SecurityConfigChange(t *testing.T) {
// security configuration for mTLS with a file watcher certificate provider.
resources = e2e.DefaultClientResources(e2e.ResourceParams{
DialTarget: serviceName,
NodeID: xdsClientNodeID,
NodeID: nodeID,
Host: host,
Port: port,
SecLevel: e2e.SecurityLevelMTLS,
Expand Down
11 changes: 7 additions & 4 deletions xds/internal/test/xds_server_serving_mode_test.go
Expand Up @@ -86,6 +86,9 @@ func (mt *modeTracker) waitForUpdate(ctx context.Context) error {
// xDS enabled gRPC servers. It verifies that appropriate mode changes happen in
// the server, and also verifies behavior of clientConns under these modes.
func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
managementServer, nodeID, bootstrapContents, _, cleanup := setupManagementServer(t)
defer cleanup()

// Configure xDS credentials to be used on the server-side.
creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{
FallbackCreds: insecure.NewCredentials(),
Expand Down Expand Up @@ -131,7 +134,7 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
}
listener2 := e2e.DefaultServerListener(host2, port2, e2e.SecurityLevelNone)
resources := e2e.UpdateOptions{
NodeID: xdsClientNodeID,
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener1, listener2},
}
if err := managementServer.Update(resources); err != nil {
Expand Down Expand Up @@ -176,7 +179,7 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
// Update the management server to remove the second listener resource. This
// should push only the second listener into "not-serving" mode.
if err := managementServer.Update(e2e.UpdateOptions{
NodeID: xdsClientNodeID,
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener1},
}); err != nil {
t.Error(err)
Expand All @@ -193,7 +196,7 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
// well. This should push the first listener into "not-serving" mode. Second
// listener is already in "not-serving" mode.
if err := managementServer.Update(e2e.UpdateOptions{
NodeID: xdsClientNodeID,
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{},
}); err != nil {
t.Error(err)
Expand All @@ -216,7 +219,7 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {

// Update the management server with both listener resources.
if err := managementServer.Update(e2e.UpdateOptions{
NodeID: xdsClientNodeID,
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener1, listener2},
}); err != nil {
t.Error(err)
Expand Down

0 comments on commit 51003aa

Please sign in to comment.