Skip to content

Commit

Permalink
implement improved xDS NACK semantics (#27276)
Browse files Browse the repository at this point in the history
* refactor xDS response parsing

* fix build

* implement improved xDS NACK semantics

* fix clang-tidy

* fix test
  • Loading branch information
markdroth committed Sep 8, 2021
1 parent 5765d32 commit f3497eb
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 82 deletions.
25 changes: 0 additions & 25 deletions src/core/ext/xds/xds_api.cc
Expand Up @@ -3408,15 +3408,6 @@ upb_strview EdsResourceName(
eds_resource);
}

template <typename UpdateMap>
void MoveUpdatesToFailedSet(UpdateMap* update_map,
std::set<std::string>* resource_names_failed) {
for (const auto& p : *update_map) {
resource_names_failed->insert(p.first);
}
update_map->clear();
}

} // namespace

XdsApi::AdsParseResult XdsApi::ParseAdsResponse(
Expand Down Expand Up @@ -3462,40 +3453,24 @@ XdsApi::AdsParseResult XdsApi::ParseAdsResponse(
IsLds, MaybeLogListener, LdsResourceParse, response, "LDS",
expected_listener_names, &result.lds_update_map,
&result.resource_names_failed);
if (result.parse_error != GRPC_ERROR_NONE) {
MoveUpdatesToFailedSet(&result.lds_update_map,
&result.resource_names_failed);
}
} else if (IsRds(result.type_url)) {
result.parse_error = AdsResponseParse(
context, envoy_config_route_v3_RouteConfiguration_parse,
RdsResourceName, IsRds, MaybeLogRouteConfiguration, RouteConfigParse,
response, "RDS", expected_route_configuration_names,
&result.rds_update_map, &result.resource_names_failed);
if (result.parse_error != GRPC_ERROR_NONE) {
MoveUpdatesToFailedSet(&result.rds_update_map,
&result.resource_names_failed);
}
} else if (IsCds(result.type_url)) {
result.parse_error = AdsResponseParse(
context, envoy_config_cluster_v3_Cluster_parse, CdsResourceName, IsCds,
MaybeLogCluster, CdsResourceParse, response, "CDS",
expected_cluster_names, &result.cds_update_map,
&result.resource_names_failed);
if (result.parse_error != GRPC_ERROR_NONE) {
MoveUpdatesToFailedSet(&result.cds_update_map,
&result.resource_names_failed);
}
} else if (IsEds(result.type_url)) {
result.parse_error = AdsResponseParse(
context, envoy_config_endpoint_v3_ClusterLoadAssignment_parse,
EdsResourceName, IsEds, MaybeLogClusterLoadAssignment, EdsResourceParse,
response, "EDS", expected_eds_service_names, &result.eds_update_map,
&result.resource_names_failed);
if (result.parse_error != GRPC_ERROR_NONE) {
MoveUpdatesToFailedSet(&result.eds_update_map,
&result.resource_names_failed);
}
}
return result;
}
Expand Down
89 changes: 64 additions & 25 deletions src/core/ext/xds/xds_client.cc
Expand Up @@ -264,13 +264,15 @@ class XdsClient::ChannelState::AdsCallState
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);

