Skip to content

Commit

Permalink
xds: cluster manager to delay picker updates (#9365)
Browse files Browse the repository at this point in the history
Do not perform picker updates while handling new addresses even if child
LBs request it. Assure that a single picker update is done.
  • Loading branch information
temawi committed Jul 13, 2022
1 parent eb25807 commit 49f5551
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 15 deletions.
38 changes: 23 additions & 15 deletions xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class ClusterManagerLoadBalancer extends LoadBalancer {
private final SynchronizationContext syncContext;
private final ScheduledExecutorService timeService;
private final XdsLogger logger;
// Set to true if currently in the process of handling resolved addresses.
private boolean resolvingAddresses;

ClusterManagerLoadBalancer(Helper helper) {
this.helper = checkNotNull(helper, "helper");
Expand All @@ -69,6 +71,15 @@ class ClusterManagerLoadBalancer extends LoadBalancer {

@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
try {
resolvingAddresses = true;
handleResolvedAddressesInternal(resolvedAddresses);
} finally {
resolvingAddresses = false;
}
}

public void handleResolvedAddressesInternal(ResolvedAddresses resolvedAddresses) {
logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
ClusterManagerConfig config = (ClusterManagerConfig)
resolvedAddresses.getLoadBalancingPolicyConfig();
Expand Down Expand Up @@ -251,21 +262,18 @@ private final class ChildLbStateHelper extends ForwardingLoadBalancerHelper {
@Override
public void updateBalancingState(final ConnectivityState newState,
final SubchannelPicker newPicker) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (!childLbStates.containsKey(name)) {
return;
}
// Subchannel picker and state are saved, but will only be propagated to the channel
// when the child instance exits deactivated state.
currentState = newState;
currentPicker = newPicker;
if (!deactivated) {
updateOverallBalancingState();
}
}
});
// If we are already in the process of resolving addresses, the overall balancing state
// will be updated at the end of it, and we don't need to trigger that update here.
if (resolvingAddresses || !childLbStates.containsKey(name)) {
return;
}
// Subchannel picker and state are saved, but will only be propagated to the channel
// when the child instance exits deactivated state.
currentState = newState;
currentPicker = newPicker;
if (!deactivated) {
updateOverallBalancingState();
}
}

@Override
Expand Down
15 changes: 15 additions & 0 deletions xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.grpc.xds;

import static com.google.common.truth.Truth.assertThat;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
Expand Down Expand Up @@ -52,6 +53,7 @@
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.testing.TestMethodDescriptors;
import io.grpc.xds.ClusterManagerLoadBalancerProvider.ClusterManagerConfig;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -249,6 +251,15 @@ public void handleNameResolutionError_notPropagateToDeactivatedChildLbs() {
assertThat(childBalancer2.upstreamError.getDescription()).isEqualTo("unknown error");
}

@Test
public void noDuplicateOverallBalancingStateUpdate() {
deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a", "childB", "policy_b"));

// The test child LBs would have triggered state updates, let's make sure the overall balancing
// state was only updated once.
verify(helper, times(1)).updateBalancingState(any(), any());
}

private void deliverResolvedAddresses(final Map<String, String> childPolicies) {
clusterManagerLoadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
Expand Down Expand Up @@ -329,6 +340,10 @@ private final class FakeLoadBalancer extends LoadBalancer {
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
config = resolvedAddresses.getLoadBalancingPolicyConfig();

// Update balancing state here so that concurrent child state changes can be easily tested.
// Most tests ignore this and trigger separate child LB updates.
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.INTERNAL));
}

@Override
Expand Down

0 comments on commit 49f5551

Please sign in to comment.