Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement improved xDS NACK semantics #27276

Merged
merged 6 commits into from Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 0 additions & 25 deletions src/core/ext/xds/xds_api.cc
Expand Up @@ -3407,15 +3407,6 @@ upb_strview EdsResourceName(
eds_resource);
}

template <typename UpdateMap>
void MoveUpdatesToFailedSet(UpdateMap* update_map,
std::set<std::string>* resource_names_failed) {
for (const auto& p : *update_map) {
resource_names_failed->insert(p.first);
}
update_map->clear();
}

} // namespace

XdsApi::AdsParseResult XdsApi::ParseAdsResponse(
Expand Down Expand Up @@ -3461,40 +3452,24 @@ XdsApi::AdsParseResult XdsApi::ParseAdsResponse(
IsLds, MaybeLogListener, LdsResourceParse, response, "LDS",
expected_listener_names, &result.lds_update_map,
&result.resource_names_failed);
if (result.parse_error != GRPC_ERROR_NONE) {
MoveUpdatesToFailedSet(&result.lds_update_map,
&result.resource_names_failed);
}
} else if (IsRds(result.type_url)) {
result.parse_error = AdsResponseParse(
context, envoy_config_route_v3_RouteConfiguration_parse,
RdsResourceName, IsRds, MaybeLogRouteConfiguration, RouteConfigParse,
response, "RDS", expected_route_configuration_names,
&result.rds_update_map, &result.resource_names_failed);
if (result.parse_error != GRPC_ERROR_NONE) {
MoveUpdatesToFailedSet(&result.rds_update_map,
&result.resource_names_failed);
}
} else if (IsCds(result.type_url)) {
result.parse_error = AdsResponseParse(
context, envoy_config_cluster_v3_Cluster_parse, CdsResourceName, IsCds,
MaybeLogCluster, CdsResourceParse, response, "CDS",
expected_cluster_names, &result.cds_update_map,
&result.resource_names_failed);
if (result.parse_error != GRPC_ERROR_NONE) {
MoveUpdatesToFailedSet(&result.cds_update_map,
&result.resource_names_failed);
}
} else if (IsEds(result.type_url)) {
result.parse_error = AdsResponseParse(
context, envoy_config_endpoint_v3_ClusterLoadAssignment_parse,
EdsResourceName, IsEds, MaybeLogClusterLoadAssignment, EdsResourceParse,
response, "EDS", expected_eds_service_names, &result.eds_update_map,
&result.resource_names_failed);
if (result.parse_error != GRPC_ERROR_NONE) {
MoveUpdatesToFailedSet(&result.eds_update_map,
&result.resource_names_failed);
}
}
return result;
}
Expand Down
89 changes: 64 additions & 25 deletions src/core/ext/xds/xds_client.cc
Expand Up @@ -263,13 +263,15 @@ class XdsClient::ChannelState::AdsCallState
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);

