Skip to content

Commit

Permalink
rls: Only use subchannel policy for default target when RLS is not av…
Browse files Browse the repository at this point in the history
…ailable (#9383)

* core: Only use subchannel policy for default target when RLS is not available
Fixes #9237
  • Loading branch information
larry-safran committed Jul 21, 2022
1 parent 03abe8a commit 50cdfa9
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 28 deletions.
9 changes: 1 addition & 8 deletions rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
Expand Up @@ -931,14 +931,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
if (picker == null) {
return PickResult.withNoResult();
}
PickResult result = picker.pickSubchannel(args);
if (result.getStatus().isOk()) {
return result;
}
if (hasFallback) {
return useFallback(args);
}
return PickResult.withError(result.getStatus());
return picker.pickSubchannel(args);
} else if (response.hasError()) {
if (hasFallback) {
return useFallback(args);
Expand Down
96 changes: 76 additions & 20 deletions rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java
Expand Up @@ -74,6 +74,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -110,6 +111,7 @@ public void uncaughtException(Thread t, Throwable e) {
mock(Helper.class, AdditionalAnswers.delegatesTo(new FakeHelper()));
private final FakeRlsServerImpl fakeRlsServerImpl = new FakeRlsServerImpl();
private final Deque<FakeSubchannel> subchannels = new LinkedList<>();
private final FakeThrottler fakeThrottler = new FakeThrottler();
@Mock
private Marshaller<Object> mockMarshaller;
@Captor
Expand All @@ -120,7 +122,7 @@ public void uncaughtException(Thread t, Throwable e) {
private String defaultTarget = "defaultTarget";

@Before
public void setUp() throws Exception {
public void setUp() {
MockitoAnnotations.initMocks(this);

fakeSearchMethod =
Expand Down Expand Up @@ -154,19 +156,21 @@ public void setUp() throws Exception {
rlsLb.cachingRlsLbClientBuilderProvider = new CachingRlsLbClientBuilderProvider() {
@Override
public CachingRlsLbClient.Builder get() {
// using default throttler which doesn't throttle
return CachingRlsLbClient.newBuilder();
// using fake throttler to allow enablement of throttler
return CachingRlsLbClient.newBuilder()
.setThrottler(fakeThrottler)
.setTicker(fakeClock.getTicker());
}
};
}

@After
public void tearDown() throws Exception {
public void tearDown() {
rlsLb.shutdown();
}

@Test
public void lb_working_withDefaultTarget() throws Exception {
public void lb_working_withDefaultTarget_rlsResponding() throws Exception {
deliverResolvedAddresses();
InOrder inOrder = inOrder(helper);
inOrder.verify(helper)
Expand Down Expand Up @@ -215,40 +219,76 @@ public void lb_working_withDefaultTarget() throws Exception {
inOrder.verify(helper)
.updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());

// search again, use pending fallback because searchSubchannel is in failure mode
// search again, verify that it doesn't use fallback, since RLS server responded, even though
// subchannel is in failure mode
res = picker.pickSubchannel(
new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT));
assertThat(res.getStatus().isOk()).isTrue();
assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.NOT_FOUND);
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
}

@Test
public void lb_working_withDefaultTarget_noRlsResponse() throws Exception {
fakeThrottler.nextResult = true;

deliverResolvedAddresses();
InOrder inOrder = inOrder(helper);
inOrder.verify(helper)
.updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
SubchannelPicker picker = pickerCaptor.getValue();
Metadata headers = new Metadata();
PickResult res;

// Search that when the RLS server doesn't respond, that fallback is used
res = picker.pickSubchannel(
new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT));
FakeSubchannel fallbackSubchannel = (FakeSubchannel) res.getSubchannel();

assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.OK);
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class));
assertThat(subchannels).hasSize(3);
FakeSubchannel fallbackSubchannel = subchannels.getLast();
fallbackSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY));
inOrder.verify(helper, times(2))
inOrder.verify(helper, times(1))
.updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
inOrder.verifyNoMoreInteractions();

res = picker.pickSubchannel(
new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT));
assertThat(subchannelIsReady(res.getSubchannel())).isTrue();
assertThat(res.getSubchannel().getAddresses()).isEqualTo(searchSubchannel.getAddresses());
assertThat(res.getSubchannel().getAttributes()).isEqualTo(searchSubchannel.getAttributes());
assertThat(res.getSubchannel()).isSameInstanceAs(fallbackSubchannel);

res = picker.pickSubchannel(
new PickSubchannelArgsImpl(fakeRescueMethod, headers, CallOptions.DEFAULT));
assertThat(subchannelIsReady(res.getSubchannel())).isTrue();
assertThat(res.getSubchannel().getAddresses()).isEqualTo(rescueSubchannel.getAddresses());
assertThat(res.getSubchannel().getAttributes()).isEqualTo(rescueSubchannel.getAttributes());
assertThat(res.getSubchannel()).isSameInstanceAs(fallbackSubchannel);

// Make sure that when RLS starts communicating that default stops being used
fakeThrottler.nextResult = false;
fakeClock.forwardTime(2, TimeUnit.SECONDS); // Expires backoff cache entries
// Create search subchannel
res = picker.pickSubchannel(
new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT));
assertThat(res.getSubchannel()).isNotSameInstanceAs(fallbackSubchannel);
FakeSubchannel searchSubchannel = (FakeSubchannel) res.getSubchannel();
searchSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY));

// create rescue subchannel
res = picker.pickSubchannel(
new PickSubchannelArgsImpl(fakeRescueMethod, headers, CallOptions.DEFAULT));
assertThat(res.getSubchannel()).isNotSameInstanceAs(fallbackSubchannel);
assertThat(res.getSubchannel()).isNotSameInstanceAs(searchSubchannel);
FakeSubchannel rescueSubchannel = (FakeSubchannel) res.getSubchannel();
rescueSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY));

// all channels are failed
rescueSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.NOT_FOUND));
inOrder.verify(helper)
.updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
searchSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.NOT_FOUND));
fallbackSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.NOT_FOUND));
inOrder.verify(helper)
.updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
inOrder.verifyNoMoreInteractions();

res = picker.pickSubchannel(
new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT));
assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.NOT_FOUND);
assertThat(res.getSubchannel()).isNull();
}

@Test
Expand Down Expand Up @@ -546,7 +586,7 @@ private static final class FakeSubchannel extends Subchannel {
private final Attributes attributes;
private List<EquivalentAddressGroup> eags;
private SubchannelStateListener listener;
private boolean isReady;
private volatile boolean isReady;

public FakeSubchannel(List<EquivalentAddressGroup> eags, Attributes attributes) {
this.eags = Collections.unmodifiableList(eags);
Expand Down Expand Up @@ -590,4 +630,20 @@ public void updateState(ConnectivityStateInfo newState) {
private static boolean subchannelIsReady(Subchannel subchannel) {
return subchannel instanceof FakeSubchannel && ((FakeSubchannel) subchannel).isReady;
}

private static final class FakeThrottler implements Throttler {

private boolean nextResult = false;

@Override
public boolean shouldThrottle() {
return nextResult;
}

@Override
public void registerBackendResponse(boolean throttled) {
// no-op
}
}

}

0 comments on commit 50cdfa9

Please sign in to comment.