Skip to content

Commit

Permalink
xds: weighted target to delay picker updates while updating children
Browse files Browse the repository at this point in the history
  • Loading branch information
temawi committed Jun 22, 2022
1 parent 8e77936 commit df50721
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 13 deletions.
35 changes: 22 additions & 13 deletions xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java
Expand Up @@ -49,20 +49,29 @@ final class WeightedTargetLoadBalancer extends LoadBalancer {
private final Map<String, GracefulSwitchLoadBalancer> childBalancers = new HashMap<>();
private final Map<String, ChildHelper> childHelpers = new HashMap<>();
private final Helper helper;
private final SynchronizationContext syncContext;

private Map<String, WeightedPolicySelection> targets = ImmutableMap.of();
// Set to true if currently in the process of handling resolved addresses.
private boolean resolvingAddresses;

WeightedTargetLoadBalancer(Helper helper) {
this.helper = checkNotNull(helper, "helper");
this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
logger = XdsLogger.withLogId(
InternalLogId.allocate("weighted-target-lb", helper.getAuthority()));
logger.log(XdsLogLevel.INFO, "Created");
}

@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
try {
resolvingAddresses = true;
handleResolvedAddressesInternal(resolvedAddresses);
} finally {
resolvingAddresses = false;
}
}

public void handleResolvedAddressesInternal(ResolvedAddresses resolvedAddresses) {
logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
Object lbConfig = resolvedAddresses.getLoadBalancingPolicyConfig();
checkNotNull(lbConfig, "missing weighted_target lb config");
Expand Down Expand Up @@ -191,17 +200,17 @@ private ChildHelper(String name) {
@Override
public void updateBalancingState(final ConnectivityState newState,
final SubchannelPicker newPicker) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (!childBalancers.containsKey(name)) {
return;
}
currentState = newState;
currentPicker = newPicker;
updateOverallBalancingState();
}
});
if (!childBalancers.containsKey(name)) {
return;
}
currentState = newState;
currentPicker = newPicker;

// If we are already in the process of resolving addresses, the overall balancing state
// will be updated at the end of it, and we don't need to trigger that update here.
if (!resolvingAddresses) {
updateOverallBalancingState();
}
}

@Override
Expand Down
76 changes: 76 additions & 0 deletions xds/src/test/java/io/grpc/xds/WeightedTargetLoadBalancerTest.java
Expand Up @@ -23,9 +23,11 @@
import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
Expand All @@ -37,6 +39,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import io.grpc.Attributes;
import io.grpc.ConnectivityState;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
Expand All @@ -56,6 +59,7 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.junit.After;
Expand All @@ -66,6 +70,9 @@
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/** Tests for {@link WeightedTargetLoadBalancer}. */
@RunWith(JUnit4.class)
Expand Down Expand Up @@ -402,4 +409,73 @@ public void raceBetweenShutdownAndChildLbBalancingStateUpdate() {
weightedChildHelper0.updateBalancingState(READY, mock(SubchannelPicker.class));
verifyNoMoreInteractions(helper);
}

// When the ChildHelper is asked to update the overall balancing state, it should not do that if
// the update was triggered by the parent LB that will handle triggering the overall state update.
@Test
public void noDuplicateOverallBalancingStateUpdate() {
FakeLoadBalancerProvider fakeLbProvider = new FakeLoadBalancerProvider();

Map<String, WeightedPolicySelection> targets = ImmutableMap.of(
"target0", new WeightedPolicySelection(
weights[0], new PolicySelection(fakeLbProvider, configs[0])),
"target3", new WeightedPolicySelection(
weights[3], new PolicySelection(fakeLbProvider, configs[3])));
weightedTargetLb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(new WeightedTargetConfig(targets))
.build());

// Both of the two child LB policies will call the helper to update the balancing state.
// But since those calls happen during the handling of teh resolved addresses of the parent
// WeightedTargetLLoadBalancer, the overall balancing state should only be updated once.
verify(helper, times(1)).updateBalancingState(any(), any());

}

private static class FakeLoadBalancerProvider extends LoadBalancerProvider {

@Override
public boolean isAvailable() {
return true;
}

@Override
public int getPriority() {
return 5;
}

@Override
public String getPolicyName() {
return "foo";
}

@Override
public LoadBalancer newLoadBalancer(Helper helper) {
return new FakeLoadBalancer(helper);
}
}

static class FakeLoadBalancer extends LoadBalancer {

private Helper helper;

FakeLoadBalancer(Helper helper) {
this.helper = helper;
}

@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.INTERNAL));
}

@Override
public void handleNameResolutionError(Status error) {
}

@Override
public void shutdown() {
}
}
}

0 comments on commit df50721

Please sign in to comment.