Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client_channel: allow LB policy to communicate update errors to resolver #30809

Merged
merged 16 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -5441,6 +5441,7 @@ grpc_cc_library(
"gpr",
"grpc_base",
"grpc_resolver",
"grpc_service_config",
"grpc_trace",
"iomgr_fwd",
"iomgr_timer",
Expand Down
15 changes: 12 additions & 3 deletions src/core/ext/filters/client_channel/client_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,9 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
gpr_log(GPR_INFO, "chand=%p: got resolver result", this);
}
// Grab resolver result health callback.
auto resolver_callback = std::move(result.result_health_callback);
absl::Status resolver_result_status;
// We only want to trace the address resolution in the follow cases:
// (a) Address resolution resulted in service config change.
// (b) Address resolution that causes number of backends to go from
Expand Down Expand Up @@ -1222,6 +1225,8 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) {
// TRANSIENT_FAILURE.
OnResolverErrorLocked(result.service_config.status());
trace_strings.push_back("no valid service config");
resolver_result_status =
absl::UnavailableError("no valid service config");
}
} else if (*result.service_config == nullptr) {
// Resolver did not return any service config.
Expand Down Expand Up @@ -1266,7 +1271,7 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) {
gpr_log(GPR_INFO, "chand=%p: service config not changed", this);
}
// Create or update LB policy, as needed.
CreateOrUpdateLbPolicyLocked(
resolver_result_status = CreateOrUpdateLbPolicyLocked(
std::move(lb_policy_config),
parsed_service_config->health_check_service_name(), std::move(result));
if (service_config_changed || config_selector_changed) {
Expand All @@ -1280,6 +1285,10 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) {
trace_strings.push_back("Service config changed");
}
}
// Invoke resolver callback if needed.
if (resolver_callback != nullptr) {
resolver_callback(std::move(resolver_result_status));
}
// Add channel trace event.
if (!trace_strings.empty()) {
std::string message =
Expand Down Expand Up @@ -1326,7 +1335,7 @@ void ClientChannel::OnResolverErrorLocked(absl::Status status) {
}
}

