From a3477951e56abe19adfd79973bb415cf8c7cad9d Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 7 Sep 2021 12:17:23 -0700 Subject: [PATCH 1/5] refactor xDS response parsing --- src/core/ext/xds/xds_api.cc | 1030 +++++++++++--------------- test/cpp/end2end/xds_end2end_test.cc | 33 +- 2 files changed, 431 insertions(+), 632 deletions(-) diff --git a/src/core/ext/xds/xds_api.cc b/src/core/ext/xds/xds_api.cc index ecaf22ab64e0f..6ddfe415d1692 100644 --- a/src/core/ext/xds/xds_api.cc +++ b/src/core/ext/xds/xds_api.cc @@ -847,15 +847,15 @@ bool IsLds(absl::string_view type_url, bool* is_v2 = nullptr) { return false; } -bool IsRds(absl::string_view type_url) { +bool IsRds(absl::string_view type_url, bool* /*is_v2*/ = nullptr) { return type_url == XdsApi::kRdsTypeUrl || type_url == kRdsV2TypeUrl; } -bool IsCds(absl::string_view type_url) { +bool IsCds(absl::string_view type_url, bool* /*is_v2*/ = nullptr) { return type_url == XdsApi::kCdsTypeUrl || type_url == kCdsV2TypeUrl; } -bool IsEds(absl::string_view type_url) { +bool IsEds(absl::string_view type_url, bool* /*is_v2*/ = nullptr) { return type_url == XdsApi::kEdsTypeUrl || type_url == kEdsV2TypeUrl; } @@ -1207,6 +1207,18 @@ void MaybeLogDiscoveryResponse( } } +void MaybeLogListener(const EncodingContext& context, + const envoy_config_listener_v3_Listener* listener) { + if (GRPC_TRACE_FLAG_ENABLED(*context.tracer) && + gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) { + const upb_msgdef* msg_type = + envoy_config_listener_v3_Listener_getmsgdef(context.symtab); + char buf[10240]; + upb_text_encode(listener, msg_type, nullptr, 0, buf, sizeof(buf)); + gpr_log(GPR_DEBUG, "[xds_client %p] Listener: %s", context.client, buf); + } +} + void MaybeLogHttpConnectionManager( const EncodingContext& context, const envoy_extensions_filters_network_http_connection_manager_v3_HttpConnectionManager* @@ -1828,7 +1840,7 @@ grpc_error_handle RouteActionParse(const EncodingContext& context, grpc_error_handle RouteConfigParse( const EncodingContext& context, const envoy_config_route_v3_RouteConfiguration* route_config, - XdsApi::RdsUpdate* rds_update) { + bool /*is_v2*/, XdsApi::RdsUpdate* rds_update) { MaybeLogRouteConfiguration(context, route_config); // Get the virtual hosts. size_t num_virtual_hosts; @@ -2214,7 +2226,7 @@ grpc_error_handle HttpConnectionManagerParse( http_connection_manager_proto); XdsApi::RdsUpdate rds_update; grpc_error_handle error = - RouteConfigParse(context, route_config, &rds_update); + RouteConfigParse(context, route_config, is_v2, &rds_update); if (error != GRPC_ERROR_NONE) return error; http_connection_manager->rds_update = std::move(rds_update); return GRPC_ERROR_NONE; @@ -2247,7 +2259,7 @@ grpc_error_handle HttpConnectionManagerParse( return GRPC_ERROR_NONE; } -grpc_error_handle LdsResponseParseClient( +grpc_error_handle LdsResourceParseClient( const EncodingContext& context, const envoy_config_listener_v3_ApiListener* api_listener, bool is_v2, XdsApi::LdsUpdate* lds_update) { @@ -2714,7 +2726,7 @@ grpc_error_handle BuildFilterChainMap( return GRPC_ERROR_NONE; } -grpc_error_handle LdsResponseParseServer( +grpc_error_handle LdsResourceParseServer( const EncodingContext& context, const envoy_config_listener_v3_Listener* listener, bool is_v2, XdsApi::LdsUpdate* lds_update) { @@ -2763,166 +2775,31 @@ grpc_error_handle LdsResponseParseServer( return GRPC_ERROR_NONE; } -grpc_error_handle LdsResponseParse( +grpc_error_handle LdsResourceParse( const EncodingContext& context, - const envoy_service_discovery_v3_DiscoveryResponse* response, - const std::set& expected_listener_names, - XdsApi::LdsUpdateMap* lds_update_map, - std::set* resource_names_failed) { - std::vector errors; - // Get the resources from the response. - size_t size; - const google_protobuf_Any* const* resources = - envoy_service_discovery_v3_DiscoveryResponse_resources(response, &size); - for (size_t i = 0; i < size; ++i) { - // Check the type_url of the resource. - absl::string_view type_url = - UpbStringToAbsl(google_protobuf_Any_type_url(resources[i])); - bool is_v2 = false; - if (!IsLds(type_url, &is_v2)) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("resource index ", i, ": Resource is not LDS.") - .c_str())); - continue; - } - // Decode the listener. - const upb_strview encoded_listener = - google_protobuf_Any_value(resources[i]); - const envoy_config_listener_v3_Listener* listener = - envoy_config_listener_v3_Listener_parse( - encoded_listener.data, encoded_listener.size, context.arena); - if (listener == nullptr) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("resource index ", i, ": Can't decode listener.") - .c_str())); - continue; - } - // Check listener name. Ignore unexpected listeners. - std::string listener_name = - UpbStringToStdString(envoy_config_listener_v3_Listener_name(listener)); - if (expected_listener_names.find(listener_name) == - expected_listener_names.end()) { - continue; - } - // Fail if listener name is duplicated. - if (lds_update_map->find(listener_name) != lds_update_map->end()) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("duplicate listener name \"", listener_name, "\"") - .c_str())); - resource_names_failed->insert(listener_name); - continue; - } - // Serialize into JSON and store it in the LdsUpdateMap - XdsApi::LdsResourceData& lds_resource_data = - (*lds_update_map)[listener_name]; - XdsApi::LdsUpdate& lds_update = lds_resource_data.resource; - lds_resource_data.serialized_proto = UpbStringToStdString(encoded_listener); - // Check whether it's a client or server listener. - const envoy_config_listener_v3_ApiListener* api_listener = - envoy_config_listener_v3_Listener_api_listener(listener); - const envoy_config_core_v3_Address* address = - envoy_config_listener_v3_Listener_address(listener); - if (api_listener != nullptr && address != nullptr) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(listener_name, - ": Listener has both address and ApiListener") - .c_str())); - resource_names_failed->insert(listener_name); - continue; - } - if (api_listener == nullptr && address == nullptr) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(listener_name, - ": Listener has neither address nor ApiListener") - .c_str())); - resource_names_failed->insert(listener_name); - continue; - } - grpc_error_handle error = GRPC_ERROR_NONE; - if (api_listener != nullptr) { - error = LdsResponseParseClient(context, api_listener, is_v2, &lds_update); - } else { - error = LdsResponseParseServer(context, listener, is_v2, &lds_update); - } - if (error != GRPC_ERROR_NONE) { - errors.push_back(grpc_error_add_child( - GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(listener_name, ": validation error").c_str()), - error)); - resource_names_failed->insert(listener_name); - } + const envoy_config_listener_v3_Listener* listener, bool is_v2, + XdsApi::LdsUpdate* lds_update) { + // Check whether it's a client or server listener. + const envoy_config_listener_v3_ApiListener* api_listener = + envoy_config_listener_v3_Listener_api_listener(listener); + const envoy_config_core_v3_Address* address = + envoy_config_listener_v3_Listener_address(listener); + if (api_listener != nullptr && address != nullptr) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Listener has both address and ApiListener"); } - return GRPC_ERROR_CREATE_FROM_VECTOR("errors parsing LDS response", &errors); -} - -grpc_error_handle RdsResponseParse( - const EncodingContext& context, - const envoy_service_discovery_v3_DiscoveryResponse* response, - const std::set& expected_route_configuration_names, - XdsApi::RdsUpdateMap* rds_update_map, - std::set* resource_names_failed) { - std::vector errors; - // Get the resources from the response. - size_t size; - const google_protobuf_Any* const* resources = - envoy_service_discovery_v3_DiscoveryResponse_resources(response, &size); - for (size_t i = 0; i < size; ++i) { - // Check the type_url of the resource. - absl::string_view type_url = - UpbStringToAbsl(google_protobuf_Any_type_url(resources[i])); - if (!IsRds(type_url)) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("resource index ", i, ": Resource is not RDS.") - .c_str())); - continue; - } - // Decode the route_config. - const upb_strview encoded_route_config = - google_protobuf_Any_value(resources[i]); - const envoy_config_route_v3_RouteConfiguration* route_config = - envoy_config_route_v3_RouteConfiguration_parse( - encoded_route_config.data, encoded_route_config.size, - context.arena); - if (route_config == nullptr) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("resource index ", i, ": Can't decode route_config.") - .c_str())); - continue; - } - // Check route_config_name. Ignore unexpected route_config. - std::string route_config_name = UpbStringToStdString( - envoy_config_route_v3_RouteConfiguration_name(route_config)); - if (expected_route_configuration_names.find(route_config_name) == - expected_route_configuration_names.end()) { - continue; - } - // Fail if route config name is duplicated. - if (rds_update_map->find(route_config_name) != rds_update_map->end()) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("duplicate route config name \"", route_config_name, - "\"") - .c_str())); - resource_names_failed->insert(route_config_name); - continue; - } - // Serialize into JSON and store it in the RdsUpdateMap - XdsApi::RdsResourceData& rds_resource_data = - (*rds_update_map)[route_config_name]; - XdsApi::RdsUpdate& rds_update = rds_resource_data.resource; - rds_resource_data.serialized_proto = - UpbStringToStdString(encoded_route_config); - // Parse the route_config. - grpc_error_handle error = - RouteConfigParse(context, route_config, &rds_update); - if (error != GRPC_ERROR_NONE) { - errors.push_back(grpc_error_add_child( - GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(route_config_name, ": validation error").c_str()), - error)); - resource_names_failed->insert(route_config_name); - } + if (api_listener == nullptr && address == nullptr) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Listener has neither address nor ApiListener"); + } + // Validate Listener fields. + grpc_error_handle error = GRPC_ERROR_NONE; + if (api_listener != nullptr) { + error = LdsResourceParseClient(context, api_listener, is_v2, lds_update); + } else { + error = LdsResourceParseServer(context, listener, is_v2, lds_update); } - return GRPC_ERROR_CREATE_FROM_VECTOR("errors parsing RDS response", &errors); + return error; } grpc_error_handle UpstreamTlsContextParse( @@ -2973,380 +2850,259 @@ grpc_error_handle UpstreamTlsContextParse( return GRPC_ERROR_NONE; } -grpc_error_handle CdsResponseParse( +grpc_error_handle CdsLogicalDnsParse( const EncodingContext& context, - const envoy_service_discovery_v3_DiscoveryResponse* response, - const std::set& expected_cluster_names, - XdsApi::CdsUpdateMap* cds_update_map, - std::set* resource_names_failed) { + const envoy_config_cluster_v3_Cluster* cluster, + XdsApi::CdsUpdate* cds_update) { + const auto* load_assignment = + envoy_config_cluster_v3_Cluster_load_assignment(cluster); + if (load_assignment == nullptr) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "load_assignment not present for LOGICAL_DNS cluster"); + } + size_t num_localities; + const auto* const* localities = + envoy_config_endpoint_v3_ClusterLoadAssignment_endpoints(load_assignment, + &num_localities); + if (num_localities != 1) { + return GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat("load_assignment for LOGICAL_DNS cluster must have " + "exactly one locality, found ", + num_localities) + .c_str()); + } + size_t num_endpoints; + const auto* const* endpoints = + envoy_config_endpoint_v3_LocalityLbEndpoints_lb_endpoints(localities[0], + &num_endpoints); + if (num_endpoints != 1) { + return GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat("locality for LOGICAL_DNS cluster must have " + "exactly one endpoint, found ", + num_endpoints) + .c_str()); + } + const auto* endpoint = + envoy_config_endpoint_v3_LbEndpoint_endpoint(endpoints[0]); + if (endpoint == nullptr) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "LbEndpoint endpoint field not set"); + } + const auto* address = envoy_config_endpoint_v3_Endpoint_address(endpoint); + if (address == nullptr) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Endpoint address field not set"); + } + const auto* socket_address = + envoy_config_core_v3_Address_socket_address(address); + if (socket_address == nullptr) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Address socket_address field not set"); + } + if (envoy_config_core_v3_SocketAddress_resolver_name(socket_address).size != + 0) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "LOGICAL_DNS clusters must NOT have a custom resolver name set"); + } + absl::string_view address_str = UpbStringToAbsl( + envoy_config_core_v3_SocketAddress_address(socket_address)); + if (address_str.empty()) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "SocketAddress address field not set"); + } + if (!envoy_config_core_v3_SocketAddress_has_port_value(socket_address)) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "SocketAddress port_value field not set"); + } + cds_update->dns_hostname = JoinHostPort( + address_str, + envoy_config_core_v3_SocketAddress_port_value(socket_address)); + return GRPC_ERROR_NONE; +} + +grpc_error_handle CdsResourceParse( + const EncodingContext& context, + const envoy_config_cluster_v3_Cluster* cluster, bool /*is_v2*/, + XdsApi::CdsUpdate* cds_update) { std::vector errors; - // Get the resources from the response. - size_t size; - const google_protobuf_Any* const* resources = - envoy_service_discovery_v3_DiscoveryResponse_resources(response, &size); - // Parse all the resources in the CDS response. - for (size_t i = 0; i < size; ++i) { - // Check the type_url of the resource. - absl::string_view type_url = - UpbStringToAbsl(google_protobuf_Any_type_url(resources[i])); - if (!IsCds(type_url)) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("resource index ", i, ": Resource is not CDS.") - .c_str())); - continue; - } - // Decode the cluster. - const upb_strview encoded_cluster = google_protobuf_Any_value(resources[i]); - const envoy_config_cluster_v3_Cluster* cluster = - envoy_config_cluster_v3_Cluster_parse( - encoded_cluster.data, encoded_cluster.size, context.arena); - if (cluster == nullptr) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("resource index ", i, ": Can't decode cluster.") - .c_str())); - continue; - } - MaybeLogCluster(context, cluster); - // Ignore unexpected cluster names. - std::string cluster_name = - UpbStringToStdString(envoy_config_cluster_v3_Cluster_name(cluster)); - if (expected_cluster_names.find(cluster_name) == - expected_cluster_names.end()) { - continue; - } - // Fail on duplicate resources. - if (cds_update_map->find(cluster_name) != cds_update_map->end()) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("duplicate resource name \"", cluster_name, "\"") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; - } - // Add the cluster to cds_update_map. - XdsApi::CdsResourceData& cds_resource_data = - (*cds_update_map)[cluster_name]; - XdsApi::CdsUpdate& cds_update = cds_resource_data.resource; - // Store serialized proto. - cds_resource_data.serialized_proto = UpbStringToStdString(encoded_cluster); - // Check the cluster_discovery_type. - if (!envoy_config_cluster_v3_Cluster_has_type(cluster) && - !envoy_config_cluster_v3_Cluster_has_cluster_type(cluster)) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, ": DiscoveryType not found.").c_str())); - resource_names_failed->insert(cluster_name); - continue; - } - if (envoy_config_cluster_v3_Cluster_type(cluster) == - envoy_config_cluster_v3_Cluster_EDS) { - cds_update.cluster_type = XdsApi::CdsUpdate::ClusterType::EDS; - // Check the EDS config source. - const envoy_config_cluster_v3_Cluster_EdsClusterConfig* - eds_cluster_config = - envoy_config_cluster_v3_Cluster_eds_cluster_config(cluster); - const envoy_config_core_v3_ConfigSource* eds_config = - envoy_config_cluster_v3_Cluster_EdsClusterConfig_eds_config( - eds_cluster_config); - if (!envoy_config_core_v3_ConfigSource_has_ads(eds_config)) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, ": EDS ConfigSource is not ADS.") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; - } - // Record EDS service_name (if any). - upb_strview service_name = - envoy_config_cluster_v3_Cluster_EdsClusterConfig_service_name( - eds_cluster_config); - if (service_name.size != 0) { - cds_update.eds_service_name = UpbStringToStdString(service_name); - } - } else if (!XdsAggregateAndLogicalDnsClusterEnabled()) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, ": DiscoveryType is not valid.").c_str())); - resource_names_failed->insert(cluster_name); - continue; - } else if (envoy_config_cluster_v3_Cluster_type(cluster) == - envoy_config_cluster_v3_Cluster_LOGICAL_DNS) { - cds_update.cluster_type = XdsApi::CdsUpdate::ClusterType::LOGICAL_DNS; - const auto* load_assignment = - envoy_config_cluster_v3_Cluster_load_assignment(cluster); - if (load_assignment == nullptr) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat( - cluster_name, - ": load_assignment not present for LOGICAL_DNS cluster") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; - } - size_t num_localities; - const auto* const* localities = - envoy_config_endpoint_v3_ClusterLoadAssignment_endpoints( - load_assignment, &num_localities); - if (num_localities != 1) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, - ": load_assignment for LOGICAL_DNS cluster must have " - "exactly one locality, found ", - num_localities) - .c_str())); - resource_names_failed->insert(cluster_name); - continue; - } - size_t num_endpoints; - const auto* const* endpoints = - envoy_config_endpoint_v3_LocalityLbEndpoints_lb_endpoints( - localities[0], &num_endpoints); - if (num_endpoints != 1) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, - ": locality for LOGICAL_DNS cluster must have " - "exactly one endpoint, found ", - num_endpoints) - .c_str())); - resource_names_failed->insert(cluster_name); - continue; - } - const auto* endpoint = - envoy_config_endpoint_v3_LbEndpoint_endpoint(endpoints[0]); - if (endpoint == nullptr) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, ": LbEndpoint endpoint field not set") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; - } - const auto* address = envoy_config_endpoint_v3_Endpoint_address(endpoint); - if (address == nullptr) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, ": Endpoint address field not set") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; - } - const auto* socket_address = - envoy_config_core_v3_Address_socket_address(address); - if (socket_address == nullptr) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, ": Address socket_address field not set") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; - } - if (envoy_config_core_v3_SocketAddress_resolver_name(socket_address) - .size != 0) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat( - cluster_name, - ": LOGICAL_DNS clusters must NOT have a custom resolver " - "name set") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; - } - absl::string_view address_str = UpbStringToAbsl( - envoy_config_core_v3_SocketAddress_address(socket_address)); - if (address_str.empty()) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, ": SocketAddress address field not set") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; - } - if (!envoy_config_core_v3_SocketAddress_has_port_value(socket_address)) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, - ": SocketAddress port_value field not set") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; - } - cds_update.dns_hostname = JoinHostPort( - address_str, - envoy_config_core_v3_SocketAddress_port_value(socket_address)); + // Check the cluster_discovery_type. + if (!envoy_config_cluster_v3_Cluster_has_type(cluster) && + !envoy_config_cluster_v3_Cluster_has_cluster_type(cluster)) { + errors.push_back( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("DiscoveryType not found.")); + } else if (envoy_config_cluster_v3_Cluster_type(cluster) == + envoy_config_cluster_v3_Cluster_EDS) { + cds_update->cluster_type = XdsApi::CdsUpdate::ClusterType::EDS; + // Check the EDS config source. + const envoy_config_cluster_v3_Cluster_EdsClusterConfig* eds_cluster_config = + envoy_config_cluster_v3_Cluster_eds_cluster_config(cluster); + const envoy_config_core_v3_ConfigSource* eds_config = + envoy_config_cluster_v3_Cluster_EdsClusterConfig_eds_config( + eds_cluster_config); + if (!envoy_config_core_v3_ConfigSource_has_ads(eds_config)) { + errors.push_back( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("EDS ConfigSource is not ADS.")); + } + // Record EDS service_name (if any). + upb_strview service_name = + envoy_config_cluster_v3_Cluster_EdsClusterConfig_service_name( + eds_cluster_config); + if (service_name.size != 0) { + cds_update->eds_service_name = UpbStringToStdString(service_name); + } + } else if (!XdsAggregateAndLogicalDnsClusterEnabled()) { + errors.push_back( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("DiscoveryType is not valid.")); + } else if (envoy_config_cluster_v3_Cluster_type(cluster) == + envoy_config_cluster_v3_Cluster_LOGICAL_DNS) { + cds_update->cluster_type = XdsApi::CdsUpdate::ClusterType::LOGICAL_DNS; + grpc_error_handle error = CdsLogicalDnsParse(context, cluster, cds_update); + if (error != GRPC_ERROR_NONE) errors.push_back(error); + } else { + if (!envoy_config_cluster_v3_Cluster_has_cluster_type(cluster)) { + errors.push_back( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("DiscoveryType is not valid.")); } else { - if (envoy_config_cluster_v3_Cluster_has_cluster_type(cluster)) { - const envoy_config_cluster_v3_Cluster_CustomClusterType* - custom_cluster_type = - envoy_config_cluster_v3_Cluster_cluster_type(cluster); - upb_strview type_name = - envoy_config_cluster_v3_Cluster_CustomClusterType_name( + const envoy_config_cluster_v3_Cluster_CustomClusterType* + custom_cluster_type = + envoy_config_cluster_v3_Cluster_cluster_type(cluster); + upb_strview type_name = + envoy_config_cluster_v3_Cluster_CustomClusterType_name( + custom_cluster_type); + if (UpbStringToAbsl(type_name) != "envoy.clusters.aggregate") { + errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "DiscoveryType is not valid.")); + } else { + cds_update->cluster_type = XdsApi::CdsUpdate::ClusterType::AGGREGATE; + // Retrieve aggregate clusters. + const google_protobuf_Any* typed_config = + envoy_config_cluster_v3_Cluster_CustomClusterType_typed_config( custom_cluster_type); - if (UpbStringToAbsl(type_name) == "envoy.clusters.aggregate") { - cds_update.cluster_type = XdsApi::CdsUpdate::ClusterType::AGGREGATE; - // Retrieve aggregate clusters. - const google_protobuf_Any* typed_config = - envoy_config_cluster_v3_Cluster_CustomClusterType_typed_config( - custom_cluster_type); - const upb_strview aggregate_cluster_config_upb_strview = - google_protobuf_Any_value(typed_config); - const envoy_extensions_clusters_aggregate_v3_ClusterConfig* - aggregate_cluster_config = - envoy_extensions_clusters_aggregate_v3_ClusterConfig_parse( - aggregate_cluster_config_upb_strview.data, - aggregate_cluster_config_upb_strview.size, context.arena); - if (aggregate_cluster_config == nullptr) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, ": Can't parse aggregate cluster.") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; - } + const upb_strview aggregate_cluster_config_upb_strview = + google_protobuf_Any_value(typed_config); + const envoy_extensions_clusters_aggregate_v3_ClusterConfig* + aggregate_cluster_config = + envoy_extensions_clusters_aggregate_v3_ClusterConfig_parse( + aggregate_cluster_config_upb_strview.data, + aggregate_cluster_config_upb_strview.size, context.arena); + if (aggregate_cluster_config == nullptr) { + errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Can't parse aggregate cluster.")); + } else { size_t size; const upb_strview* clusters = envoy_extensions_clusters_aggregate_v3_ClusterConfig_clusters( aggregate_cluster_config, &size); for (size_t i = 0; i < size; ++i) { const upb_strview cluster = clusters[i]; - cds_update.prioritized_cluster_names.emplace_back( + cds_update->prioritized_cluster_names.emplace_back( UpbStringToStdString(cluster)); } - } else { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, ": DiscoveryType is not valid.") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; } - } else { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, ": DiscoveryType is not valid.") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; } } - // Check the LB policy. - if (envoy_config_cluster_v3_Cluster_lb_policy(cluster) == - envoy_config_cluster_v3_Cluster_ROUND_ROBIN) { - cds_update.lb_policy = "ROUND_ROBIN"; - } else if (envoy_config_cluster_v3_Cluster_lb_policy(cluster) == - envoy_config_cluster_v3_Cluster_RING_HASH) { - cds_update.lb_policy = "RING_HASH"; - // Record ring hash lb config - auto* ring_hash_config = - envoy_config_cluster_v3_Cluster_ring_hash_lb_config(cluster); - if (ring_hash_config != nullptr) { - const google_protobuf_UInt64Value* max_ring_size = - envoy_config_cluster_v3_Cluster_RingHashLbConfig_maximum_ring_size( - ring_hash_config); - if (max_ring_size != nullptr) { - cds_update.max_ring_size = - google_protobuf_UInt64Value_value(max_ring_size); - if (cds_update.max_ring_size > 8388608 || - cds_update.max_ring_size == 0) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat( - cluster_name, - ": max_ring_size is not in the range of 1 to 8388608.") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; - } + } + // Check the LB policy. + if (envoy_config_cluster_v3_Cluster_lb_policy(cluster) == + envoy_config_cluster_v3_Cluster_ROUND_ROBIN) { + cds_update->lb_policy = "ROUND_ROBIN"; + } else if (envoy_config_cluster_v3_Cluster_lb_policy(cluster) == + envoy_config_cluster_v3_Cluster_RING_HASH) { + cds_update->lb_policy = "RING_HASH"; + // Record ring hash lb config + auto* ring_hash_config = + envoy_config_cluster_v3_Cluster_ring_hash_lb_config(cluster); + if (ring_hash_config != nullptr) { + const google_protobuf_UInt64Value* max_ring_size = + envoy_config_cluster_v3_Cluster_RingHashLbConfig_maximum_ring_size( + ring_hash_config); + if (max_ring_size != nullptr) { + cds_update->max_ring_size = + google_protobuf_UInt64Value_value(max_ring_size); + if (cds_update->max_ring_size > 8388608 || + cds_update->max_ring_size == 0) { + errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "max_ring_size is not in the range of 1 to 8388608.")); } - const google_protobuf_UInt64Value* min_ring_size = - envoy_config_cluster_v3_Cluster_RingHashLbConfig_minimum_ring_size( - ring_hash_config); - if (min_ring_size != nullptr) { - cds_update.min_ring_size = - google_protobuf_UInt64Value_value(min_ring_size); - if (cds_update.min_ring_size > 8388608 || - cds_update.min_ring_size == 0) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat( - cluster_name, - ": min_ring_size is not in the range of 1 to 8388608.") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; - } - if (cds_update.min_ring_size > cds_update.max_ring_size) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat( - cluster_name, - ": min_ring_size cannot be greater than max_ring_size.") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; - } + } + const google_protobuf_UInt64Value* min_ring_size = + envoy_config_cluster_v3_Cluster_RingHashLbConfig_minimum_ring_size( + ring_hash_config); + if (min_ring_size != nullptr) { + cds_update->min_ring_size = + google_protobuf_UInt64Value_value(min_ring_size); + if (cds_update->min_ring_size > 8388608 || + cds_update->min_ring_size == 0) { + errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "min_ring_size is not in the range of 1 to 8388608.")); } - if (envoy_config_cluster_v3_Cluster_RingHashLbConfig_hash_function( - ring_hash_config) != - envoy_config_cluster_v3_Cluster_RingHashLbConfig_XX_HASH) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, - ": ring hash lb config has invalid hash function.") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; + if (cds_update->min_ring_size > cds_update->max_ring_size) { + errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "min_ring_size cannot be greater than max_ring_size.")); } } - } else { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, ": LB policy is not supported.").c_str())); - resource_names_failed->insert(cluster_name); - continue; - } - if (XdsSecurityEnabled()) { - auto* transport_socket = - envoy_config_cluster_v3_Cluster_transport_socket(cluster); - if (transport_socket != nullptr) { - grpc_error_handle error = UpstreamTlsContextParse( - context, transport_socket, &cds_update.common_tls_context); - if (error != GRPC_ERROR_NONE) { - errors.push_back(grpc_error_add_child( - GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat( - "Error parsing security configuration for cluster: ", - cluster_name) - .c_str()), - error)); - resource_names_failed->insert(cluster_name); - continue; - } + if (envoy_config_cluster_v3_Cluster_RingHashLbConfig_hash_function( + ring_hash_config) != + envoy_config_cluster_v3_Cluster_RingHashLbConfig_XX_HASH) { + errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "ring hash lb config has invalid hash function.")); } } - // Record LRS server name (if any). - const envoy_config_core_v3_ConfigSource* lrs_server = - envoy_config_cluster_v3_Cluster_lrs_server(cluster); - if (lrs_server != nullptr) { - if (!envoy_config_core_v3_ConfigSource_has_self(lrs_server)) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, ": LRS ConfigSource is not self.") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; + } else { + errors.push_back( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("LB policy is not supported.")); + } + if (XdsSecurityEnabled()) { + auto* transport_socket = + envoy_config_cluster_v3_Cluster_transport_socket(cluster); + if (transport_socket != nullptr) { + grpc_error_handle error = UpstreamTlsContextParse( + context, transport_socket, &cds_update->common_tls_context); + if (error != GRPC_ERROR_NONE) { + errors.push_back( + grpc_error_add_child(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Error parsing security configuration"), + error)); } - cds_update.lrs_load_reporting_server_name.emplace(""); } - // The Cluster resource encodes the circuit breaking parameters in a list of - // Thresholds messages, where each message specifies the parameters for a - // particular RoutingPriority. we will look only at the first entry in the - // list for priority DEFAULT and default to 1024 if not found. - if (envoy_config_cluster_v3_Cluster_has_circuit_breakers(cluster)) { - const envoy_config_cluster_v3_CircuitBreakers* circuit_breakers = - envoy_config_cluster_v3_Cluster_circuit_breakers(cluster); - size_t num_thresholds; - const envoy_config_cluster_v3_CircuitBreakers_Thresholds* const* - thresholds = envoy_config_cluster_v3_CircuitBreakers_thresholds( - circuit_breakers, &num_thresholds); - for (size_t i = 0; i < num_thresholds; ++i) { - const auto* threshold = thresholds[i]; - if (envoy_config_cluster_v3_CircuitBreakers_Thresholds_priority( - threshold) == envoy_config_core_v3_DEFAULT) { - const google_protobuf_UInt32Value* max_requests = - envoy_config_cluster_v3_CircuitBreakers_Thresholds_max_requests( - threshold); - if (max_requests != nullptr) { - cds_update.max_concurrent_requests = - google_protobuf_UInt32Value_value(max_requests); - } - break; + } + // Record LRS server name (if any). + const envoy_config_core_v3_ConfigSource* lrs_server = + envoy_config_cluster_v3_Cluster_lrs_server(cluster); + if (lrs_server != nullptr) { + if (!envoy_config_core_v3_ConfigSource_has_self(lrs_server)) { + errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + ": LRS ConfigSource is not self.")); + } + cds_update->lrs_load_reporting_server_name.emplace(""); + } + // The Cluster resource encodes the circuit breaking parameters in a list of + // Thresholds messages, where each message specifies the parameters for a + // particular RoutingPriority. we will look only at the first entry in the + // list for priority DEFAULT and default to 1024 if not found. + if (envoy_config_cluster_v3_Cluster_has_circuit_breakers(cluster)) { + const envoy_config_cluster_v3_CircuitBreakers* circuit_breakers = + envoy_config_cluster_v3_Cluster_circuit_breakers(cluster); + size_t num_thresholds; + const envoy_config_cluster_v3_CircuitBreakers_Thresholds* const* + thresholds = envoy_config_cluster_v3_CircuitBreakers_thresholds( + circuit_breakers, &num_thresholds); + for (size_t i = 0; i < num_thresholds; ++i) { + const auto* threshold = thresholds[i]; + if (envoy_config_cluster_v3_CircuitBreakers_Thresholds_priority( + threshold) == envoy_config_core_v3_DEFAULT) { + const google_protobuf_UInt32Value* max_requests = + envoy_config_cluster_v3_CircuitBreakers_Thresholds_max_requests( + threshold); + if (max_requests != nullptr) { + cds_update->max_concurrent_requests = + google_protobuf_UInt32Value_value(max_requests); } + break; } } } - return GRPC_ERROR_CREATE_FROM_VECTOR("errors parsing CDS response", &errors); + return GRPC_ERROR_CREATE_FROM_VECTOR("errors parsing CDS resource", &errors); } grpc_error_handle ServerAddressParseAndAppend( @@ -3480,12 +3236,79 @@ grpc_error_handle DropParseAndAppend( return GRPC_ERROR_NONE; } -grpc_error_handle EdsResponseParse( +grpc_error_handle EdsResourceParse( const EncodingContext& context, + const envoy_config_endpoint_v3_ClusterLoadAssignment* + cluster_load_assignment, + bool /*is_v2*/, XdsApi::EdsUpdate* eds_update) { + std::vector errors; + // Get the endpoints. + size_t locality_size; + const envoy_config_endpoint_v3_LocalityLbEndpoints* const* endpoints = + envoy_config_endpoint_v3_ClusterLoadAssignment_endpoints( + cluster_load_assignment, &locality_size); + for (size_t j = 0; j < locality_size; ++j) { + size_t priority; + XdsApi::EdsUpdate::Priority::Locality locality; + grpc_error_handle error = LocalityParse(endpoints[j], &locality, &priority); + if (error != GRPC_ERROR_NONE) { + errors.push_back(error); + continue; + } + // Filter out locality with weight 0. + if (locality.lb_weight == 0) continue; + // Make sure prorities is big enough. Note that they might not + // arrive in priority order. + while (eds_update->priorities.size() < priority + 1) { + eds_update->priorities.emplace_back(); + } + eds_update->priorities[priority].localities.emplace(locality.name.get(), + std::move(locality)); + } + for (const auto& priority : eds_update->priorities) { + if (priority.localities.empty()) { + errors.push_back( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("sparse priority list")); + } + } + // Get the drop config. + eds_update->drop_config = MakeRefCounted(); + const envoy_config_endpoint_v3_ClusterLoadAssignment_Policy* policy = + envoy_config_endpoint_v3_ClusterLoadAssignment_policy( + cluster_load_assignment); + if (policy != nullptr) { + size_t drop_size; + const envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload* const* + drop_overload = + envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_drop_overloads( + policy, &drop_size); + for (size_t j = 0; j < drop_size; ++j) { + grpc_error_handle error = + DropParseAndAppend(drop_overload[j], eds_update->drop_config.get()); + if (error != GRPC_ERROR_NONE) { + errors.push_back( + grpc_error_add_child(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "drop config validation error"), + error)); + } + } + } + return GRPC_ERROR_CREATE_FROM_VECTOR("errors parsing EDS resource", &errors); +} + +template +grpc_error_handle AdsResponseParse( + const EncodingContext& context, ProtoParseFunction proto_parse_function, + ProtoResourceNameFunction proto_resource_name_function, + ResourceTypeSelectorFunction resource_type_selector_function, + ProtoLogFunction proto_log_function, + ResourceParseFunction resource_parse_function, const envoy_service_discovery_v3_DiscoveryResponse* response, - const std::set& expected_eds_service_names, - XdsApi::EdsUpdateMap* eds_update_map, - std::set* resource_names_failed) { + const char* resource_type_string, + const std::set& expected_resource_names, + UpdateMap* update_map, std::set* resource_names_failed) { std::vector errors; // Get the resources from the response. size_t size; @@ -3495,115 +3318,60 @@ grpc_error_handle EdsResponseParse( // Check the type_url of the resource. absl::string_view type_url = UpbStringToAbsl(google_protobuf_Any_type_url(resources[i])); - if (!IsEds(type_url)) { + bool is_v2 = false; + if (!resource_type_selector_function(type_url, &is_v2)) { errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("resource index ", i, ": Resource is not EDS.") + absl::StrCat("resource index ", i, ": Resource is not ", + resource_type_string, ".") .c_str())); continue; } - // Get the cluster_load_assignment. - upb_strview encoded_cluster_load_assignment = - google_protobuf_Any_value(resources[i]); - envoy_config_endpoint_v3_ClusterLoadAssignment* cluster_load_assignment = - envoy_config_endpoint_v3_ClusterLoadAssignment_parse( - encoded_cluster_load_assignment.data, - encoded_cluster_load_assignment.size, context.arena); - if (cluster_load_assignment == nullptr) { + // Parse the resource. + upb_strview serialized_resource = google_protobuf_Any_value(resources[i]); + auto* resource = proto_parse_function( + serialized_resource.data, serialized_resource.size, context.arena); + if (resource == nullptr) { errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("resource index ", i, - ": Can't parse cluster_load_assignment.") + absl::StrCat("resource index ", i, ": Can't parse ", + resource_type_string, " resource.") .c_str())); continue; } - MaybeLogClusterLoadAssignment(context, cluster_load_assignment); - // Check the EDS service name. Ignore unexpected names. - std::string eds_service_name = UpbStringToStdString( - envoy_config_endpoint_v3_ClusterLoadAssignment_cluster_name( - cluster_load_assignment)); - if (expected_eds_service_names.find(eds_service_name) == - expected_eds_service_names.end()) { + proto_log_function(context, resource); + // Check the resource name. Ignore unexpected names. + std::string resource_name = + UpbStringToStdString(proto_resource_name_function(resource)); + if (expected_resource_names.find(resource_name) == + expected_resource_names.end()) { continue; } // Fail on duplicate resources. - if (eds_update_map->find(eds_service_name) != eds_update_map->end()) { + if (update_map->find(resource_name) != update_map->end()) { errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("duplicate resource name \"", eds_service_name, "\"") + absl::StrCat("duplicate resource name \"", resource_name, "\"") .c_str())); - resource_names_failed->insert(eds_service_name); + resource_names_failed->insert(resource_name); continue; } - // Serialize into JSON and store it in the EdsUpdateMap - XdsApi::EdsResourceData& eds_resource_data = - (*eds_update_map)[eds_service_name]; - XdsApi::EdsUpdate& eds_update = eds_resource_data.resource; - eds_resource_data.serialized_proto = - UpbStringToStdString(encoded_cluster_load_assignment); - // Get the endpoints. - size_t locality_size; - const envoy_config_endpoint_v3_LocalityLbEndpoints* const* endpoints = - envoy_config_endpoint_v3_ClusterLoadAssignment_endpoints( - cluster_load_assignment, &locality_size); - grpc_error_handle error = GRPC_ERROR_NONE; - for (size_t j = 0; j < locality_size; ++j) { - size_t priority; - XdsApi::EdsUpdate::Priority::Locality locality; - error = LocalityParse(endpoints[j], &locality, &priority); - if (error != GRPC_ERROR_NONE) break; - // Filter out locality with weight 0. - if (locality.lb_weight == 0) continue; - // Make sure prorities is big enough. Note that they might not - // arrive in priority order. - while (eds_update.priorities.size() < priority + 1) { - eds_update.priorities.emplace_back(); - } - eds_update.priorities[priority].localities.emplace(locality.name.get(), - std::move(locality)); - } + // Validate resource. + decltype(UpdateMap::mapped_type::resource) update; + grpc_error_handle error = + resource_parse_function(context, resource, is_v2, &update); if (error != GRPC_ERROR_NONE) { errors.push_back(grpc_error_add_child( GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(eds_service_name, ": locality validation error") - .c_str()), + absl::StrCat(resource_name, ": validation error").c_str()), error)); - resource_names_failed->insert(eds_service_name); - continue; - } - for (const auto& priority : eds_update.priorities) { - if (priority.localities.empty()) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(eds_service_name, ": sparse priority list").c_str())); - resource_names_failed->insert(eds_service_name); - continue; - } - } - // Get the drop config. - eds_update.drop_config = MakeRefCounted(); - const envoy_config_endpoint_v3_ClusterLoadAssignment_Policy* policy = - envoy_config_endpoint_v3_ClusterLoadAssignment_policy( - cluster_load_assignment); - if (policy != nullptr) { - size_t drop_size; - const envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload* const* - drop_overload = - envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_drop_overloads( - policy, &drop_size); - for (size_t j = 0; j < drop_size; ++j) { - error = - DropParseAndAppend(drop_overload[j], eds_update.drop_config.get()); - if (error != GRPC_ERROR_NONE) break; - } - if (error != GRPC_ERROR_NONE) { - errors.push_back(grpc_error_add_child( - GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(eds_service_name, ": drop config validation error") - .c_str()), - error)); - resource_names_failed->insert(eds_service_name); - continue; - } + resource_names_failed->insert(resource_name); + } else { + // Store result in update map, in both validated and serialized form. + auto& resource_data = (*update_map)[resource_name]; + resource_data.resource = std::move(update); + resource_data.serialized_proto = + UpbStringToStdString(serialized_resource); } } - return GRPC_ERROR_CREATE_FROM_VECTOR("errors parsing EDS response", &errors); + return GRPC_ERROR_CREATE_FROM_VECTOR("errors parsing ADS response", &errors); } std::string TypeUrlInternalToExternal(absl::string_view type_url) { @@ -3619,6 +3387,27 @@ std::string TypeUrlInternalToExternal(absl::string_view type_url) { return std::string(type_url); } +upb_strview LdsResourceName( + const envoy_config_listener_v3_Listener* lds_resource) { + return envoy_config_listener_v3_Listener_name(lds_resource); +} + +upb_strview RdsResourceName( + const envoy_config_route_v3_RouteConfiguration* rds_resource) { + return envoy_config_route_v3_RouteConfiguration_name(rds_resource); +} + +upb_strview CdsResourceName( + const envoy_config_cluster_v3_Cluster* cds_resource) { + return envoy_config_cluster_v3_Cluster_name(cds_resource); +} + +upb_strview EdsResourceName( + const envoy_config_endpoint_v3_ClusterLoadAssignment* eds_resource) { + return envoy_config_endpoint_v3_ClusterLoadAssignment_cluster_name( + eds_resource); +} + template void MoveUpdatesToFailedSet(UpdateMap* update_map, std::set* resource_names_failed) { @@ -3664,34 +3453,45 @@ XdsApi::AdsParseResult XdsApi::ParseAdsResponse( result.nonce = UpbStringToStdString( envoy_service_discovery_v3_DiscoveryResponse_nonce(response)); // Parse the response according to the resource type. + // TODO(roth): When we have time, consider defining an interface for the + // methods of each resource type, so that we don't have to pass + // individual functions into each call to AdsResponseParse(). if (IsLds(result.type_url)) { - result.parse_error = - LdsResponseParse(context, response, expected_listener_names, - &result.lds_update_map, &result.resource_names_failed); + result.parse_error = AdsResponseParse( + context, envoy_config_listener_v3_Listener_parse, LdsResourceName, + IsLds, MaybeLogListener, LdsResourceParse, response, "LDS", + expected_listener_names, &result.lds_update_map, + &result.resource_names_failed); if (result.parse_error != GRPC_ERROR_NONE) { MoveUpdatesToFailedSet(&result.lds_update_map, &result.resource_names_failed); } } else if (IsRds(result.type_url)) { - result.parse_error = - RdsResponseParse(context, response, expected_route_configuration_names, - &result.rds_update_map, &result.resource_names_failed); + result.parse_error = AdsResponseParse( + context, envoy_config_route_v3_RouteConfiguration_parse, + RdsResourceName, IsRds, MaybeLogRouteConfiguration, RouteConfigParse, + response, "RDS", expected_route_configuration_names, + &result.rds_update_map, &result.resource_names_failed); if (result.parse_error != GRPC_ERROR_NONE) { MoveUpdatesToFailedSet(&result.rds_update_map, &result.resource_names_failed); } } else if (IsCds(result.type_url)) { - result.parse_error = - CdsResponseParse(context, response, expected_cluster_names, - &result.cds_update_map, &result.resource_names_failed); + result.parse_error = AdsResponseParse( + context, envoy_config_cluster_v3_Cluster_parse, CdsResourceName, IsCds, + MaybeLogCluster, CdsResourceParse, response, "CDS", + expected_cluster_names, &result.cds_update_map, + &result.resource_names_failed); if (result.parse_error != GRPC_ERROR_NONE) { MoveUpdatesToFailedSet(&result.cds_update_map, &result.resource_names_failed); } } else if (IsEds(result.type_url)) { - result.parse_error = - EdsResponseParse(context, response, expected_eds_service_names, - &result.eds_update_map, &result.resource_names_failed); + result.parse_error = AdsResponseParse( + context, envoy_config_endpoint_v3_ClusterLoadAssignment_parse, + EdsResourceName, IsEds, MaybeLogClusterLoadAssignment, EdsResourceParse, + response, "EDS", expected_eds_service_names, &result.eds_update_map, + &result.resource_names_failed); if (result.parse_error != GRPC_ERROR_NONE) { MoveUpdatesToFailedSet(&result.eds_update_map, &result.resource_names_failed); diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index ccf2b3e9c56fe..345c1a04be44f 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -3417,16 +3417,14 @@ TEST_P(GlobalXdsClientTest, MultipleBadResources) { const auto response_state = balancers_[0]->ads_service()->lds_response_state(); return response_state.state != AdsServiceImpl::ResponseState::NACKED || - !absl::StrContains( - response_state.error_message, - absl::StrCat( - kServerName, - ": Listener has neither address nor ApiListener")) || - !absl::StrContains( - response_state.error_message, - absl::StrCat( - kServerName2, - ": Listener has neither address nor ApiListener")); + ::testing::Matches(::testing::ContainsRegex(absl::StrCat( + kServerName, + ": validation error.*" + "Listener has neither address nor ApiListener.*", + kServerName2, + ": validation error.*" + "Listener has neither address nor ApiListener")))( + response_state.error_message); })); ASSERT_FALSE(timed_out); } @@ -7255,12 +7253,14 @@ TEST_P(CdsTest, MultipleBadResources) { const auto response_state = balancers_[0]->ads_service()->cds_response_state(); EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED); - EXPECT_THAT(response_state.error_message, - ::testing::AllOf( - ::testing::HasSubstr(absl::StrCat( - kDefaultClusterName, ": DiscoveryType is not valid.")), - ::testing::HasSubstr(absl::StrCat( - kClusterName2, ": DiscoveryType is not valid.")))); + EXPECT_THAT( + response_state.error_message, + ::testing::ContainsRegex(absl::StrCat(kDefaultClusterName, + ": validation error.*" + "DiscoveryType is not valid.*", + kClusterName2, + ": validation error.*" + "DiscoveryType is not valid"))); } // Tests that CDS client should send a NACK if the eds_config in CDS response @@ -12904,7 +12904,6 @@ TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpEndpointError) { CheckRpcSendOk(); for (int o = 0; o < kFetchConfigRetries; o++) { auto csds_response = FetchCsdsResponse(); - // Check if error state is propagated bool ok = ::testing::Value( csds_response.config(0).xds_config(), From ef3221ea5b7609f18ba57ee1c390c5813f3db41d Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 7 Sep 2021 14:48:58 -0700 Subject: [PATCH 2/5] fix build --- src/core/ext/xds/xds_api.cc | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/core/ext/xds/xds_api.cc b/src/core/ext/xds/xds_api.cc index 6ddfe415d1692..67bf1ed69f34e 100644 --- a/src/core/ext/xds/xds_api.cc +++ b/src/core/ext/xds/xds_api.cc @@ -2851,7 +2851,6 @@ grpc_error_handle UpstreamTlsContextParse( } grpc_error_handle CdsLogicalDnsParse( - const EncodingContext& context, const envoy_config_cluster_v3_Cluster* cluster, XdsApi::CdsUpdate* cds_update) { const auto* load_assignment = @@ -2956,7 +2955,7 @@ grpc_error_handle CdsResourceParse( } else if (envoy_config_cluster_v3_Cluster_type(cluster) == envoy_config_cluster_v3_Cluster_LOGICAL_DNS) { cds_update->cluster_type = XdsApi::CdsUpdate::ClusterType::LOGICAL_DNS; - grpc_error_handle error = CdsLogicalDnsParse(context, cluster, cds_update); + grpc_error_handle error = CdsLogicalDnsParse(cluster, cds_update); if (error != GRPC_ERROR_NONE) errors.push_back(error); } else { if (!envoy_config_cluster_v3_Cluster_has_cluster_type(cluster)) { @@ -3237,7 +3236,7 @@ grpc_error_handle DropParseAndAppend( } grpc_error_handle EdsResourceParse( - const EncodingContext& context, + const EncodingContext& /*context*/, const envoy_config_endpoint_v3_ClusterLoadAssignment* cluster_load_assignment, bool /*is_v2*/, XdsApi::EdsUpdate* eds_update) { From 4389073b062508ace8a2e209468ffccb08f92573 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 7 Sep 2021 15:53:57 -0700 Subject: [PATCH 3/5] implement improved xDS NACK semantics --- src/core/ext/xds/xds_api.cc | 25 ---- src/core/ext/xds/xds_client.cc | 89 +++++++++---- test/cpp/end2end/xds_end2end_test.cc | 183 ++++++++++++++++++++++----- 3 files changed, 215 insertions(+), 82 deletions(-) diff --git a/src/core/ext/xds/xds_api.cc b/src/core/ext/xds/xds_api.cc index 67bf1ed69f34e..6b0549b633db7 100644 --- a/src/core/ext/xds/xds_api.cc +++ b/src/core/ext/xds/xds_api.cc @@ -3407,15 +3407,6 @@ upb_strview EdsResourceName( eds_resource); } -template -void MoveUpdatesToFailedSet(UpdateMap* update_map, - std::set* resource_names_failed) { - for (const auto& p : *update_map) { - resource_names_failed->insert(p.first); - } - update_map->clear(); -} - } // namespace XdsApi::AdsParseResult XdsApi::ParseAdsResponse( @@ -3461,40 +3452,24 @@ XdsApi::AdsParseResult XdsApi::ParseAdsResponse( IsLds, MaybeLogListener, LdsResourceParse, response, "LDS", expected_listener_names, &result.lds_update_map, &result.resource_names_failed); - if (result.parse_error != GRPC_ERROR_NONE) { - MoveUpdatesToFailedSet(&result.lds_update_map, - &result.resource_names_failed); - } } else if (IsRds(result.type_url)) { result.parse_error = AdsResponseParse( context, envoy_config_route_v3_RouteConfiguration_parse, RdsResourceName, IsRds, MaybeLogRouteConfiguration, RouteConfigParse, response, "RDS", expected_route_configuration_names, &result.rds_update_map, &result.resource_names_failed); - if (result.parse_error != GRPC_ERROR_NONE) { - MoveUpdatesToFailedSet(&result.rds_update_map, - &result.resource_names_failed); - } } else if (IsCds(result.type_url)) { result.parse_error = AdsResponseParse( context, envoy_config_cluster_v3_Cluster_parse, CdsResourceName, IsCds, MaybeLogCluster, CdsResourceParse, response, "CDS", expected_cluster_names, &result.cds_update_map, &result.resource_names_failed); - if (result.parse_error != GRPC_ERROR_NONE) { - MoveUpdatesToFailedSet(&result.cds_update_map, - &result.resource_names_failed); - } } else if (IsEds(result.type_url)) { result.parse_error = AdsResponseParse( context, envoy_config_endpoint_v3_ClusterLoadAssignment_parse, EdsResourceName, IsEds, MaybeLogClusterLoadAssignment, EdsResourceParse, response, "EDS", expected_eds_service_names, &result.eds_update_map, &result.resource_names_failed); - if (result.parse_error != GRPC_ERROR_NONE) { - MoveUpdatesToFailedSet(&result.eds_update_map, - &result.resource_names_failed); - } } return result; } diff --git a/src/core/ext/xds/xds_client.cc b/src/core/ext/xds/xds_client.cc index 4715a231d6925..ed50b7c8732d7 100644 --- a/src/core/ext/xds/xds_client.cc +++ b/src/core/ext/xds/xds_client.cc @@ -263,13 +263,15 @@ class XdsClient::ChannelState::AdsCallState ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); void AcceptLdsUpdateLocked(std::string version, grpc_millis update_time, - XdsApi::LdsUpdateMap lds_update_map) + XdsApi::LdsUpdateMap lds_update_map, + const std::set& resource_names_failed) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); void AcceptRdsUpdateLocked(std::string version, grpc_millis update_time, XdsApi::RdsUpdateMap rds_update_map) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); void AcceptCdsUpdateLocked(std::string version, grpc_millis update_time, - XdsApi::CdsUpdateMap cds_update_map) + XdsApi::CdsUpdateMap cds_update_map, + const std::set& resource_names_failed) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); void AcceptEdsUpdateLocked(std::string version, grpc_millis update_time, XdsApi::EdsUpdateMap eds_update_map) @@ -904,7 +906,8 @@ XdsApi::ResourceMetadata CreateResourceMetadataAcked( void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked( std::string version, grpc_millis update_time, - XdsApi::LdsUpdateMap lds_update_map) { + XdsApi::LdsUpdateMap lds_update_map, + const std::set& resource_names_failed) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] LDS update received containing %" PRIuPTR @@ -948,6 +951,21 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked( p.first->OnListenerChanged(*listener_state.update); } } + // For invalid resources in the update, if they are already in the + // cache, pretend that they are present in the update, so that we + // don't incorrectly consider them deleted below. + for (const std::string& listener_name : resource_names_failed) { + auto it = xds_client()->listener_map_.find(listener_name); + if (it != xds_client()->listener_map_.end()) { + auto& resource = it->second.update; + if (!resource.has_value()) continue; + lds_update_map[listener_name]; + if (!resource->http_connection_manager.route_config_name.empty()) { + rds_resource_names_seen.insert( + resource->http_connection_manager.route_config_name); + } + } + } // For any subscribed resource that is not present in the update, // remove it from the cache and notify watchers that it does not exist. for (const auto& p : lds_state.subscribed_resources) { @@ -1030,7 +1048,8 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdateLocked( void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdateLocked( std::string version, grpc_millis update_time, - XdsApi::CdsUpdateMap cds_update_map) { + XdsApi::CdsUpdateMap cds_update_map, + const std::set& resource_names_failed) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] CDS update received containing %" PRIuPTR @@ -1072,6 +1091,20 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdateLocked( p.first->OnClusterChanged(cluster_state.update.value()); } } + // For invalid resources in the update, if they are already in the + // cache, pretend that they are present in the update, so that we + // don't incorrectly consider them deleted below. + for (const std::string& cluster_name : resource_names_failed) { + auto it = xds_client()->cluster_map_.find(cluster_name); + if (it != xds_client()->cluster_map_.end()) { + auto& resource = it->second.update; + if (!resource.has_value()) continue; + cds_update_map[cluster_name]; + eds_resource_names_seen.insert(resource->eds_service_name.empty() + ? cluster_name + : resource->eds_service_name); + } + } // For any subscribed resource that is not present in the update, // remove it from the cache and notify watchers that it does not exist. for (const auto& p : cds_state.subscribed_resources) { @@ -1173,7 +1206,7 @@ void XdsClient::ChannelState::AdsCallState::RejectAdsUpdateLocked( if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] %s update NACKed containing %" PRIuPTR - " resources", + " invalid resources", xds_client(), result.type_url.c_str(), result.resource_names_failed.size()); } @@ -1268,9 +1301,8 @@ bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() { // Update nonce. auto& state = state_map_[result.type_url]; state.nonce = std::move(result.nonce); - // NACK or ACK the response. + // If we got an error, we'll NACK the update. if (result.parse_error != GRPC_ERROR_NONE) { - // NACK unacceptable update. gpr_log(GPR_ERROR, "[xds_client %p] ADS response invalid for resource type %s " "version %s, will NACK: nonce=%s error=%s", @@ -1294,27 +1326,32 @@ bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() { RejectAdsUpdateLocked(update_time, result, &xds_client()->endpoint_map_); } - SendMessageLocked(result.type_url); - } else { + } + // Process any valid resources. + bool have_valid_resources = false; + if (result.type_url == XdsApi::kLdsTypeUrl) { + have_valid_resources = !result.lds_update_map.empty(); + AcceptLdsUpdateLocked(result.version, update_time, + std::move(result.lds_update_map), + std::move(result.resource_names_failed)); + } else if (result.type_url == XdsApi::kRdsTypeUrl) { + have_valid_resources = !result.rds_update_map.empty(); + AcceptRdsUpdateLocked(result.version, update_time, + std::move(result.rds_update_map)); + } else if (result.type_url == XdsApi::kCdsTypeUrl) { + have_valid_resources = !result.cds_update_map.empty(); + AcceptCdsUpdateLocked(result.version, update_time, + std::move(result.cds_update_map), + std::move(result.resource_names_failed)); + } else if (result.type_url == XdsApi::kEdsTypeUrl) { + have_valid_resources = !result.eds_update_map.empty(); + AcceptEdsUpdateLocked(result.version, update_time, + std::move(result.eds_update_map)); + } + if (have_valid_resources) { seen_response_ = true; - // Accept the ADS response according to the type_url. - if (result.type_url == XdsApi::kLdsTypeUrl) { - AcceptLdsUpdateLocked(result.version, update_time, - std::move(result.lds_update_map)); - } else if (result.type_url == XdsApi::kRdsTypeUrl) { - AcceptRdsUpdateLocked(result.version, update_time, - std::move(result.rds_update_map)); - } else if (result.type_url == XdsApi::kCdsTypeUrl) { - AcceptCdsUpdateLocked(result.version, update_time, - std::move(result.cds_update_map)); - } else if (result.type_url == XdsApi::kEdsTypeUrl) { - AcceptEdsUpdateLocked(result.version, update_time, - std::move(result.eds_update_map)); - } xds_client()->resource_version_map_[result.type_url] = std::move(result.version); - // ACK the update. - SendMessageLocked(result.type_url); // Start load reporting if needed. auto& lrs_call = chand()->lrs_calld_; if (lrs_call != nullptr) { @@ -1322,6 +1359,8 @@ bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() { if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked(); } } + // Send ACK or NACK. + SendMessageLocked(result.type_url); } if (xds_client()->shutting_down_) return true; // Keep listening for updates. diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index 345c1a04be44f..107154b3f27c9 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -3389,44 +3389,98 @@ TEST_P(GlobalXdsClientTest, MultipleChannelsShareXdsClient) { // Tests that the NACK for multiple bad LDS resources includes both errors. TEST_P(GlobalXdsClientTest, MultipleBadResources) { constexpr char kServerName2[] = "server.other.com"; + constexpr char kServerName3[] = "server.another.com"; auto listener = default_listener_; listener.clear_api_listener(); balancers_[0]->ads_service()->SetLdsResource(listener); listener.set_name(kServerName2); balancers_[0]->ads_service()->SetLdsResource(listener); + listener = default_listener_; + listener.set_name(kServerName3); + SetListenerAndRouteConfiguration(0, listener, default_route_config_); + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", CreateEndpointsForBackends(0, 1)}, + }); + balancers_[0]->ads_service()->SetEdsResource(BuildEdsResource(args)); SetNextResolutionForLbChannelAllBalancers(); CheckRpcSendFailure(); // Need to create a second channel to subscribe to a second LDS resource. auto channel2 = CreateChannel(0, kServerName2); auto stub2 = grpc::testing::EchoTestService::NewStub(channel2); - ClientContext context; - EchoRequest request; - request.set_message(kRequestMessage); - EchoResponse response; - grpc::Status status = stub2->Echo(&context, request, &response); - EXPECT_FALSE(status.ok()); - // Wait for second NACK to be reported to xDS server. + { + ClientContext context; + EchoRequest request; + request.set_message(kRequestMessage); + EchoResponse response; + grpc::Status status = stub2->Echo(&context, request, &response); + EXPECT_FALSE(status.ok()); + // Wait for second NACK to be reported to xDS server. + auto deadline = absl::Now() + absl::Seconds(30); + bool timed_out = false; + CheckRpcSendFailure( + CheckRpcSendFailureOptions().set_continue_predicate([&](size_t) { + if (absl::Now() >= deadline) { + timed_out = true; + return false; + } + const auto response_state = + balancers_[0]->ads_service()->lds_response_state(); + return response_state.state != + AdsServiceImpl::ResponseState::NACKED || + ::testing::Matches(::testing::ContainsRegex(absl::StrCat( + kServerName, + ": validation error.*" + "Listener has neither address nor ApiListener.*", + kServerName2, + ": validation error.*" + "Listener has neither address nor ApiListener")))( + response_state.error_message); + })); + ASSERT_FALSE(timed_out); + } + // Now start a new channel with a third server name, this one with a + // valid resource. + auto channel3 = CreateChannel(0, kServerName3); + auto stub3 = grpc::testing::EchoTestService::NewStub(channel3); + { + ClientContext context; + EchoRequest request; + request.set_message(kRequestMessage); + EchoResponse response; + grpc::Status status = stub3->Echo(&context, request, &response); + EXPECT_TRUE(status.ok()) << "code=" << status.error_code() + << " message=" << status.error_message(); + } +} + +// Tests that we don't trigger does-not-exist callbacks for a resource +// that was previously valid but is updated to be invalid. +TEST_P(GlobalXdsClientTest, InvalidListenerStillExistsIfPreviouslyCached) { + // Set up valid resources and check that the channel works. + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", CreateEndpointsForBackends(0, 1)}, + }); + balancers_[0]->ads_service()->SetEdsResource(BuildEdsResource(args)); + SetNextResolutionForLbChannelAllBalancers(); + CheckRpcSendOk(); + // Now send an update changing the Listener to be invalid. + auto listener = default_listener_; + listener.clear_api_listener(); + balancers_[0]->ads_service()->SetLdsResource(listener); + // Wait for xDS server to see NACK. auto deadline = absl::Now() + absl::Seconds(30); - bool timed_out = false; - CheckRpcSendFailure( - CheckRpcSendFailureOptions().set_continue_predicate([&](size_t) { - if (absl::Now() >= deadline) { - timed_out = true; - return false; - } - const auto response_state = - balancers_[0]->ads_service()->lds_response_state(); - return response_state.state != AdsServiceImpl::ResponseState::NACKED || - ::testing::Matches(::testing::ContainsRegex(absl::StrCat( - kServerName, - ": validation error.*" - "Listener has neither address nor ApiListener.*", - kServerName2, - ": validation error.*" - "Listener has neither address nor ApiListener")))( - response_state.error_message); - })); - ASSERT_FALSE(timed_out); + do { + CheckRpcSendOk(); + ASSERT_LT(absl::Now(), deadline); + } while (balancers_[0]->ads_service()->lds_response_state().state != + AdsServiceImpl::ResponseState::NACKED); + EXPECT_THAT(balancers_[0]->ads_service()->lds_response_state().error_message, + ::testing::ContainsRegex(absl::StrCat( + kServerName, + ": validation error.*" + "Listener has neither address nor ApiListener"))); + // Check one more time, just to make sure it still works after NACK. + CheckRpcSendOk(); } class XdsResolverLoadReportingOnlyTest : public XdsEnd2endTest { @@ -7233,19 +7287,42 @@ TEST_P(CdsTest, UnsupportedClusterType) { // Tests that the NACK for multiple bad resources includes both errors. TEST_P(CdsTest, MultipleBadResources) { constexpr char kClusterName2[] = "cluster_name_2"; - // Use unsupported type for default cluster. + constexpr char kClusterName3[] = "cluster_name_3"; + // Add cluster with unsupported type. auto cluster = default_cluster_; + cluster.set_name(kClusterName2); cluster.set_type(Cluster::STATIC); balancers_[0]->ads_service()->SetCdsResource(cluster); // Add second cluster with the same error. - cluster.set_name(kClusterName2); + cluster.set_name(kClusterName3); balancers_[0]->ads_service()->SetCdsResource(cluster); - // Change RouteConfig to point to both clusters. + // Change RouteConfig to point to all clusters. RouteConfiguration route_config = default_route_config_; + route_config.mutable_virtual_hosts(0)->clear_routes(); + // First route: default cluster, selected based on header. auto* route = route_config.mutable_virtual_hosts(0)->add_routes(); route->mutable_match()->set_prefix(""); + auto* header_matcher = route->mutable_match()->add_headers(); + header_matcher->set_name("cluster"); + header_matcher->set_exact_match(kDefaultClusterName); + route->mutable_route()->set_cluster(kDefaultClusterName); + // Second route: cluster 2, selected based on header. + route = route_config.mutable_virtual_hosts(0)->add_routes(); + route->mutable_match()->set_prefix(""); + header_matcher = route->mutable_match()->add_headers(); + header_matcher->set_name("cluster"); + header_matcher->set_exact_match(kClusterName2); route->mutable_route()->set_cluster(kClusterName2); + // Third route: cluster 3, used by default. + route = route_config.mutable_virtual_hosts(0)->add_routes(); + route->mutable_match()->set_prefix(""); + route->mutable_route()->set_cluster(kClusterName3); SetRouteConfiguration(0, route_config); + // Add EDS resource. + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", CreateEndpointsForBackends(0, 1)}, + }); + balancers_[0]->ads_service()->SetEdsResource(BuildEdsResource(args)); // Send RPC. SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); @@ -7255,12 +7332,54 @@ TEST_P(CdsTest, MultipleBadResources) { EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED); EXPECT_THAT( response_state.error_message, - ::testing::ContainsRegex(absl::StrCat(kDefaultClusterName, + ::testing::ContainsRegex(absl::StrCat(kClusterName2, ": validation error.*" "DiscoveryType is not valid.*", - kClusterName2, + kClusterName3, ": validation error.*" "DiscoveryType is not valid"))); + // RPCs for default cluster should succeed. + std::vector> metadata_default_cluster = { + {"cluster", kDefaultClusterName}, + }; + CheckRpcSendOk( + 1, RpcOptions().set_metadata(std::move(metadata_default_cluster))); + // RPCs for cluster 2 should fail. + std::vector> metadata_cluster_2 = { + {"cluster", kClusterName2}, + }; + CheckRpcSendFailure(CheckRpcSendFailureOptions().set_rpc_options( + RpcOptions().set_metadata(std::move(metadata_default_cluster)))); +} + +// Tests that we don't trigger does-not-exist callbacks for a resource +// that was previously valid but is updated to be invalid. +TEST_P(CdsTest, InvalidClusterStillExistsIfPreviouslyCached) { + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", CreateEndpointsForBackends(0, 1)}, + }); + balancers_[0]->ads_service()->SetEdsResource(BuildEdsResource(args)); + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + // Check that everything works. + CheckRpcSendOk(); + // Now send an update changing the Cluster to be invalid. + auto cluster = default_cluster_; + cluster.set_type(Cluster::STATIC); + balancers_[0]->ads_service()->SetCdsResource(cluster); + // Wait for xDS server to see NACK. + auto deadline = absl::Now() + absl::Seconds(30); + do { + CheckRpcSendOk(); + ASSERT_LT(absl::Now(), deadline); + } while (balancers_[0]->ads_service()->cds_response_state().state != + AdsServiceImpl::ResponseState::NACKED); + EXPECT_THAT(balancers_[0]->ads_service()->cds_response_state().error_message, + ::testing::ContainsRegex(absl::StrCat( + kDefaultClusterName, + ": validation error.*DiscoveryType is not valid"))); + // Check one more time, just to make sure it still works after NACK. + CheckRpcSendOk(); } // Tests that CDS client should send a NACK if the eds_config in CDS response From 3acbe69799277bd09422a125461f04748e9a4c52 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 8 Sep 2021 08:01:47 -0700 Subject: [PATCH 4/5] fix clang-tidy --- src/core/ext/xds/xds_client.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/ext/xds/xds_client.cc b/src/core/ext/xds/xds_client.cc index ed50b7c8732d7..67d3067b3f7b5 100644 --- a/src/core/ext/xds/xds_client.cc +++ b/src/core/ext/xds/xds_client.cc @@ -1333,7 +1333,7 @@ bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() { have_valid_resources = !result.lds_update_map.empty(); AcceptLdsUpdateLocked(result.version, update_time, std::move(result.lds_update_map), - std::move(result.resource_names_failed)); + result.resource_names_failed); } else if (result.type_url == XdsApi::kRdsTypeUrl) { have_valid_resources = !result.rds_update_map.empty(); AcceptRdsUpdateLocked(result.version, update_time, @@ -1342,7 +1342,7 @@ bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() { have_valid_resources = !result.cds_update_map.empty(); AcceptCdsUpdateLocked(result.version, update_time, std::move(result.cds_update_map), - std::move(result.resource_names_failed)); + result.resource_names_failed); } else if (result.type_url == XdsApi::kEdsTypeUrl) { have_valid_resources = !result.eds_update_map.empty(); AcceptEdsUpdateLocked(result.version, update_time, From 855509498dade1f819276fc909db8a48f322f40e Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 8 Sep 2021 09:59:06 -0700 Subject: [PATCH 5/5] fix test --- test/cpp/end2end/xds_end2end_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index 107154b3f27c9..7433e156e08a7 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -7349,7 +7349,7 @@ TEST_P(CdsTest, MultipleBadResources) { {"cluster", kClusterName2}, }; CheckRpcSendFailure(CheckRpcSendFailureOptions().set_rpc_options( - RpcOptions().set_metadata(std::move(metadata_default_cluster)))); + RpcOptions().set_metadata(std::move(metadata_cluster_2)))); } // Tests that we don't trigger does-not-exist callbacks for a resource