Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xdsclient: don't reset version info after stream restart #5422

Merged
merged 7 commits into from Jul 6, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
48 changes: 48 additions & 0 deletions internal/testutils/xds/e2e/logging.go
@@ -0,0 +1,48 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package e2e

import (
"fmt"

"google.golang.org/grpc/grpclog"
)

var logger = grpclog.Component("xds-e2e")

// serverLogger implements the Logger interface defined at
// envoyproxy/go-control-plane/pkg/log. This is passed to the Snapshot cache.
type serverLogger struct{}

func (l serverLogger) Debugf(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
logger.InfoDepth(1, msg)
}
func (l serverLogger) Infof(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
logger.InfoDepth(1, msg)
}
func (l serverLogger) Warnf(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
logger.WarningDepth(1, msg)
}
func (l serverLogger) Errorf(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
logger.ErrorDepth(1, msg)
}
100 changes: 65 additions & 35 deletions internal/testutils/xds/e2e/server.go
Expand Up @@ -26,43 +26,19 @@ import (
"reflect"
"strconv"

"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"

v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
v3discoverygrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
v3cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
v3resource "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
v3server "github.com/envoyproxy/go-control-plane/pkg/server/v3"
"google.golang.org/grpc"
)

var logger = grpclog.Component("xds-e2e")

// serverLogger implements the Logger interface defined at
// envoyproxy/go-control-plane/pkg/log. This is passed to the Snapshot cache.
type serverLogger struct{}

func (l serverLogger) Debugf(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
logger.InfoDepth(1, msg)
}
func (l serverLogger) Infof(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
logger.InfoDepth(1, msg)
}
func (l serverLogger) Warnf(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
logger.WarningDepth(1, msg)
}
func (l serverLogger) Errorf(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
logger.ErrorDepth(1, msg)
}

