Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core, grpclb,xds: let leaf LB policies explicitly refresh name resolution when subchannel connection is broken #8048

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -1926,7 +1926,6 @@ void onTerminated(InternalSubchannel is) {

@Override
void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
handleInternalSubchannelState(newState);
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
voidzcy marked this conversation as resolved.
Show resolved Hide resolved
checkState(listener != null, "listener is null");
listener.onSubchannelState(newState);
}
Expand Down
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.SHUTDOWN;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;

Expand Down Expand Up @@ -84,6 +85,9 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo
if (currentState == SHUTDOWN) {
return;
}
if (stateInfo.getState() == TRANSIENT_FAILURE || stateInfo.getState() == IDLE) {
helper.refreshNameResolution();
}

SubchannelPicker picker;
switch (currentState) {
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java
Expand Up @@ -139,6 +139,9 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo
if (subchannels.get(stripAttrs(subchannel.getAddresses())) != subchannel) {
return;
}
if (stateInfo.getState() == TRANSIENT_FAILURE || stateInfo.getState() == IDLE) {
helper.refreshNameResolution();
}
if (stateInfo.getState() == IDLE) {
subchannel.requestConnection();
}
Expand Down
29 changes: 6 additions & 23 deletions core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
Expand Up @@ -2095,43 +2095,26 @@ public void lbHelper_getNameResolverRegistry() {
.isSameInstanceAs(NameResolverRegistry.getDefaultRegistry());
}

@Test
public void refreshNameResolution_whenSubchannelConnectionFailed_notIdle() {
subtestNameResolutionRefreshWhenConnectionFailed(false, false);
}

@Test
public void refreshNameResolution_whenOobChannelConnectionFailed_notIdle() {
subtestNameResolutionRefreshWhenConnectionFailed(true, false);
}

@Test
public void notRefreshNameResolution_whenSubchannelConnectionFailed_idle() {
subtestNameResolutionRefreshWhenConnectionFailed(false, true);
subtestNameResolutionRefreshWhenConnectionFailed(false);
}

@Test
public void notRefreshNameResolution_whenOobChannelConnectionFailed_idle() {
subtestNameResolutionRefreshWhenConnectionFailed(true, true);
subtestNameResolutionRefreshWhenConnectionFailed(true);
}

private void subtestNameResolutionRefreshWhenConnectionFailed(
boolean isOobChannel, boolean isIdle) {
private void subtestNameResolutionRefreshWhenConnectionFailed(boolean isIdle) {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build();
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
if (isOobChannel) {
OobChannel oobChannel = (OobChannel) helper.createOobChannel(
Collections.singletonList(addressGroup), "oobAuthority");
oobChannel.getSubchannel().requestConnection();
} else {
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
requestConnectionSafely(helper, subchannel);
}
OobChannel oobChannel = (OobChannel) helper.createOobChannel(
Collections.singletonList(addressGroup), "oobAuthority");
oobChannel.getSubchannel().requestConnection();

MockClientTransportInfo transportInfo = transports.poll();
assertNotNull(transportInfo);
Expand Down
37 changes: 37 additions & 0 deletions core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java
Expand Up @@ -22,6 +22,8 @@
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
Expand Down Expand Up @@ -160,6 +162,38 @@ public void requestConnectionPicker() throws Exception {
verify(mockSubchannel, times(2)).requestConnection();
}

@Test
public void refreshNameResolutionAfterSubchannelConnectionBroken() {
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
verify(mockHelper).createSubchannel(any(CreateSubchannelArgs.class));

InOrder inOrder = inOrder(mockHelper, mockSubchannel);
inOrder.verify(mockSubchannel).start(stateListenerCaptor.capture());
SubchannelStateListener stateListener = stateListenerCaptor.getValue();
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
assertSame(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel());
inOrder.verify(mockSubchannel).requestConnection();

stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
assertNull(pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel());
Status error = Status.UNAUTHENTICATED.withDescription("permission denied");
stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error));
inOrder.verify(mockHelper).refreshNameResolution();
inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY));
inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture());
assertSame(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel());
// Simulate receiving go-away so the subchannel transit to IDLE.
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE));
inOrder.verify(mockHelper).refreshNameResolution();
inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));

verifyNoMoreInteractions(mockHelper, mockSubchannel);
}

@Test
public void pickAfterResolvedAndUnchanged() throws Exception {
loadBalancer.handleResolvedAddresses(
Expand Down Expand Up @@ -225,10 +259,12 @@ public void pickAfterStateChangeAfterResolution() throws Exception {

Status error = Status.UNAVAILABLE.withDescription("boom!");
stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error));
inOrder.verify(mockHelper).refreshNameResolution();
inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());

stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE));
inOrder.verify(mockHelper).refreshNameResolution();
inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
assertEquals(Status.OK, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());

