Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xdsclient: don't reset version info after stream restart #5422

Merged
merged 7 commits into from Jul 6, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
48 changes: 48 additions & 0 deletions internal/testutils/xds/e2e/logging.go
@@ -0,0 +1,48 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package e2e

import (
"fmt"

"google.golang.org/grpc/grpclog"
)

var logger = grpclog.Component("xds-e2e")

// serverLogger implements the Logger interface defined at
// envoyproxy/go-control-plane/pkg/log. This is passed to the Snapshot cache.
type serverLogger struct{}

func (l serverLogger) Debugf(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
logger.InfoDepth(1, msg)
}
func (l serverLogger) Infof(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
logger.InfoDepth(1, msg)
}
func (l serverLogger) Warnf(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
logger.WarningDepth(1, msg)
}
func (l serverLogger) Errorf(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
logger.ErrorDepth(1, msg)
}
100 changes: 65 additions & 35 deletions internal/testutils/xds/e2e/server.go
Expand Up @@ -26,43 +26,19 @@ import (
"reflect"
"strconv"

"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"

v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
v3discoverygrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
v3cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
v3resource "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
v3server "github.com/envoyproxy/go-control-plane/pkg/server/v3"
"google.golang.org/grpc"
)

var logger = grpclog.Component("xds-e2e")

// serverLogger implements the Logger interface defined at
// envoyproxy/go-control-plane/pkg/log. This is passed to the Snapshot cache.
type serverLogger struct{}

func (l serverLogger) Debugf(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
logger.InfoDepth(1, msg)
}
func (l serverLogger) Infof(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
logger.InfoDepth(1, msg)
}
func (l serverLogger) Warnf(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
logger.WarningDepth(1, msg)
}
func (l serverLogger) Errorf(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
logger.ErrorDepth(1, msg)
}