// ManagementServer is a thin wrapper around the xDS control plane
// implementation provided by envoyproxy/go-control-plane.
type ManagementServer struct {
Expand All @@ -77,27 +53,81 @@ type ManagementServer struct {
version int // Version of resource snapshot.
}

// ManagementServerOptions contains options to be passed to the management
// server during creation.
type ManagementServerOptions struct {
// Listener to accept connections on. If nil, a TPC listener on a local port
// will be created and used.
Listener net.Listener

// The callbacks defined below correspond to the state of the world (sotw)
// version of the xDS API on the management server.

// OnStreamOpen is called when an xDS stream is opened. The callback is
// invoked with the assigned stream ID and the type URL from the incoming
// request (or "" for ADS).
//
// Returning an error from this callback will end processing and close the
// stream. OnStreamClosed will still be called.
OnStreamOpen func(context.Context, int64, string) error

// OnStreamClosed is called immediately prior to closing an xDS stream. The
// callback is invoked with the stream ID of the stream being closed.
OnStreamClosed func(int64)

// OnStreamRequest is called when a request is received on the stream. The
// callback is invoked with the stream ID of the stream on which the request
// was received and the received request.
//
// Returning an error from this callback will end processing and close the
// stream. OnStreamClosed will still be called.
OnStreamRequest func(int64, *v3discoverypb.DiscoveryRequest) error

// OnStreamResponse is called immediately prior to sending a response on the
// stream. The callback is invoked with the stream ID of the stream on which
// the response is being sent along with the incoming request and the outgoing
// response.
OnStreamResponse func(context.Context, int64, *v3discoverypb.DiscoveryRequest, *v3discoverypb.DiscoveryResponse)
}

// StartManagementServer initializes a management server which implements the
// AggregatedDiscoveryService endpoint. The management server is initialized
// with no resources. Tests should call the Update() method to change the
// resource snapshot held by the management server, as required by the test
// logic. When the test is done, it should call the Stop() method to cleanup
// resources allocated by the management server.
func StartManagementServer() (*ManagementServer, error) {
func StartManagementServer(opts *ManagementServerOptions) (*ManagementServer, error) {
// Create a snapshot cache.
cache := v3cache.NewSnapshotCache(true, v3cache.IDHash{}, serverLogger{})
logger.Infof("Created new snapshot cache...")

lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
return nil, fmt.Errorf("failed to start xDS management server: %v", err)
var lis net.Listener
if opts != nil && opts.Listener != nil {
lis = opts.Listener
} else {
var err error
lis, err = net.Listen("tcp", "localhost:0")
if err != nil {
return nil, fmt.Errorf("failed to start xDS management server: %v", err)
}
}

// Create an xDS management server and register the ADS implementation
// provided by it on a gRPC server. Cancelling the context passed to the
// server is the only way of stopping it at the end of the test.
// Cancelling the context passed to the server is the only way of stopping it
// at the end of the test.
ctx, cancel := context.WithCancel(context.Background())
xs := v3server.NewServer(ctx, cache, v3server.CallbackFuncs{})
callbacks := v3server.CallbackFuncs{}
if opts != nil {
callbacks = v3server.CallbackFuncs{
StreamOpenFunc: opts.OnStreamOpen,
StreamClosedFunc: opts.OnStreamClosed,
StreamRequestFunc: opts.OnStreamRequest,
StreamResponseFunc: opts.OnStreamResponse,
}
}

// Create an xDS management server and register the ADS implementation
// provided by it on a gRPC server.
xs := v3server.NewServer(ctx, cache, callbacks)
gs := grpc.NewServer()
v3discoverygrpc.RegisterAggregatedDiscoveryServiceServer(gs, xs)
logger.Infof("Registered Aggregated Discovery Service (ADS)...")
Expand Down
4 changes: 2 additions & 2 deletions internal/testutils/xds/e2e/setup_management_server.go
Expand Up @@ -41,11 +41,11 @@ import (
// - bootstrap contents to be used by the client
// - xDS resolver builder to be used by the client
// - a cleanup function to be invoked at the end of the test
func SetupManagementServer(t *testing.T) (*ManagementServer, string, []byte, resolver.Builder, func()) {
func SetupManagementServer(t *testing.T, opts *ManagementServerOptions) (*ManagementServer, string, []byte, resolver.Builder, func()) {
t.Helper()

// Spin up an xDS management server on a local port.
server, err := StartManagementServer()
server, err := StartManagementServer(opts)
if err != nil {
t.Fatalf("Failed to spin up the xDS management server: %v", err)
}
Expand Down
166 changes: 166 additions & 0 deletions test/xds/xds_client_ack_nack_test.go
@@ -0,0 +1,166 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package xds_test

import (
"context"
"fmt"
"testing"

"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"

v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
testgrpc "google.golang.org/grpc/test/grpc_testing"
testpb "google.golang.org/grpc/test/grpc_testing"
)

// We are interested in LDS, RDS, CDS and EDS resources as part of the regular
// xDS flow on the client.
const wantResources = 4

// seenAllACKs returns true if we have seen two streams with acks for all the
// resources that we are interested in.
func seenAllACKs(acks map[int64]map[string]string) bool {
if len(acks) != 2 {
return false
}
for _, v := range acks {
if len(v) != wantResources {
return false
}
}
return true
}

// TestClientResourceVersionAfterStreamRestart tests the scenario where the
// xdsClient's ADS stream to the management server gets broken. This test
// verifies that the version number on the initial request on the new stream
// indicates the most recent version seen by the client on the previous stream.
func (s) TestClientResourceVersionAfterStreamRestart(t *testing.T) {
// Create a restartable listener which can close existing connections.
l, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
lis := testutils.NewRestartableListener(l)

streamClosed := grpcsync.NewEvent() // Event to notify stream closure.
acksReceived := grpcsync.NewEvent() // Event to notify receipt of acks for all resources.
// Map from stream id to a map of resource type to resource version.
ackVersionsMap := make(map[int64]map[string]string)
managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t, &e2e.ManagementServerOptions{
Listener: lis,
OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error {
// Return early under the following circumstances:
// - Received all the requests we wanted to see. This is to avoid
// any stray requests leading to test flakes.
// - Request contains no resource names. Such requests are usually
// seen when the xdsclient is shutting down and is no longer
// interested in the resources that it had subscribed to earlier.
if acksReceived.HasFired() || len(req.GetResourceNames()) == 0 {
return nil
}
// Create a stream specific map to store ack versions if this is the
// first time we are seeing this stream id.
if ackVersionsMap[id] == nil {
ackVersionsMap[id] = make(map[string]string)
}
ackVersionsMap[id][req.GetTypeUrl()] = req.GetVersionInfo()
if seenAllACKs(ackVersionsMap) {
acksReceived.Fire()
}
return nil
},
OnStreamClosed: func(int64) {
streamClosed.Fire()
},
})
defer cleanup1()

port, cleanup2 := startTestService(t, nil)
defer cleanup2()

const serviceName = "my-service-client-side-xds"
resources := e2e.DefaultClientResources(e2e.ResourceParams{
DialTarget: serviceName,
NodeID: nodeID,
Host: "localhost",
Port: port,
SecLevel: e2e.SecurityLevelNone,
})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Create a ClientConn and make a successful RPC.
cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()

client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("rpc EmptyCall() failed: %v", err)
}

// A successful RPC means that we have captured the ack versions for all
// resources in the OnStreamRequest callback. Nothing more needs to be done
// here before stream restart.

// Stop the listener on the management server. This will cause the client to
// backoff and recreate the stream.
lis.Stop()

// Wait for the stream to be closed on the server.
<-streamClosed.Done()

// Restart the listener on the management server to be able to accept
// reconnect attempts from the client.
lis.Restart()

// Wait for all the previously sent resources to be re-requested.
<-acksReceived.Done()

// Retrieve and compare ACKs from before and after stream restart.
var idBeforeRestart, idAfterRestart int64
for id := range ackVersionsMap {
if id > idAfterRestart {
idBeforeRestart = idAfterRestart
idAfterRestart = id
} else {
idBeforeRestart = id
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why this needs to be so complicated now. Why can't we use 1 and 2 directly anymore as before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we moved to having a map indexed by the stream id, I thought this should be the way to go. Over engineering .. sigh.

Done now. Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Go had better generics support already, I wouldn't mind it so much as:

idBeforeRestart := min(keys(ackVersionsMap))
idAfterRestart := max(keys(ackVersionsMap))

}
}
acksBeforeRestart := ackVersionsMap[idBeforeRestart]
acksAfterRestart := ackVersionsMap[idAfterRestart]
if !cmp.Equal(acksBeforeRestart, acksAfterRestart) {
t.Fatalf("ACKs before restart: %v and ACKs after restart: %v don't match", acksBeforeRestart, acksAfterRestart)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use cmp.Diff instead of cmp.Equal and display that output instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("rpc EmptyCall() failed: %v", err)
}
}
2 changes: 1 addition & 1 deletion test/xds/xds_client_affinity_test.go
Expand Up @@ -88,7 +88,7 @@ func (s) TestClientSideAffinitySanityCheck(t *testing.T) {
return func() { envconfig.XDSRingHash = old }
}()()

managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t)
managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t, nil)
defer cleanup1()

port, cleanup2 := startTestService(t, nil)
Expand Down
4 changes: 2 additions & 2 deletions test/xds/xds_client_federation_test.go
Expand Up @@ -53,15 +53,15 @@ func (s) TestClientSideFederation(t *testing.T) {
defer func() { envconfig.XDSFederation = oldXDSFederation }()

// Start a management server as the default authority.
serverDefaultAuth, err := e2e.StartManagementServer()
serverDefaultAuth, err := e2e.StartManagementServer(nil)
if err != nil {
t.Fatalf("Failed to spin up the xDS management server: %v", err)
}
t.Cleanup(serverDefaultAuth.Stop)

// Start another management server as the other authority.
const nonDefaultAuth = "non-default-auth"
serverAnotherAuth, err := e2e.StartManagementServer()
serverAnotherAuth, err := e2e.StartManagementServer(nil)
if err != nil {
t.Fatalf("Failed to spin up the xDS management server: %v", err)
}
Expand Down