void AcceptLdsUpdateLocked(std::string version, grpc_millis update_time,
XdsApi::LdsUpdateMap lds_update_map)
XdsApi::LdsUpdateMap lds_update_map,
const std::set<std::string>& resource_names_failed)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
void AcceptRdsUpdateLocked(std::string version, grpc_millis update_time,
XdsApi::RdsUpdateMap rds_update_map)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
void AcceptCdsUpdateLocked(std::string version, grpc_millis update_time,
XdsApi::CdsUpdateMap cds_update_map)
XdsApi::CdsUpdateMap cds_update_map,
const std::set<std::string>& resource_names_failed)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
void AcceptEdsUpdateLocked(std::string version, grpc_millis update_time,
XdsApi::EdsUpdateMap eds_update_map)
Expand Down Expand Up @@ -905,7 +907,8 @@ XdsApi::ResourceMetadata CreateResourceMetadataAcked(

void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked(
std::string version, grpc_millis update_time,
XdsApi::LdsUpdateMap lds_update_map) {
XdsApi::LdsUpdateMap lds_update_map,
const std::set<std::string>& resource_names_failed) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] LDS update received containing %" PRIuPTR
Expand Down Expand Up @@ -949,6 +952,21 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked(
p.first->OnListenerChanged(*listener_state.update);
}
}
// For invalid resources in the update, if they are already in the
// cache, pretend that they are present in the update, so that we
// don't incorrectly consider them deleted below.
for (const std::string& listener_name : resource_names_failed) {
auto it = xds_client()->listener_map_.find(listener_name);
if (it != xds_client()->listener_map_.end()) {
auto& resource = it->second.update;
if (!resource.has_value()) continue;
lds_update_map[listener_name];
if (!resource->http_connection_manager.route_config_name.empty()) {
rds_resource_names_seen.insert(
resource->http_connection_manager.route_config_name);
}
}
}
// For any subscribed resource that is not present in the update,
// remove it from the cache and notify watchers that it does not exist.
for (const auto& p : lds_state.subscribed_resources) {
Expand Down Expand Up @@ -1031,7 +1049,8 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdateLocked(

void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdateLocked(
std::string version, grpc_millis update_time,
XdsApi::CdsUpdateMap cds_update_map) {
XdsApi::CdsUpdateMap cds_update_map,
const std::set<std::string>& resource_names_failed) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] CDS update received containing %" PRIuPTR
Expand Down Expand Up @@ -1073,6 +1092,20 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdateLocked(
p.first->OnClusterChanged(cluster_state.update.value());
}
}
// For invalid resources in the update, if they are already in the
// cache, pretend that they are present in the update, so that we
// don't incorrectly consider them deleted below.
for (const std::string& cluster_name : resource_names_failed) {
auto it = xds_client()->cluster_map_.find(cluster_name);
if (it != xds_client()->cluster_map_.end()) {
auto& resource = it->second.update;
if (!resource.has_value()) continue;
cds_update_map[cluster_name];
eds_resource_names_seen.insert(resource->eds_service_name.empty()
? cluster_name
: resource->eds_service_name);
}
}
// For any subscribed resource that is not present in the update,
// remove it from the cache and notify watchers that it does not exist.
for (const auto& p : cds_state.subscribed_resources) {
Expand Down Expand Up @@ -1174,7 +1207,7 @@ void XdsClient::ChannelState::AdsCallState::RejectAdsUpdateLocked(
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] %s update NACKed containing %" PRIuPTR
" resources",
" invalid resources",
xds_client(), result.type_url.c_str(),
result.resource_names_failed.size());
}
Expand Down Expand Up @@ -1269,9 +1302,8 @@ bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
// Update nonce.
auto& state = state_map_[result.type_url];
state.nonce = std::move(result.nonce);
// NACK or ACK the response.
// If we got an error, we'll NACK the update.
if (result.parse_error != GRPC_ERROR_NONE) {
// NACK unacceptable update.
gpr_log(GPR_ERROR,
"[xds_client %p] ADS response invalid for resource type %s "
"version %s, will NACK: nonce=%s error=%s",
Expand All @@ -1295,34 +1327,41 @@ bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
RejectAdsUpdateLocked(update_time, result,
&xds_client()->endpoint_map_);
}
SendMessageLocked(result.type_url);
} else {
}
// Process any valid resources.
bool have_valid_resources = false;
if (result.type_url == XdsApi::kLdsTypeUrl) {
have_valid_resources = !result.lds_update_map.empty();
AcceptLdsUpdateLocked(result.version, update_time,
std::move(result.lds_update_map),
result.resource_names_failed);
} else if (result.type_url == XdsApi::kRdsTypeUrl) {
have_valid_resources = !result.rds_update_map.empty();
AcceptRdsUpdateLocked(result.version, update_time,
std::move(result.rds_update_map));
} else if (result.type_url == XdsApi::kCdsTypeUrl) {
have_valid_resources = !result.cds_update_map.empty();
AcceptCdsUpdateLocked(result.version, update_time,
std::move(result.cds_update_map),
result.resource_names_failed);
} else if (result.type_url == XdsApi::kEdsTypeUrl) {
have_valid_resources = !result.eds_update_map.empty();
AcceptEdsUpdateLocked(result.version, update_time,
std::move(result.eds_update_map));
}
if (have_valid_resources) {
seen_response_ = true;
// Accept the ADS response according to the type_url.
if (result.type_url == XdsApi::kLdsTypeUrl) {
AcceptLdsUpdateLocked(result.version, update_time,
std::move(result.lds_update_map));
} else if (result.type_url == XdsApi::kRdsTypeUrl) {
AcceptRdsUpdateLocked(result.version, update_time,
std::move(result.rds_update_map));
} else if (result.type_url == XdsApi::kCdsTypeUrl) {
AcceptCdsUpdateLocked(result.version, update_time,
std::move(result.cds_update_map));
} else if (result.type_url == XdsApi::kEdsTypeUrl) {
AcceptEdsUpdateLocked(result.version, update_time,
std::move(result.eds_update_map));
}
xds_client()->resource_version_map_[result.type_url] =
std::move(result.version);
// ACK the update.
SendMessageLocked(result.type_url);
// Start load reporting if needed.
auto& lrs_call = chand()->lrs_calld_;
if (lrs_call != nullptr) {
LrsCallState* lrs_calld = lrs_call->calld();
if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked();
}
}
// Send ACK or NACK.
SendMessageLocked(result.type_url);
}
if (xds_client()->shutting_down_) return true;
// Keep listening for updates.
Expand Down

0 comments on commit f3497eb

Please sign in to comment.