Skip to content

Commit

Permalink
rls: fix RLS policy to not propagate status from control plane RPC to…
Browse files Browse the repository at this point in the history
… data plane RPC (#9413)

rls: Avoid library returning the status codes which the status spec document says that the library will never return when talking to RLS server.  Instead, always return UNAVAILABLE on errors.

* Provide context around error message from RLS server
  • Loading branch information
larry-safran committed Aug 15, 2022
1 parent 01d5bd4 commit 778098b
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 9 deletions.
4 changes: 4 additions & 0 deletions api/src/main/java/io/grpc/Status.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@
*
* <p>Utility functions are provided to convert a status to an exception and to extract them
* back out.
*
* <p>Extended descriptions, including a list of codes that should not be generated by the library,
* can be found at
* <a href="https://github.com/grpc/grpc/blob/master/doc/statuscodes.md">doc/statuscodes.md</a>
*/
@Immutable
@CheckReturnValue
Expand Down
19 changes: 18 additions & 1 deletion rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,19 @@ private CachingRlsLbClient(Builder builder) {
logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created");
}

/**
* Convert the status to UNAVAILBLE and enhance the error message.
* @param status status as provided by server
* @param serverName Used for error description
* @return Transformed status
*/
static Status convertRlsServerStatus(Status status, String serverName) {
return Status.UNAVAILABLE.withCause(status.getCause()).withDescription(
String.format("Unable to retrieve RLS targets from RLS server %s. "
+ "RLS server returned: %s: %s",
serverName, status.getCode(), status.getDescription()));
}

@CheckReturnValue
private ListenableFuture<RouteLookupResponse> asyncRlsCall(RouteLookupRequest request) {
final SettableFuture<RouteLookupResponse> response = SettableFuture.create();
Expand Down Expand Up @@ -931,12 +944,15 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
if (picker == null) {
return PickResult.withNoResult();
}
// Happy path
return picker.pickSubchannel(args);
} else if (response.hasError()) {
if (hasFallback) {
return useFallback(args);
}
return PickResult.withError(response.getStatus());
return PickResult.withError(
convertRlsServerStatus(response.getStatus(),
lbPolicyConfig.getRouteLookupConfig().lookupService()));
} else {
return PickResult.withNoResult();
}
Expand Down Expand Up @@ -980,4 +996,5 @@ public String toString() {
.toString();
}
}

}
6 changes: 4 additions & 2 deletions rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth.assertWithMessage;
import static io.grpc.rls.CachingRlsLbClient.RLS_DATA_KEY;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -386,7 +387,7 @@ public void get_updatesLbState() throws Exception {
headers,
CallOptions.DEFAULT));
assertThat(pickResult.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(pickResult.getStatus().getDescription()).isEqualTo("fallback not available");
assertThat(pickResult.getStatus().getDescription()).contains("fallback not available");
}

@Test
Expand All @@ -412,6 +413,7 @@ public void get_childPolicyWrapper_reusedForSameTarget() throws Exception {
assertThat(resp.getHeaderData()).isEqualTo("header");

ChildPolicyWrapper childPolicyWrapper = resp.getChildPolicyWrapper();
assertNotNull(childPolicyWrapper);
assertThat(childPolicyWrapper.getTarget()).isEqualTo("target");
assertThat(childPolicyWrapper.getPicker()).isNotInstanceOf(RlsPicker.class);

Expand Down Expand Up @@ -575,7 +577,7 @@ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withError(
Status.UNAVAILABLE.withDescription("fallback not available"));
Status.UNAVAILABLE.withDescription("fallback not available"));
}
});
} else {
Expand Down
45 changes: 39 additions & 6 deletions rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,36 @@ public void tearDown() {
rlsLb.shutdown();
}

@Test
public void lb_serverStatusCodeConversion() throws Exception {
deliverResolvedAddresses();
InOrder inOrder = inOrder(helper);
inOrder.verify(helper)
.updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
SubchannelPicker picker = pickerCaptor.getValue();
Metadata headers = new Metadata();
PickSubchannelArgsImpl fakeSearchMethodArgs =
new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT);
PickResult res = picker.pickSubchannel(fakeSearchMethodArgs);
FakeSubchannel subchannel = (FakeSubchannel) res.getSubchannel();
assertThat(subchannel).isNotNull();

// Ensure happy path is unaffected
subchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY));
res = picker.pickSubchannel(fakeSearchMethodArgs);
assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.OK);

// Check on conversion
Throwable cause = new Throwable("cause");
Status aborted = Status.ABORTED.withCause(cause).withDescription("base desc");
Status serverStatus = CachingRlsLbClient.convertRlsServerStatus(aborted, "conv.test");
assertThat(serverStatus.getCode()).isEqualTo(Status.Code.UNAVAILABLE);
assertThat(serverStatus.getCause()).isEqualTo(cause);
assertThat(serverStatus.getDescription()).contains("RLS server returned: ");
assertThat(serverStatus.getDescription()).endsWith("ABORTED: base desc");
assertThat(serverStatus.getDescription()).contains("RLS server conv.test");
}

@Test
public void lb_working_withDefaultTarget_rlsResponding() throws Exception {
deliverResolvedAddresses();
Expand Down Expand Up @@ -210,7 +240,7 @@ public void lb_working_withDefaultTarget_rlsResponding() throws Exception {
FakeSubchannel rescueSubchannel = subchannels.getLast();

// search subchannel is down, rescue subchannel is connecting
searchSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.NOT_FOUND));
searchSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE));

inOrder.verify(helper)
.updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
Expand All @@ -223,7 +253,7 @@ public void lb_working_withDefaultTarget_rlsResponding() throws Exception {
// subchannel is in failure mode
res = picker.pickSubchannel(
new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT));
assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.NOT_FOUND);
assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.UNAVAILABLE);
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
}

Expand All @@ -243,6 +273,7 @@ public void lb_working_withDefaultTarget_noRlsResponse() throws Exception {
res = picker.pickSubchannel(
new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT));
FakeSubchannel fallbackSubchannel = (FakeSubchannel) res.getSubchannel();
assertThat(fallbackSubchannel).isNotNull();

assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.OK);
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
Expand Down Expand Up @@ -270,6 +301,7 @@ public void lb_working_withDefaultTarget_noRlsResponse() throws Exception {
new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT));
assertThat(res.getSubchannel()).isNotSameInstanceAs(fallbackSubchannel);
FakeSubchannel searchSubchannel = (FakeSubchannel) res.getSubchannel();
assertThat(searchSubchannel).isNotNull();
searchSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY));

// create rescue subchannel
Expand All @@ -278,16 +310,17 @@ public void lb_working_withDefaultTarget_noRlsResponse() throws Exception {
assertThat(res.getSubchannel()).isNotSameInstanceAs(fallbackSubchannel);
assertThat(res.getSubchannel()).isNotSameInstanceAs(searchSubchannel);
FakeSubchannel rescueSubchannel = (FakeSubchannel) res.getSubchannel();
assertThat(rescueSubchannel).isNotNull();
rescueSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY));

// all channels are failed
rescueSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.NOT_FOUND));
searchSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.NOT_FOUND));
fallbackSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.NOT_FOUND));
rescueSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE));
searchSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE));
fallbackSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE));

res = picker.pickSubchannel(
new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT));
assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.NOT_FOUND);
assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.UNAVAILABLE);
assertThat(res.getSubchannel()).isNull();
}

Expand Down

0 comments on commit 778098b

Please sign in to comment.