Expand Down Expand Up @@ -294,6 +330,7 @@ public void nameResolutionErrorWithStateChanges() throws Exception {
SubchannelStateListener stateListener = stateListenerCaptor.getValue();

stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE));
inOrder.verify(mockHelper).refreshNameResolution();
inOrder.verify(mockHelper).updateBalancingState(
eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));

Expand Down
35 changes: 32 additions & 3 deletions core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java
Expand Up @@ -277,11 +277,13 @@ public void pickAfterStateChange() throws Exception {
ConnectivityStateInfo.forTransientFailure(error));
assertThat(subchannelStateInfo.value.getState()).isEqualTo(TRANSIENT_FAILURE);
assertThat(subchannelStateInfo.value.getStatus()).isEqualTo(error);
inOrder.verify(mockHelper).refreshNameResolution();
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
assertThat(pickerCaptor.getValue()).isInstanceOf(EmptyPicker.class);

deliverSubchannelState(subchannel,
ConnectivityStateInfo.forNonError(IDLE));
inOrder.verify(mockHelper).refreshNameResolution();
assertThat(subchannelStateInfo.value.getState()).isEqualTo(TRANSIENT_FAILURE);
assertThat(subchannelStateInfo.value.getStatus()).isEqualTo(error);

Expand All @@ -305,9 +307,7 @@ public void stayTransientFailureUntilReady() {
deliverSubchannelState(
sc,
ConnectivityStateInfo.forTransientFailure(error));
deliverSubchannelState(
sc,
ConnectivityStateInfo.forNonError(IDLE));
inOrder.verify(mockHelper).refreshNameResolution();
deliverSubchannelState(
sc,
ConnectivityStateInfo.forNonError(CONNECTING));
Expand All @@ -330,6 +330,35 @@ public void stayTransientFailureUntilReady() {
verifyNoMoreInteractions(mockHelper);
}

@Test
public void refreshNameResolutionWhenSubchannelConnectionBroken() {
InOrder inOrder = inOrder(mockHelper);
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(Attributes.EMPTY)
.build());

verify(mockHelper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), isA(EmptyPicker.class));

// Simulate state transitions for each subchannel individually.
for (Subchannel sc : loadBalancer.getSubchannels()) {
verify(sc).requestConnection();
deliverSubchannelState(sc, ConnectivityStateInfo.forNonError(CONNECTING));
Status error = Status.UNKNOWN.withDescription("connection broken");
deliverSubchannelState(sc, ConnectivityStateInfo.forTransientFailure(error));
inOrder.verify(mockHelper).refreshNameResolution();
deliverSubchannelState(sc, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(mockHelper).updateBalancingState(eq(READY), isA(ReadyPicker.class));
// Simulate receiving go-away so READY subchannels transit to IDLE.
deliverSubchannelState(sc, ConnectivityStateInfo.forNonError(IDLE));
inOrder.verify(mockHelper).refreshNameResolution();
verify(sc, times(2)).requestConnection();
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), isA(EmptyPicker.class));
}

verifyNoMoreInteractions(mockHelper);
}

@Test
public void pickerRoundRobin() throws Exception {
Subchannel subchannel = mock(Subchannel.class);
Expand Down
4 changes: 4 additions & 0 deletions grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
Expand Up @@ -222,6 +222,10 @@ void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState
if (config.getMode() == Mode.ROUND_ROBIN && newState.getState() == IDLE) {
subchannel.requestConnection();
}
if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
helper.refreshNameResolution();
}

AtomicReference<ConnectivityStateInfo> stateInfoRef =
subchannel.getAttributes().get(STATE_INFO);
// If all RR servers are unhealthy, it's possible that at least one connection is CONNECTING at
Expand Down
Expand Up @@ -1003,6 +1003,7 @@ public void grpclbWorking() {
verify(subchannel1).requestConnection();
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(IDLE));
verify(subchannel1, times(2)).requestConnection();
inOrder.verify(helper).refreshNameResolution();
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
assertThat(logs).containsExactly(
"INFO: [grpclb-<api.google.com>] Update balancing state to READY: picks="
Expand All @@ -1022,6 +1023,7 @@ public void grpclbWorking() {
ConnectivityStateInfo errorState1 =
ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE.withDescription("error1"));
deliverSubchannelState(subchannel1, errorState1);
inOrder.verify(helper).refreshNameResolution();
inOrder.verifyNoMoreInteractions();

// If no subchannel is READY, some with error and the others are IDLE, will report CONNECTING
Expand Down Expand Up @@ -1183,14 +1185,17 @@ public void roundRobinMode_subchannelStayTransientFailureUntilReady() {
// Switch all subchannels to TRANSIENT_FAILURE, making the general state TRANSIENT_FAILURE too.
Status error = Status.UNAVAILABLE.withDescription("error");
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forTransientFailure(error));
inOrder.verify(helper).refreshNameResolution();
deliverSubchannelState(subchannel2, ConnectivityStateInfo.forTransientFailure(error));
inOrder.verify(helper).refreshNameResolution();
inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
assertThat(((RoundRobinPicker) pickerCaptor.getValue()).pickList)
.containsExactly(new ErrorEntry(error));

// Switch subchannel1 to IDLE, then to CONNECTING, which are ignored since the previous
// subchannel state is TRANSIENT_FAILURE. General state is unchanged.
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(IDLE));
inOrder.verify(helper).refreshNameResolution();
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING));
inOrder.verifyNoMoreInteractions();

