Skip to content

Commit

Permalink
Revert "Revert "client_channel: allow LB policy to communicate update…
Browse files Browse the repository at this point in the history
… errors to resolver (grpc#30809)" (grpc#30970)"

This reverts commit 1648bc0.
  • Loading branch information
markdroth committed Sep 14, 2022
1 parent 26707b3 commit 24e1883
Show file tree
Hide file tree
Showing 25 changed files with 383 additions and 147 deletions.
1 change: 1 addition & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -5457,6 +5457,7 @@ grpc_cc_library(
"gpr",
"grpc_base",
"grpc_resolver",
"grpc_service_config",
"grpc_trace",
"iomgr_fwd",
"iomgr_timer",
Expand Down
15 changes: 12 additions & 3 deletions src/core/ext/filters/client_channel/client_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,9 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
gpr_log(GPR_INFO, "chand=%p: got resolver result", this);
}
// Grab resolver result health callback.
auto resolver_callback = std::move(result.result_health_callback);
absl::Status resolver_result_status;
// We only want to trace the address resolution in the follow cases:
// (a) Address resolution resulted in service config change.
// (b) Address resolution that causes number of backends to go from
Expand Down Expand Up @@ -1222,6 +1225,8 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) {
// TRANSIENT_FAILURE.
OnResolverErrorLocked(result.service_config.status());
trace_strings.push_back("no valid service config");
resolver_result_status =
absl::UnavailableError("no valid service config");
}
} else if (*result.service_config == nullptr) {
// Resolver did not return any service config.
Expand Down Expand Up @@ -1266,7 +1271,7 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) {
gpr_log(GPR_INFO, "chand=%p: service config not changed", this);
}
// Create or update LB policy, as needed.
CreateOrUpdateLbPolicyLocked(
resolver_result_status = CreateOrUpdateLbPolicyLocked(
std::move(lb_policy_config),
parsed_service_config->health_check_service_name(), std::move(result));
if (service_config_changed || config_selector_changed) {
Expand All @@ -1280,6 +1285,10 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) {
trace_strings.push_back("Service config changed");
}
}
// Invoke resolver callback if needed.
if (resolver_callback != nullptr) {
resolver_callback(std::move(resolver_result_status));
}
// Add channel trace event.
if (!trace_strings.empty()) {
std::string message =
Expand Down Expand Up @@ -1326,7 +1335,7 @@ void ClientChannel::OnResolverErrorLocked(absl::Status status) {
}
}

