Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core, grpclb,xds: let leaf LB policies explicitly refresh name resolution when subchannel connection is broken #8048

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 16 additions & 0 deletions api/src/main/java/io/grpc/LoadBalancer.java
Expand Up @@ -1059,6 +1059,22 @@ public void refreshNameResolution() {
throw new UnsupportedOperationException();
}

/**
* Historically the channel automatically refreshes name resolution if any subchannel
* connection is broken. It's transitioning to let load balancers make the decision. To
* avoid silent breakages, the channel checks if {@link #refreshNameResolution} is called
* by the load balancer. If not, it will do it and log a warning. This will be removed in
* the future and load balancers are completely responsible for triggering the refresh.
* See <a href="https://github.com/grpc/grpc-java/issues/8088">#8088</a> for the background.
*
* @deprecated Do NOT use.
voidzcy marked this conversation as resolved.
Show resolved Hide resolved
*/
@Deprecated
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8088")
public void ignoreRefreshNameResolutionCheck() {
throw new UnsupportedOperationException();
voidzcy marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Returns a {@link SynchronizationContext} that runs tasks in the same Synchronization Context
* as that the callback methods on the {@link LoadBalancer} interface are run in.
Expand Down
20 changes: 19 additions & 1 deletion core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Expand Up @@ -1415,6 +1415,8 @@ void remove(RetriableStream<?> retriableStream) {

private final class LbHelperImpl extends LoadBalancer.Helper {
AutoConfiguredLoadBalancer lb;
boolean nsRefreshedByLb;
boolean ignoreRefreshNsCheck;

@Override
public AbstractSubchannel createSubchannel(CreateSubchannelArgs args) {
Expand Down Expand Up @@ -1453,6 +1455,7 @@ public void run() {
@Override
public void refreshNameResolution() {
syncContext.throwIfNotInThisSynchronizationContext();
nsRefreshedByLb = true;
final class LoadBalancerRefreshNameResolution implements Runnable {
@Override
public void run() {
Expand All @@ -1463,6 +1466,12 @@ public void run() {
syncContext.execute(new LoadBalancerRefreshNameResolution());
}

@Deprecated
@Override
public void ignoreRefreshNameResolutionCheck() {
ignoreRefreshNsCheck = true;
}

@Override
public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) {
return createOobChannel(Collections.singletonList(addressGroup), authority);
Expand Down Expand Up @@ -1926,9 +1935,18 @@ void onTerminated(InternalSubchannel is) {

@Override
void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
handleInternalSubchannelState(newState);
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
voidzcy marked this conversation as resolved.
Show resolved Hide resolved
checkState(listener != null, "listener is null");
listener.onSubchannelState(newState);
if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
if (!helper.ignoreRefreshNsCheck && !helper.nsRefreshedByLb) {
logger.log(Level.WARNING,
"LoadBalancer should call Helper.refreshNameResolution() to refresh name "
+ "resolution if subchannel connection is broken. This will no longer happen"
voidzcy marked this conversation as resolved.
Show resolved Hide resolved
+ " automatically in the future releases");
refreshAndResetNameResolution();
helper.nsRefreshedByLb = true;
}
}
}

@Override
Expand Down
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.SHUTDOWN;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;

Expand Down Expand Up @@ -84,6 +85,9 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo
if (currentState == SHUTDOWN) {
return;
}
if (stateInfo.getState() == TRANSIENT_FAILURE || stateInfo.getState() == IDLE) {
helper.refreshNameResolution();
}

SubchannelPicker picker;
switch (currentState) {
Expand Down
Expand Up @@ -94,6 +94,12 @@ public void refreshNameResolution() {
delegate().refreshNameResolution();
}

@Deprecated
@Override
public void ignoreRefreshNameResolutionCheck() {
delegate().ignoreRefreshNameResolutionCheck();
}

@Override
public String getAuthority() {
return delegate().getAuthority();
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java
Expand Up @@ -139,6 +139,9 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo
if (subchannels.get(stripAttrs(subchannel.getAddresses())) != subchannel) {
return;
}
if (stateInfo.getState() == TRANSIENT_FAILURE || stateInfo.getState() == IDLE) {
helper.refreshNameResolution();
}
if (stateInfo.getState() == IDLE) {
subchannel.requestConnection();
}
Expand Down
146 changes: 123 additions & 23 deletions core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
Expand Up @@ -139,6 +139,9 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import javax.annotation.Nullable;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -277,6 +280,7 @@ public String getPolicyName() {
private boolean requestConnection = true;
private BlockingQueue<MockClientTransportInfo> transports;
private boolean panicExpected;
private final List<LogRecord> logs = new ArrayList<>();
@Captor
private ArgumentCaptor<ResolvedAddresses> resolvedAddressCaptor;

Expand Down Expand Up @@ -328,6 +332,22 @@ public void setUp() throws Exception {
when(executorPool.getObject()).thenReturn(executor.getScheduledExecutorService());
when(balancerRpcExecutorPool.getObject())
.thenReturn(balancerRpcExecutor.getScheduledExecutorService());
Handler handler = new Handler() {
@Override
public void publish(LogRecord record) {
logs.add(record);
}

@Override
public void flush() {
}

@Override
public void close() throws SecurityException {
}
};
ManagedChannelImpl.logger.addHandler(handler);
ManagedChannelImpl.logger.setLevel(Level.ALL);

channelBuilder = new ManagedChannelImplBuilder(TARGET,
new UnsupportedClientTransportFactoryBuilder(), new FixedPortProvider(DEFAULT_PORT));
Expand Down Expand Up @@ -1539,6 +1559,103 @@ public void run() {
timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
}

@Test
public void subchannelConnectionBroken_noLbRefreshingResolver_logWarningAndTriggeRefresh() {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build();
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
FakeNameResolverFactory.FakeNameResolver resolver =
Iterables.getOnlyElement(nameResolverFactory.resolvers);
assertThat(resolver.refreshCalled).isEqualTo(0);

Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
InternalSubchannel internalSubchannel =
(InternalSubchannel) subchannel.getInternalSubchannel();
internalSubchannel.obtainActiveTransport();
MockClientTransportInfo transportInfo = transports.poll();

// Break subchannel connection
transportInfo.listener.transportShutdown(Status.UNAVAILABLE.withDescription("unreachable"));
LogRecord log = Iterables.getOnlyElement(logs);
assertThat(log.getLevel()).isEqualTo(Level.WARNING);
assertThat(log.getMessage()).isEqualTo(
"LoadBalancer should call Helper.refreshNameResolution() to refresh name resolution if "
+ "subchannel connection is broken. This will no longer happen automatically in the "
+ "future releases");
assertThat(resolver.refreshCalled).isEqualTo(1);
}

@Test
public void subchannelConnectionBroken_ResolverRefreshedByLb() {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build();
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
FakeNameResolverFactory.FakeNameResolver resolver =
Iterables.getOnlyElement(nameResolverFactory.resolvers);
assertThat(resolver.refreshCalled).isEqualTo(0);
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class);
verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture());
helper = helperCaptor.getValue();

SubchannelStateListener listener = new SubchannelStateListener() {
@Override
public void onSubchannelState(ConnectivityStateInfo newState) {
// Normal LoadBalancer should refresh name resolution when some subchannel enters
// TRANSIENT_FAILURE or IDLE
if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
helper.refreshNameResolution();
}
}
};
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, listener);
InternalSubchannel internalSubchannel =
(InternalSubchannel) subchannel.getInternalSubchannel();
internalSubchannel.obtainActiveTransport();
MockClientTransportInfo transportInfo = transports.poll();

// Break subchannel connection and simulate load balancer refreshing name resolution
transportInfo.listener.transportShutdown(Status.UNAVAILABLE.withDescription("unreachable"));
assertThat(logs).isEmpty();
assertThat(resolver.refreshCalled).isEqualTo(1);
}

@Test
public void subchannelConnectionBroken_ignoreRefreshNameResolutionCheck_noRefresh() {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build();
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
FakeNameResolverFactory.FakeNameResolver resolver =
Iterables.getOnlyElement(nameResolverFactory.resolvers);
assertThat(resolver.refreshCalled).isEqualTo(0);
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class);
verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture());
helper = helperCaptor.getValue();
helper.ignoreRefreshNameResolutionCheck();

Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
InternalSubchannel internalSubchannel =
(InternalSubchannel) subchannel.getInternalSubchannel();
internalSubchannel.obtainActiveTransport();
MockClientTransportInfo transportInfo = transports.poll();

// Break subchannel connection
transportInfo.listener.transportShutdown(Status.UNAVAILABLE.withDescription("unreachable"));
assertThat(logs).isEmpty();
assertThat(resolver.refreshCalled).isEqualTo(0);
}

@Test
public void subchannelStringableBeforeStart() {
createChannel();
Expand Down Expand Up @@ -2095,43 +2212,26 @@ public void lbHelper_getNameResolverRegistry() {
.isSameInstanceAs(NameResolverRegistry.getDefaultRegistry());
}

@Test
public void refreshNameResolution_whenSubchannelConnectionFailed_notIdle() {
subtestNameResolutionRefreshWhenConnectionFailed(false, false);
}

@Test
public void refreshNameResolution_whenOobChannelConnectionFailed_notIdle() {
subtestNameResolutionRefreshWhenConnectionFailed(true, false);
}

@Test
public void notRefreshNameResolution_whenSubchannelConnectionFailed_idle() {
subtestNameResolutionRefreshWhenConnectionFailed(false, true);
subtestNameResolutionRefreshWhenConnectionFailed(false);
}

@Test
public void notRefreshNameResolution_whenOobChannelConnectionFailed_idle() {
subtestNameResolutionRefreshWhenConnectionFailed(true, true);
subtestNameResolutionRefreshWhenConnectionFailed(true);
}

private void subtestNameResolutionRefreshWhenConnectionFailed(
boolean isOobChannel, boolean isIdle) {
private void subtestNameResolutionRefreshWhenConnectionFailed(boolean isIdle) {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build();
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
if (isOobChannel) {
OobChannel oobChannel = (OobChannel) helper.createOobChannel(
Collections.singletonList(addressGroup), "oobAuthority");
oobChannel.getSubchannel().requestConnection();
} else {
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
requestConnectionSafely(helper, subchannel);
}
OobChannel oobChannel = (OobChannel) helper.createOobChannel(
Collections.singletonList(addressGroup), "oobAuthority");
oobChannel.getSubchannel().requestConnection();

MockClientTransportInfo transportInfo = transports.poll();
assertNotNull(transportInfo);
Expand Down
37 changes: 37 additions & 0 deletions core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java
Expand Up @@ -22,6 +22,8 @@
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
Expand Down Expand Up @@ -160,6 +162,38 @@ public void requestConnectionPicker() throws Exception {
verify(mockSubchannel, times(2)).requestConnection();
}

@Test
public void refreshNameResolutionAfterSubchannelConnectionBroken() {
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
verify(mockHelper).createSubchannel(any(CreateSubchannelArgs.class));

InOrder inOrder = inOrder(mockHelper, mockSubchannel);
inOrder.verify(mockSubchannel).start(stateListenerCaptor.capture());
SubchannelStateListener stateListener = stateListenerCaptor.getValue();
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
assertSame(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel());
inOrder.verify(mockSubchannel).requestConnection();

stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
assertNull(pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel());
Status error = Status.UNAUTHENTICATED.withDescription("permission denied");
stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error));
inOrder.verify(mockHelper).refreshNameResolution();
inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY));
inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture());
assertSame(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel());
// Simulate receiving go-away so the subchannel transit to IDLE.
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE));
inOrder.verify(mockHelper).refreshNameResolution();
inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));

verifyNoMoreInteractions(mockHelper, mockSubchannel);
}

@Test
public void pickAfterResolvedAndUnchanged() throws Exception {
loadBalancer.handleResolvedAddresses(
Expand Down Expand Up @@ -225,10 +259,12 @@ public void pickAfterStateChangeAfterResolution() throws Exception {

Status error = Status.UNAVAILABLE.withDescription("boom!");
stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error));
inOrder.verify(mockHelper).refreshNameResolution();
inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());

stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE));
inOrder.verify(mockHelper).refreshNameResolution();
inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
assertEquals(Status.OK, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());

Expand Down Expand Up @@ -294,6 +330,7 @@ public void nameResolutionErrorWithStateChanges() throws Exception {
SubchannelStateListener stateListener = stateListenerCaptor.getValue();

stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE));
inOrder.verify(mockHelper).refreshNameResolution();
inOrder.verify(mockHelper).updateBalancingState(
eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));

Expand Down