void ClientChannel::CreateOrUpdateLbPolicyLocked(
absl::Status ClientChannel::CreateOrUpdateLbPolicyLocked(
RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
const absl::optional<std::string>& health_check_service_name,
Resolver::Result result) {
Expand All @@ -1353,7 +1362,7 @@ void ClientChannel::CreateOrUpdateLbPolicyLocked(
gpr_log(GPR_INFO, "chand=%p: Updating child policy %p", this,
lb_policy_.get());
}
lb_policy_->UpdateLocked(std::move(update_args));
return lb_policy_->UpdateLocked(std::move(update_args));
}

// Creates a new LB policy.
Expand Down
2 changes: 1 addition & 1 deletion src/core/ext/filters/client_channel/client_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ class ClientChannel {
void OnResolverErrorLocked(absl::Status status)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);

void CreateOrUpdateLbPolicyLocked(
absl::Status CreateOrUpdateLbPolicyLocked(
RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
const absl::optional<std::string>& health_check_service_name,
Resolver::Result result) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ void ChildPolicyHandler::ShutdownLocked() {
}
}

void ChildPolicyHandler::UpdateLocked(UpdateArgs args) {
absl::Status ChildPolicyHandler::UpdateLocked(UpdateArgs args) {
// If the child policy name changes, we need to create a new child
// policy. When this happens, we leave child_policy_ as-is and store
// the new child policy in pending_child_policy_. Once the new child
Expand Down Expand Up @@ -253,7 +253,7 @@ void ChildPolicyHandler::UpdateLocked(UpdateArgs args) {
policy_to_update == pending_child_policy_.get() ? "pending " : "",
policy_to_update);
}
policy_to_update->UpdateLocked(std::move(args));
return policy_to_update->UpdateLocked(std::move(args));
}

void ChildPolicyHandler::ExitIdleLocked() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <utility>

#include "absl/status/status.h"
#include "absl/strings/string_view.h"

#include "src/core/lib/channel/channel_args.h"
Expand All @@ -43,7 +44,7 @@ class ChildPolicyHandler : public LoadBalancingPolicy {

absl::string_view name() const override { return "child_policy_handler"; }

void UpdateLocked(UpdateArgs args) override;
absl::Status UpdateLocked(UpdateArgs args) override;
void ExitIdleLocked() override;
void ResetBackoffLocked() override;

Expand Down
24 changes: 17 additions & 7 deletions src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ class GrpcLb : public LoadBalancingPolicy {

absl::string_view name() const override { return kGrpclb; }

void UpdateLocked(UpdateArgs args) override;
absl::Status UpdateLocked(UpdateArgs args) override;
void ResetBackoffLocked() override;

private:
Expand Down Expand Up @@ -473,7 +473,7 @@ class GrpcLb : public LoadBalancingPolicy {
void ShutdownLocked() override;

// Helper functions used in UpdateLocked().
void UpdateBalancerChannelLocked(const ChannelArgs& args);
absl::Status UpdateBalancerChannelLocked(const ChannelArgs& args);

void CancelBalancerChannelConnectivityWatchLocked();

Expand Down Expand Up @@ -1524,7 +1524,7 @@ void GrpcLb::ResetBackoffLocked() {
}
}

void GrpcLb::UpdateLocked(UpdateArgs args) {
absl::Status GrpcLb::UpdateLocked(UpdateArgs args) {
const bool is_initial_update = lb_channel_ == nullptr;
config_ = args.config;
GPR_ASSERT(config_ != nullptr);
Expand All @@ -1540,7 +1540,7 @@ void GrpcLb::UpdateLocked(UpdateArgs args) {
}
resolution_note_ = std::move(args.resolution_note);
// Update balancer channel.
UpdateBalancerChannelLocked(args.args);
absl::Status status = UpdateBalancerChannelLocked(args.args);
// Update the existing child policy, if any.
if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked();
// If this is the initial update, start the fallback-at-startup checks
Expand All @@ -1565,18 +1565,24 @@ void GrpcLb::UpdateLocked(UpdateArgs args) {
// Start balancer call.
StartBalancerCallLocked();
}
return status;
}

//
// helpers for UpdateLocked()
//

void GrpcLb::UpdateBalancerChannelLocked(const ChannelArgs& args) {
absl::Status GrpcLb::UpdateBalancerChannelLocked(const ChannelArgs& args) {
// Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
// since we use this to trigger the client_load_reporting filter.
args_ = args.Set(GRPC_ARG_LB_POLICY_NAME, "grpclb");
// Construct args for balancer channel.
// Get balancer addresses.
ServerAddressList balancer_addresses = ExtractBalancerAddresses(args);
absl::Status status;
if (balancer_addresses.empty()) {
status = absl::UnavailableError("balancer address list must be non-empty");
}
// Construct args for balancer channel.
ChannelArgs lb_channel_args =
BuildBalancerChannelArgs(response_generator_.get(), args);
// Create balancer channel if needed.
Expand Down Expand Up @@ -1604,6 +1610,8 @@ void GrpcLb::UpdateBalancerChannelLocked(const ChannelArgs& args) {
result.addresses = std::move(balancer_addresses);
result.args = lb_channel_args;
response_generator_->SetResponse(std::move(result));
// Return status.
return status;
}

void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() {
Expand Down Expand Up @@ -1794,7 +1802,9 @@ void GrpcLb::CreateOrUpdateChildPolicyLocked() {
gpr_log(GPR_INFO, "[grpclb %p] Updating child policy handler %p", this,
child_policy_.get());
}
child_policy_->UpdateLocked(std::move(update_args));
// TODO(roth): If we're in fallback mode and the child policy rejects the
// update, we should propagate that failure back to the resolver somehow.
(void)child_policy_->UpdateLocked(std::move(update_args));
}

//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class OutlierDetectionLb : public LoadBalancingPolicy {

absl::string_view name() const override { return kOutlierDetection; }

void UpdateLocked(UpdateArgs args) override;
absl::Status UpdateLocked(UpdateArgs args) override;
void ExitIdleLocked() override;
void ResetBackoffLocked() override;

Expand Down Expand Up @@ -595,7 +595,7 @@ void OutlierDetectionLb::ResetBackoffLocked() {
if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
}

void OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO, "[outlier_detection_lb %p] Received update", this);
}
Expand Down Expand Up @@ -692,7 +692,7 @@ void OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
"[outlier_detection_lb %p] Updating child policy handler %p", this,
child_policy_.get());
}
child_policy_->UpdateLocked(std::move(update_args));
return child_policy_->UpdateLocked(std::move(update_args));
}

void OutlierDetectionLb::MaybeUpdatePickerLocked() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class PickFirst : public LoadBalancingPolicy {

absl::string_view name() const override { return kPickFirst; }

void UpdateLocked(UpdateArgs args) override;
absl::Status UpdateLocked(UpdateArgs args) override;
void ExitIdleLocked() override;
void ResetBackoffLocked() override;

Expand Down Expand Up @@ -232,7 +232,7 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
this, std::move(addresses), latest_update_args_.args);
latest_pending_subchannel_list_->StartWatchingLocked();
// Empty update or no valid subchannels. Put the channel in
// TRANSIENT_FAILURE.
// TRANSIENT_FAILURE and request re-resolution.
if (latest_pending_subchannel_list_->num_subchannels() == 0) {
absl::Status status =
latest_update_args_.addresses.ok()
Expand All @@ -242,6 +242,7 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status));
channel_control_helper()->RequestReresolution();
}
// Otherwise, if this is the initial update, report CONNECTING.
else if (subchannel_list_.get() == nullptr) {
Expand All @@ -263,7 +264,7 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
}
}