void ClientChannel::CreateOrUpdateLbPolicyLocked(
absl::Status ClientChannel::CreateOrUpdateLbPolicyLocked(
RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
const absl::optional<std::string>& health_check_service_name,
Resolver::Result result) {
Expand All @@ -1353,7 +1362,7 @@ void ClientChannel::CreateOrUpdateLbPolicyLocked(
gpr_log(GPR_INFO, "chand=%p: Updating child policy %p", this,
lb_policy_.get());
}
lb_policy_->UpdateLocked(std::move(update_args));
return lb_policy_->UpdateLocked(std::move(update_args));
}

// Creates a new LB policy.
Expand Down
2 changes: 1 addition & 1 deletion src/core/ext/filters/client_channel/client_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ class ClientChannel {
void OnResolverErrorLocked(absl::Status status)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);

void CreateOrUpdateLbPolicyLocked(
absl::Status CreateOrUpdateLbPolicyLocked(
RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
const absl::optional<std::string>& health_check_service_name,
Resolver::Result result) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ void ChildPolicyHandler::ShutdownLocked() {
}
}

void ChildPolicyHandler::UpdateLocked(UpdateArgs args) {
absl::Status ChildPolicyHandler::UpdateLocked(UpdateArgs args) {
// If the child policy name changes, we need to create a new child
// policy. When this happens, we leave child_policy_ as-is and store
// the new child policy in pending_child_policy_. Once the new child
Expand Down Expand Up @@ -253,7 +253,7 @@ void ChildPolicyHandler::UpdateLocked(UpdateArgs args) {
policy_to_update == pending_child_policy_.get() ? "pending " : "",
policy_to_update);
}
policy_to_update->UpdateLocked(std::move(args));
return policy_to_update->UpdateLocked(std::move(args));
}

void ChildPolicyHandler::ExitIdleLocked() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <utility>

#include "absl/status/status.h"
#include "absl/strings/string_view.h"

#include "src/core/lib/channel/channel_args.h"
Expand All @@ -43,7 +44,7 @@ class ChildPolicyHandler : public LoadBalancingPolicy {

absl::string_view name() const override { return "child_policy_handler"; }

void UpdateLocked(UpdateArgs args) override;
absl::Status UpdateLocked(UpdateArgs args) override;
void ExitIdleLocked() override;
void ResetBackoffLocked() override;

Expand Down
24 changes: 17 additions & 7 deletions src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ class GrpcLb : public LoadBalancingPolicy {

absl::string_view name() const override { return kGrpclb; }

void UpdateLocked(UpdateArgs args) override;
absl::Status UpdateLocked(UpdateArgs args) override;
void ResetBackoffLocked() override;

private:
Expand Down Expand Up @@ -473,7 +473,7 @@ class GrpcLb : public LoadBalancingPolicy {
void ShutdownLocked() override;

// Helper functions used in UpdateLocked().
void UpdateBalancerChannelLocked(const ChannelArgs& args);
absl::Status UpdateBalancerChannelLocked(const ChannelArgs& args);

void CancelBalancerChannelConnectivityWatchLocked();

Expand Down Expand Up @@ -1524,7 +1524,7 @@ void GrpcLb::ResetBackoffLocked() {
}
}

void GrpcLb::UpdateLocked(UpdateArgs args) {
absl::Status GrpcLb::UpdateLocked(UpdateArgs args) {
const bool is_initial_update = lb_channel_ == nullptr;
config_ = args.config;
GPR_ASSERT(config_ != nullptr);
Expand All @@ -1540,7 +1540,7 @@ void GrpcLb::UpdateLocked(UpdateArgs args) {
}
resolution_note_ = std::move(args.resolution_note);
// Update balancer channel.
UpdateBalancerChannelLocked(args.args);
absl::Status status = UpdateBalancerChannelLocked(args.args);
// Update the existing child policy, if any.
if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked();
// If this is the initial update, start the fallback-at-startup checks
Expand All @@ -1565,18 +1565,24 @@ void GrpcLb::UpdateLocked(UpdateArgs args) {
// Start balancer call.
StartBalancerCallLocked();
}
return status;
}

//
// helpers for UpdateLocked()
//

void GrpcLb::UpdateBalancerChannelLocked(const ChannelArgs& args) {
absl::Status GrpcLb::UpdateBalancerChannelLocked(const ChannelArgs& args) {
// Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
// since we use this to trigger the client_load_reporting filter.
args_ = args.Set(GRPC_ARG_LB_POLICY_NAME, "grpclb");
// Construct args for balancer channel.
// Get balancer addresses.
ServerAddressList balancer_addresses = ExtractBalancerAddresses(args);
absl::Status status;
if (balancer_addresses.empty()) {
status = absl::UnavailableError("balancer address list must be non-empty");
}
// Construct args for balancer channel.
ChannelArgs lb_channel_args =
BuildBalancerChannelArgs(response_generator_.get(), args);
// Create balancer channel if needed.
Expand Down Expand Up @@ -1604,6 +1610,8 @@ void GrpcLb::UpdateBalancerChannelLocked(const ChannelArgs& args) {
result.addresses = std::move(balancer_addresses);
result.args = lb_channel_args;
response_generator_->SetResponse(std::move(result));
// Return status.
return status;
}

void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() {
Expand Down Expand Up @@ -1794,7 +1802,9 @@ void GrpcLb::CreateOrUpdateChildPolicyLocked() {
gpr_log(GPR_INFO, "[grpclb %p] Updating child policy handler %p", this,
child_policy_.get());
}
child_policy_->UpdateLocked(std::move(update_args));
// TODO(roth): If we're in fallback mode and the child policy rejects the
// update, we should propagate that failure back to the resolver somehow.
(void)child_policy_->UpdateLocked(std::move(update_args));
}

//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class OutlierDetectionLb : public LoadBalancingPolicy {

absl::string_view name() const override { return kOutlierDetection; }

void UpdateLocked(UpdateArgs args) override;
absl::Status UpdateLocked(UpdateArgs args) override;
void ExitIdleLocked() override;
void ResetBackoffLocked() override;

Expand Down Expand Up @@ -595,7 +595,7 @@ void OutlierDetectionLb::ResetBackoffLocked() {
if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
}

void OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO, "[outlier_detection_lb %p] Received update", this);
}
Expand Down Expand Up @@ -692,7 +692,7 @@ void OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
"[outlier_detection_lb %p] Updating child policy handler %p", this,
child_policy_.get());
}
child_policy_->UpdateLocked(std::move(update_args));
return child_policy_->UpdateLocked(std::move(update_args));
}

void OutlierDetectionLb::MaybeUpdatePickerLocked() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class PickFirst : public LoadBalancingPolicy {

absl::string_view name() const override { return kPickFirst; }

void UpdateLocked(UpdateArgs args) override;
absl::Status UpdateLocked(UpdateArgs args) override;
void ExitIdleLocked() override;
void ResetBackoffLocked() override;

Expand Down Expand Up @@ -232,7 +232,7 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
this, std::move(addresses), latest_update_args_.args);
latest_pending_subchannel_list_->StartWatchingLocked();
// Empty update or no valid subchannels. Put the channel in
// TRANSIENT_FAILURE.
// TRANSIENT_FAILURE and request re-resolution.
if (latest_pending_subchannel_list_->num_subchannels() == 0) {
absl::Status status =
latest_update_args_.addresses.ok()
Expand All @@ -242,6 +242,7 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status));
channel_control_helper()->RequestReresolution();
}
// Otherwise, if this is the initial update, report CONNECTING.
else if (subchannel_list_.get() == nullptr) {
Expand All @@ -263,7 +264,7 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
}
}

