Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: ignore balancing state update from downstream after LB shutdown #8134

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 6 additions & 3 deletions xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java
Expand Up @@ -126,6 +126,7 @@ public void shutdown() {
for (ChildLbState state : childLbStates.values()) {
state.shutdown();
}
childLbStates.clear();
}

private void updateOverallBalancingState() {
Expand Down Expand Up @@ -237,7 +238,6 @@ void reactivate(LoadBalancerProvider policyProvider) {
}

void shutdown() {
deactivated = true;
if (deletionTimer != null && deletionTimer.isPending()) {
deletionTimer.cancel();
}
Expand All @@ -253,10 +253,13 @@ public void updateBalancingState(final ConnectivityState newState,
syncContext.execute(new Runnable() {
@Override
public void run() {
currentState = newState;
currentPicker = newPicker;
if (!childLbStates.containsKey(name)) {
return;
}
// Subchannel picker and state are saved, but will only be propagated to the channel
// when the child instance exits deactivated state.
currentState = newState;
currentPicker = newPicker;
if (!deactivated) {
updateOverallBalancingState();
}
Expand Down
4 changes: 4 additions & 0 deletions xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java
Expand Up @@ -123,6 +123,7 @@ public void shutdown() {
for (ChildLbState child : children.values()) {
child.tearDown();
}
children.clear();
}

private void tryNextPriority(boolean reportConnecting) {
Expand Down Expand Up @@ -292,6 +293,9 @@ public void updateBalancingState(final ConnectivityState newState,
syncContext.execute(new Runnable() {
@Override
public void run() {
if (!children.containsKey(priority)) {
return;
}
connectivityState = newState;
picker = newPicker;
if (deletionTimer != null && deletionTimer.isPending()) {
Expand Down
11 changes: 10 additions & 1 deletion xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java
Expand Up @@ -71,7 +71,7 @@ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
for (String targetName : newTargets.keySet()) {
WeightedPolicySelection weightedChildLbConfig = newTargets.get(targetName);
if (!targets.containsKey(targetName)) {
ChildHelper childHelper = new ChildHelper();
ChildHelper childHelper = new ChildHelper(targetName);
GracefulSwitchLoadBalancer childBalancer = new GracefulSwitchLoadBalancer(childHelper);
childBalancer.switchTo(weightedChildLbConfig.policySelection.getProvider());
childHelpers.put(targetName, childHelper);
Expand Down Expand Up @@ -125,6 +125,7 @@ public void shutdown() {
for (LoadBalancer childBalancer : childBalancers.values()) {
childBalancer.shutdown();
}
childBalancers.clear();
}

private void updateOverallBalancingState() {
Expand Down Expand Up @@ -179,15 +180,23 @@ private static ConnectivityState aggregateState(
}

private final class ChildHelper extends ForwardingLoadBalancerHelper {
String name;
ConnectivityState currentState = CONNECTING;
SubchannelPicker currentPicker = BUFFER_PICKER;

private ChildHelper(String name) {
this.name = name;
}

@Override
public void updateBalancingState(final ConnectivityState newState,
final SubchannelPicker newPicker) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (!childBalancers.containsKey(name)) {
return;
}
currentState = newState;
currentPicker = newPicker;
updateOverallBalancingState();
Expand Down
Expand Up @@ -21,10 +21,12 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -98,6 +100,7 @@ public void setUp() {
lbConfigInventory.put("childB", new Object());
lbConfigInventory.put("childC", null);
clusterManagerLoadBalancer = new ClusterManagerLoadBalancer(helper);
clearInvocations(helper);
}

@After
Expand Down Expand Up @@ -185,13 +188,28 @@ public void ignoreBalancingStateUpdateForDeactivatedChildLbs() {
verify(helper, never()).updateBalancingState(
eq(ConnectivityState.READY), any(SubchannelPicker.class));

// reactivate policy_a
// Reactivate policy_a, balancing state update reflects the latest connectivity state and
// picker.
deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a", "childB", "policy_b"));
verify(helper).updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
assertThat(pickSubchannel(pickerCaptor.getValue(), "childA").getSubchannel())
.isEqualTo(subchannel);
}

@Test
public void raceBetweenShutdownAndChildLbBalancingStateUpdate() {
deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a", "childB", "policy_b"));
verify(helper).updateBalancingState(
eq(ConnectivityState.CONNECTING), any(SubchannelPicker.class));
FakeLoadBalancer childBalancer = childBalancers.iterator().next();

// LB shutdown and subchannel state change can happen simultaneously. If shutdown runs first,
// any further balancing state update should be ignored.
clusterManagerLoadBalancer.shutdown();
childBalancer.deliverSubchannelState(mock(Subchannel.class), ConnectivityState.READY);
verifyNoMoreInteractions(helper);
}

@Test
public void handleNameResolutionError_beforeChildLbsInstantiated_returnErrorPicker() {
clusterManagerLoadBalancer.handleNameResolutionError(
Expand Down
30 changes: 30 additions & 0 deletions xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java
Expand Up @@ -20,12 +20,16 @@
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -115,6 +119,7 @@ public void setUp() {
doReturn(syncContext).when(helper).getSynchronizationContext();
doReturn(fakeClock.getScheduledExecutorService()).when(helper).getScheduledExecutorService();
priorityLb = new PriorityLoadBalancer(helper);
clearInvocations(helper);
}

@After
Expand Down Expand Up @@ -420,6 +425,31 @@ public void bypassReresolutionRequestsIfConfiged() {
verify(helper).refreshNameResolution();
}

@Test
public void raceBetweenShutdownAndChildLbBalancingStateUpdate() {
PriorityChildConfig priorityChildConfig0 =
new PriorityChildConfig(new PolicySelection(fooLbProvider, new Object()), true);
PriorityChildConfig priorityChildConfig1 =
new PriorityChildConfig(new PolicySelection(fooLbProvider, new Object()), false);
PriorityLbConfig priorityLbConfig =
new PriorityLbConfig(
ImmutableMap.of("p0", priorityChildConfig0, "p1", priorityChildConfig1),
ImmutableList.of("p0", "p1"));
priorityLb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(priorityLbConfig)
.build());
verify(helper).updateBalancingState(eq(CONNECTING), eq(BUFFER_PICKER));

// LB shutdown and subchannel state change can happen simultaneously. If shutdown runs first,
// any further balancing state update should be ignored.
priorityLb.shutdown();
Helper priorityHelper0 = Iterables.getOnlyElement(fooHelpers); // priority p0
priorityHelper0.updateBalancingState(READY, mock(SubchannelPicker.class));
verifyNoMoreInteractions(helper);
}

private void assertLatestConnectivityState(ConnectivityState expectedState) {
verify(helper, atLeastOnce())
.updateBalancingState(connectivityStateCaptor.capture(), pickerCaptor.capture());
Expand Down
23 changes: 23 additions & 0 deletions xds/src/test/java/io/grpc/xds/WeightedTargetLoadBalancerTest.java
Expand Up @@ -25,10 +25,12 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -164,6 +166,7 @@ public void setUp() {
lbRegistry.register(barLbProvider);

weightedTargetLb = new WeightedTargetLoadBalancer(helper);
clearInvocations(helper);
}

@After
Expand Down Expand Up @@ -379,4 +382,24 @@ public void balancingStateUpdatedFromChildBalancers() {
new WeightedChildPicker(weights[2], failurePickers[2]),
new WeightedChildPicker(weights[3], failurePickers[3]));
}

@Test
public void raceBetweenShutdownAndChildLbBalancingStateUpdate() {
Map<String, WeightedPolicySelection> targets = ImmutableMap.of(
"target0", weightedLbConfig0,
"target1", weightedLbConfig1);
weightedTargetLb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(new WeightedTargetConfig(targets))
.build());
verify(helper).updateBalancingState(eq(CONNECTING), eq(BUFFER_PICKER));

// LB shutdown and subchannel state change can happen simultaneously. If shutdown runs first,
// any further balancing state update should be ignored.
weightedTargetLb.shutdown();
Helper weightedChildHelper0 = childHelpers.iterator().next();
weightedChildHelper0.updateBalancingState(READY, mock(SubchannelPicker.class));
verifyNoMoreInteractions(helper);
}
}