From 4d8148e9a92877a48aa5ec64a0de9b1797dd93fb Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 8 Sep 2021 13:37:07 -0700 Subject: [PATCH] implement improved xDS NACK semantics (#27276) * refactor xDS response parsing * fix build * implement improved xDS NACK semantics * fix clang-tidy * fix test --- src/core/ext/xds/xds_api.cc | 25 ---- src/core/ext/xds/xds_client.cc | 89 +++++++++---- test/cpp/end2end/xds_end2end_test.cc | 183 ++++++++++++++++++++++----- 3 files changed, 215 insertions(+), 82 deletions(-) diff --git a/src/core/ext/xds/xds_api.cc b/src/core/ext/xds/xds_api.cc index 1c3148eb48644..b161c87bf3dab 100644 --- a/src/core/ext/xds/xds_api.cc +++ b/src/core/ext/xds/xds_api.cc @@ -3408,15 +3408,6 @@ upb_strview EdsResourceName( eds_resource); } -template -void MoveUpdatesToFailedSet(UpdateMap* update_map, - std::set* resource_names_failed) { - for (const auto& p : *update_map) { - resource_names_failed->insert(p.first); - } - update_map->clear(); -} - } // namespace XdsApi::AdsParseResult XdsApi::ParseAdsResponse( @@ -3462,40 +3453,24 @@ XdsApi::AdsParseResult XdsApi::ParseAdsResponse( IsLds, MaybeLogListener, LdsResourceParse, response, "LDS", expected_listener_names, &result.lds_update_map, &result.resource_names_failed); - if (result.parse_error != GRPC_ERROR_NONE) { - MoveUpdatesToFailedSet(&result.lds_update_map, - &result.resource_names_failed); - } } else if (IsRds(result.type_url)) { result.parse_error = AdsResponseParse( context, envoy_config_route_v3_RouteConfiguration_parse, RdsResourceName, IsRds, MaybeLogRouteConfiguration, RouteConfigParse, response, "RDS", expected_route_configuration_names, &result.rds_update_map, &result.resource_names_failed); - if (result.parse_error != GRPC_ERROR_NONE) { - MoveUpdatesToFailedSet(&result.rds_update_map, - &result.resource_names_failed); - } } else if (IsCds(result.type_url)) { result.parse_error = AdsResponseParse( context, envoy_config_cluster_v3_Cluster_parse, CdsResourceName, IsCds, MaybeLogCluster, CdsResourceParse, response, "CDS", expected_cluster_names, &result.cds_update_map, &result.resource_names_failed); - if (result.parse_error != GRPC_ERROR_NONE) { - MoveUpdatesToFailedSet(&result.cds_update_map, - &result.resource_names_failed); - } } else if (IsEds(result.type_url)) { result.parse_error = AdsResponseParse( context, envoy_config_endpoint_v3_ClusterLoadAssignment_parse, EdsResourceName, IsEds, MaybeLogClusterLoadAssignment, EdsResourceParse, response, "EDS", expected_eds_service_names, &result.eds_update_map, &result.resource_names_failed); - if (result.parse_error != GRPC_ERROR_NONE) { - MoveUpdatesToFailedSet(&result.eds_update_map, - &result.resource_names_failed); - } } return result; } diff --git a/src/core/ext/xds/xds_client.cc b/src/core/ext/xds/xds_client.cc index 5e1d2b073f803..b57fa0aa82194 100644 --- a/src/core/ext/xds/xds_client.cc +++ b/src/core/ext/xds/xds_client.cc @@ -264,13 +264,15 @@ class XdsClient::ChannelState::AdsCallState ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); void AcceptLdsUpdateLocked(std::string version, grpc_millis update_time, - XdsApi::LdsUpdateMap lds_update_map) + XdsApi::LdsUpdateMap lds_update_map, + const std::set& resource_names_failed) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); void AcceptRdsUpdateLocked(std::string version, grpc_millis update_time, XdsApi::RdsUpdateMap rds_update_map) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); void AcceptCdsUpdateLocked(std::string version, grpc_millis update_time, - XdsApi::CdsUpdateMap cds_update_map) + XdsApi::CdsUpdateMap cds_update_map, + const std::set& resource_names_failed) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); void AcceptEdsUpdateLocked(std::string version, grpc_millis update_time, XdsApi::EdsUpdateMap eds_update_map) @@ -905,7 +907,8 @@ XdsApi::ResourceMetadata CreateResourceMetadataAcked( void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked( std::string version, grpc_millis update_time, - XdsApi::LdsUpdateMap lds_update_map) { + XdsApi::LdsUpdateMap lds_update_map, + const std::set& resource_names_failed) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] LDS update received containing %" PRIuPTR @@ -949,6 +952,21 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked( p.first->OnListenerChanged(*listener_state.update); } } + // For invalid resources in the update, if they are already in the + // cache, pretend that they are present in the update, so that we + // don't incorrectly consider them deleted below. + for (const std::string& listener_name : resource_names_failed) { + auto it = xds_client()->listener_map_.find(listener_name); + if (it != xds_client()->listener_map_.end()) { + auto& resource = it->second.update; + if (!resource.has_value()) continue; + lds_update_map[listener_name]; + if (!resource->http_connection_manager.route_config_name.empty()) { + rds_resource_names_seen.insert( + resource->http_connection_manager.route_config_name); + } + } + } // For any subscribed resource that is not present in the update, // remove it from the cache and notify watchers that it does not exist. for (const auto& p : lds_state.subscribed_resources) { @@ -1031,7 +1049,8 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdateLocked( void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdateLocked( std::string version, grpc_millis update_time, - XdsApi::CdsUpdateMap cds_update_map) { + XdsApi::CdsUpdateMap cds_update_map, + const std::set& resource_names_failed) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] CDS update received containing %" PRIuPTR @@ -1073,6 +1092,20 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdateLocked( p.first->OnClusterChanged(cluster_state.update.value()); } } + // For invalid resources in the update, if they are already in the + // cache, pretend that they are present in the update, so that we + // don't incorrectly consider them deleted below. + for (const std::string& cluster_name : resource_names_failed) { + auto it = xds_client()->cluster_map_.find(cluster_name); + if (it != xds_client()->cluster_map_.end()) { + auto& resource = it->second.update; + if (!resource.has_value()) continue; + cds_update_map[cluster_name]; + eds_resource_names_seen.insert(resource->eds_service_name.empty() + ? cluster_name + : resource->eds_service_name); + } + } // For any subscribed resource that is not present in the update, // remove it from the cache and notify watchers that it does not exist. for (const auto& p : cds_state.subscribed_resources) { @@ -1174,7 +1207,7 @@ void XdsClient::ChannelState::AdsCallState::RejectAdsUpdateLocked( if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] %s update NACKed containing %" PRIuPTR - " resources", + " invalid resources", xds_client(), result.type_url.c_str(), result.resource_names_failed.size()); } @@ -1269,9 +1302,8 @@ bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() { // Update nonce. auto& state = state_map_[result.type_url]; state.nonce = std::move(result.nonce); - // NACK or ACK the response. + // If we got an error, we'll NACK the update. if (result.parse_error != GRPC_ERROR_NONE) { - // NACK unacceptable update. gpr_log(GPR_ERROR, "[xds_client %p] ADS response invalid for resource type %s " "version %s, will NACK: nonce=%s error=%s", @@ -1295,27 +1327,32 @@ bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() { RejectAdsUpdateLocked(update_time, result, &xds_client()->endpoint_map_); } - SendMessageLocked(result.type_url); - } else { + } + // Process any valid resources. + bool have_valid_resources = false; + if (result.type_url == XdsApi::kLdsTypeUrl) { + have_valid_resources = !result.lds_update_map.empty(); + AcceptLdsUpdateLocked(result.version, update_time, + std::move(result.lds_update_map), + result.resource_names_failed); + } else if (result.type_url == XdsApi::kRdsTypeUrl) { + have_valid_resources = !result.rds_update_map.empty(); + AcceptRdsUpdateLocked(result.version, update_time, + std::move(result.rds_update_map)); + } else if (result.type_url == XdsApi::kCdsTypeUrl) { + have_valid_resources = !result.cds_update_map.empty(); + AcceptCdsUpdateLocked(result.version, update_time, + std::move(result.cds_update_map), + result.resource_names_failed); + } else if (result.type_url == XdsApi::kEdsTypeUrl) { + have_valid_resources = !result.eds_update_map.empty(); + AcceptEdsUpdateLocked(result.version, update_time, + std::move(result.eds_update_map)); + } + if (have_valid_resources) { seen_response_ = true; - // Accept the ADS response according to the type_url. - if (result.type_url == XdsApi::kLdsTypeUrl) { - AcceptLdsUpdateLocked(result.version, update_time, - std::move(result.lds_update_map)); - } else if (result.type_url == XdsApi::kRdsTypeUrl) { - AcceptRdsUpdateLocked(result.version, update_time, - std::move(result.rds_update_map)); - } else if (result.type_url == XdsApi::kCdsTypeUrl) { - AcceptCdsUpdateLocked(result.version, update_time, - std::move(result.cds_update_map)); - } else if (result.type_url == XdsApi::kEdsTypeUrl) { - AcceptEdsUpdateLocked(result.version, update_time, - std::move(result.eds_update_map)); - } xds_client()->resource_version_map_[result.type_url] = std::move(result.version); - // ACK the update. - SendMessageLocked(result.type_url); // Start load reporting if needed. auto& lrs_call = chand()->lrs_calld_; if (lrs_call != nullptr) { @@ -1323,6 +1360,8 @@ bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() { if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked(); } } + // Send ACK or NACK. + SendMessageLocked(result.type_url); } if (xds_client()->shutting_down_) return true; // Keep listening for updates. diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index 345c1a04be44f..7433e156e08a7 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -3389,44 +3389,98 @@ TEST_P(GlobalXdsClientTest, MultipleChannelsShareXdsClient) { // Tests that the NACK for multiple bad LDS resources includes both errors. TEST_P(GlobalXdsClientTest, MultipleBadResources) { constexpr char kServerName2[] = "server.other.com"; + constexpr char kServerName3[] = "server.another.com"; auto listener = default_listener_; listener.clear_api_listener(); balancers_[0]->ads_service()->SetLdsResource(listener); listener.set_name(kServerName2); balancers_[0]->ads_service()->SetLdsResource(listener); + listener = default_listener_; + listener.set_name(kServerName3); + SetListenerAndRouteConfiguration(0, listener, default_route_config_); + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", CreateEndpointsForBackends(0, 1)}, + }); + balancers_[0]->ads_service()->SetEdsResource(BuildEdsResource(args)); SetNextResolutionForLbChannelAllBalancers(); CheckRpcSendFailure(); // Need to create a second channel to subscribe to a second LDS resource. auto channel2 = CreateChannel(0, kServerName2); auto stub2 = grpc::testing::EchoTestService::NewStub(channel2); - ClientContext context; - EchoRequest request; - request.set_message(kRequestMessage); - EchoResponse response; - grpc::Status status = stub2->Echo(&context, request, &response); - EXPECT_FALSE(status.ok()); - // Wait for second NACK to be reported to xDS server. + { + ClientContext context; + EchoRequest request; + request.set_message(kRequestMessage); + EchoResponse response; + grpc::Status status = stub2->Echo(&context, request, &response); + EXPECT_FALSE(status.ok()); + // Wait for second NACK to be reported to xDS server. + auto deadline = absl::Now() + absl::Seconds(30); + bool timed_out = false; + CheckRpcSendFailure( + CheckRpcSendFailureOptions().set_continue_predicate([&](size_t) { + if (absl::Now() >= deadline) { + timed_out = true; + return false; + } + const auto response_state = + balancers_[0]->ads_service()->lds_response_state(); + return response_state.state != + AdsServiceImpl::ResponseState::NACKED || + ::testing::Matches(::testing::ContainsRegex(absl::StrCat( + kServerName, + ": validation error.*" + "Listener has neither address nor ApiListener.*", + kServerName2, + ": validation error.*" + "Listener has neither address nor ApiListener")))( + response_state.error_message); + })); + ASSERT_FALSE(timed_out); + } + // Now start a new channel with a third server name, this one with a + // valid resource. + auto channel3 = CreateChannel(0, kServerName3); + auto stub3 = grpc::testing::EchoTestService::NewStub(channel3); + { + ClientContext context; + EchoRequest request; + request.set_message(kRequestMessage); + EchoResponse response; + grpc::Status status = stub3->Echo(&context, request, &response); + EXPECT_TRUE(status.ok()) << "code=" << status.error_code() + << " message=" << status.error_message(); + } +} + +// Tests that we don't trigger does-not-exist callbacks for a resource +// that was previously valid but is updated to be invalid. +TEST_P(GlobalXdsClientTest, InvalidListenerStillExistsIfPreviouslyCached) { + // Set up valid resources and check that the channel works. + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", CreateEndpointsForBackends(0, 1)}, + }); + balancers_[0]->ads_service()->SetEdsResource(BuildEdsResource(args)); + SetNextResolutionForLbChannelAllBalancers(); + CheckRpcSendOk(); + // Now send an update changing the Listener to be invalid. + auto listener = default_listener_; + listener.clear_api_listener(); + balancers_[0]->ads_service()->SetLdsResource(listener); + // Wait for xDS server to see NACK. auto deadline = absl::Now() + absl::Seconds(30); - bool timed_out = false; - CheckRpcSendFailure( - CheckRpcSendFailureOptions().set_continue_predicate([&](size_t) { - if (absl::Now() >= deadline) { - timed_out = true; - return false; - } - const auto response_state = - balancers_[0]->ads_service()->lds_response_state(); - return response_state.state != AdsServiceImpl::ResponseState::NACKED || - ::testing::Matches(::testing::ContainsRegex(absl::StrCat( - kServerName, - ": validation error.*" - "Listener has neither address nor ApiListener.*", - kServerName2, - ": validation error.*" - "Listener has neither address nor ApiListener")))( - response_state.error_message); - })); - ASSERT_FALSE(timed_out); + do { + CheckRpcSendOk(); + ASSERT_LT(absl::Now(), deadline); + } while (balancers_[0]->ads_service()->lds_response_state().state != + AdsServiceImpl::ResponseState::NACKED); + EXPECT_THAT(balancers_[0]->ads_service()->lds_response_state().error_message, + ::testing::ContainsRegex(absl::StrCat( + kServerName, + ": validation error.*" + "Listener has neither address nor ApiListener"))); + // Check one more time, just to make sure it still works after NACK. + CheckRpcSendOk(); } class XdsResolverLoadReportingOnlyTest : public XdsEnd2endTest { @@ -7233,19 +7287,42 @@ TEST_P(CdsTest, UnsupportedClusterType) { // Tests that the NACK for multiple bad resources includes both errors. TEST_P(CdsTest, MultipleBadResources) { constexpr char kClusterName2[] = "cluster_name_2"; - // Use unsupported type for default cluster. + constexpr char kClusterName3[] = "cluster_name_3"; + // Add cluster with unsupported type. auto cluster = default_cluster_; + cluster.set_name(kClusterName2); cluster.set_type(Cluster::STATIC); balancers_[0]->ads_service()->SetCdsResource(cluster); // Add second cluster with the same error. - cluster.set_name(kClusterName2); + cluster.set_name(kClusterName3); balancers_[0]->ads_service()->SetCdsResource(cluster); - // Change RouteConfig to point to both clusters. + // Change RouteConfig to point to all clusters. RouteConfiguration route_config = default_route_config_; + route_config.mutable_virtual_hosts(0)->clear_routes(); + // First route: default cluster, selected based on header. auto* route = route_config.mutable_virtual_hosts(0)->add_routes(); route->mutable_match()->set_prefix(""); + auto* header_matcher = route->mutable_match()->add_headers(); + header_matcher->set_name("cluster"); + header_matcher->set_exact_match(kDefaultClusterName); + route->mutable_route()->set_cluster(kDefaultClusterName); + // Second route: cluster 2, selected based on header. + route = route_config.mutable_virtual_hosts(0)->add_routes(); + route->mutable_match()->set_prefix(""); + header_matcher = route->mutable_match()->add_headers(); + header_matcher->set_name("cluster"); + header_matcher->set_exact_match(kClusterName2); route->mutable_route()->set_cluster(kClusterName2); + // Third route: cluster 3, used by default. + route = route_config.mutable_virtual_hosts(0)->add_routes(); + route->mutable_match()->set_prefix(""); + route->mutable_route()->set_cluster(kClusterName3); SetRouteConfiguration(0, route_config); + // Add EDS resource. + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", CreateEndpointsForBackends(0, 1)}, + }); + balancers_[0]->ads_service()->SetEdsResource(BuildEdsResource(args)); // Send RPC. SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); @@ -7255,12 +7332,54 @@ TEST_P(CdsTest, MultipleBadResources) { EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED); EXPECT_THAT( response_state.error_message, - ::testing::ContainsRegex(absl::StrCat(kDefaultClusterName, + ::testing::ContainsRegex(absl::StrCat(kClusterName2, ": validation error.*" "DiscoveryType is not valid.*", - kClusterName2, + kClusterName3, ": validation error.*" "DiscoveryType is not valid"))); + // RPCs for default cluster should succeed. + std::vector> metadata_default_cluster = { + {"cluster", kDefaultClusterName}, + }; + CheckRpcSendOk( + 1, RpcOptions().set_metadata(std::move(metadata_default_cluster))); + // RPCs for cluster 2 should fail. + std::vector> metadata_cluster_2 = { + {"cluster", kClusterName2}, + }; + CheckRpcSendFailure(CheckRpcSendFailureOptions().set_rpc_options( + RpcOptions().set_metadata(std::move(metadata_cluster_2)))); +} + +// Tests that we don't trigger does-not-exist callbacks for a resource +// that was previously valid but is updated to be invalid. +TEST_P(CdsTest, InvalidClusterStillExistsIfPreviouslyCached) { + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", CreateEndpointsForBackends(0, 1)}, + }); + balancers_[0]->ads_service()->SetEdsResource(BuildEdsResource(args)); + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + // Check that everything works. + CheckRpcSendOk(); + // Now send an update changing the Cluster to be invalid. + auto cluster = default_cluster_; + cluster.set_type(Cluster::STATIC); + balancers_[0]->ads_service()->SetCdsResource(cluster); + // Wait for xDS server to see NACK. + auto deadline = absl::Now() + absl::Seconds(30); + do { + CheckRpcSendOk(); + ASSERT_LT(absl::Now(), deadline); + } while (balancers_[0]->ads_service()->cds_response_state().state != + AdsServiceImpl::ResponseState::NACKED); + EXPECT_THAT(balancers_[0]->ads_service()->cds_response_state().error_message, + ::testing::ContainsRegex(absl::StrCat( + kDefaultClusterName, + ": validation error.*DiscoveryType is not valid"))); + // Check one more time, just to make sure it still works after NACK. + CheckRpcSendOk(); } // Tests that CDS client should send a NACK if the eds_config in CDS response