void PickFirst::UpdateLocked(UpdateArgs args) {
absl::Status PickFirst::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
if (args.addresses.ok()) {
gpr_log(GPR_INFO,
Expand All @@ -276,6 +277,13 @@ void PickFirst::UpdateLocked(UpdateArgs args) {
}
// Add GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg.
args.args = args.args.Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, 1);
// Set return status based on the address list.
absl::Status status;
if (!args.addresses.ok()) {
status = args.addresses.status();
} else if (args.addresses->empty()) {
status = absl::UnavailableError("address list must not be empty");
}
// If the update contains a resolver error and we have a previous update
// that was not a resolver error, keep using the previous addresses.
if (!args.addresses.ok() && latest_update_args_.config != nullptr) {
Expand All @@ -288,6 +296,7 @@ void PickFirst::UpdateLocked(UpdateArgs args) {
if (!idle_) {
AttemptToConnectUsingLatestUpdateArgsLocked();
}
return status;
}

void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
Expand Down
37 changes: 26 additions & 11 deletions src/core/ext/filters/client_channel/lb_policy/priority/priority.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class PriorityLb : public LoadBalancingPolicy {

absl::string_view name() const override { return kPriority; }

void UpdateLocked(UpdateArgs args) override;
absl::Status UpdateLocked(UpdateArgs args) override;
void ExitIdleLocked() override;
void ResetBackoffLocked() override;

Expand All @@ -126,8 +126,8 @@ class PriorityLb : public LoadBalancingPolicy {

const std::string& name() const { return name_; }

void UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,
bool ignore_reresolution_requests);
absl::Status UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,
bool ignore_reresolution_requests);
void ExitIdleLocked();
void ResetBackoffLocked();
void MaybeDeactivateLocked();
Expand Down Expand Up @@ -344,7 +344,7 @@ void PriorityLb::ResetBackoffLocked() {
for (const auto& p : children_) p.second->ResetBackoffLocked();
}

void PriorityLb::UpdateLocked(UpdateArgs args) {
absl::Status PriorityLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO, "[priority_lb %p] received update", this);
}
Expand All @@ -357,6 +357,7 @@ void PriorityLb::UpdateLocked(UpdateArgs args) {
resolution_note_ = std::move(args.resolution_note);
// Check all existing children against the new config.
update_in_progress_ = true;
std::vector<std::string> errors;
for (const auto& p : children_) {
const std::string& child_name = p.first;
auto& child = p.second;
Expand All @@ -366,13 +367,24 @@ void PriorityLb::UpdateLocked(UpdateArgs args) {
child->MaybeDeactivateLocked();
} else {
// Existing child found in new config. Update it.
child->UpdateLocked(config_it->second.config,
config_it->second.ignore_reresolution_requests);
absl::Status status =
child->UpdateLocked(config_it->second.config,
config_it->second.ignore_reresolution_requests);
if (!status.ok()) {
errors.emplace_back(
absl::StrCat("child ", child_name, ": ", status.ToString()));
}
}
}
update_in_progress_ = false;
// Try to get connected.
ChoosePriorityLocked();
// Return status.
if (!errors.empty()) {
return absl::UnavailableError(absl::StrCat(
"errors from children: [", absl::StrJoin(errors, "; "), "]"));
}
return absl::OkStatus();
}

uint32_t PriorityLb::GetChildPriorityLocked(
Expand Down Expand Up @@ -416,8 +428,11 @@ void PriorityLb::ChoosePriorityLocked() {
Ref(DEBUG_LOCATION, "ChildPriority"), child_name);
auto child_config = config_->children().find(child_name);
GPR_DEBUG_ASSERT(child_config != config_->children().end());
child->UpdateLocked(child_config->second.config,
child_config->second.ignore_reresolution_requests);
// TODO(roth): If the child reports a non-OK status with the
// update, we need to propagate that back to the resolver somehow.
(void)child->UpdateLocked(
child_config->second.config,
child_config->second.ignore_reresolution_requests);
} else {
// The child already exists. Reactivate if needed.
child->MaybeReactivateLocked();
Expand Down Expand Up @@ -668,10 +683,10 @@ PriorityLb::ChildPriority::GetPicker() {
return absl::make_unique<RefCountedPickerWrapper>(picker_wrapper_);
}

void PriorityLb::ChildPriority::UpdateLocked(
absl::Status PriorityLb::ChildPriority::UpdateLocked(
RefCountedPtr<LoadBalancingPolicy::Config> config,
bool ignore_reresolution_requests) {
if (priority_policy_->shutting_down_) return;
if (priority_policy_->shutting_down_) return absl::OkStatus();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): start update",
priority_policy_.get(), name_.c_str(), this);
Expand All @@ -697,7 +712,7 @@ void PriorityLb::ChildPriority::UpdateLocked(
"[priority_lb %p] child %s (%p): updating child policy handler %p",
priority_policy_.get(), name_.c_str(), this, child_policy_.get());
}
child_policy_->UpdateLocked(std::move(update_args));
return child_policy_->UpdateLocked(std::move(update_args));
}

OrphanablePtr<LoadBalancingPolicy>
Expand Down

0 comments on commit 24e1883

Please sign in to comment.