void PickFirst::UpdateLocked(UpdateArgs args) {
absl::Status PickFirst::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
if (args.addresses.ok()) {
gpr_log(GPR_INFO,
Expand All @@ -276,6 +277,13 @@ void PickFirst::UpdateLocked(UpdateArgs args) {
}
// Add GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg.
args.args = args.args.Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, 1);
// Set return status based on the address list.
absl::Status status;
if (!args.addresses.ok()) {
status = args.addresses.status();
} else if (args.addresses->empty()) {
status = absl::UnavailableError("address list must not be empty");
}
// If the update contains a resolver error and we have a previous update
// that was not a resolver error, keep using the previous addresses.
if (!args.addresses.ok() && latest_update_args_.config != nullptr) {
Expand All @@ -288,6 +296,7 @@ void PickFirst::UpdateLocked(UpdateArgs args) {
if (!idle_) {
AttemptToConnectUsingLatestUpdateArgsLocked();
}
return status;
}

void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
Expand Down
37 changes: 26 additions & 11 deletions src/core/ext/filters/client_channel/lb_policy/priority/priority.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class PriorityLb : public LoadBalancingPolicy {

absl::string_view name() const override { return kPriority; }

void UpdateLocked(UpdateArgs args) override;
absl::Status UpdateLocked(UpdateArgs args) override;
void ExitIdleLocked() override;
void ResetBackoffLocked() override;

Expand All @@ -126,8 +126,8 @@ class PriorityLb : public LoadBalancingPolicy {

const std::string& name() const { return name_; }

void UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,
bool ignore_reresolution_requests);
absl::Status UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,
bool ignore_reresolution_requests);
void ExitIdleLocked();
void ResetBackoffLocked();
void MaybeDeactivateLocked();
Expand Down Expand Up @@ -344,7 +344,7 @@ void PriorityLb::ResetBackoffLocked() {
for (const auto& p : children_) p.second->ResetBackoffLocked();
}

void PriorityLb::UpdateLocked(UpdateArgs args) {
absl::Status PriorityLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO, "[priority_lb %p] received update", this);
}
Expand All @@ -357,6 +357,7 @@ void PriorityLb::UpdateLocked(UpdateArgs args) {
resolution_note_ = std::move(args.resolution_note);
// Check all existing children against the new config.
update_in_progress_ = true;
std::vector<std::string> errors;
for (const auto& p : children_) {
const std::string& child_name = p.first;
auto& child = p.second;
Expand All @@ -366,13 +367,24 @@ void PriorityLb::UpdateLocked(UpdateArgs args) {
child->MaybeDeactivateLocked();
} else {
// Existing child found in new config. Update it.
child->UpdateLocked(config_it->second.config,
config_it->second.ignore_reresolution_requests);
absl::Status status =
child->UpdateLocked(config_it->second.config,
config_it->second.ignore_reresolution_requests);
if (!status.ok()) {
errors.emplace_back(
absl::StrCat("child ", child_name, ": ", status.ToString()));
}
}
}
update_in_progress_ = false;
// Try to get connected.
ChoosePriorityLocked();
// Return status.
if (!errors.empty()) {
return absl::UnavailableError(absl::StrCat(
"errors from children: [", absl::StrJoin(errors, "; "), "]"));
}
return absl::OkStatus();
}

uint32_t PriorityLb::GetChildPriorityLocked(
Expand Down Expand Up @@ -416,8 +428,11 @@ void PriorityLb::ChoosePriorityLocked() {
Ref(DEBUG_LOCATION, "ChildPriority"), child_name);
auto child_config = config_->children().find(child_name);
GPR_DEBUG_ASSERT(child_config != config_->children().end());
child->UpdateLocked(child_config->second.config,
child_config->second.ignore_reresolution_requests);
// TODO(roth): If the child reports a non-OK status with the
// update, we need to propagate that back to the resolver somehow.
(void)child->UpdateLocked(
child_config->second.config,
child_config->second.ignore_reresolution_requests);
} else {
// The child already exists. Reactivate if needed.
child->MaybeReactivateLocked();
Expand Down Expand Up @@ -668,10 +683,10 @@ PriorityLb::ChildPriority::GetPicker() {
return absl::make_unique<RefCountedPickerWrapper>(picker_wrapper_);
}

void PriorityLb::ChildPriority::UpdateLocked(
absl::Status PriorityLb::ChildPriority::UpdateLocked(
RefCountedPtr<LoadBalancingPolicy::Config> config,
bool ignore_reresolution_requests) {
if (priority_policy_->shutting_down_) return;
if (priority_policy_->shutting_down_) return absl::OkStatus();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): start update",
priority_policy_.get(), name_.c_str(), this);
Expand All @@ -697,7 +712,7 @@ void PriorityLb::ChildPriority::UpdateLocked(
"[priority_lb %p] child %s (%p): updating child policy handler %p",
priority_policy_.get(), name_.c_str(), this, child_policy_.get());
}
child_policy_->UpdateLocked(std::move(update_args));
return child_policy_->UpdateLocked(std::move(update_args));
}

OrphanablePtr<LoadBalancingPolicy>
Expand Down