From fcaf9a95835086fb7637b065313d38b24d1c92e1 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Tue, 4 May 2021 15:56:56 -0700 Subject: [PATCH] xds: ignore balancing state update from downstream after LB shutdown (#8134) LoadBalancers should not propagate balancing state updates after itself being shutdown. For LB policies that maintain a group of child LB policies with each having its independent lifetime, balancing state update propagations from each child LB policy can go out of the lifetime of its parent easily, especially for cases that balancing state update is put to the back of the queue and not propagated up inline. For LBs that are simple pass-through in the middle of the LB tree structure, it isn't a big issue as its lifecycle would be the same as its child. Transitively, It would behave correctly as long as its downstream is doing in the right way. This change is a sanity cleanup for LB policies that maintain multiple child LB policies to preserve the invariant that further balancing state updates from their child policies will not get propagated. --- .../grpc/xds/ClusterManagerLoadBalancer.java | 9 ++++-- .../io/grpc/xds/PriorityLoadBalancer.java | 4 +++ .../grpc/xds/WeightedTargetLoadBalancer.java | 11 ++++++- .../xds/ClusterManagerLoadBalancerTest.java | 20 ++++++++++++- .../io/grpc/xds/PriorityLoadBalancerTest.java | 30 +++++++++++++++++++ .../xds/WeightedTargetLoadBalancerTest.java | 23 ++++++++++++++ 6 files changed, 92 insertions(+), 5 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java index 8bd7aca1729..e91fbe2fa1b 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java @@ -126,6 +126,7 @@ public void shutdown() { for (ChildLbState state : childLbStates.values()) { state.shutdown(); } + childLbStates.clear(); } private void updateOverallBalancingState() { @@ -237,7 +238,6 @@ void reactivate(LoadBalancerProvider policyProvider) { } void shutdown() { - deactivated = true; if (deletionTimer != null && deletionTimer.isPending()) { deletionTimer.cancel(); } @@ -253,10 +253,13 @@ public void updateBalancingState(final ConnectivityState newState, syncContext.execute(new Runnable() { @Override public void run() { - currentState = newState; - currentPicker = newPicker; + if (!childLbStates.containsKey(name)) { + return; + } // Subchannel picker and state are saved, but will only be propagated to the channel // when the child instance exits deactivated state. + currentState = newState; + currentPicker = newPicker; if (!deactivated) { updateOverallBalancingState(); } diff --git a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java index c8f63c4ad22..80ddfd8a865 100644 --- a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java @@ -123,6 +123,7 @@ public void shutdown() { for (ChildLbState child : children.values()) { child.tearDown(); } + children.clear(); } private void tryNextPriority(boolean reportConnecting) { @@ -292,6 +293,9 @@ public void updateBalancingState(final ConnectivityState newState, syncContext.execute(new Runnable() { @Override public void run() { + if (!children.containsKey(priority)) { + return; + } connectivityState = newState; picker = newPicker; if (deletionTimer != null && deletionTimer.isPending()) { diff --git a/xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java b/xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java index 0a11da61bea..ee8c0308fce 100644 --- a/xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java @@ -71,7 +71,7 @@ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { for (String targetName : newTargets.keySet()) { WeightedPolicySelection weightedChildLbConfig = newTargets.get(targetName); if (!targets.containsKey(targetName)) { - ChildHelper childHelper = new ChildHelper(); + ChildHelper childHelper = new ChildHelper(targetName); GracefulSwitchLoadBalancer childBalancer = new GracefulSwitchLoadBalancer(childHelper); childBalancer.switchTo(weightedChildLbConfig.policySelection.getProvider()); childHelpers.put(targetName, childHelper); @@ -125,6 +125,7 @@ public void shutdown() { for (LoadBalancer childBalancer : childBalancers.values()) { childBalancer.shutdown(); } + childBalancers.clear(); } private void updateOverallBalancingState() { @@ -179,15 +180,23 @@ private static ConnectivityState aggregateState( } private final class ChildHelper extends ForwardingLoadBalancerHelper { + String name; ConnectivityState currentState = CONNECTING; SubchannelPicker currentPicker = BUFFER_PICKER; + private ChildHelper(String name) { + this.name = name; + } + @Override public void updateBalancingState(final ConnectivityState newState, final SubchannelPicker newPicker) { syncContext.execute(new Runnable() { @Override public void run() { + if (!childBalancers.containsKey(name)) { + return; + } currentState = newState; currentPicker = newPicker; updateOverallBalancingState(); diff --git a/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java index 7fb9ddf0e04..38aed01a234 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java @@ -21,10 +21,12 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; @@ -98,6 +100,7 @@ public void setUp() { lbConfigInventory.put("childB", new Object()); lbConfigInventory.put("childC", null); clusterManagerLoadBalancer = new ClusterManagerLoadBalancer(helper); + clearInvocations(helper); } @After @@ -185,13 +188,28 @@ public void ignoreBalancingStateUpdateForDeactivatedChildLbs() { verify(helper, never()).updateBalancingState( eq(ConnectivityState.READY), any(SubchannelPicker.class)); - // reactivate policy_a + // Reactivate policy_a, balancing state update reflects the latest connectivity state and + // picker. deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a", "childB", "policy_b")); verify(helper).updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture()); assertThat(pickSubchannel(pickerCaptor.getValue(), "childA").getSubchannel()) .isEqualTo(subchannel); } + @Test + public void raceBetweenShutdownAndChildLbBalancingStateUpdate() { + deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a", "childB", "policy_b")); + verify(helper).updateBalancingState( + eq(ConnectivityState.CONNECTING), any(SubchannelPicker.class)); + FakeLoadBalancer childBalancer = childBalancers.iterator().next(); + + // LB shutdown and subchannel state change can happen simultaneously. If shutdown runs first, + // any further balancing state update should be ignored. + clusterManagerLoadBalancer.shutdown(); + childBalancer.deliverSubchannelState(mock(Subchannel.class), ConnectivityState.READY); + verifyNoMoreInteractions(helper); + } + @Test public void handleNameResolutionError_beforeChildLbsInstantiated_returnErrorPicker() { clusterManagerLoadBalancer.handleNameResolutionError( diff --git a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java index 2a030e3d30f..a7e2e916b3e 100644 --- a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java @@ -20,12 +20,16 @@ import static io.grpc.ConnectivityState.CONNECTING; import static io.grpc.ConnectivityState.READY; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; +import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -115,6 +119,7 @@ public void setUp() { doReturn(syncContext).when(helper).getSynchronizationContext(); doReturn(fakeClock.getScheduledExecutorService()).when(helper).getScheduledExecutorService(); priorityLb = new PriorityLoadBalancer(helper); + clearInvocations(helper); } @After @@ -420,6 +425,31 @@ public void bypassReresolutionRequestsIfConfiged() { verify(helper).refreshNameResolution(); } + @Test + public void raceBetweenShutdownAndChildLbBalancingStateUpdate() { + PriorityChildConfig priorityChildConfig0 = + new PriorityChildConfig(new PolicySelection(fooLbProvider, new Object()), true); + PriorityChildConfig priorityChildConfig1 = + new PriorityChildConfig(new PolicySelection(fooLbProvider, new Object()), false); + PriorityLbConfig priorityLbConfig = + new PriorityLbConfig( + ImmutableMap.of("p0", priorityChildConfig0, "p1", priorityChildConfig1), + ImmutableList.of("p0", "p1")); + priorityLb.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of()) + .setLoadBalancingPolicyConfig(priorityLbConfig) + .build()); + verify(helper).updateBalancingState(eq(CONNECTING), eq(BUFFER_PICKER)); + + // LB shutdown and subchannel state change can happen simultaneously. If shutdown runs first, + // any further balancing state update should be ignored. + priorityLb.shutdown(); + Helper priorityHelper0 = Iterables.getOnlyElement(fooHelpers); // priority p0 + priorityHelper0.updateBalancingState(READY, mock(SubchannelPicker.class)); + verifyNoMoreInteractions(helper); + } + private void assertLatestConnectivityState(ConnectivityState expectedState) { verify(helper, atLeastOnce()) .updateBalancingState(connectivityStateCaptor.capture(), pickerCaptor.capture()); diff --git a/xds/src/test/java/io/grpc/xds/WeightedTargetLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/WeightedTargetLoadBalancerTest.java index efee8a1a4bb..7d9d30385e4 100644 --- a/xds/src/test/java/io/grpc/xds/WeightedTargetLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/WeightedTargetLoadBalancerTest.java @@ -25,10 +25,12 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; @@ -164,6 +166,7 @@ public void setUp() { lbRegistry.register(barLbProvider); weightedTargetLb = new WeightedTargetLoadBalancer(helper); + clearInvocations(helper); } @After @@ -379,4 +382,24 @@ public void balancingStateUpdatedFromChildBalancers() { new WeightedChildPicker(weights[2], failurePickers[2]), new WeightedChildPicker(weights[3], failurePickers[3])); } + + @Test + public void raceBetweenShutdownAndChildLbBalancingStateUpdate() { + Map targets = ImmutableMap.of( + "target0", weightedLbConfig0, + "target1", weightedLbConfig1); + weightedTargetLb.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of()) + .setLoadBalancingPolicyConfig(new WeightedTargetConfig(targets)) + .build()); + verify(helper).updateBalancingState(eq(CONNECTING), eq(BUFFER_PICKER)); + + // LB shutdown and subchannel state change can happen simultaneously. If shutdown runs first, + // any further balancing state update should be ignored. + weightedTargetLb.shutdown(); + Helper weightedChildHelper0 = childHelpers.iterator().next(); + weightedChildHelper0.updateBalancingState(READY, mock(SubchannelPicker.class)); + verifyNoMoreInteractions(helper); + } }