Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core, grpclb,xds: let leaf LB policies explicitly refresh name resolution when subchannel connection is broken #8048

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 19 additions & 0 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Expand Up @@ -1415,6 +1415,8 @@ void remove(RetriableStream<?> retriableStream) {

private final class LbHelperImpl extends LoadBalancer.Helper {
AutoConfiguredLoadBalancer lb;
boolean nsRefreshedByLb;
boolean ignoreRefreshNsCheck;

@Override
public AbstractSubchannel createSubchannel(CreateSubchannelArgs args) {
Expand Down Expand Up @@ -1453,6 +1455,7 @@ public void run() {
@Override
public void refreshNameResolution() {
syncContext.throwIfNotInThisSynchronizationContext();
nsRefreshedByLb = true;
final class LoadBalancerRefreshNameResolution implements Runnable {
@Override
public void run() {
Expand All @@ -1463,6 +1466,12 @@ public void run() {
syncContext.execute(new LoadBalancerRefreshNameResolution());
}

@Deprecated
@Override
public void ignoreRefreshNameResolutionCheck() {
ignoreRefreshNsCheck = true;
}

@Override
public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) {
return createOobChannel(Collections.singletonList(addressGroup), authority);
Expand Down Expand Up @@ -1928,6 +1937,16 @@ void onTerminated(InternalSubchannel is) {
void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
checkState(listener != null, "listener is null");
listener.onSubchannelState(newState);
if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
if (!helper.ignoreRefreshNsCheck && !helper.nsRefreshedByLb) {
logger.log(Level.WARNING,
"LoadBalancer should call Helper.refreshNameResolution() to refresh name "
+ "resolution if subchannel connection is broken. This will no longer happen"
voidzcy marked this conversation as resolved.
Show resolved Hide resolved
+ " automatically in the future releases");
refreshAndResetNameResolution();
helper.nsRefreshedByLb = true;
}
}
}

@Override
Expand Down
117 changes: 117 additions & 0 deletions core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
Expand Up @@ -139,6 +139,9 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import javax.annotation.Nullable;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -277,6 +280,7 @@ public String getPolicyName() {
private boolean requestConnection = true;
private BlockingQueue<MockClientTransportInfo> transports;
private boolean panicExpected;
private final List<LogRecord> logs = new ArrayList<>();
@Captor
private ArgumentCaptor<ResolvedAddresses> resolvedAddressCaptor;

Expand Down Expand Up @@ -328,6 +332,22 @@ public void setUp() throws Exception {
when(executorPool.getObject()).thenReturn(executor.getScheduledExecutorService());
when(balancerRpcExecutorPool.getObject())
.thenReturn(balancerRpcExecutor.getScheduledExecutorService());
Handler handler = new Handler() {
@Override
public void publish(LogRecord record) {
logs.add(record);
}

@Override
public void flush() {
}

@Override
public void close() throws SecurityException {
}
};
ManagedChannelImpl.logger.addHandler(handler);
ManagedChannelImpl.logger.setLevel(Level.ALL);

channelBuilder = new ManagedChannelImplBuilder(TARGET,
new UnsupportedClientTransportFactoryBuilder(), new FixedPortProvider(DEFAULT_PORT));
Expand Down Expand Up @@ -1539,6 +1559,103 @@ public void run() {
timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
}

@Test
public void subchannelConnectionBroken_noLbRefreshingResolver_logWarningAndTriggeRefresh() {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build();
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
FakeNameResolverFactory.FakeNameResolver resolver =
Iterables.getOnlyElement(nameResolverFactory.resolvers);
assertThat(resolver.refreshCalled).isEqualTo(0);

Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
InternalSubchannel internalSubchannel =
(InternalSubchannel) subchannel.getInternalSubchannel();
internalSubchannel.obtainActiveTransport();
MockClientTransportInfo transportInfo = transports.poll();

// Break subchannel connection
transportInfo.listener.transportShutdown(Status.UNAVAILABLE.withDescription("unreachable"));
LogRecord log = Iterables.getOnlyElement(logs);
assertThat(log.getLevel()).isEqualTo(Level.WARNING);
assertThat(log.getMessage()).isEqualTo(
"LoadBalancer should call Helper.refreshNameResolution() to refresh name resolution if "
+ "subchannel connection is broken. This will no longer happen automatically in the "
+ "future releases");
assertThat(resolver.refreshCalled).isEqualTo(1);
}

@Test
public void subchannelConnectionBroken_ResolverRefreshedByLb() {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build();
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
FakeNameResolverFactory.FakeNameResolver resolver =
Iterables.getOnlyElement(nameResolverFactory.resolvers);
assertThat(resolver.refreshCalled).isEqualTo(0);
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class);
verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture());
helper = helperCaptor.getValue();

SubchannelStateListener listener = new SubchannelStateListener() {
@Override
public void onSubchannelState(ConnectivityStateInfo newState) {
// Normal LoadBalancer should refresh name resolution when some subchannel enters
// TRANSIENT_FAILURE or IDLE
if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
helper.refreshNameResolution();
}
}
};
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, listener);
InternalSubchannel internalSubchannel =
(InternalSubchannel) subchannel.getInternalSubchannel();
internalSubchannel.obtainActiveTransport();
MockClientTransportInfo transportInfo = transports.poll();

// Break subchannel connection and simulate load balancer refreshing name resolution
transportInfo.listener.transportShutdown(Status.UNAVAILABLE.withDescription("unreachable"));
assertThat(logs).isEmpty();
assertThat(resolver.refreshCalled).isEqualTo(1);
}

@Test
public void subchannelConnectionBroken_ignoreRefreshNameResolutionCheck_noRefresh() {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build();
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
FakeNameResolverFactory.FakeNameResolver resolver =
Iterables.getOnlyElement(nameResolverFactory.resolvers);
assertThat(resolver.refreshCalled).isEqualTo(0);
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class);
verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture());
helper = helperCaptor.getValue();
helper.ignoreRefreshNameResolutionCheck();

Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
InternalSubchannel internalSubchannel =
(InternalSubchannel) subchannel.getInternalSubchannel();
internalSubchannel.obtainActiveTransport();
MockClientTransportInfo transportInfo = transports.poll();

// Break subchannel connection
transportInfo.listener.transportShutdown(Status.UNAVAILABLE.withDescription("unreachable"));
assertThat(logs).isEmpty();
assertThat(resolver.refreshCalled).isEqualTo(0);
}

@Test
public void subchannelStringableBeforeStart() {
createChannel();
Expand Down