diff --git a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java index 693848897c6..dcd69e427c3 100644 --- a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java @@ -190,6 +190,7 @@ final class ClientXdsClient extends AbstractXdsClient { protected void handleLdsResponse(String versionInfo, List resources, String nonce) { Map parsedResources = new HashMap<>(resources.size()); Set unpackedResources = new HashSet<>(resources.size()); + Set invalidResources = new HashSet<>(); List errors = new ArrayList<>(); Set retainedRdsResources = new HashSet<>(); @@ -222,6 +223,7 @@ protected void handleLdsResponse(String versionInfo, List resources, String } catch (ResourceInvalidException e) { errors.add( "LDS response Listener '" + listenerName + "' validation error: " + e.getMessage()); + invalidResources.add(listenerName); continue; } @@ -231,19 +233,9 @@ protected void handleLdsResponse(String versionInfo, List resources, String getLogger().log(XdsLogLevel.INFO, "Received LDS Response version {0} nonce {1}. Parsed resources: {2}", versionInfo, nonce, unpackedResources); - - if (!errors.isEmpty()) { - handleResourcesRejected(ResourceType.LDS, unpackedResources, versionInfo, nonce, errors); - return; - } - - handleResourcesAccepted(ResourceType.LDS, parsedResources, versionInfo, nonce); - for (String resource : rdsResourceSubscribers.keySet()) { - if (!retainedRdsResources.contains(resource)) { - ResourceSubscriber subscriber = rdsResourceSubscribers.get(resource); - subscriber.onAbsent(); - } - } + handleResourceUpdate( + ResourceType.LDS, parsedResources, invalidResources, retainedRdsResources, versionInfo, + nonce, errors); } private LdsUpdate processClientSideListener( @@ -1313,6 +1305,7 @@ static StructOrError parseClusterWeight( protected void handleRdsResponse(String versionInfo, List resources, String nonce) { Map parsedResources = new HashMap<>(resources.size()); Set unpackedResources = new HashSet<>(resources.size()); + Set invalidResources = new HashSet<>(); List errors = new ArrayList<>(); for (int i = 0; i < resources.size(); i++) { @@ -1340,6 +1333,7 @@ protected void handleRdsResponse(String versionInfo, List resources, String errors.add( "RDS response RouteConfiguration '" + routeConfigName + "' validation error: " + e .getMessage()); + invalidResources.add(routeConfigName); continue; } @@ -1348,12 +1342,9 @@ protected void handleRdsResponse(String versionInfo, List resources, String getLogger().log(XdsLogLevel.INFO, "Received RDS Response version {0} nonce {1}. Parsed resources: {2}", versionInfo, nonce, unpackedResources); - - if (!errors.isEmpty()) { - handleResourcesRejected(ResourceType.RDS, unpackedResources, versionInfo, nonce, errors); - } else { - handleResourcesAccepted(ResourceType.RDS, parsedResources, versionInfo, nonce); - } + handleResourceUpdate( + ResourceType.RDS, parsedResources, invalidResources, Collections.emptySet(), + versionInfo, nonce, errors); } private static RdsUpdate processRouteConfiguration( @@ -1377,6 +1368,7 @@ private static RdsUpdate processRouteConfiguration( protected void handleCdsResponse(String versionInfo, List resources, String nonce) { Map parsedResources = new HashMap<>(resources.size()); Set unpackedResources = new HashSet<>(resources.size()); + Set invalidResources = new HashSet<>(); List errors = new ArrayList<>(); Set retainedEdsResources = new HashSet<>(); @@ -1413,6 +1405,7 @@ protected void handleCdsResponse(String versionInfo, List resources, String } catch (ResourceInvalidException e) { errors.add( "CDS response Cluster '" + clusterName + "' validation error: " + e.getMessage()); + invalidResources.add(clusterName); continue; } parsedResources.put(clusterName, new ParsedResource(cdsUpdate, resource)); @@ -1420,21 +1413,9 @@ protected void handleCdsResponse(String versionInfo, List resources, String getLogger().log(XdsLogLevel.INFO, "Received CDS Response version {0} nonce {1}. Parsed resources: {2}", versionInfo, nonce, unpackedResources); - - if (!errors.isEmpty()) { - handleResourcesRejected(ResourceType.CDS, unpackedResources, versionInfo, nonce, errors); - return; - } - - handleResourcesAccepted(ResourceType.CDS, parsedResources, versionInfo, nonce); - // CDS responses represents the state of the world, EDS resources not referenced in CDS - // resources should be deleted. - for (String resource : edsResourceSubscribers.keySet()) { - ResourceSubscriber subscriber = edsResourceSubscribers.get(resource); - if (!retainedEdsResources.contains(resource)) { - subscriber.onAbsent(); - } - } + handleResourceUpdate( + ResourceType.CDS, parsedResources, invalidResources, retainedEdsResources, versionInfo, + nonce, errors); } @VisibleForTesting @@ -1615,6 +1596,7 @@ private static StructOrError parseNonAggregateCluster( protected void handleEdsResponse(String versionInfo, List resources, String nonce) { Map parsedResources = new HashMap<>(resources.size()); Set unpackedResources = new HashSet<>(resources.size()); + Set invalidResources = new HashSet<>(); List errors = new ArrayList<>(); for (int i = 0; i < resources.size(); i++) { @@ -1649,16 +1631,17 @@ protected void handleEdsResponse(String versionInfo, List resources, String } catch (ResourceInvalidException e) { errors.add("EDS response ClusterLoadAssignment '" + clusterName + "' validation error: " + e.getMessage()); + invalidResources.add(clusterName); continue; } parsedResources.put(clusterName, new ParsedResource(edsUpdate, resource)); } - - if (!errors.isEmpty()) { - handleResourcesRejected(ResourceType.EDS, unpackedResources, versionInfo, nonce, errors); - } else { - handleResourcesAccepted(ResourceType.EDS, parsedResources, versionInfo, nonce); - } + getLogger().log( + XdsLogLevel.INFO, "Received EDS Response version {0} nonce {1}. Parsed resources: {2}", + versionInfo, nonce, unpackedResources); + handleResourceUpdate( + ResourceType.EDS, parsedResources, invalidResources, Collections.emptySet(), + versionInfo, nonce, errors); } private static EdsUpdate processClusterLoadAssignment(ClusterLoadAssignment assignment) @@ -2048,43 +2031,67 @@ private void cleanUpResourceTimers() { } } - private void handleResourcesAccepted( - ResourceType type, Map parsedResources, String version, - String nonce) { - ackResponse(type, version, nonce); - + private void handleResourceUpdate( + ResourceType type, Map parsedResources, Set invalidResources, + Set retainedResources, String version, String nonce, List errors) { + String errorDetail = null; + if (errors.isEmpty()) { + checkArgument(invalidResources.isEmpty(), "found invalid resources but missing errors"); + ackResponse(type, version, nonce); + } else { + errorDetail = Joiner.on('\n').join(errors); + getLogger().log(XdsLogLevel.WARNING, + "Failed processing {0} Response version {1} nonce {2}. Errors:\n{3}", + type, version, nonce, errorDetail); + nackResponse(type, nonce, errorDetail); + } long updateTime = timeProvider.currentTimeNanos(); for (Map.Entry entry : getSubscribedResourcesMap(type).entrySet()) { String resourceName = entry.getKey(); ResourceSubscriber subscriber = entry.getValue(); + // Attach error details to the subscribed resources that included in the ADS update. + if (invalidResources.contains(resourceName)) { + subscriber.onRejected(version, updateTime, errorDetail); + } // Notify the watchers. if (parsedResources.containsKey(resourceName)) { subscriber.onData(parsedResources.get(resourceName), version, updateTime); } else if (type == ResourceType.LDS || type == ResourceType.CDS) { + if (subscriber.data != null && invalidResources.contains(resourceName)) { + // Update is rejected but keep using the cached data. + if (type == ResourceType.LDS) { + LdsUpdate ldsUpdate = (LdsUpdate) subscriber.data; + io.grpc.xds.HttpConnectionManager hcm = ldsUpdate.httpConnectionManager(); + if (hcm != null) { + String rdsName = hcm.rdsName(); + if (rdsName != null) { + retainedResources.add(rdsName); + } + } + } else { + CdsUpdate cdsUpdate = (CdsUpdate) subscriber.data; + String edsName = cdsUpdate.edsServiceName(); + if (edsName == null) { + edsName = cdsUpdate.clusterName(); + } + retainedResources.add(edsName); + } + continue; + } // For State of the World services, notify watchers when their watched resource is missing // from the ADS update. subscriber.onAbsent(); } } - } - - private void handleResourcesRejected( - ResourceType type, Set unpackedResourceNames, String version, - String nonce, List errors) { - String errorDetail = Joiner.on('\n').join(errors); - getLogger().log(XdsLogLevel.WARNING, - "Failed processing {0} Response version {1} nonce {2}. Errors:\n{3}", - type, version, nonce, errorDetail); - nackResponse(type, nonce, errorDetail); - - long updateTime = timeProvider.currentTimeNanos(); - for (Map.Entry entry : getSubscribedResourcesMap(type).entrySet()) { - String resourceName = entry.getKey(); - ResourceSubscriber subscriber = entry.getValue(); - - // Attach error details to the subscribed resources that included in the ADS update. - if (unpackedResourceNames.contains(resourceName)) { - subscriber.onRejected(version, updateTime, errorDetail); + // LDS/CDS responses represents the state of the world, RDS/EDS resources not referenced in + // LDS/CDS resources should be deleted. + if (type == ResourceType.LDS || type == ResourceType.CDS) { + Map dependentSubscribers = + type == ResourceType.LDS ? rdsResourceSubscribers : edsResourceSubscribers; + for (String resource : dependentSubscribers.keySet()) { + if (!retainedResources.contains(resource)) { + dependentSubscribers.get(resource).onAbsent(); + } } } } diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java index a2a29ffe989..e66c73163be 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java @@ -473,11 +473,11 @@ public void ldsResponseErrorHandling_someResourcesFailedUnpack() { List errors = ImmutableList.of( "LDS response Resource index 0 - can't decode Listener: ", "LDS response Resource index 2 - can't decode Listener: "); - verifyResourceMetadataNacked(LDS, LDS_RESOURCE, null, "", 0, VERSION_1, TIME_INCREMENT, errors); + verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerRds, VERSION_1, TIME_INCREMENT); verifySubscribedResourcesMetadataSizes(1, 0, 0, 0); // The response is NACKed with the same error message. call.verifyRequestNack(LDS, LDS_RESOURCE, "", "0000", NODE, errors); - verifyNoInteractions(ldsResourceWatcher); + verify(ldsResourceWatcher).onChanged(any(LdsUpdate.class)); } /** @@ -517,14 +517,14 @@ public void ldsResponseErrorHandling_subscribedResourceInvalid() { "A", Any.pack(mf.buildListenerWithApiListenerForRds("A", "A.2")), "B", Any.pack(mf.buildListenerWithApiListenerInvalid("B"))); call.sendResponse(LDS, resourcesV2.values().asList(), VERSION_2, "0001"); - // {A, B} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B - // {C} -> ACK, version 1 + // {A} -> ACK, version 2 + // {B} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B + // {C} -> does not exist List errorsV2 = ImmutableList.of("LDS response Listener 'B' validation error: "); - verifyResourceMetadataNacked(LDS, "A", resourcesV1.get("A"), VERSION_1, TIME_INCREMENT, - VERSION_2, TIME_INCREMENT * 2, errorsV2); + verifyResourceMetadataAcked(LDS, "A", resourcesV2.get("A"), VERSION_2, TIME_INCREMENT * 2); verifyResourceMetadataNacked(LDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT, VERSION_2, TIME_INCREMENT * 2, errorsV2); - verifyResourceMetadataAcked(LDS, "C", resourcesV1.get("C"), VERSION_1, TIME_INCREMENT); + verifyResourceMetadataDoesNotExist(LDS, "C"); call.verifyRequestNack(LDS, subscribedResourceNames, VERSION_1, "0001", NODE, errorsV2); // LDS -> {B, C} version 3 @@ -532,7 +532,7 @@ public void ldsResponseErrorHandling_subscribedResourceInvalid() { "B", Any.pack(mf.buildListenerWithApiListenerForRds("B", "B.3")), "C", Any.pack(mf.buildListenerWithApiListenerForRds("C", "C.3"))); call.sendResponse(LDS, resourcesV3.values().asList(), VERSION_3, "0002"); - // {A} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B + // {A} -> does not exist // {B, C} -> ACK, version 3 verifyResourceMetadataDoesNotExist(LDS, "A"); verifyResourceMetadataAcked(LDS, "B", resourcesV3.get("B"), VERSION_3, TIME_INCREMENT * 3); @@ -541,6 +541,73 @@ public void ldsResponseErrorHandling_subscribedResourceInvalid() { verifySubscribedResourcesMetadataSizes(3, 0, 0, 0); } + @Test + public void ldsResponseErrorHandling_subscribedResourceInvalid_withRdsSubscriptioin() { + List subscribedResourceNames = ImmutableList.of("A", "B", "C"); + xdsClient.watchLdsResource("A", ldsResourceWatcher); + xdsClient.watchRdsResource("A.1", rdsResourceWatcher); + xdsClient.watchLdsResource("B", ldsResourceWatcher); + xdsClient.watchRdsResource("B.1", rdsResourceWatcher); + xdsClient.watchLdsResource("C", ldsResourceWatcher); + xdsClient.watchRdsResource("C.1", rdsResourceWatcher); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + assertThat(call).isNotNull(); + verifyResourceMetadataRequested(LDS, "A"); + verifyResourceMetadataRequested(LDS, "B"); + verifyResourceMetadataRequested(LDS, "C"); + verifyResourceMetadataRequested(RDS, "A.1"); + verifyResourceMetadataRequested(RDS, "B.1"); + verifyResourceMetadataRequested(RDS, "C.1"); + verifySubscribedResourcesMetadataSizes(3, 0, 3, 0); + + // LDS -> {A, B, C}, version 1 + ImmutableMap resourcesV1 = ImmutableMap.of( + "A", Any.pack(mf.buildListenerWithApiListenerForRds("A", "A.1")), + "B", Any.pack(mf.buildListenerWithApiListenerForRds("B", "B.1")), + "C", Any.pack(mf.buildListenerWithApiListenerForRds("C", "C.1"))); + call.sendResponse(LDS, resourcesV1.values().asList(), VERSION_1, "0000"); + // {A, B, C} -> ACK, version 1 + verifyResourceMetadataAcked(LDS, "A", resourcesV1.get("A"), VERSION_1, TIME_INCREMENT); + verifyResourceMetadataAcked(LDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT); + verifyResourceMetadataAcked(LDS, "C", resourcesV1.get("C"), VERSION_1, TIME_INCREMENT); + call.verifyRequest(LDS, subscribedResourceNames, VERSION_1, "0000", NODE); + + // RDS -> {A.1, B.1, C.1}, version 1 + List vhostsV1 = mf.buildOpaqueVirtualHosts(1); + ImmutableMap resourcesV11 = ImmutableMap.of( + "A.1", Any.pack(mf.buildRouteConfiguration("A.1", vhostsV1)), + "B.1", Any.pack(mf.buildRouteConfiguration("B.1", vhostsV1)), + "C.1", Any.pack(mf.buildRouteConfiguration("C.1", vhostsV1))); + call.sendResponse(RDS, resourcesV11.values().asList(), VERSION_1, "0000"); + // {A.1, B.1, C.1} -> ACK, version 1 + verifyResourceMetadataAcked(RDS, "A.1", resourcesV11.get("A.1"), VERSION_1, TIME_INCREMENT * 2); + verifyResourceMetadataAcked(RDS, "B.1", resourcesV11.get("B.1"), VERSION_1, TIME_INCREMENT * 2); + verifyResourceMetadataAcked(RDS, "C.1", resourcesV11.get("C.1"), VERSION_1, TIME_INCREMENT * 2); + + // LDS -> {A, B}, version 2 + // Failed to parse endpoint B + ImmutableMap resourcesV2 = ImmutableMap.of( + "A", Any.pack(mf.buildListenerWithApiListenerForRds("A", "A.2")), + "B", Any.pack(mf.buildListenerWithApiListenerInvalid("B"))); + call.sendResponse(LDS, resourcesV2.values().asList(), VERSION_2, "0001"); + // {A} -> ACK, version 2 + // {B} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B + // {C} -> does not exist + List errorsV2 = ImmutableList.of("LDS response Listener 'B' validation error: "); + verifyResourceMetadataAcked(LDS, "A", resourcesV2.get("A"), VERSION_2, TIME_INCREMENT * 3); + verifyResourceMetadataNacked( + LDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT, VERSION_2, TIME_INCREMENT * 3, + errorsV2); + verifyResourceMetadataDoesNotExist(LDS, "C"); + call.verifyRequestNack(LDS, subscribedResourceNames, VERSION_1, "0001", NODE, errorsV2); + // {A.1} -> does not exist + // {B.1} -> version 1 + // {C.1} -> does not exist + verifyResourceMetadataDoesNotExist(RDS, "A.1"); + verifyResourceMetadataAcked(RDS, "B.1", resourcesV11.get("B.1"), VERSION_1, TIME_INCREMENT * 2); + verifyResourceMetadataDoesNotExist(RDS, "C.1"); + } + @Test public void ldsResourceFound_containsVirtualHosts() { DiscoveryRpcCall call = startResourceWatcher(LDS, LDS_RESOURCE, ldsResourceWatcher); @@ -807,11 +874,11 @@ public void rdsResponseErrorHandling_someResourcesFailedUnpack() { List errors = ImmutableList.of( "RDS response Resource index 0 - can't decode RouteConfiguration: ", "RDS response Resource index 2 - can't decode RouteConfiguration: "); - verifyResourceMetadataNacked(RDS, RDS_RESOURCE, null, "", 0, VERSION_1, TIME_INCREMENT, errors); + verifyResourceMetadataAcked(RDS, RDS_RESOURCE, testRouteConfig, VERSION_1, TIME_INCREMENT); verifySubscribedResourcesMetadataSizes(0, 0, 1, 0); // The response is NACKed with the same error message. call.verifyRequestNack(RDS, RDS_RESOURCE, "", "0000", NODE, errors); - verifyNoInteractions(rdsResourceWatcher); + verify(rdsResourceWatcher).onChanged(any(RdsUpdate.class)); } /** @@ -852,12 +919,12 @@ public void rdsResponseErrorHandling_subscribedResourceInvalid() { "A", Any.pack(mf.buildRouteConfiguration("A", mf.buildOpaqueVirtualHosts(2))), "B", Any.pack(mf.buildRouteConfigurationInvalid("B"))); call.sendResponse(RDS, resourcesV2.values().asList(), VERSION_2, "0001"); - // {A, B} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B + // {A} -> ACK, version 2 + // {B} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B // {C} -> ACK, version 1 List errorsV2 = ImmutableList.of("RDS response RouteConfiguration 'B' validation error: "); - verifyResourceMetadataNacked(RDS, "A", resourcesV1.get("A"), VERSION_1, TIME_INCREMENT, - VERSION_2, TIME_INCREMENT * 2, errorsV2); + verifyResourceMetadataAcked(RDS, "A", resourcesV2.get("A"), VERSION_2, TIME_INCREMENT * 2); verifyResourceMetadataNacked(RDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT, VERSION_2, TIME_INCREMENT * 2, errorsV2); verifyResourceMetadataAcked(RDS, "C", resourcesV1.get("C"), VERSION_1, TIME_INCREMENT); @@ -869,10 +936,9 @@ public void rdsResponseErrorHandling_subscribedResourceInvalid() { "B", Any.pack(mf.buildRouteConfiguration("B", vhostsV3)), "C", Any.pack(mf.buildRouteConfiguration("C", vhostsV3))); call.sendResponse(RDS, resourcesV3.values().asList(), VERSION_3, "0002"); - // {A} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B + // {A} -> ACK, version 2 // {B, C} -> ACK, version 3 - verifyResourceMetadataNacked(RDS, "A", resourcesV1.get("A"), VERSION_1, TIME_INCREMENT, - VERSION_2, TIME_INCREMENT * 2, errorsV2); + verifyResourceMetadataAcked(RDS, "A", resourcesV2.get("A"), VERSION_2, TIME_INCREMENT * 2); verifyResourceMetadataAcked(RDS, "B", resourcesV3.get("B"), VERSION_3, TIME_INCREMENT * 3); verifyResourceMetadataAcked(RDS, "C", resourcesV3.get("C"), VERSION_3, TIME_INCREMENT * 3); call.verifyRequest(RDS, subscribedResourceNames, VERSION_3, "0002", NODE); @@ -1146,11 +1212,12 @@ public void cdsResponseErrorHandling_someResourcesFailedUnpack() { List errors = ImmutableList.of( "CDS response Resource index 0 - can't decode Cluster: ", "CDS response Resource index 2 - can't decode Cluster: "); - verifyResourceMetadataNacked(CDS, CDS_RESOURCE, null, "", 0, VERSION_1, TIME_INCREMENT, errors); + verifyResourceMetadataAcked( + CDS, CDS_RESOURCE, testClusterRoundRobin, VERSION_1, TIME_INCREMENT); verifySubscribedResourcesMetadataSizes(0, 1, 0, 0); // The response is NACKed with the same error message. call.verifyRequestNack(CDS, CDS_RESOURCE, "", "0000", NODE, errors); - verifyNoInteractions(cdsResourceWatcher); + verify(cdsResourceWatcher).onChanged(any(CdsUpdate.class)); } /** @@ -1198,14 +1265,14 @@ public void cdsResponseErrorHandling_subscribedResourceInvalid() { )), "B", Any.pack(mf.buildClusterInvalid("B"))); call.sendResponse(CDS, resourcesV2.values().asList(), VERSION_2, "0001"); - // {A, B} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B - // {C} -> ACK, version 1 + // {A} -> ACK, version 2 + // {B} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B + // {C} -> does not exist List errorsV2 = ImmutableList.of("CDS response Cluster 'B' validation error: "); - verifyResourceMetadataNacked(CDS, "A", resourcesV1.get("A"), VERSION_1, TIME_INCREMENT, - VERSION_2, TIME_INCREMENT * 2, errorsV2); + verifyResourceMetadataAcked(CDS, "A", resourcesV2.get("A"), VERSION_2, TIME_INCREMENT * 2); verifyResourceMetadataNacked(CDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT, VERSION_2, TIME_INCREMENT * 2, errorsV2); - verifyResourceMetadataAcked(CDS, "C", resourcesV1.get("C"), VERSION_1, TIME_INCREMENT); + verifyResourceMetadataDoesNotExist(CDS, "C"); call.verifyRequestNack(CDS, subscribedResourceNames, VERSION_1, "0001", NODE, errorsV2); // CDS -> {B, C} version 3 @@ -1217,7 +1284,7 @@ public void cdsResponseErrorHandling_subscribedResourceInvalid() { "envoy.transport_sockets.tls", null ))); call.sendResponse(CDS, resourcesV3.values().asList(), VERSION_3, "0002"); - // {A} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B + // {A} -> does not exit // {B, C} -> ACK, version 3 verifyResourceMetadataDoesNotExist(CDS, "A"); verifyResourceMetadataAcked(CDS, "B", resourcesV3.get("B"), VERSION_3, TIME_INCREMENT * 3); @@ -1225,6 +1292,82 @@ public void cdsResponseErrorHandling_subscribedResourceInvalid() { call.verifyRequest(CDS, subscribedResourceNames, VERSION_3, "0002", NODE); } + @Test + public void cdsResponseErrorHandling_subscribedResourceInvalid_withEdsSubscription() { + List subscribedResourceNames = ImmutableList.of("A", "B", "C"); + xdsClient.watchCdsResource("A", cdsResourceWatcher); + xdsClient.watchEdsResource("A.1", edsResourceWatcher); + xdsClient.watchCdsResource("B", cdsResourceWatcher); + xdsClient.watchEdsResource("B.1", edsResourceWatcher); + xdsClient.watchCdsResource("C", cdsResourceWatcher); + xdsClient.watchEdsResource("C.1", edsResourceWatcher); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + assertThat(call).isNotNull(); + verifyResourceMetadataRequested(CDS, "A"); + verifyResourceMetadataRequested(CDS, "B"); + verifyResourceMetadataRequested(CDS, "C"); + verifyResourceMetadataRequested(EDS, "A.1"); + verifyResourceMetadataRequested(EDS, "B.1"); + verifyResourceMetadataRequested(EDS, "C.1"); + verifySubscribedResourcesMetadataSizes(0, 3, 0, 3); + + // CDS -> {A, B, C}, version 1 + ImmutableMap resourcesV1 = ImmutableMap.of( + "A", Any.pack(mf.buildEdsCluster("A", "A.1", "round_robin", null, false, null, + "envoy.transport_sockets.tls", null + )), + "B", Any.pack(mf.buildEdsCluster("B", "B.1", "round_robin", null, false, null, + "envoy.transport_sockets.tls", null + )), + "C", Any.pack(mf.buildEdsCluster("C", "C.1", "round_robin", null, false, null, + "envoy.transport_sockets.tls", null + ))); + call.sendResponse(CDS, resourcesV1.values().asList(), VERSION_1, "0000"); + // {A, B, C} -> ACK, version 1 + verifyResourceMetadataAcked(CDS, "A", resourcesV1.get("A"), VERSION_1, TIME_INCREMENT); + verifyResourceMetadataAcked(CDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT); + verifyResourceMetadataAcked(CDS, "C", resourcesV1.get("C"), VERSION_1, TIME_INCREMENT); + call.verifyRequest(CDS, subscribedResourceNames, VERSION_1, "0000", NODE); + + // EDS -> {A.1, B.1, C.1}, version 1 + List dropOverloads = ImmutableList.of(); + List endpointsV1 = ImmutableList.of(lbEndpointHealthy); + ImmutableMap resourcesV11 = ImmutableMap.of( + "A.1", Any.pack(mf.buildClusterLoadAssignment("A.1", endpointsV1, dropOverloads)), + "B.1", Any.pack(mf.buildClusterLoadAssignment("B.1", endpointsV1, dropOverloads)), + "C.1", Any.pack(mf.buildClusterLoadAssignment("C.1", endpointsV1, dropOverloads))); + call.sendResponse(EDS, resourcesV11.values().asList(), VERSION_1, "0000"); + // {A.1, B.1, C.1} -> ACK, version 1 + verifyResourceMetadataAcked(EDS, "A.1", resourcesV11.get("A.1"), VERSION_1, TIME_INCREMENT * 2); + verifyResourceMetadataAcked(EDS, "B.1", resourcesV11.get("B.1"), VERSION_1, TIME_INCREMENT * 2); + verifyResourceMetadataAcked(EDS, "C.1", resourcesV11.get("C.1"), VERSION_1, TIME_INCREMENT * 2); + + // CDS -> {A, B}, version 2 + // Failed to parse endpoint B + ImmutableMap resourcesV2 = ImmutableMap.of( + "A", Any.pack(mf.buildEdsCluster("A", "A.2", "round_robin", null, false, null, + "envoy.transport_sockets.tls", null + )), + "B", Any.pack(mf.buildClusterInvalid("B"))); + call.sendResponse(CDS, resourcesV2.values().asList(), VERSION_2, "0001"); + // {A} -> ACK, version 2 + // {B} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B + // {C} -> does not exist + List errorsV2 = ImmutableList.of("CDS response Cluster 'B' validation error: "); + verifyResourceMetadataAcked(CDS, "A", resourcesV2.get("A"), VERSION_2, TIME_INCREMENT * 3); + verifyResourceMetadataNacked( + CDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT, VERSION_2, TIME_INCREMENT * 3, + errorsV2); + verifyResourceMetadataDoesNotExist(CDS, "C"); + call.verifyRequestNack(CDS, subscribedResourceNames, VERSION_1, "0001", NODE, errorsV2); + // {A.1} -> does not exist + // {B.1} -> version 1 + // {C.1} -> does not exist + verifyResourceMetadataDoesNotExist(EDS, "A.1"); + verifyResourceMetadataAcked(EDS, "B.1", resourcesV11.get("B.1"), VERSION_1, TIME_INCREMENT * 2); + verifyResourceMetadataDoesNotExist(EDS, "C.1"); + } + @Test public void cdsResourceFound() { DiscoveryRpcCall call = startResourceWatcher(CDS, CDS_RESOURCE, cdsResourceWatcher); @@ -1666,11 +1809,14 @@ public void edsResponseErrorHandling_someResourcesFailedUnpack() { List errors = ImmutableList.of( "EDS response Resource index 0 - can't decode ClusterLoadAssignment: ", "EDS response Resource index 2 - can't decode ClusterLoadAssignment: "); - verifyResourceMetadataNacked(EDS, EDS_RESOURCE, null, "", 0, VERSION_1, TIME_INCREMENT, errors); + verifyResourceMetadataAcked( + EDS, EDS_RESOURCE, testClusterLoadAssignment, VERSION_1, TIME_INCREMENT); verifySubscribedResourcesMetadataSizes(0, 0, 0, 1); // The response is NACKed with the same error message. call.verifyRequestNack(EDS, EDS_RESOURCE, "", "0000", NODE, errors); - verifyNoInteractions(edsResourceWatcher); + verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture()); + EdsUpdate edsUpdate = edsUpdateCaptor.getValue(); + assertThat(edsUpdate.clusterName).isEqualTo(EDS_RESOURCE); } /** @@ -1713,12 +1859,12 @@ public void edsResponseErrorHandling_subscribedResourceInvalid() { "A", Any.pack(mf.buildClusterLoadAssignment("A", endpointsV2, dropOverloads)), "B", Any.pack(mf.buildClusterLoadAssignmentInvalid("B"))); call.sendResponse(EDS, resourcesV2.values().asList(), VERSION_2, "0001"); - // {A, B} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B + // {A} -> ACK, version 2 + // {B} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B // {C} -> ACK, version 1 List errorsV2 = ImmutableList.of("EDS response ClusterLoadAssignment 'B' validation error: "); - verifyResourceMetadataNacked(EDS, "A", resourcesV1.get("A"), VERSION_1, TIME_INCREMENT, - VERSION_2, TIME_INCREMENT * 2, errorsV2); + verifyResourceMetadataAcked(EDS, "A", resourcesV2.get("A"), VERSION_2, TIME_INCREMENT * 2); verifyResourceMetadataNacked(EDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT, VERSION_2, TIME_INCREMENT * 2, errorsV2); verifyResourceMetadataAcked(EDS, "C", resourcesV1.get("C"), VERSION_1, TIME_INCREMENT); @@ -1731,10 +1877,9 @@ public void edsResponseErrorHandling_subscribedResourceInvalid() { "B", Any.pack(mf.buildClusterLoadAssignment("B", endpointsV3, dropOverloads)), "C", Any.pack(mf.buildClusterLoadAssignment("C", endpointsV3, dropOverloads))); call.sendResponse(EDS, resourcesV3.values().asList(), VERSION_3, "0002"); - // {A} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B + // {A} -> ACK, version 2 // {B, C} -> ACK, version 3 - verifyResourceMetadataNacked(EDS, "A", resourcesV1.get("A"), VERSION_1, TIME_INCREMENT, - VERSION_2, TIME_INCREMENT * 2, errorsV2); + verifyResourceMetadataAcked(EDS, "A", resourcesV2.get("A"), VERSION_2, TIME_INCREMENT * 2); verifyResourceMetadataAcked(EDS, "B", resourcesV3.get("B"), VERSION_3, TIME_INCREMENT * 3); verifyResourceMetadataAcked(EDS, "C", resourcesV3.get("C"), VERSION_3, TIME_INCREMENT * 3); call.verifyRequest(EDS, subscribedResourceNames, VERSION_3, "0002", NODE);