Skip to content

Commit

Permalink
xdsclient: don't reset version info after stream restart
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Jun 14, 2022
1 parent 584d9cd commit 7ebf9e4
Show file tree
Hide file tree
Showing 18 changed files with 284 additions and 63 deletions.
48 changes: 48 additions & 0 deletions internal/testutils/xds/e2e/logging.go
@@ -0,0 +1,48 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package e2e

import (
"fmt"

"google.golang.org/grpc/grpclog"
)

var logger = grpclog.Component("xds-e2e")

// serverLogger implements the Logger interface defined at
// envoyproxy/go-control-plane/pkg/log. This is passed to the Snapshot cache.
type serverLogger struct{}

func (l serverLogger) Debugf(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
logger.InfoDepth(1, msg)
}
func (l serverLogger) Infof(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
logger.InfoDepth(1, msg)
}
func (l serverLogger) Warnf(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
logger.WarningDepth(1, msg)
}
func (l serverLogger) Errorf(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
logger.ErrorDepth(1, msg)
}
100 changes: 65 additions & 35 deletions internal/testutils/xds/e2e/server.go
Expand Up @@ -26,43 +26,19 @@ import (
"reflect"
"strconv"

"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"

v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
v3discoverygrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
v3cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
v3resource "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
v3server "github.com/envoyproxy/go-control-plane/pkg/server/v3"
"google.golang.org/grpc"
)

var logger = grpclog.Component("xds-e2e")

// serverLogger implements the Logger interface defined at
// envoyproxy/go-control-plane/pkg/log. This is passed to the Snapshot cache.
type serverLogger struct{}

func (l serverLogger) Debugf(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
logger.InfoDepth(1, msg)
}
func (l serverLogger) Infof(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
logger.InfoDepth(1, msg)
}
func (l serverLogger) Warnf(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
logger.WarningDepth(1, msg)
}
func (l serverLogger) Errorf(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
logger.ErrorDepth(1, msg)
}

