From 478ead34e2a6a3531a27008a060d139a46925647 Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Wed, 13 Apr 2022 15:35:49 -0700 Subject: [PATCH 1/6] ring hash --- .../io/grpc/xds/RingHashLoadBalancer.java | 83 ++++++++++------ .../io/grpc/xds/RingHashLoadBalancerTest.java | 95 ++++++++++++++++++- 2 files changed, 143 insertions(+), 35 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java index a8f517a8967..f2f07375b53 100644 --- a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java @@ -40,6 +40,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -67,6 +68,7 @@ final class RingHashLoadBalancer extends LoadBalancer { private List ring; private ConnectivityState currentState; + private Iterator connectionAttemptIterator = subchannels.values().iterator(); RingHashLoadBalancer(Helper helper) { this.helper = checkNotNull(helper, "helper"); @@ -142,6 +144,7 @@ public void onSubchannelState(ConnectivityStateInfo newState) { for (EquivalentAddressGroup addr : removedAddrs) { removedSubchannels.add(subchannels.remove(addr)); } + connectionAttemptIterator = subchannels.values().iterator(); // Update the picker before shutting down the subchannels, to reduce the chance of race // between picking a subchannel and shutting it down. @@ -203,53 +206,67 @@ public void shutdown() { * TRANSIENT_FAILURE *
  • If there is at least one subchannel in CONNECTING state, overall state is * CONNECTING
  • + *
  • If there is one subchannel in TRANSIENT_FAILURE state and there is + * more than one subchannel, report CONNECTING
  • *
  • If there is at least one subchannel in IDLE state, overall state is IDLE
  • *
  • Otherwise, overall state is TRANSIENT_FAILURE
  • * */ private void updateBalancingState() { checkState(!subchannels.isEmpty(), "no subchannel has been created"); - int failureCount = 0; - boolean hasConnecting = false; - Subchannel idleSubchannel = null; ConnectivityState overallState = null; + boolean start_connection_attempt = false; + int num_idle_ = 0; + int num_ready_ = 0; + int num_connecting_ = 0; + int num_transient_failure_ = 0; for (Subchannel subchannel : subchannels.values()) { ConnectivityState state = getSubchannelStateInfoRef(subchannel).value.getState(); if (state == READY) { - overallState = READY; + num_ready_++; break; - } - if (state == TRANSIENT_FAILURE) { - failureCount++; - } else if (state == CONNECTING) { - hasConnecting = true; + } else if (state == TRANSIENT_FAILURE) { + num_transient_failure_++; + } else if (state == CONNECTING ) { + num_connecting_++; } else if (state == IDLE) { - if (idleSubchannel == null) { - idleSubchannel = subchannel; - } + num_idle_++; } } - if (overallState == null) { - if (failureCount >= 2) { - // This load balancer may not get any pick requests from the upstream if it's reporting - // TRANSIENT_FAILURE. It needs to recover by itself by attempting to connect to at least - // one subchannel that has not failed at any given time. - if (!hasConnecting && idleSubchannel != null) { - idleSubchannel.requestConnection(); - } - overallState = TRANSIENT_FAILURE; - } else if (hasConnecting) { - overallState = CONNECTING; - } else if (idleSubchannel != null) { - overallState = IDLE; - } else { - overallState = TRANSIENT_FAILURE; - } + if (num_ready_ > 0) { + overallState = READY; + } else if (num_transient_failure_ >= 2) { + overallState = TRANSIENT_FAILURE; + start_connection_attempt = true; + } else if (num_connecting_ > 0) { + overallState = CONNECTING; + } else if (num_transient_failure_ == 1 && subchannels.size() > 1) { + overallState = CONNECTING; + start_connection_attempt = true; + } else if (num_idle_ > 0) { + overallState = IDLE; + } else { + overallState = TRANSIENT_FAILURE; + start_connection_attempt = true; } RingHashPicker picker = new RingHashPicker(syncContext, ring, subchannels); // TODO(chengyuanzhang): avoid unnecessary reprocess caused by duplicated server addr updates helper.updateBalancingState(overallState, picker); currentState = overallState; + if (start_connection_attempt) { + if (!connectionAttemptIterator.hasNext()) { + connectionAttemptIterator = subchannels.values().iterator(); + } + if (connectionAttemptIterator.hasNext()) { + final Subchannel finalSubchannel = connectionAttemptIterator.next(); + syncContext.execute(new Runnable() { + @Override + public void run() { + finalSubchannel.requestConnection(); + } + }); + } + } } private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { @@ -259,18 +276,22 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo if (stateInfo.getState() == TRANSIENT_FAILURE || stateInfo.getState() == IDLE) { helper.refreshNameResolution(); } - Ref subchannelStateRef = getSubchannelStateInfoRef(subchannel); + updateConnectivityState(subchannel, stateInfo); + updateBalancingState(); + } + private void updateConnectivityState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { + Ref subchannelStateRef = getSubchannelStateInfoRef(subchannel); + ConnectivityState previousConnectivityState = subchannelStateRef.value.getState(); // Don't proactively reconnect if the subchannel enters IDLE, even if previously was connected. // If the subchannel was previously in TRANSIENT_FAILURE, it is considered to stay in // TRANSIENT_FAILURE until it becomes READY. - if (subchannelStateRef.value.getState() == TRANSIENT_FAILURE) { + if (previousConnectivityState == TRANSIENT_FAILURE) { if (stateInfo.getState() == CONNECTING || stateInfo.getState() == IDLE) { return; } } subchannelStateRef.value = stateInfo; - updateBalancingState(); } private static void shutdownSubchannel(Subchannel subchannel) { diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java index 5a9bb7ff4a8..c28b97fe08d 100644 --- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java @@ -25,6 +25,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -56,8 +57,10 @@ import io.grpc.xds.RingHashLoadBalancer.RingHashConfig; import java.lang.Thread.UncaughtExceptionHandler; import java.net.SocketAddress; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; +import java.util.Deque; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -95,6 +98,7 @@ public void uncaughtException(Thread t, Throwable e) { private final Map, Subchannel> subchannels = new HashMap<>(); private final Map subchannelStateListeners = new HashMap<>(); + private final Deque connectionRequestedQueue = new ArrayDeque<>(); private final XxHash64 hashFunc = XxHash64.INSTANCE; @Mock private Helper helper; @@ -123,6 +127,13 @@ public Void answer(InvocationOnMock invocation) throws Throwable { return null; } }).when(subchannel).start(any(SubchannelStateListener.class)); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + connectionRequestedQueue.offer(subchannel); + return null; + } + }).when(subchannel).requestConnection(); return subchannel; } }); @@ -138,6 +149,7 @@ public void tearDown() { for (Subchannel subchannel : subchannels.values()) { verify(subchannel).shutdown(); } + connectionRequestedQueue.clear(); } @Test @@ -216,18 +228,21 @@ public void aggregateSubchannelStates_connectingReadyIdleFailure() { subchannels.get(Collections.singletonList(servers.get(0))), ConnectivityStateInfo.forNonError(CONNECTING)); inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + verifyConnection(0); // two in CONNECTING deliverSubchannelState( subchannels.get(Collections.singletonList(servers.get(1))), ConnectivityStateInfo.forNonError(CONNECTING)); inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + verifyConnection(0); // one in CONNECTING, one in READY deliverSubchannelState( subchannels.get(Collections.singletonList(servers.get(1))), ConnectivityStateInfo.forNonError(READY)); inOrder.verify(helper).updateBalancingState(eq(READY), any(SubchannelPicker.class)); + verifyConnection(0); // one in TRANSIENT_FAILURE, one in READY deliverSubchannelState( @@ -236,17 +251,28 @@ public void aggregateSubchannelStates_connectingReadyIdleFailure() { Status.UNKNOWN.withDescription("unknown failure"))); inOrder.verify(helper).refreshNameResolution(); inOrder.verify(helper).updateBalancingState(eq(READY), any(SubchannelPicker.class)); + verifyConnection(0); // one in TRANSIENT_FAILURE, one in IDLE deliverSubchannelState( subchannels.get(Collections.singletonList(servers.get(1))), ConnectivityStateInfo.forNonError(IDLE)); inOrder.verify(helper).refreshNameResolution(); - inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + verifyConnection(1); verifyNoMoreInteractions(helper); } + private void verifyConnection(int times) { + for (int i = 0; i < times; i++) { + Subchannel connectOnce = connectionRequestedQueue.poll(); + assertThat(connectOnce).isNotNull(); + clearInvocations(connectOnce); + } + assertThat(connectionRequestedQueue.poll()).isNull(); + } + @Test public void aggregateSubchannelStates_twoOrMoreSubchannelsInTransientFailure() { RingHashConfig config = new RingHashConfig(10, 100); @@ -264,7 +290,8 @@ public void aggregateSubchannelStates_twoOrMoreSubchannelsInTransientFailure() { ConnectivityStateInfo.forTransientFailure( Status.UNAVAILABLE.withDescription("not found"))); inOrder.verify(helper).refreshNameResolution(); - inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + verifyConnection(1); // two in TRANSIENT_FAILURE, two in IDLE deliverSubchannelState( @@ -274,6 +301,7 @@ public void aggregateSubchannelStates_twoOrMoreSubchannelsInTransientFailure() { inOrder.verify(helper).refreshNameResolution(); inOrder.verify(helper) .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + verifyConnection(1); // two in TRANSIENT_FAILURE, one in CONNECTING, one in IDLE // The overall state is dominated by the two in TRANSIENT_FAILURE. @@ -282,6 +310,7 @@ public void aggregateSubchannelStates_twoOrMoreSubchannelsInTransientFailure() { ConnectivityStateInfo.forNonError(CONNECTING)); inOrder.verify(helper) .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + verifyConnection(1); // three in TRANSIENT_FAILURE, one in CONNECTING deliverSubchannelState( @@ -291,12 +320,14 @@ public void aggregateSubchannelStates_twoOrMoreSubchannelsInTransientFailure() { inOrder.verify(helper).refreshNameResolution(); inOrder.verify(helper) .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + verifyConnection(1); // three in TRANSIENT_FAILURE, one in READY deliverSubchannelState( subchannels.get(Collections.singletonList(servers.get(2))), ConnectivityStateInfo.forNonError(READY)); inOrder.verify(helper).updateBalancingState(eq(READY), any(SubchannelPicker.class)); + verifyConnection(0); verifyNoMoreInteractions(helper); } @@ -320,15 +351,20 @@ public void subchannelStayInTransientFailureUntilBecomeReady() { verify(helper, times(3)).refreshNameResolution(); // Stays in IDLE when until there are two or more subchannels in TRANSIENT_FAILURE. - verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); verify(helper, times(2)) .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + verifyConnection(3); verifyNoMoreInteractions(helper); + reset(helper); // Simulate underlying subchannel auto reconnect after backoff. for (Subchannel subchannel : subchannels.values()) { deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING)); } + verify(helper, times(3)) + .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + verifyConnection(3); verifyNoMoreInteractions(helper); // Simulate one subchannel enters READY. @@ -337,6 +373,51 @@ public void subchannelStayInTransientFailureUntilBecomeReady() { verify(helper).updateBalancingState(eq(READY), any(SubchannelPicker.class)); } + @Test + public void updateConnectionIterator() { + RingHashConfig config = new RingHashConfig(10, 100); + List servers = createWeightedServerAddrs(1, 1, 1); + InOrder inOrder = Mockito.inOrder(helper); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); + verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + + deliverSubchannelState( + subchannels.get(Collections.singletonList(servers.get(0))), + ConnectivityStateInfo.forTransientFailure( + Status.UNAVAILABLE.withDescription("connection lost"))); + inOrder.verify(helper).refreshNameResolution(); + inOrder.verify(helper) + .updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + verifyConnection(1); + + servers = createWeightedServerAddrs(1,1); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + inOrder.verify(helper) + .updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + verifyConnection(1); + + deliverSubchannelState( + subchannels.get(Collections.singletonList(servers.get(1))), + ConnectivityStateInfo.forTransientFailure( + Status.UNAVAILABLE.withDescription("connection lost"))); + inOrder.verify(helper).refreshNameResolution(); + inOrder.verify(helper) + .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + verifyConnection(1); + + deliverSubchannelState( + subchannels.get(Collections.singletonList(servers.get(0))), + ConnectivityStateInfo.forNonError(CONNECTING)); + inOrder.verify(helper) + .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + verifyConnection(1); + } + @Test public void ignoreShutdownSubchannelStateChange() { RingHashConfig config = new RingHashConfig(10, 100); @@ -466,7 +547,12 @@ public void skipFailingHosts_pickNextNonFailingHostInFirstTwoHosts() { subchannels.get(Collections.singletonList(servers.get(0))), ConnectivityStateInfo.forTransientFailure( Status.UNAVAILABLE.withDescription("unreachable"))); - verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); + verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + verify(subchannels.get(Collections.singletonList(servers.get(2)))) + .requestConnection(); + Subchannel attempted = connectionRequestedQueue.poll(); + assertThat(attempted).isNotNull(); + clearInvocations(attempted); PickResult result = pickerCaptor.getValue().pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); @@ -476,6 +562,7 @@ public void skipFailingHosts_pickNextNonFailingHostInFirstTwoHosts() { verify(subchannels.get(Collections.singletonList(servers.get(1))), never()) .requestConnection(); // no excessive connection + reset(helper); deliverSubchannelState( subchannels.get(Collections.singletonList(servers.get(2))), ConnectivityStateInfo.forNonError(CONNECTING)); From c319150d316642f40b258d6ef779e7d599db37f3 Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Wed, 13 Apr 2022 15:35:49 -0700 Subject: [PATCH 2/6] ring hash --- .../io/grpc/xds/RingHashLoadBalancer.java | 103 ++++++++++++------ .../io/grpc/xds/RingHashLoadBalancerTest.java | 95 +++++++++++++++- 2 files changed, 163 insertions(+), 35 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java index a8f517a8967..e4331d2e6d1 100644 --- a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java @@ -40,6 +40,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -67,6 +68,9 @@ final class RingHashLoadBalancer extends LoadBalancer { private List ring; private ConnectivityState currentState; + // If we need to proactively start connecting, simply iterate through all the subchannels. + // Alternatively, we can do it more fairly and effectively. + private Iterator connectionAttemptIterator = subchannels.values().iterator(); RingHashLoadBalancer(Helper helper) { this.helper = checkNotNull(helper, "helper"); @@ -142,6 +146,7 @@ public void onSubchannelState(ConnectivityStateInfo newState) { for (EquivalentAddressGroup addr : removedAddrs) { removedSubchannels.add(subchannels.remove(addr)); } + connectionAttemptIterator = subchannels.values().iterator(); // Update the picker before shutting down the subchannels, to reduce the chance of race // between picking a subchannel and shutting it down. @@ -203,53 +208,85 @@ public void shutdown() { * TRANSIENT_FAILURE *
  • If there is at least one subchannel in CONNECTING state, overall state is * CONNECTING
  • + *
  • If there is one subchannel in TRANSIENT_FAILURE state and there is + * more than one subchannel, report CONNECTING
  • *
  • If there is at least one subchannel in IDLE state, overall state is IDLE
  • *
  • Otherwise, overall state is TRANSIENT_FAILURE
  • * */ private void updateBalancingState() { checkState(!subchannels.isEmpty(), "no subchannel has been created"); - int failureCount = 0; - boolean hasConnecting = false; - Subchannel idleSubchannel = null; ConnectivityState overallState = null; + boolean start_connection_attempt = false; + int num_idle_ = 0; + int num_ready_ = 0; + int num_connecting_ = 0; + int num_transient_failure_ = 0; for (Subchannel subchannel : subchannels.values()) { ConnectivityState state = getSubchannelStateInfoRef(subchannel).value.getState(); if (state == READY) { - overallState = READY; + num_ready_++; break; - } - if (state == TRANSIENT_FAILURE) { - failureCount++; - } else if (state == CONNECTING) { - hasConnecting = true; + } else if (state == TRANSIENT_FAILURE) { + num_transient_failure_++; + } else if (state == CONNECTING ) { + num_connecting_++; } else if (state == IDLE) { - if (idleSubchannel == null) { - idleSubchannel = subchannel; - } + num_idle_++; } } - if (overallState == null) { - if (failureCount >= 2) { - // This load balancer may not get any pick requests from the upstream if it's reporting - // TRANSIENT_FAILURE. It needs to recover by itself by attempting to connect to at least - // one subchannel that has not failed at any given time. - if (!hasConnecting && idleSubchannel != null) { - idleSubchannel.requestConnection(); - } - overallState = TRANSIENT_FAILURE; - } else if (hasConnecting) { - overallState = CONNECTING; - } else if (idleSubchannel != null) { - overallState = IDLE; - } else { - overallState = TRANSIENT_FAILURE; - } + if (num_ready_ > 0) { + overallState = READY; + } else if (num_transient_failure_ >= 2) { + overallState = TRANSIENT_FAILURE; + start_connection_attempt = true; + } else if (num_connecting_ > 0) { + overallState = CONNECTING; + } else if (num_transient_failure_ == 1 && subchannels.size() > 1) { + overallState = CONNECTING; + start_connection_attempt = true; + } else if (num_idle_ > 0) { + overallState = IDLE; + } else { + overallState = TRANSIENT_FAILURE; + start_connection_attempt = true; } RingHashPicker picker = new RingHashPicker(syncContext, ring, subchannels); // TODO(chengyuanzhang): avoid unnecessary reprocess caused by duplicated server addr updates helper.updateBalancingState(overallState, picker); currentState = overallState; + // While the ring_hash policy is reporting TRANSIENT_FAILURE, it will + // not be getting any pick requests from the priority policy. + // However, because the ring_hash policy does not attempt to + // reconnect to subchannels unless it is getting pick requests, + // it will need special handling to ensure that it will eventually + // recover from TRANSIENT_FAILURE state once the problem is resolved. + // Specifically, it will make sure that it is attempting to connect to + // at least one subchannel at any given time. After a given subchannel + // fails a connection attempt, it will move on to the next subchannel + // in the ring. It will keep doing this until one of the subchannels + // successfully connects, at which point it will report READY and stop + // proactively trying to connect. The policy will remain in + // TRANSIENT_FAILURE until at least one subchannel becomes connected, + // even if subchannels are in state CONNECTING during that time. + // + // Note that we do the same thing when the policy is in state + // CONNECTING, just to ensure that we don't remain in CONNECTING state + // indefinitely if there are no new picks coming in. + if (start_connection_attempt) { + if (!connectionAttemptIterator.hasNext()) { + connectionAttemptIterator = subchannels.values().iterator(); + } + if (connectionAttemptIterator.hasNext()) { + final Subchannel finalSubchannel = connectionAttemptIterator.next(); + syncContext.execute(new Runnable() { + @Override + public void run() { + finalSubchannel.requestConnection(); + } + }); + } + } } private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { @@ -259,18 +296,22 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo if (stateInfo.getState() == TRANSIENT_FAILURE || stateInfo.getState() == IDLE) { helper.refreshNameResolution(); } - Ref subchannelStateRef = getSubchannelStateInfoRef(subchannel); + updateConnectivityState(subchannel, stateInfo); + updateBalancingState(); + } + private void updateConnectivityState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { + Ref subchannelStateRef = getSubchannelStateInfoRef(subchannel); + ConnectivityState previousConnectivityState = subchannelStateRef.value.getState(); // Don't proactively reconnect if the subchannel enters IDLE, even if previously was connected. // If the subchannel was previously in TRANSIENT_FAILURE, it is considered to stay in // TRANSIENT_FAILURE until it becomes READY. - if (subchannelStateRef.value.getState() == TRANSIENT_FAILURE) { + if (previousConnectivityState == TRANSIENT_FAILURE) { if (stateInfo.getState() == CONNECTING || stateInfo.getState() == IDLE) { return; } } subchannelStateRef.value = stateInfo; - updateBalancingState(); } private static void shutdownSubchannel(Subchannel subchannel) { diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java index 5a9bb7ff4a8..c28b97fe08d 100644 --- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java @@ -25,6 +25,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -56,8 +57,10 @@ import io.grpc.xds.RingHashLoadBalancer.RingHashConfig; import java.lang.Thread.UncaughtExceptionHandler; import java.net.SocketAddress; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; +import java.util.Deque; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -95,6 +98,7 @@ public void uncaughtException(Thread t, Throwable e) { private final Map, Subchannel> subchannels = new HashMap<>(); private final Map subchannelStateListeners = new HashMap<>(); + private final Deque connectionRequestedQueue = new ArrayDeque<>(); private final XxHash64 hashFunc = XxHash64.INSTANCE; @Mock private Helper helper; @@ -123,6 +127,13 @@ public Void answer(InvocationOnMock invocation) throws Throwable { return null; } }).when(subchannel).start(any(SubchannelStateListener.class)); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + connectionRequestedQueue.offer(subchannel); + return null; + } + }).when(subchannel).requestConnection(); return subchannel; } }); @@ -138,6 +149,7 @@ public void tearDown() { for (Subchannel subchannel : subchannels.values()) { verify(subchannel).shutdown(); } + connectionRequestedQueue.clear(); } @Test @@ -216,18 +228,21 @@ public void aggregateSubchannelStates_connectingReadyIdleFailure() { subchannels.get(Collections.singletonList(servers.get(0))), ConnectivityStateInfo.forNonError(CONNECTING)); inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + verifyConnection(0); // two in CONNECTING deliverSubchannelState( subchannels.get(Collections.singletonList(servers.get(1))), ConnectivityStateInfo.forNonError(CONNECTING)); inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + verifyConnection(0); // one in CONNECTING, one in READY deliverSubchannelState( subchannels.get(Collections.singletonList(servers.get(1))), ConnectivityStateInfo.forNonError(READY)); inOrder.verify(helper).updateBalancingState(eq(READY), any(SubchannelPicker.class)); + verifyConnection(0); // one in TRANSIENT_FAILURE, one in READY deliverSubchannelState( @@ -236,17 +251,28 @@ public void aggregateSubchannelStates_connectingReadyIdleFailure() { Status.UNKNOWN.withDescription("unknown failure"))); inOrder.verify(helper).refreshNameResolution(); inOrder.verify(helper).updateBalancingState(eq(READY), any(SubchannelPicker.class)); + verifyConnection(0); // one in TRANSIENT_FAILURE, one in IDLE deliverSubchannelState( subchannels.get(Collections.singletonList(servers.get(1))), ConnectivityStateInfo.forNonError(IDLE)); inOrder.verify(helper).refreshNameResolution(); - inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + verifyConnection(1); verifyNoMoreInteractions(helper); } + private void verifyConnection(int times) { + for (int i = 0; i < times; i++) { + Subchannel connectOnce = connectionRequestedQueue.poll(); + assertThat(connectOnce).isNotNull(); + clearInvocations(connectOnce); + } + assertThat(connectionRequestedQueue.poll()).isNull(); + } + @Test public void aggregateSubchannelStates_twoOrMoreSubchannelsInTransientFailure() { RingHashConfig config = new RingHashConfig(10, 100); @@ -264,7 +290,8 @@ public void aggregateSubchannelStates_twoOrMoreSubchannelsInTransientFailure() { ConnectivityStateInfo.forTransientFailure( Status.UNAVAILABLE.withDescription("not found"))); inOrder.verify(helper).refreshNameResolution(); - inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + verifyConnection(1); // two in TRANSIENT_FAILURE, two in IDLE deliverSubchannelState( @@ -274,6 +301,7 @@ public void aggregateSubchannelStates_twoOrMoreSubchannelsInTransientFailure() { inOrder.verify(helper).refreshNameResolution(); inOrder.verify(helper) .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + verifyConnection(1); // two in TRANSIENT_FAILURE, one in CONNECTING, one in IDLE // The overall state is dominated by the two in TRANSIENT_FAILURE. @@ -282,6 +310,7 @@ public void aggregateSubchannelStates_twoOrMoreSubchannelsInTransientFailure() { ConnectivityStateInfo.forNonError(CONNECTING)); inOrder.verify(helper) .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + verifyConnection(1); // three in TRANSIENT_FAILURE, one in CONNECTING deliverSubchannelState( @@ -291,12 +320,14 @@ public void aggregateSubchannelStates_twoOrMoreSubchannelsInTransientFailure() { inOrder.verify(helper).refreshNameResolution(); inOrder.verify(helper) .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + verifyConnection(1); // three in TRANSIENT_FAILURE, one in READY deliverSubchannelState( subchannels.get(Collections.singletonList(servers.get(2))), ConnectivityStateInfo.forNonError(READY)); inOrder.verify(helper).updateBalancingState(eq(READY), any(SubchannelPicker.class)); + verifyConnection(0); verifyNoMoreInteractions(helper); } @@ -320,15 +351,20 @@ public void subchannelStayInTransientFailureUntilBecomeReady() { verify(helper, times(3)).refreshNameResolution(); // Stays in IDLE when until there are two or more subchannels in TRANSIENT_FAILURE. - verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); verify(helper, times(2)) .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + verifyConnection(3); verifyNoMoreInteractions(helper); + reset(helper); // Simulate underlying subchannel auto reconnect after backoff. for (Subchannel subchannel : subchannels.values()) { deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING)); } + verify(helper, times(3)) + .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + verifyConnection(3); verifyNoMoreInteractions(helper); // Simulate one subchannel enters READY. @@ -337,6 +373,51 @@ public void subchannelStayInTransientFailureUntilBecomeReady() { verify(helper).updateBalancingState(eq(READY), any(SubchannelPicker.class)); } + @Test + public void updateConnectionIterator() { + RingHashConfig config = new RingHashConfig(10, 100); + List servers = createWeightedServerAddrs(1, 1, 1); + InOrder inOrder = Mockito.inOrder(helper); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); + verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + + deliverSubchannelState( + subchannels.get(Collections.singletonList(servers.get(0))), + ConnectivityStateInfo.forTransientFailure( + Status.UNAVAILABLE.withDescription("connection lost"))); + inOrder.verify(helper).refreshNameResolution(); + inOrder.verify(helper) + .updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + verifyConnection(1); + + servers = createWeightedServerAddrs(1,1); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + inOrder.verify(helper) + .updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + verifyConnection(1); + + deliverSubchannelState( + subchannels.get(Collections.singletonList(servers.get(1))), + ConnectivityStateInfo.forTransientFailure( + Status.UNAVAILABLE.withDescription("connection lost"))); + inOrder.verify(helper).refreshNameResolution(); + inOrder.verify(helper) + .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + verifyConnection(1); + + deliverSubchannelState( + subchannels.get(Collections.singletonList(servers.get(0))), + ConnectivityStateInfo.forNonError(CONNECTING)); + inOrder.verify(helper) + .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + verifyConnection(1); + } + @Test public void ignoreShutdownSubchannelStateChange() { RingHashConfig config = new RingHashConfig(10, 100); @@ -466,7 +547,12 @@ public void skipFailingHosts_pickNextNonFailingHostInFirstTwoHosts() { subchannels.get(Collections.singletonList(servers.get(0))), ConnectivityStateInfo.forTransientFailure( Status.UNAVAILABLE.withDescription("unreachable"))); - verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); + verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + verify(subchannels.get(Collections.singletonList(servers.get(2)))) + .requestConnection(); + Subchannel attempted = connectionRequestedQueue.poll(); + assertThat(attempted).isNotNull(); + clearInvocations(attempted); PickResult result = pickerCaptor.getValue().pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); @@ -476,6 +562,7 @@ public void skipFailingHosts_pickNextNonFailingHostInFirstTwoHosts() { verify(subchannels.get(Collections.singletonList(servers.get(1))), never()) .requestConnection(); // no excessive connection + reset(helper); deliverSubchannelState( subchannels.get(Collections.singletonList(servers.get(2))), ConnectivityStateInfo.forNonError(CONNECTING)); From 934c93693b20b739983ccb92f316381e53e7b08e Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Thu, 14 Apr 2022 10:06:32 -0700 Subject: [PATCH 3/6] add comment --- .../io/grpc/xds/RingHashLoadBalancer.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java index f2f07375b53..e4331d2e6d1 100644 --- a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java @@ -68,6 +68,8 @@ final class RingHashLoadBalancer extends LoadBalancer { private List ring; private ConnectivityState currentState; + // If we need to proactively start connecting, simply iterate through all the subchannels. + // Alternatively, we can do it more fairly and effectively. private Iterator connectionAttemptIterator = subchannels.values().iterator(); RingHashLoadBalancer(Helper helper) { @@ -253,6 +255,24 @@ private void updateBalancingState() { // TODO(chengyuanzhang): avoid unnecessary reprocess caused by duplicated server addr updates helper.updateBalancingState(overallState, picker); currentState = overallState; + // While the ring_hash policy is reporting TRANSIENT_FAILURE, it will + // not be getting any pick requests from the priority policy. + // However, because the ring_hash policy does not attempt to + // reconnect to subchannels unless it is getting pick requests, + // it will need special handling to ensure that it will eventually + // recover from TRANSIENT_FAILURE state once the problem is resolved. + // Specifically, it will make sure that it is attempting to connect to + // at least one subchannel at any given time. After a given subchannel + // fails a connection attempt, it will move on to the next subchannel + // in the ring. It will keep doing this until one of the subchannels + // successfully connects, at which point it will report READY and stop + // proactively trying to connect. The policy will remain in + // TRANSIENT_FAILURE until at least one subchannel becomes connected, + // even if subchannels are in state CONNECTING during that time. + // + // Note that we do the same thing when the policy is in state + // CONNECTING, just to ensure that we don't remain in CONNECTING state + // indefinitely if there are no new picks coming in. if (start_connection_attempt) { if (!connectionAttemptIterator.hasNext()) { connectionAttemptIterator = subchannels.values().iterator(); From 6e185f77520c7f8aa2dfe6692dab589161dc6b62 Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Fri, 15 Apr 2022 13:48:18 -0700 Subject: [PATCH 4/6] add sticky tf test --- .../io/grpc/xds/RingHashLoadBalancerTest.java | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java index c28b97fe08d..46000350abc 100644 --- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java @@ -678,6 +678,43 @@ public void allSubchannelsInTransientFailure() { .isEqualTo("[FakeSocketAddress-server0] unreachable"); } + @Test + public void stickyTransientFailure() { + // Map each server address to exactly one ring entry. + RingHashConfig config = new RingHashConfig(3, 3); + List servers = createWeightedServerAddrs(1, 1, 1); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); + verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + + // Bring one subchannel to TRANSIENT_FAILURE. + Subchannel firstSubchannel = subchannels.get(Collections.singletonList(servers.get(0))); + deliverSubchannelState(firstSubchannel, + ConnectivityStateInfo.forTransientFailure( + Status.UNAVAILABLE.withDescription( + firstSubchannel.getAddresses().getAddresses() + " unreachable"))); + + verify(helper).updateBalancingState(eq(CONNECTING), any()); + verifyConnection(1); + deliverSubchannelState(firstSubchannel, ConnectivityStateInfo.forNonError(IDLE)); + verify(helper, times(2)).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + verifyConnection(1); + + // Picking subchannel triggers connection. RPC hash hits server0. + PickSubchannelArgs args = new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod(), new Metadata(), + CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid())); + PickResult result = pickerCaptor.getValue().pickSubchannel(args); + assertThat(result.getStatus().isOk()).isTrue(); + // enabled me. there is a bug in picker behavior + // verify(subchannels.get(Collections.singletonList(servers.get(0)))).requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(2)))).requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(1))), never()) + .requestConnection(); + } + @Test public void hostSelectionProportionalToWeights() { RingHashConfig config = new RingHashConfig(10000, 100000); // large ring From 4c2dccf6e6550d984506f79d41d1468dee678c46 Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Fri, 15 Apr 2022 14:21:25 -0700 Subject: [PATCH 5/6] add random --- .../java/io/grpc/xds/RingHashLoadBalancer.java | 16 ++++++++++++++-- .../io/grpc/xds/RingHashLoadBalancerTest.java | 16 +++++++--------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java index e4331d2e6d1..12b0225bbf4 100644 --- a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java @@ -43,6 +43,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; /** @@ -68,9 +69,8 @@ final class RingHashLoadBalancer extends LoadBalancer { private List ring; private ConnectivityState currentState; - // If we need to proactively start connecting, simply iterate through all the subchannels. - // Alternatively, we can do it more fairly and effectively. private Iterator connectionAttemptIterator = subchannels.values().iterator(); + private final Random random = new Random(); RingHashLoadBalancer(Helper helper) { this.helper = checkNotNull(helper, "helper"); @@ -146,7 +146,19 @@ public void onSubchannelState(ConnectivityStateInfo newState) { for (EquivalentAddressGroup addr : removedAddrs) { removedSubchannels.add(subchannels.remove(addr)); } + // If we need to proactively start connecting, iterate through all the subchannels, starting + // at a random position. + // Alternatively, we should better start at the same position. connectionAttemptIterator = subchannels.values().iterator(); + int randomAdvance = random.nextInt(subchannels.size()); + while (randomAdvance-- > 0) { + if (!connectionAttemptIterator.hasNext()) { + connectionAttemptIterator = subchannels.values().iterator(); + connectionAttemptIterator.next(); + } else { + connectionAttemptIterator.next(); + } + } // Update the picker before shutting down the subchannels, to reduce the chance of race // between picking a subchannel and shutting it down. diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java index 46000350abc..9edbe02f098 100644 --- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java @@ -548,11 +548,7 @@ public void skipFailingHosts_pickNextNonFailingHostInFirstTwoHosts() { ConnectivityStateInfo.forTransientFailure( Status.UNAVAILABLE.withDescription("unreachable"))); verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - verify(subchannels.get(Collections.singletonList(servers.get(2)))) - .requestConnection(); - Subchannel attempted = connectionRequestedQueue.poll(); - assertThat(attempted).isNotNull(); - clearInvocations(attempted); + verifyConnection(1); PickResult result = pickerCaptor.getValue().pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); @@ -613,16 +609,18 @@ public void skipFailingHosts_firstTwoHostsFailed_pickNextFirstReady() { ConnectivityStateInfo.forTransientFailure( Status.PERMISSION_DENIED.withDescription("permission denied"))); verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - verify(subchannels.get(Collections.singletonList(servers.get(1)))) - .requestConnection(); // LB attempts to recover by itself + verifyConnection(2); // LB attempts to recover by itself PickResult result = pickerCaptor.getValue().pickSubchannel(args); assertThat(result.getStatus().isOk()).isFalse(); // fail the RPC assertThat(result.getStatus().getCode()) .isEqualTo(Code.UNAVAILABLE); // with error status for the original server hit by hash assertThat(result.getStatus().getDescription()).isEqualTo("unreachable"); - verify(subchannels.get(Collections.singletonList(servers.get(1))), times(2)) - .requestConnection(); // kickoff connection to server3 (next first non-failing) + verify(subchannels.get(Collections.singletonList(servers.get(1)))) + .requestConnection(); // kickoff connection to server3 (next first non-failing) + // TODO: zivy@ + //verify(subchannels.get(Collections.singletonList(servers.get(0)))).requestConnection(); + //verify(subchannels.get(Collections.singletonList(servers.get(2)))).requestConnection(); // Now connecting to server1. deliverSubchannelState( From 4b4eb0752a3007367349250d89a3a022a57b73ec Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Fri, 15 Apr 2022 16:50:42 -0700 Subject: [PATCH 6/6] fix --- .../io/grpc/xds/RingHashLoadBalancer.java | 19 +++---------------- 1 file changed, 3 insertions(+), 16 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java index 12b0225bbf4..f4f8ee2e6ee 100644 --- a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java @@ -152,12 +152,7 @@ public void onSubchannelState(ConnectivityStateInfo newState) { connectionAttemptIterator = subchannels.values().iterator(); int randomAdvance = random.nextInt(subchannels.size()); while (randomAdvance-- > 0) { - if (!connectionAttemptIterator.hasNext()) { - connectionAttemptIterator = subchannels.values().iterator(); - connectionAttemptIterator.next(); - } else { - connectionAttemptIterator.next(); - } + connectionAttemptIterator.next(); } // Update the picker before shutting down the subchannels, to reduce the chance of race @@ -228,7 +223,6 @@ public void shutdown() { */ private void updateBalancingState() { checkState(!subchannels.isEmpty(), "no subchannel has been created"); - ConnectivityState overallState = null; boolean start_connection_attempt = false; int num_idle_ = 0; int num_ready_ = 0; @@ -247,6 +241,7 @@ private void updateBalancingState() { num_idle_++; } } + ConnectivityState overallState; if (num_ready_ > 0) { overallState = READY; } else if (num_transient_failure_ >= 2) { @@ -289,15 +284,7 @@ private void updateBalancingState() { if (!connectionAttemptIterator.hasNext()) { connectionAttemptIterator = subchannels.values().iterator(); } - if (connectionAttemptIterator.hasNext()) { - final Subchannel finalSubchannel = connectionAttemptIterator.next(); - syncContext.execute(new Runnable() { - @Override - public void run() { - finalSubchannel.requestConnection(); - } - }); - } + connectionAttemptIterator.next().requestConnection(); } }