Skip to content

Commit

Permalink
remove tests which got missed as part of a merge conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Jan 25, 2023
1 parent c9e452a commit 79007e1
Showing 1 changed file with 89 additions and 68 deletions.
157 changes: 89 additions & 68 deletions xds/internal/resolver/xds_resolver_test.go
Expand Up @@ -567,14 +567,6 @@ func (s) TestResolverCloseClosesXDSClient(t *testing.T) {
}
}

// TestXDSResolverCloseClosesXDSClient tests that the XDS resolver's Close
// method closes the XDS client.
func (s) TestXDSResolverCloseClosesXDSClient(t *testing.T) {
xdsR, _, _, cancel := testSetup(t, setupOpts{target: target})
xdsR.Close()
cancel() // Blocks until the xDS client is closed.
}

// TestResolverBadServiceUpdate tests the case where a resource returned by the
// management server is NACKed by the xDS client, which then returns an update
// containing an error to the resolver. Verifies that the update is propagated
Expand Down Expand Up @@ -1168,110 +1160,139 @@ func (s) TestResolverRemovedWithRPCs(t *testing.T) {
}
}

// TestXDSResolverRemovedResource tests for proper behavior after a resource is
// removed.
func (s) TestXDSResolverRemovedResource(t *testing.T) {
xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target})
defer cancel()
defer xdsR.Close()
// TestResolverRemovedResource tests the case where resources returned by the
// management server are removed. The test verifies that the resolver pushes the
// expected config selector and service config in this case.
func (s) TestResolverRemovedResource(t *testing.T) {
mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
if err != nil {
t.Fatal(err)
}
defer mgmtServer.Stop()

// Create a bootstrap configuration specifying the above management server.
nodeID := uuid.New().String()
cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
NodeID: nodeID,
ServerURI: mgmtServer.Address,
Version: xdsbootstrap.TransportV3,
})
if err != nil {
t.Fatal(err)
}
defer cleanup()

const serviceName = "my-service-client-side-xds"
tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)})
defer rClose()

// Configure the management server with a good listener and route
// configuration resource.
ldsName := serviceName
rdsName := "route-" + serviceName
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)},
Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, ldsName, "test-cluster-1")},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForWatchListener(ctx, t, xdsC, targetStr)
xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)

// Invoke the watchAPI callback with a good service update and wait for the
// UpdateState method to be called on the ClientConn.
xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{"test-cluster-1": {Weight: 1}}}},
},
},
}, nil)
wantJSON := `{"loadBalancingConfig":[{
"xds_cluster_manager_experimental":{
"children":{
"cluster:test-cluster-1":{
"childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}]
}
}
}}]}`
wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(wantJSON)
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

gotState, err := tcc.stateCh.Receive(ctx)
// Read the update pushed by the resolver to the ClientConn.
val, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
t.Fatalf("Timeout waiting for an update from the resolver: %v", err)
}
rState := gotState.(resolver.State)
rState := val.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err)
t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err)
}
wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(`
{
"loadBalancingConfig": [
{
"xds_cluster_manager_experimental": {
"children": {
"cluster:test-cluster-1": {
"childPolicy": [
{
"cds_experimental": {
"cluster": "test-cluster-1"
}
}
]
}
}
}
}
]
}`)
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Errorf("ClientConn.UpdateState received different service config")
t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config))
t.Error("want: ", cmp.Diff(nil, wantSCParsed.Config))
t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config))
}

// "Make an RPC" by invoking the config selector.
cs := iresolver.GetConfigSelector(rState)
if cs == nil {
t.Fatalf("received nil config selector")
t.Fatal("Received nil config selector in update from resolver")
}

res, err := cs.SelectConfig(iresolver.RPCInfo{Context: context.Background()})
res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
if err != nil {
t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err)
t.Fatalf("cs.SelectConfig(): %v", err)
}

// "Finish the RPC"; this could cause a panic if the resolver doesn't
// handle it correctly.
res.OnCommitted()

// Delete the resource. The channel should receive a service config with the
// original cluster but with an erroring config selector.
suErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource removed error")
xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{}, suErr)
// Delete the resources on the management server, resulting in a
// resource-not-found error from the xDS client.
if err := mgmtServer.Update(ctx, e2e.UpdateOptions{NodeID: nodeID}); err != nil {
t.Fatal(err)
}

if gotState, err = tcc.stateCh.Receive(ctx); err != nil {
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
// The channel should receive the existing service config with the original
// cluster but with an erroring config selector.
val, err = tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Timeout waiting for an update from the resolver: %v", err)
}
rState = gotState.(resolver.State)
rState = val.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err)
t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err)
}
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Errorf("ClientConn.UpdateState received different service config")
t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config))
t.Error("want: ", cmp.Diff(nil, wantSCParsed.Config))
t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config))
}

// "Make another RPC" by invoking the config selector.
cs = iresolver.GetConfigSelector(rState)
if cs == nil {
t.Fatalf("received nil config selector")
t.Fatal("Received nil config selector in update from resolver")
}

res, err = cs.SelectConfig(iresolver.RPCInfo{Context: context.Background()})
res, err = cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
if err == nil || status.Code(err) != codes.Unavailable {
t.Fatalf("Expected UNAVAILABLE error from cs.SelectConfig(_); got %v, %v", res, err)
t.Fatalf("cs.SelectConfig() got %v, %v, expected UNAVAILABLE error", res, err)
}

// In the meantime, an empty ServiceConfig update should have been sent.
if gotState, err = tcc.stateCh.Receive(ctx); err != nil {
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
val, err = tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Timeout waiting for an update from the resolver: %v", err)
}
rState = gotState.(resolver.State)
rState = val.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err)
t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err)
}
wantSCParsed = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)("{}")
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Errorf("ClientConn.UpdateState received different service config")
t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config))
t.Error("want: ", cmp.Diff(nil, wantSCParsed.Config))
t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config))
}
}

Expand Down

0 comments on commit 79007e1

Please sign in to comment.