// ManagementServer is a thin wrapper around the xDS control plane
// implementation provided by envoyproxy/go-control-plane.
type ManagementServer struct {
Expand All @@ -77,27 +53,81 @@ type ManagementServer struct {
version int // Version of resource snapshot.
}

// ManagementServerOptions contains options to be passed to the management
// server during creation.
type ManagementServerOptions struct {
// Listener to accept connections on. If nil, a TPC listener on a local port
// will be created and used.
Listener net.Listener

// The callbacks defined below correspond to the state of the world (sotw)
// version of the xDS API on the management server.

// OnStreamOpen is called when an xDS stream is opened. The callback is
// invoked with the assigned stream ID and the type URL from the incoming
// request (or "" for ADS).
//
// Returning an error from this callback will end processing and close the
// stream. OnStreamClosed will still be called.
OnStreamOpen func(context.Context, int64, string) error

// OnStreamClosed is called immediately prior to closing an xDS stream. The
// callback is invoked with the stream ID of the stream being closed.
OnStreamClosed func(int64)

// OnStreamRequest is called when a request is received on the stream. The
// callback is invoked with the stream ID of the stream on which the request
// was received and the received request.
//
// Returning an error from this callback will end processing and close the
// stream. OnStreamClosed will still be called.
OnStreamRequest func(int64, *v3discoverypb.DiscoveryRequest) error

// OnStreamResponse is called immediately prior to sending a response on the
// stream. The callback is invoked with the stream ID of the stream on which
// the response is being sent along with the incoming request and the outgoing
// response.
OnStreamResponse func(context.Context, int64, *v3discoverypb.DiscoveryRequest, *v3discoverypb.DiscoveryResponse)
}

// StartManagementServer initializes a management server which implements the
// AggregatedDiscoveryService endpoint. The management server is initialized
// with no resources. Tests should call the Update() method to change the
// resource snapshot held by the management server, as required by the test
// logic. When the test is done, it should call the Stop() method to cleanup
// resources allocated by the management server.
func StartManagementServer() (*ManagementServer, error) {
func StartManagementServer(opts *ManagementServerOptions) (*ManagementServer, error) {
// Create a snapshot cache.
cache := v3cache.NewSnapshotCache(true, v3cache.IDHash{}, serverLogger{})
logger.Infof("Created new snapshot cache...")

lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
return nil, fmt.Errorf("failed to start xDS management server: %v", err)
var lis net.Listener
if opts != nil && opts.Listener != nil {
lis = opts.Listener
} else {
var err error
lis, err = net.Listen("tcp", "localhost:0")
if err != nil {
return nil, fmt.Errorf("failed to start xDS management server: %v", err)
}
}

// Create an xDS management server and register the ADS implementation
// provided by it on a gRPC server. Cancelling the context passed to the
// server is the only way of stopping it at the end of the test.
// Cancelling the context passed to the server is the only way of stopping it
// at the end of the test.
ctx, cancel := context.WithCancel(context.Background())
xs := v3server.NewServer(ctx, cache, v3server.CallbackFuncs{})
callbacks := v3server.CallbackFuncs{}
if opts != nil {
callbacks = v3server.CallbackFuncs{
StreamOpenFunc: opts.OnStreamOpen,
StreamClosedFunc: opts.OnStreamClosed,
StreamRequestFunc: opts.OnStreamRequest,
StreamResponseFunc: opts.OnStreamResponse,
}
}

// Create an xDS management server and register the ADS implementation
// provided by it on a gRPC server.
xs := v3server.NewServer(ctx, cache, callbacks)
gs := grpc.NewServer()
v3discoverygrpc.RegisterAggregatedDiscoveryServiceServer(gs, xs)
logger.Infof("Registered Aggregated Discovery Service (ADS)...")
Expand Down
4 changes: 2 additions & 2 deletions internal/testutils/xds/e2e/setup_management_server.go
Expand Up @@ -41,11 +41,11 @@ import (
// - bootstrap contents to be used by the client
// - xDS resolver builder to be used by the client
// - a cleanup function to be invoked at the end of the test
func SetupManagementServer(t *testing.T) (*ManagementServer, string, []byte, resolver.Builder, func()) {
func SetupManagementServer(t *testing.T, opts *ManagementServerOptions) (*ManagementServer, string, []byte, resolver.Builder, func()) {
t.Helper()

// Spin up an xDS management server on a local port.
server, err := StartManagementServer()
server, err := StartManagementServer(opts)
if err != nil {
t.Fatalf("Failed to spin up the xDS management server: %v", err)
}
Expand Down
183 changes: 183 additions & 0 deletions test/xds/xds_client_ack_nack_test.go
@@ -0,0 +1,183 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package xds_test

import (
"context"
"fmt"
"testing"

"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"

v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
testgrpc "google.golang.org/grpc/test/grpc_testing"
testpb "google.golang.org/grpc/test/grpc_testing"
)

const (
// We are interested in LDS, RDS, CDS and EDS resources as part of the regular
// xDS flow on the client.
wantResources = 4
// Management server assigns monotonically increasing stream IDs starting at 1.
idBeforeRestart = 1
idAfterRestart = 2
)

type streamRequestTuple struct {
id int64 // Assigned by the management server.
req *v3discoverypb.DiscoveryRequest // As received by the managerment server.
}

// TestClientResourceVersionAfterStreamRestart tests the scenario where the
// xdsClient's ADS stream to the management server gets broken. This test
// verifies that the version number on the initial request on the new stream
// indicates the most recent version seen by the client on the previous stream.
func (s) TestClientResourceVersionAfterStreamRestart(t *testing.T) {
// Create a restartable listener which can close existing connections.
l, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
lis := testutils.NewRestartableListener(l)

// Event to notify stream closure.
streamClosed := grpcsync.NewEvent()
// ADS requests received by the management server are pushed on to this
// channel from the below registered callbacks. A big buffer is configured
// on the channel to accommodate for the fact that at least two requests are
// required for every resource (the initial one and the ACK), and because it
// allows us to have simpler code.
requestsCh := make(chan streamRequestTuple, 50)
managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t, &e2e.ManagementServerOptions{
Listener: lis,
OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error {
requestsCh <- streamRequestTuple{id: id, req: req}
return nil
},
OnStreamClosed: func(int64) {
streamClosed.Fire()
},
})
defer cleanup1()

port, cleanup2 := startTestService(t, nil)
defer cleanup2()

const serviceName = "my-service-client-side-xds"
resources := e2e.DefaultClientResources(e2e.ResourceParams{
DialTarget: serviceName,
NodeID: nodeID,
Host: "localhost",
Port: port,
SecLevel: e2e.SecurityLevelNone,
})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Create a ClientConn and make a successful RPC.
cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()

client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("rpc EmptyCall() failed: %v", err)
}

// Wait for all the resources to be ACKed.
ackVersionsBeforeRestart := make(map[string]string)
AcksBeforeRestart:
for {
select {
case tuple := <-requestsCh:
if tuple.id != idBeforeRestart {
t.Fatalf("Received request with stream ID %d, expecting %d", tuple.id, idBeforeRestart)
}
// The client first requests for a resource with version set to empty
// string. After receipt of the response, it sends another request for
// the same resource, this time with a non-empty version string. This
// corresponds to ACKs, and this is what we want to capture.
req := tuple.req
if len(req.GetResourceNames()) != 0 && req.GetVersionInfo() != "" {
ackVersionsBeforeRestart[req.GetTypeUrl()] = req.GetVersionInfo()
}
if len(ackVersionsBeforeRestart) == wantResources {
break AcksBeforeRestart
}
case <-ctx.Done():
t.Fatal("timeout when waiting for resources to be requested before stream restart")
}
}

// Stop the listener on the management server. This will cause the client to
// backoff and recreate the stream.
lis.Stop()

// Wait for the stream to be closed on the server.
<-streamClosed.Done()

// Restart the listener on the management server to be able to accept
// reconnect attempts from the client.
lis.Restart()

// Wait for all the previously sent resources to be re-requested.
ackVersionsAfterRestart := make(map[string]string)
AcksAfterRestart:
for {
select {
case tuple := <-requestsCh:
if tuple.id == idBeforeRestart {
// Ignore any stray requests from the old stream.
continue
}
if tuple.id != idAfterRestart {
t.Fatalf("Received request with stream ID %d, expecting %d", tuple.id, idAfterRestart)
}
// After stream closure, capture the first request for every resource.
// This should not be set to an empty version string, but instead should
// be set to the version last ACKed before stream closure.
req := tuple.req
if len(req.GetResourceNames()) != 0 {
ackVersionsAfterRestart[req.GetTypeUrl()] = req.GetVersionInfo()
}
if len(ackVersionsAfterRestart) == wantResources {
break AcksAfterRestart
}
case <-ctx.Done():
t.Fatal("timeout when waiting for resources to be re-requested after stream restart")
}
}

if !cmp.Equal(ackVersionsBeforeRestart, ackVersionsAfterRestart) {
t.Fatalf("ackVersionsBeforeRestart: %v and ackVersionsAfterRestart: %v don't match", ackVersionsBeforeRestart, ackVersionsAfterRestart)
}
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("rpc EmptyCall() failed: %v", err)
}
}
2 changes: 1 addition & 1 deletion test/xds/xds_client_affinity_test.go
Expand Up @@ -88,7 +88,7 @@ func (s) TestClientSideAffinitySanityCheck(t *testing.T) {
return func() { envconfig.XDSRingHash = old }
}()()

managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t)
managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t, nil)
defer cleanup1()

port, cleanup2 := startTestService(t, nil)
Expand Down