// ManagementServer is a thin wrapper around the xDS control plane
// implementation provided by envoyproxy/go-control-plane.
type ManagementServer struct {
Expand All @@ -77,27 +53,81 @@ type ManagementServer struct {
version int // Version of resource snapshot.
}

// ManagementServerOptions contains options to be passed to the management
// server during creation.
type ManagementServerOptions struct {
// Listener to accept connections on. If nil, a TPC listener on a local port
// will be created and used.
Listener net.Listener

// The callbacks defined below correspond to the state of the world (sotw)
// version of the xDS API on the management server.

// OnStreamOpen is called when an xDS stream is opened. The callback is
// invoked with the assigned stream ID and the type URL from the incoming
// request (or "" for ADS).
//
// Returning an error from this callback will end processing and close the
// stream. OnStreamClosed will still be called.
OnStreamOpen func(context.Context, int64, string) error

// OnStreamClosed is called immediately prior to closing an xDS stream. The
// callback is invoked with the stream ID of the stream being closed.
OnStreamClosed func(int64)

// OnStreamRequest is called when a request is received on the stream. The
// callback is invoked with the stream ID of the stream on which the request
// was received and the received request.
//
// Returning an error from this callback will end processing and close the
// stream. OnStreamClosed will still be called.
OnStreamRequest func(int64, *v3discoverypb.DiscoveryRequest) error

// OnStreamResponse is called immediately prior to sending a response on the
// stream. The callback is invoked with the stream ID of the stream on which
// the response is being sent along with the incoming request and the outgoing
// response.
OnStreamResponse func(context.Context, int64, *v3discoverypb.DiscoveryRequest, *v3discoverypb.DiscoveryResponse)
}

// StartManagementServer initializes a management server which implements the
// AggregatedDiscoveryService endpoint. The management server is initialized
// with no resources. Tests should call the Update() method to change the
// resource snapshot held by the management server, as required by the test
// logic. When the test is done, it should call the Stop() method to cleanup
// resources allocated by the management server.
func StartManagementServer() (*ManagementServer, error) {
func StartManagementServer(opts *ManagementServerOptions) (*ManagementServer, error) {
// Create a snapshot cache.
cache := v3cache.NewSnapshotCache(true, v3cache.IDHash{}, serverLogger{})
logger.Infof("Created new snapshot cache...")

lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
return nil, fmt.Errorf("failed to start xDS management server: %v", err)
var lis net.Listener
if opts != nil && opts.Listener != nil {
lis = opts.Listener
} else {
var err error
lis, err = net.Listen("tcp", "localhost:0")
if err != nil {
return nil, fmt.Errorf("failed to start xDS management server: %v", err)
}
}

// Create an xDS management server and register the ADS implementation
// provided by it on a gRPC server. Cancelling the context passed to the
// server is the only way of stopping it at the end of the test.
// Cancelling the context passed to the server is the only way of stopping it
// at the end of the test.
ctx, cancel := context.WithCancel(context.Background())
xs := v3server.NewServer(ctx, cache, v3server.CallbackFuncs{})
callbacks := v3server.CallbackFuncs{}
if opts != nil {
callbacks = v3server.CallbackFuncs{
StreamOpenFunc: opts.OnStreamOpen,
StreamClosedFunc: opts.OnStreamClosed,
StreamRequestFunc: opts.OnStreamRequest,
StreamResponseFunc: opts.OnStreamResponse,
}
}

// Create an xDS management server and register the ADS implementation
// provided by it on a gRPC server.
xs := v3server.NewServer(ctx, cache, callbacks)
gs := grpc.NewServer()
v3discoverygrpc.RegisterAggregatedDiscoveryServiceServer(gs, xs)
logger.Infof("Registered Aggregated Discovery Service (ADS)...")
Expand Down
4 changes: 2 additions & 2 deletions internal/testutils/xds/e2e/setup_management_server.go
Expand Up @@ -41,11 +41,11 @@ import (
// - bootstrap contents to be used by the client
// - xDS resolver builder to be used by the client
// - a cleanup function to be invoked at the end of the test
func SetupManagementServer(t *testing.T) (*ManagementServer, string, []byte, resolver.Builder, func()) {
func SetupManagementServer(t *testing.T, opts *ManagementServerOptions) (*ManagementServer, string, []byte, resolver.Builder, func()) {
t.Helper()

// Spin up an xDS management server on a local port.
server, err := StartManagementServer()
server, err := StartManagementServer(opts)
if err != nil {
t.Fatalf("Failed to spin up the xDS management server: %v", err)
}
Expand Down
135 changes: 135 additions & 0 deletions test/xds/xds_client_ack_nack_test.go
@@ -0,0 +1,135 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package xds_test

import (
"context"
"fmt"
"testing"

"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"

v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
testgrpc "google.golang.org/grpc/test/grpc_testing"
testpb "google.golang.org/grpc/test/grpc_testing"
)

// TestClientResourceVersionAfterStreamRestart tests the scenario where the
// xdsClient's ADS stream to the management server gets broken. This test
// verifies that the version number on the initial request on the new stream
// indicates the most recent version seen by the client on the previous stream.
func (s) TestClientResourceVersionAfterStreamRestart(t *testing.T) {
// Create a restartable listener which can close existing connections.
l, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
lis := testutils.NewRestartableListener(l)

ackVersionsBeforeRestart := make(map[string]string)
ackVersionsAfterRestart := make(map[string]string)
streamClosed := grpcsync.NewEvent()
resourcesRequestedAfterStreamClose := make(chan struct{})
managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t, &e2e.ManagementServerOptions{
Listener: lis,
OnStreamRequest: func(id int64, request *v3discoverypb.DiscoveryRequest) error {
// Populate the versions in the appropriate map based on whether the
// stream has closed.
if !streamClosed.HasFired() {
if len(request.GetResourceNames()) != 0 {
ackVersionsBeforeRestart[request.GetTypeUrl()] = request.GetVersionInfo()
}
return nil
}
if len(request.GetResourceNames()) != 0 {
ackVersionsAfterRestart[request.GetTypeUrl()] = request.GetVersionInfo()
if len(ackVersionsAfterRestart) == len(ackVersionsBeforeRestart) {
select {
case resourcesRequestedAfterStreamClose <- struct{}{}:
default:
}
}
}
return nil
},
OnStreamClosed: func(int64) {
streamClosed.Fire()
},
})
defer cleanup1()

port, cleanup2 := startTestService(t, nil)
defer cleanup2()

const serviceName = "my-service-client-side-xds"
resources := e2e.DefaultClientResources(e2e.ResourceParams{
DialTarget: serviceName,
NodeID: nodeID,
Host: "localhost",
Port: port,
SecLevel: e2e.SecurityLevelNone,
})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Create a ClientConn and make a successful RPC.
cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()

client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("rpc EmptyCall() failed: %v", err)
}

// Stop the listener on the management server. This will cause the client to
// backoff and recreate the stream.
lis.Stop()

// Wait for the stream to be closed on the server.
<-streamClosed.Done()

// Restart the listener on the management server to be able to accept
// reconnect attempts from the client.
lis.Restart()

// Wait for all the previously sent resources to be re-requested.
select {
case <-resourcesRequestedAfterStreamClose:
case <-ctx.Done():
t.Fatal("timeout when waiting for all resources to be re-requested after stream restart")
}

if !cmp.Equal(ackVersionsBeforeRestart, ackVersionsAfterRestart) {
t.Fatalf("ackVersionsBeforeRestart: %v and ackVersionsAfterRestart: %v don't match", ackVersionsBeforeRestart, ackVersionsAfterRestart)
}
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("rpc EmptyCall() failed: %v", err)
}
}
2 changes: 1 addition & 1 deletion test/xds/xds_client_affinity_test.go
Expand Up @@ -88,7 +88,7 @@ func (s) TestClientSideAffinitySanityCheck(t *testing.T) {
return func() { envconfig.XDSRingHash = old }
}()()

managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t)
managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t, nil)
defer cleanup1()

port, cleanup2 := startTestService(t, nil)
Expand Down
4 changes: 2 additions & 2 deletions test/xds/xds_client_federation_test.go
Expand Up @@ -53,15 +53,15 @@ func (s) TestClientSideFederation(t *testing.T) {
defer func() { envconfig.XDSFederation = oldXDSFederation }()

// Start a management server as the default authority.
serverDefaultAuth, err := e2e.StartManagementServer()
serverDefaultAuth, err := e2e.StartManagementServer(nil)
if err != nil {
t.Fatalf("Failed to spin up the xDS management server: %v", err)
}
t.Cleanup(serverDefaultAuth.Stop)

// Start another management server as the other authority.
const nonDefaultAuth = "non-default-auth"
serverAnotherAuth, err := e2e.StartManagementServer()
serverAnotherAuth, err := e2e.StartManagementServer(nil)
if err != nil {
t.Fatalf("Failed to spin up the xDS management server: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion test/xds/xds_client_integration_test.go
Expand Up @@ -74,7 +74,7 @@ func startTestService(t *testing.T, server *stubserver.StubServer) (uint32, func
}

func (s) TestClientSideXDS(t *testing.T) {
managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t)
managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t, nil)
defer cleanup1()

port, cleanup2 := startTestService(t, nil)
Expand Down
2 changes: 1 addition & 1 deletion test/xds/xds_client_retry_test.go
Expand Up @@ -49,7 +49,7 @@ func (s) TestClientSideRetry(t *testing.T) {
},
}

managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t)
managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t, nil)
defer cleanup1()

port, cleanup2 := startTestService(t, ss)
Expand Down
2 changes: 1 addition & 1 deletion test/xds/xds_rls_clusterspecifier_plugin_test.go
Expand Up @@ -104,7 +104,7 @@ func (s) TestRLSinxDS(t *testing.T) {
// Set up all components and configuration necessary - management server,
// xDS resolver, fake RLS Server, and xDS configuration which specifies an
// RLS Balancer that communicates to this set up fake RLS Server.
managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t)
managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t, nil)
defer cleanup1()
port, cleanup2 := startTestService(t, nil)
defer cleanup2()
Expand Down

0 comments on commit 7ebf9e4

Please sign in to comment.