Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core, grpclb,xds: let leaf LB policies explicitly refresh name resolution when subchannel connection is broken #8048

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 20 additions & 0 deletions api/src/main/java/io/grpc/LoadBalancer.java
Expand Up @@ -1059,6 +1059,26 @@ public void refreshNameResolution() {
throw new UnsupportedOperationException();
}

/**
* Historically the channel automatically refreshes name resolution if any subchannel
* connection is broken. It's transitioning to let load balancers make the decision. To
* avoid silent breakages, the channel checks if {@link #refreshNameResolution} is called
* by the load balancer. If not, it will do it and log a warning. This will be removed in
* the future and load balancers are completely responsible for triggering the refresh.
* See <a href="https://github.com/grpc/grpc-java/issues/8088">#8088</a> for the background.
*
* <p>This should rarely be used, but sometimes the address for the subchannel wasn't
* provided by the name resolver and a refresh needs to be directed somewhere else instead.
* Then you can call this method to disable the short-tem check for detecting LoadBalancers
* that need to be updated for the new expected behavior.
*
* @since 1.38.0
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8088")
public void ignoreRefreshNameResolutionCheck() {
// no-op
}

/**
* Returns a {@link SynchronizationContext} that runs tasks in the same Synchronization Context
* as that the callback methods on the {@link LoadBalancer} interface are run in.
Expand Down
22 changes: 21 additions & 1 deletion core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Expand Up @@ -117,6 +117,7 @@
@ThreadSafe
final class ManagedChannelImpl extends ManagedChannel implements
InternalInstrumented<ChannelStats> {
@VisibleForTesting
static final Logger logger = Logger.getLogger(ManagedChannelImpl.class.getName());

// Matching this pattern means the target string is a URI target or at least intended to be one.
Expand Down Expand Up @@ -1415,6 +1416,8 @@ void remove(RetriableStream<?> retriableStream) {

private final class LbHelperImpl extends LoadBalancer.Helper {
AutoConfiguredLoadBalancer lb;
boolean nsRefreshedByLb;
boolean ignoreRefreshNsCheck;

@Override
public AbstractSubchannel createSubchannel(CreateSubchannelArgs args) {
Expand Down Expand Up @@ -1453,6 +1456,7 @@ public void run() {
@Override
public void refreshNameResolution() {
syncContext.throwIfNotInThisSynchronizationContext();
nsRefreshedByLb = true;
final class LoadBalancerRefreshNameResolution implements Runnable {
@Override
public void run() {
Expand All @@ -1463,6 +1467,11 @@ public void run() {
syncContext.execute(new LoadBalancerRefreshNameResolution());
}

@Override
public void ignoreRefreshNameResolutionCheck() {
ignoreRefreshNsCheck = true;
}

@Override
public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) {
return createOobChannel(Collections.singletonList(addressGroup), authority);
Expand Down Expand Up @@ -1505,6 +1514,8 @@ void onTerminated(InternalSubchannel is) {

@Override
void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
// TODO(chengyuanzhang): change to let LB policies explicitly manage OOB channel's
// state and refresh name resolution if necessary.
handleInternalSubchannelState(newState);
oobChannel.handleSubchannelStateChange(newState);
}
Expand Down Expand Up @@ -1926,9 +1937,18 @@ void onTerminated(InternalSubchannel is) {

@Override
void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
handleInternalSubchannelState(newState);
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
voidzcy marked this conversation as resolved.
Show resolved Hide resolved
checkState(listener != null, "listener is null");
listener.onSubchannelState(newState);
if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
if (!helper.ignoreRefreshNsCheck && !helper.nsRefreshedByLb) {
logger.log(Level.WARNING,
"LoadBalancer should call Helper.refreshNameResolution() to refresh name "
+ "resolution if subchannel state becomes TRANSIENT_FAILURE or IDLE. "
+ "This will no longer happen automatically in the future releases");
refreshAndResetNameResolution();
helper.nsRefreshedByLb = true;
}
}
}

@Override
Expand Down
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.SHUTDOWN;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;

Expand Down Expand Up @@ -84,6 +85,9 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo
if (currentState == SHUTDOWN) {
return;
}
if (stateInfo.getState() == TRANSIENT_FAILURE || stateInfo.getState() == IDLE) {
helper.refreshNameResolution();
}

SubchannelPicker picker;
switch (currentState) {
Expand Down
Expand Up @@ -94,6 +94,11 @@ public void refreshNameResolution() {
delegate().refreshNameResolution();
}

@Override
public void ignoreRefreshNameResolutionCheck() {
delegate().ignoreRefreshNameResolutionCheck();
}

@Override
public String getAuthority() {
return delegate().getAuthority();
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java
Expand Up @@ -139,6 +139,9 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo
if (subchannels.get(stripAttrs(subchannel.getAddresses())) != subchannel) {
return;
}
if (stateInfo.getState() == TRANSIENT_FAILURE || stateInfo.getState() == IDLE) {
helper.refreshNameResolution();
}
if (stateInfo.getState() == IDLE) {
subchannel.requestConnection();
}
Expand Down
146 changes: 123 additions & 23 deletions core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
Expand Up @@ -139,6 +139,9 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import javax.annotation.Nullable;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -277,6 +280,7 @@ public String getPolicyName() {
private boolean requestConnection = true;
private BlockingQueue<MockClientTransportInfo> transports;
private boolean panicExpected;
private final List<LogRecord> logs = new ArrayList<>();
@Captor
private ArgumentCaptor<ResolvedAddresses> resolvedAddressCaptor;

Expand Down Expand Up @@ -328,6 +332,22 @@ public void setUp() throws Exception {
when(executorPool.getObject()).thenReturn(executor.getScheduledExecutorService());
when(balancerRpcExecutorPool.getObject())
.thenReturn(balancerRpcExecutor.getScheduledExecutorService());
Handler handler = new Handler() {
@Override
public void publish(LogRecord record) {
logs.add(record);
}

@Override
public void flush() {
}

@Override
public void close() throws SecurityException {
}
};
ManagedChannelImpl.logger.addHandler(handler);
ManagedChannelImpl.logger.setLevel(Level.ALL);

channelBuilder = new ManagedChannelImplBuilder(TARGET,
new UnsupportedClientTransportFactoryBuilder(), new FixedPortProvider(DEFAULT_PORT));
Expand Down Expand Up @@ -1539,6 +1559,103 @@ public void run() {
timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
}

@Test
public void subchannelConnectionBroken_noLbRefreshingResolver_logWarningAndTriggeRefresh() {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build();
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
FakeNameResolverFactory.FakeNameResolver resolver =
Iterables.getOnlyElement(nameResolverFactory.resolvers);
assertThat(resolver.refreshCalled).isEqualTo(0);

Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
InternalSubchannel internalSubchannel =
(InternalSubchannel) subchannel.getInternalSubchannel();
internalSubchannel.obtainActiveTransport();
MockClientTransportInfo transportInfo = transports.poll();

// Break subchannel connection
transportInfo.listener.transportShutdown(Status.UNAVAILABLE.withDescription("unreachable"));
LogRecord log = Iterables.getOnlyElement(logs);
assertThat(log.getLevel()).isEqualTo(Level.WARNING);
assertThat(log.getMessage()).isEqualTo(
"LoadBalancer should call Helper.refreshNameResolution() to refresh name resolution if "
+ "subchannel state becomes TRANSIENT_FAILURE or IDLE. This will no longer happen "
+ "automatically in the future releases");
assertThat(resolver.refreshCalled).isEqualTo(1);
}

@Test
public void subchannelConnectionBroken_ResolverRefreshedByLb() {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build();
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
FakeNameResolverFactory.FakeNameResolver resolver =
Iterables.getOnlyElement(nameResolverFactory.resolvers);
assertThat(resolver.refreshCalled).isEqualTo(0);
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class);
verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture());
helper = helperCaptor.getValue();

SubchannelStateListener listener = new SubchannelStateListener() {
@Override
public void onSubchannelState(ConnectivityStateInfo newState) {
// Normal LoadBalancer should refresh name resolution when some subchannel enters
// TRANSIENT_FAILURE or IDLE
if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
helper.refreshNameResolution();
}
}
};
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, listener);
InternalSubchannel internalSubchannel =
(InternalSubchannel) subchannel.getInternalSubchannel();
internalSubchannel.obtainActiveTransport();
MockClientTransportInfo transportInfo = transports.poll();

// Break subchannel connection and simulate load balancer refreshing name resolution
transportInfo.listener.transportShutdown(Status.UNAVAILABLE.withDescription("unreachable"));
assertThat(logs).isEmpty();
assertThat(resolver.refreshCalled).isEqualTo(1);
}

@Test
public void subchannelConnectionBroken_ignoreRefreshNameResolutionCheck_noRefresh() {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build();
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
FakeNameResolverFactory.FakeNameResolver resolver =
Iterables.getOnlyElement(nameResolverFactory.resolvers);
assertThat(resolver.refreshCalled).isEqualTo(0);
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class);
verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture());
helper = helperCaptor.getValue();
helper.ignoreRefreshNameResolutionCheck();

Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
InternalSubchannel internalSubchannel =
(InternalSubchannel) subchannel.getInternalSubchannel();
internalSubchannel.obtainActiveTransport();
MockClientTransportInfo transportInfo = transports.poll();

// Break subchannel connection
transportInfo.listener.transportShutdown(Status.UNAVAILABLE.withDescription("unreachable"));
assertThat(logs).isEmpty();
assertThat(resolver.refreshCalled).isEqualTo(0);
}

@Test
public void subchannelStringableBeforeStart() {
createChannel();
Expand Down Expand Up @@ -2095,43 +2212,26 @@ public void lbHelper_getNameResolverRegistry() {
.isSameInstanceAs(NameResolverRegistry.getDefaultRegistry());
}

@Test
public void refreshNameResolution_whenSubchannelConnectionFailed_notIdle() {
subtestNameResolutionRefreshWhenConnectionFailed(false, false);
}

@Test
public void refreshNameResolution_whenOobChannelConnectionFailed_notIdle() {
subtestNameResolutionRefreshWhenConnectionFailed(true, false);
}

@Test
public void notRefreshNameResolution_whenSubchannelConnectionFailed_idle() {
subtestNameResolutionRefreshWhenConnectionFailed(false, true);
subtestNameResolutionRefreshWhenConnectionFailed(false);
}

@Test
public void notRefreshNameResolution_whenOobChannelConnectionFailed_idle() {
subtestNameResolutionRefreshWhenConnectionFailed(true, true);
subtestNameResolutionRefreshWhenConnectionFailed(true);
}

private void subtestNameResolutionRefreshWhenConnectionFailed(
boolean isOobChannel, boolean isIdle) {
private void subtestNameResolutionRefreshWhenConnectionFailed(boolean isIdle) {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build();
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
if (isOobChannel) {
OobChannel oobChannel = (OobChannel) helper.createOobChannel(
Collections.singletonList(addressGroup), "oobAuthority");
oobChannel.getSubchannel().requestConnection();
} else {
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
requestConnectionSafely(helper, subchannel);
}
OobChannel oobChannel = (OobChannel) helper.createOobChannel(
Collections.singletonList(addressGroup), "oobAuthority");
oobChannel.getSubchannel().requestConnection();

MockClientTransportInfo transportInfo = transports.poll();
assertNotNull(transportInfo);
Expand Down
37 changes: 37 additions & 0 deletions core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java
Expand Up @@ -22,6 +22,8 @@
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
Expand Down Expand Up @@ -160,6 +162,38 @@ public void requestConnectionPicker() throws Exception {
verify(mockSubchannel, times(2)).requestConnection();
}

@Test
public void refreshNameResolutionAfterSubchannelConnectionBroken() {
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
verify(mockHelper).createSubchannel(any(CreateSubchannelArgs.class));

InOrder inOrder = inOrder(mockHelper, mockSubchannel);
inOrder.verify(mockSubchannel).start(stateListenerCaptor.capture());
SubchannelStateListener stateListener = stateListenerCaptor.getValue();
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
assertSame(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel());
inOrder.verify(mockSubchannel).requestConnection();

stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
assertNull(pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel());
Status error = Status.UNAUTHENTICATED.withDescription("permission denied");
stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error));
inOrder.verify(mockHelper).refreshNameResolution();
inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY));
inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture());
assertSame(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel());
// Simulate receiving go-away so the subchannel transit to IDLE.
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE));
inOrder.verify(mockHelper).refreshNameResolution();
inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));

verifyNoMoreInteractions(mockHelper, mockSubchannel);
}

@Test
public void pickAfterResolvedAndUnchanged() throws Exception {
loadBalancer.handleResolvedAddresses(
Expand Down Expand Up @@ -225,10 +259,12 @@ public void pickAfterStateChangeAfterResolution() throws Exception {

Status error = Status.UNAVAILABLE.withDescription("boom!");
stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error));
inOrder.verify(mockHelper).refreshNameResolution();
inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());

stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE));
inOrder.verify(mockHelper).refreshNameResolution();
inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
assertEquals(Status.OK, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());

Expand Down Expand Up @@ -294,6 +330,7 @@ public void nameResolutionErrorWithStateChanges() throws Exception {
SubchannelStateListener stateListener = stateListenerCaptor.getValue();

stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE));
inOrder.verify(mockHelper).refreshNameResolution();
inOrder.verify(mockHelper).updateBalancingState(
eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));

Expand Down