From bcf5cde7dd31de4be3cfc8bf8264681abc8fb292 Mon Sep 17 00:00:00 2001 From: Terry Wilson Date: Mon, 12 Sep 2022 11:17:51 -0700 Subject: [PATCH] xds: prevent concurrent priority LB picker updates (#9363) If a child policy triggers an update to the parent priority policy it will be ignored if an update is already in process. --- .../io/grpc/xds/PriorityLoadBalancer.java | 65 ++++++++++------- .../io/grpc/xds/PriorityLoadBalancerTest.java | 69 ++++++++++++++++++- 2 files changed, 106 insertions(+), 28 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java index 5cae9139ae6..7a6bff4d779 100644 --- a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java @@ -58,7 +58,7 @@ final class PriorityLoadBalancer extends LoadBalancer { private final XdsLogger logger; // Includes all active and deactivated children. Mutable. New entries are only added from priority - // 0 up to the selected priority. An entry is only deleted 15 minutes after the its deactivation. + // 0 up to the selected priority. An entry is only deleted 15 minutes after its deactivation. private final Map children = new HashMap<>(); // Following fields are only null initially. @@ -70,6 +70,8 @@ final class PriorityLoadBalancer extends LoadBalancer { @Nullable private String currentPriority; private ConnectivityState currentConnectivityState; private SubchannelPicker currentPicker; + // Set to true if currently in the process of handling resolved addresses. + private boolean handlingResolvedAddresses; PriorityLoadBalancer(Helper helper) { this.helper = checkNotNull(helper, "helper"); @@ -82,6 +84,15 @@ final class PriorityLoadBalancer extends LoadBalancer { @Override public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { + try { + handlingResolvedAddresses = true; + handleResolvedAddressesInternal(resolvedAddresses); + } finally { + handlingResolvedAddresses = false; + } + } + + public void handleResolvedAddressesInternal(ResolvedAddresses resolvedAddresses) { logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses); this.resolvedAddresses = resolvedAddresses; PriorityLbConfig config = (PriorityLbConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); @@ -297,32 +308,34 @@ public void refreshNameResolution() { @Override public void updateBalancingState(final ConnectivityState newState, final SubchannelPicker newPicker) { - syncContext.execute(new Runnable() { - @Override - public void run() { - if (!children.containsKey(priority)) { - return; - } - connectivityState = newState; - picker = newPicker; - if (deletionTimer != null && deletionTimer.isPending()) { - return; - } - if (newState.equals(CONNECTING) ) { - if (!failOverTimer.isPending() && seenReadyOrIdleSinceTransientFailure) { - failOverTimer = syncContext.schedule(new FailOverTask(), 10, TimeUnit.SECONDS, - executor); - } - } else if (newState.equals(READY) || newState.equals(IDLE)) { - seenReadyOrIdleSinceTransientFailure = true; - failOverTimer.cancel(); - } else if (newState.equals(TRANSIENT_FAILURE)) { - seenReadyOrIdleSinceTransientFailure = false; - failOverTimer.cancel(); - } - tryNextPriority(); + if (!children.containsKey(priority)) { + return; + } + connectivityState = newState; + picker = newPicker; + + if (deletionTimer != null && deletionTimer.isPending()) { + return; + } + if (newState.equals(CONNECTING)) { + if (!failOverTimer.isPending() && seenReadyOrIdleSinceTransientFailure) { + failOverTimer = syncContext.schedule(new FailOverTask(), 10, TimeUnit.SECONDS, + executor); } - }); + } else if (newState.equals(READY) || newState.equals(IDLE)) { + seenReadyOrIdleSinceTransientFailure = true; + failOverTimer.cancel(); + } else if (newState.equals(TRANSIENT_FAILURE)) { + seenReadyOrIdleSinceTransientFailure = false; + failOverTimer.cancel(); + } + + // If we are currently handling newly resolved addresses, let's not try to reconfigure as + // the address handling process will take care of that to provide an atomic config update. + if (handlingResolvedAddresses) { + return; + } + tryNextPriority(); } @Override diff --git a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java index 6b1fae48f1d..7227dcb3b2f 100644 --- a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java @@ -22,8 +22,8 @@ import static io.grpc.ConnectivityState.READY; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doReturn; @@ -676,7 +676,7 @@ public void raceBetweenShutdownAndChildLbBalancingStateUpdate() { .setAddresses(ImmutableList.of()) .setLoadBalancingPolicyConfig(priorityLbConfig) .build()); - verify(helper, times(2)).updateBalancingState(eq(CONNECTING), isA(SubchannelPicker.class)); + verify(helper).updateBalancingState(eq(CONNECTING), eq(BUFFER_PICKER)); // LB shutdown and subchannel state change can happen simultaneously. If shutdown runs first, // any further balancing state update should be ignored. @@ -686,6 +686,26 @@ public void raceBetweenShutdownAndChildLbBalancingStateUpdate() { verifyNoMoreInteractions(helper); } + @Test + public void noDuplicateOverallBalancingStateUpdate() { + FakeLoadBalancerProvider fakeLbProvider = new FakeLoadBalancerProvider(); + + PriorityChildConfig priorityChildConfig0 = + new PriorityChildConfig(new PolicySelection(fakeLbProvider, new Object()), true); + PriorityChildConfig priorityChildConfig1 = + new PriorityChildConfig(new PolicySelection(fakeLbProvider, new Object()), false); + PriorityLbConfig priorityLbConfig = + new PriorityLbConfig( + ImmutableMap.of("p0", priorityChildConfig0, "p1", priorityChildConfig1), + ImmutableList.of("p0", "p1")); + priorityLb.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of()) + .setLoadBalancingPolicyConfig(priorityLbConfig) + .build()); + verify(helper, times(1)).updateBalancingState(any(), any()); + } + private void assertLatestConnectivityState(ConnectivityState expectedState) { verify(helper, atLeastOnce()) .updateBalancingState(connectivityStateCaptor.capture(), pickerCaptor.capture()); @@ -714,4 +734,49 @@ private void assertCurrentPickerIsBufferPicker() { PickResult pickResult = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); assertThat(pickResult).isEqualTo(PickResult.withNoResult()); } + + private static class FakeLoadBalancerProvider extends LoadBalancerProvider { + + @Override + public boolean isAvailable() { + return true; + } + + @Override + public int getPriority() { + return 5; + } + + @Override + public String getPolicyName() { + return "foo"; + } + + @Override + public LoadBalancer newLoadBalancer(Helper helper) { + return new FakeLoadBalancer(helper); + } + } + + static class FakeLoadBalancer extends LoadBalancer { + + private Helper helper; + + FakeLoadBalancer(Helper helper) { + this.helper = helper; + } + + @Override + public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { + helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.INTERNAL)); + } + + @Override + public void handleNameResolutionError(Status error) { + } + + @Override + public void shutdown() { + } + } }