Expand Down Expand Up @@ -1549,11 +1554,13 @@ private void subtestGrpclbFallbackConnectionLost(
lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
lbRequestObserver = lbRequestObservers.poll();
inOrder.verify(helper).refreshNameResolution();
}
if (allSubchannelsBroken) {
for (Subchannel subchannel : subchannels) {
// A READY subchannel transits to IDLE when receiving a go-away
deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE));
inOrder.verify(helper).refreshNameResolution();
}
}

Expand All @@ -1566,6 +1573,7 @@ private void subtestGrpclbFallbackConnectionLost(
// connections are lost
for (Subchannel subchannel : subchannels) {
deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE));
inOrder.verify(helper).refreshNameResolution();
}

// Exit fallback mode or cancel fallback timer when receiving a new server list from balancer
Expand Down
6 changes: 6 additions & 0 deletions rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java
Expand Up @@ -211,6 +211,7 @@ public void lb_working_withDefaultTarget() throws Exception {

// search subchannel is down, rescue subchannel is connecting
searchSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.NOT_FOUND));

inOrder.verify(helper)
.updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());

Expand Down Expand Up @@ -474,6 +475,11 @@ public void updateBalancingState(
// no-op
}

@Override
public void refreshNameResolution() {
// no-op
}

@Override
public String getAuthority() {
return "fake-bigtable.googleapis.com";
Expand Down
3 changes: 3 additions & 0 deletions xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java
Expand Up @@ -203,6 +203,9 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo
if (subchannels.get(stripAttrs(subchannel.getAddresses())) != subchannel) {
return;
}
if (stateInfo.getState() == TRANSIENT_FAILURE || stateInfo.getState() == IDLE) {
helper.refreshNameResolution();
}
Ref<ConnectivityStateInfo> subchannelStateRef = getSubchannelStateInfoRef(subchannel);

// Don't proactively reconnect if the subchannel enters IDLE, even if previously was connected.
Expand Down
6 changes: 6 additions & 0 deletions xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java
Expand Up @@ -233,12 +233,14 @@ public void aggregateSubchannelStates_connectingReadyIdleFailure() {
subchannels.get(Collections.singletonList(servers.get(0))),
ConnectivityStateInfo.forTransientFailure(
Status.UNKNOWN.withDescription("unknown failure")));
inOrder.verify(helper).refreshNameResolution();
inOrder.verify(helper).updateBalancingState(eq(READY), any(SubchannelPicker.class));

// one in TRANSIENT_FAILURE, one in IDLE
deliverSubchannelState(
subchannels.get(Collections.singletonList(servers.get(1))),
ConnectivityStateInfo.forNonError(IDLE));
inOrder.verify(helper).refreshNameResolution();
inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));

verifyNoMoreInteractions(helper);
Expand All @@ -260,13 +262,15 @@ public void aggregateSubchannelStates_twoOrMoreSubchannelsInTransientFailure() {
subchannels.get(Collections.singletonList(servers.get(0))),
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription("not found")));
inOrder.verify(helper).refreshNameResolution();
inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));

// two in TRANSIENT_FAILURE, two in IDLE
deliverSubchannelState(
subchannels.get(Collections.singletonList(servers.get(1))),
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription("also not found")));
inOrder.verify(helper).refreshNameResolution();
inOrder.verify(helper)
.updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));

Expand All @@ -283,6 +287,7 @@ public void aggregateSubchannelStates_twoOrMoreSubchannelsInTransientFailure() {
subchannels.get(Collections.singletonList(servers.get(3))),
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription("connection lost")));
inOrder.verify(helper).refreshNameResolution();
inOrder.verify(helper)
.updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));

Expand Down Expand Up @@ -311,6 +316,7 @@ public void subchannelStayInTransientFailureUntilBecomeReady() {
deliverSubchannelState(subchannel, ConnectivityStateInfo.forTransientFailure(
Status.UNAUTHENTICATED.withDescription("Permission denied")));
}
verify(helper, times(3)).refreshNameResolution();

// Stays in IDLE when until there are two or more subchannels in TRANSIENT_FAILURE.
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
Expand Down