void AcceptLdsUpdateLocked(std::string version, grpc_millis update_time,
XdsApi::LdsUpdateMap lds_update_map)
XdsApi::LdsUpdateMap lds_update_map,
const std::set<std::string>& resource_names_failed)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
void AcceptRdsUpdateLocked(std::string version, grpc_millis update_time,
XdsApi::RdsUpdateMap rds_update_map)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
void AcceptCdsUpdateLocked(std::string version, grpc_millis update_time,
XdsApi::CdsUpdateMap cds_update_map)
XdsApi::CdsUpdateMap cds_update_map,
const std::set<std::string>& resource_names_failed)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
void AcceptEdsUpdateLocked(std::string version, grpc_millis update_time,
XdsApi::EdsUpdateMap eds_update_map)
Expand Down Expand Up @@ -904,7 +906,8 @@ XdsApi::ResourceMetadata CreateResourceMetadataAcked(

void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked(
std::string version, grpc_millis update_time,
XdsApi::LdsUpdateMap lds_update_map) {
XdsApi::LdsUpdateMap lds_update_map,
const std::set<std::string>& resource_names_failed) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] LDS update received containing %" PRIuPTR
Expand Down Expand Up @@ -948,6 +951,21 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked(
p.first->OnListenerChanged(*listener_state.update);
}
}
// For invalid resources in the update, if they are already in the
// cache, pretend that they are present in the update, so that we
// don't incorrectly consider them deleted below.
for (const std::string& listener_name : resource_names_failed) {
auto it = xds_client()->listener_map_.find(listener_name);
if (it != xds_client()->listener_map_.end()) {
auto& resource = it->second.update;
if (!resource.has_value()) continue;
lds_update_map[listener_name];
if (!resource->http_connection_manager.route_config_name.empty()) {
rds_resource_names_seen.insert(
resource->http_connection_manager.route_config_name);
}
}
}
// For any subscribed resource that is not present in the update,
// remove it from the cache and notify watchers that it does not exist.
for (const auto& p : lds_state.subscribed_resources) {
Expand Down Expand Up @@ -1030,7 +1048,8 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdateLocked(

void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdateLocked(
std::string version, grpc_millis update_time,
XdsApi::CdsUpdateMap cds_update_map) {
XdsApi::CdsUpdateMap cds_update_map,
const std::set<std::string>& resource_names_failed) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] CDS update received containing %" PRIuPTR
Expand Down Expand Up @@ -1072,6 +1091,20 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdateLocked(
p.first->OnClusterChanged(cluster_state.update.value());
}
}
// For invalid resources in the update, if they are already in the
// cache, pretend that they are present in the update, so that we
// don't incorrectly consider them deleted below.
for (const std::string& cluster_name : resource_names_failed) {
auto it = xds_client()->cluster_map_.find(cluster_name);
if (it != xds_client()->cluster_map_.end()) {
auto& resource = it->second.update;
if (!resource.has_value()) continue;
cds_update_map[cluster_name];
eds_resource_names_seen.insert(resource->eds_service_name.empty()
? cluster_name
: resource->eds_service_name);
}
}
// For any subscribed resource that is not present in the update,
// remove it from the cache and notify watchers that it does not exist.
for (const auto& p : cds_state.subscribed_resources) {
Expand Down Expand Up @@ -1173,7 +1206,7 @@ void XdsClient::ChannelState::AdsCallState::RejectAdsUpdateLocked(
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] %s update NACKed containing %" PRIuPTR
" resources",
" invalid resources",
xds_client(), result.type_url.c_str(),
result.resource_names_failed.size());
}
Expand Down Expand Up @@ -1268,9 +1301,8 @@ bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
// Update nonce.
auto& state = state_map_[result.type_url];
state.nonce = std::move(result.nonce);
// NACK or ACK the response.
// If we got an error, we'll NACK the update.
if (result.parse_error != GRPC_ERROR_NONE) {
// NACK unacceptable update.
gpr_log(GPR_ERROR,
"[xds_client %p] ADS response invalid for resource type %s "
"version %s, will NACK: nonce=%s error=%s",
Expand All @@ -1294,34 +1326,41 @@ bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
RejectAdsUpdateLocked(update_time, result,
&xds_client()->endpoint_map_);
}
SendMessageLocked(result.type_url);
} else {
}
// Process any valid resources.
bool have_valid_resources = false;
if (result.type_url == XdsApi::kLdsTypeUrl) {
have_valid_resources = !result.lds_update_map.empty();
AcceptLdsUpdateLocked(result.version, update_time,
std::move(result.lds_update_map),
result.resource_names_failed);
} else if (result.type_url == XdsApi::kRdsTypeUrl) {
have_valid_resources = !result.rds_update_map.empty();
AcceptRdsUpdateLocked(result.version, update_time,
std::move(result.rds_update_map));
} else if (result.type_url == XdsApi::kCdsTypeUrl) {
have_valid_resources = !result.cds_update_map.empty();
AcceptCdsUpdateLocked(result.version, update_time,
std::move(result.cds_update_map),
result.resource_names_failed);
} else if (result.type_url == XdsApi::kEdsTypeUrl) {
have_valid_resources = !result.eds_update_map.empty();
AcceptEdsUpdateLocked(result.version, update_time,
std::move(result.eds_update_map));
}
if (have_valid_resources) {
seen_response_ = true;
// Accept the ADS response according to the type_url.
if (result.type_url == XdsApi::kLdsTypeUrl) {
AcceptLdsUpdateLocked(result.version, update_time,
std::move(result.lds_update_map));
} else if (result.type_url == XdsApi::kRdsTypeUrl) {
AcceptRdsUpdateLocked(result.version, update_time,
std::move(result.rds_update_map));
} else if (result.type_url == XdsApi::kCdsTypeUrl) {
AcceptCdsUpdateLocked(result.version, update_time,
std::move(result.cds_update_map));
} else if (result.type_url == XdsApi::kEdsTypeUrl) {
AcceptEdsUpdateLocked(result.version, update_time,
std::move(result.eds_update_map));
}
xds_client()->resource_version_map_[result.type_url] =
std::move(result.version);
// ACK the update.
SendMessageLocked(result.type_url);
// Start load reporting if needed.
auto& lrs_call = chand()->lrs_calld_;
if (lrs_call != nullptr) {
LrsCallState* lrs_calld = lrs_call->calld();
if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked();
}
}
// Send ACK or NACK.
SendMessageLocked(result.type_url);
}
if (xds_client()->shutting_down_) return true;
// Keep listening for updates.
Expand Down