diff --git a/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java index 8bd7aca1729..e91fbe2fa1b 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java @@ -126,6 +126,7 @@ public void shutdown() { for (ChildLbState state : childLbStates.values()) { state.shutdown(); } + childLbStates.clear(); } private void updateOverallBalancingState() { @@ -237,7 +238,6 @@ void reactivate(LoadBalancerProvider policyProvider) { } void shutdown() { - deactivated = true; if (deletionTimer != null && deletionTimer.isPending()) { deletionTimer.cancel(); } @@ -253,10 +253,13 @@ public void updateBalancingState(final ConnectivityState newState, syncContext.execute(new Runnable() { @Override public void run() { - currentState = newState; - currentPicker = newPicker; + if (!childLbStates.containsKey(name)) { + return; + } // Subchannel picker and state are saved, but will only be propagated to the channel // when the child instance exits deactivated state. + currentState = newState; + currentPicker = newPicker; if (!deactivated) { updateOverallBalancingState(); } diff --git a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java index c8f63c4ad22..80ddfd8a865 100644 --- a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java @@ -123,6 +123,7 @@ public void shutdown() { for (ChildLbState child : children.values()) { child.tearDown(); } + children.clear(); } private void tryNextPriority(boolean reportConnecting) { @@ -292,6 +293,9 @@ public void updateBalancingState(final ConnectivityState newState, syncContext.execute(new Runnable() { @Override public void run() { + if (!children.containsKey(priority)) { + return; + } connectivityState = newState; picker = newPicker; if (deletionTimer != null && deletionTimer.isPending()) { diff --git a/xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java b/xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java index 0a11da61bea..ee8c0308fce 100644 --- a/xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java @@ -71,7 +71,7 @@ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { for (String targetName : newTargets.keySet()) { WeightedPolicySelection weightedChildLbConfig = newTargets.get(targetName); if (!targets.containsKey(targetName)) { - ChildHelper childHelper = new ChildHelper(); + ChildHelper childHelper = new ChildHelper(targetName); GracefulSwitchLoadBalancer childBalancer = new GracefulSwitchLoadBalancer(childHelper); childBalancer.switchTo(weightedChildLbConfig.policySelection.getProvider()); childHelpers.put(targetName, childHelper); @@ -125,6 +125,7 @@ public void shutdown() { for (LoadBalancer childBalancer : childBalancers.values()) { childBalancer.shutdown(); } + childBalancers.clear(); } private void updateOverallBalancingState() { @@ -179,15 +180,23 @@ private static ConnectivityState aggregateState( } private final class ChildHelper extends ForwardingLoadBalancerHelper { + String name; ConnectivityState currentState = CONNECTING; SubchannelPicker currentPicker = BUFFER_PICKER; + private ChildHelper(String name) { + this.name = name; + } + @Override public void updateBalancingState(final ConnectivityState newState, final SubchannelPicker newPicker) { syncContext.execute(new Runnable() { @Override public void run() { + if (!childBalancers.containsKey(name)) { + return; + } currentState = newState; currentPicker = newPicker; updateOverallBalancingState(); diff --git a/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java index 7fb9ddf0e04..38aed01a234 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java @@ -21,10 +21,12 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; @@ -98,6 +100,7 @@ public void setUp() { lbConfigInventory.put("childB", new Object()); lbConfigInventory.put("childC", null); clusterManagerLoadBalancer = new ClusterManagerLoadBalancer(helper); + clearInvocations(helper); } @After @@ -185,13 +188,28 @@ public void ignoreBalancingStateUpdateForDeactivatedChildLbs() { verify(helper, never()).updateBalancingState( eq(ConnectivityState.READY), any(SubchannelPicker.class)); - // reactivate policy_a + // Reactivate policy_a, balancing state update reflects the latest connectivity state and + // picker. deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a", "childB", "policy_b")); verify(helper).updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture()); assertThat(pickSubchannel(pickerCaptor.getValue(), "childA").getSubchannel()) .isEqualTo(subchannel); } + @Test + public void raceBetweenShutdownAndChildLbBalancingStateUpdate() { + deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a", "childB", "policy_b")); + verify(helper).updateBalancingState( + eq(ConnectivityState.CONNECTING), any(SubchannelPicker.class)); + FakeLoadBalancer childBalancer = childBalancers.iterator().next(); + + // LB shutdown and subchannel state change can happen simultaneously. If shutdown runs first, + // any further balancing state update should be ignored. + clusterManagerLoadBalancer.shutdown(); + childBalancer.deliverSubchannelState(mock(Subchannel.class), ConnectivityState.READY); + verifyNoMoreInteractions(helper); + } + @Test public void handleNameResolutionError_beforeChildLbsInstantiated_returnErrorPicker() { clusterManagerLoadBalancer.handleNameResolutionError( diff --git a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java index 2a030e3d30f..a7e2e916b3e 100644 --- a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java @@ -20,12 +20,16 @@ import static io.grpc.ConnectivityState.CONNECTING; import static io.grpc.ConnectivityState.READY; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; +import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -115,6 +119,7 @@ public void setUp() { doReturn(syncContext).when(helper).getSynchronizationContext(); doReturn(fakeClock.getScheduledExecutorService()).when(helper).getScheduledExecutorService(); priorityLb = new PriorityLoadBalancer(helper); + clearInvocations(helper); } @After @@ -420,6 +425,31 @@ public void bypassReresolutionRequestsIfConfiged() { verify(helper).refreshNameResolution(); } + @Test + public void raceBetweenShutdownAndChildLbBalancingStateUpdate() { + PriorityChildConfig priorityChildConfig0 = + new PriorityChildConfig(new PolicySelection(fooLbProvider, new Object()), true); + PriorityChildConfig priorityChildConfig1 = + new PriorityChildConfig(new PolicySelection(fooLbProvider, new Object()), false); + PriorityLbConfig priorityLbConfig = + new PriorityLbConfig( + ImmutableMap.of("p0", priorityChildConfig0, "p1", priorityChildConfig1), + ImmutableList.of("p0", "p1")); + priorityLb.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of()) + .setLoadBalancingPolicyConfig(priorityLbConfig) + .build()); + verify(helper).updateBalancingState(eq(CONNECTING), eq(BUFFER_PICKER)); + + // LB shutdown and subchannel state change can happen simultaneously. If shutdown runs first, + // any further balancing state update should be ignored. + priorityLb.shutdown(); + Helper priorityHelper0 = Iterables.getOnlyElement(fooHelpers); // priority p0 + priorityHelper0.updateBalancingState(READY, mock(SubchannelPicker.class)); + verifyNoMoreInteractions(helper); + } + private void assertLatestConnectivityState(ConnectivityState expectedState) { verify(helper, atLeastOnce()) .updateBalancingState(connectivityStateCaptor.capture(), pickerCaptor.capture()); diff --git a/xds/src/test/java/io/grpc/xds/WeightedTargetLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/WeightedTargetLoadBalancerTest.java index efee8a1a4bb..7d9d30385e4 100644 --- a/xds/src/test/java/io/grpc/xds/WeightedTargetLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/WeightedTargetLoadBalancerTest.java @@ -25,10 +25,12 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; @@ -164,6 +166,7 @@ public void setUp() { lbRegistry.register(barLbProvider); weightedTargetLb = new WeightedTargetLoadBalancer(helper); + clearInvocations(helper); } @After @@ -379,4 +382,24 @@ public void balancingStateUpdatedFromChildBalancers() { new WeightedChildPicker(weights[2], failurePickers[2]), new WeightedChildPicker(weights[3], failurePickers[3])); } + + @Test + public void raceBetweenShutdownAndChildLbBalancingStateUpdate() { + Map targets = ImmutableMap.of( + "target0", weightedLbConfig0, + "target1", weightedLbConfig1); + weightedTargetLb.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of()) + .setLoadBalancingPolicyConfig(new WeightedTargetConfig(targets)) + .build()); + verify(helper).updateBalancingState(eq(CONNECTING), eq(BUFFER_PICKER)); + + // LB shutdown and subchannel state change can happen simultaneously. If shutdown runs first, + // any further balancing state update should be ignored. + weightedTargetLb.shutdown(); + Helper weightedChildHelper0 = childHelpers.iterator().next(); + weightedChildHelper0.updateBalancingState(READY, mock(SubchannelPicker.class)); + verifyNoMoreInteractions(helper); + } }