Skip to content

Commit

Permalink
rls: support routeLookupChannelServiceConfig in RLS lb config
Browse files Browse the repository at this point in the history
Implementing the latest change for RLS lb config.

```
The configuration for the LB policy will be of the following form:

{
  "routeLookupConfig": <JSON form of RouteLookupConfig proto>,
  "routeLookupChannelServiceConfig": {...service config JSON...},
  "childPolicy": [
    {"<policy name>": {...child policy config...}}
  ],
  "childPolicyConfigTargetFieldName": "<name of field>"
}
```

>If the routeLookupChannelServiceConfig field is present, we will pass the specified service config to the RLS control plane channel, and we will disable fetching service config via that channel's resolver.
  • Loading branch information
dapengzhang0 committed Feb 1, 2022
1 parent bd156f9 commit 7308d92
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 61 deletions.
36 changes: 6 additions & 30 deletions rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import com.google.common.base.Converter;
import com.google.common.base.MoreObjects;
import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.ChannelLogger;
Expand Down Expand Up @@ -83,13 +81,6 @@ final class CachingRlsLbClient {
private static final Converter<RouteLookupResponse, io.grpc.lookup.v1.RouteLookupResponse>
RESPONSE_CONVERTER = new RouteLookupResponseConverter().reverse();

// System property to use direct path enabled OobChannel, by default direct path is enabled.
private static final String RLS_ENABLE_OOB_CHANNEL_DIRECTPATH_PROPERTY =
"io.grpc.rls.CachingRlsLbClient.enable_oobchannel_directpath";
@VisibleForTesting
static boolean enableOobChannelDirectPath =
Boolean.parseBoolean(System.getProperty(RLS_ENABLE_OOB_CHANNEL_DIRECTPATH_PROPERTY, "false"));

// All cache status changes (pending, backoff, success) must be under this lock
private final Object lock = new Object();
// LRU cache based on access order (BACKOFF and actual data will be here)
Expand Down Expand Up @@ -158,14 +149,14 @@ private CachingRlsLbClient(Builder builder) {
ManagedChannelBuilder<?> rlsChannelBuilder = helper.createResolvingOobChannelBuilder(
rlsConfig.getLookupService(), helper.getUnsafeChannelCredentials());
rlsChannelBuilder.overrideAuthority(helper.getAuthority());
if (enableOobChannelDirectPath) {
Map<String, ?> directPathServiceConfig =
getDirectPathServiceConfig(rlsConfig.getLookupService());
Map<String, ?> routeLookupChannelServiceConfig =
lbPolicyConfig.getRouteLookupChannelServiceConfig();
if (routeLookupChannelServiceConfig != null) {
logger.log(
ChannelLogLevel.DEBUG,
"RLS channel direct path enabled. RLS channel service config: {0}",
directPathServiceConfig);
rlsChannelBuilder.defaultServiceConfig(directPathServiceConfig);
"RLS channel service config: {0}",
routeLookupChannelServiceConfig);
rlsChannelBuilder.defaultServiceConfig(routeLookupChannelServiceConfig);
rlsChannelBuilder.disableServiceConfigLookUp();
}
rlsChannel = rlsChannelBuilder.build();
Expand All @@ -184,21 +175,6 @@ private CachingRlsLbClient(Builder builder) {
logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created");
}

private static ImmutableMap<String, Object> getDirectPathServiceConfig(String serviceName) {
ImmutableMap<String, Object> pickFirstStrategy =
ImmutableMap.<String, Object>of("pick_first", ImmutableMap.of());

ImmutableMap<String, Object> childPolicy =
ImmutableMap.<String, Object>of(
"childPolicy", ImmutableList.of(pickFirstStrategy),
"serviceName", serviceName);

ImmutableMap<String, Object> grpcLbPolicy =
ImmutableMap.<String, Object>of("grpclb", childPolicy);

return ImmutableMap.<String, Object>of("loadBalancingConfig", ImmutableList.of(grpcLbPolicy));
}

@CheckReturnValue
private ListenableFuture<RouteLookupResponse> asyncRlsCall(RouteLookupRequest request) {
final SettableFuture<RouteLookupResponse> response = SettableFuture.create();
Expand Down
15 changes: 13 additions & 2 deletions rls/src/main/java/io/grpc/rls/LbPolicyConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,27 @@
final class LbPolicyConfiguration {

private final RouteLookupConfig routeLookupConfig;
@Nullable
private final Map<String, ?> routeLookupChannelServiceConfig;
private final ChildLoadBalancingPolicy policy;

LbPolicyConfiguration(
RouteLookupConfig routeLookupConfig, ChildLoadBalancingPolicy policy) {
RouteLookupConfig routeLookupConfig, @Nullable Map<String, ?> routeLookupChannelServiceConfig,
ChildLoadBalancingPolicy policy) {
this.routeLookupConfig = checkNotNull(routeLookupConfig, "routeLookupConfig");
this.routeLookupChannelServiceConfig = routeLookupChannelServiceConfig;
this.policy = checkNotNull(policy, "policy");
}

RouteLookupConfig getRouteLookupConfig() {
return routeLookupConfig;
}

@Nullable
Map<String, ?> getRouteLookupChannelServiceConfig() {
return routeLookupChannelServiceConfig;
}

ChildLoadBalancingPolicy getLoadBalancingPolicy() {
return policy;
}
Expand All @@ -74,18 +83,20 @@ public boolean equals(Object o) {
}
LbPolicyConfiguration that = (LbPolicyConfiguration) o;
return Objects.equals(routeLookupConfig, that.routeLookupConfig)
&& Objects.equals(routeLookupChannelServiceConfig, that.routeLookupChannelServiceConfig)
&& Objects.equals(policy, that.policy);
}

@Override
public int hashCode() {
return Objects.hash(routeLookupConfig, policy);
return Objects.hash(routeLookupConfig, routeLookupChannelServiceConfig, policy);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("routeLookupConfig", routeLookupConfig)
.add("routeLookupChannelServiceConfig", routeLookupChannelServiceConfig)
.add("policy", policy)
.toString();
}
Expand Down
5 changes: 4 additions & 1 deletion rls/src/main/java/io/grpc/rls/RlsLoadBalancerProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,15 @@ public ConfigOrError parseLoadBalancingPolicyConfig(Map<String, ?> rawLoadBalanc
try {
RouteLookupConfig routeLookupConfig = new RouteLookupConfigConverter()
.convert(JsonUtil.getObject(rawLoadBalancingConfigPolicy, "routeLookupConfig"));
Map<String, ?> routeLookupChannelServiceConfig =
JsonUtil.getObject(rawLoadBalancingConfigPolicy, "routeLookupChannelServiceConfig");
ChildLoadBalancingPolicy lbPolicy = ChildLoadBalancingPolicy
.create(
JsonUtil.getString(rawLoadBalancingConfigPolicy, "childPolicyConfigTargetFieldName"),
JsonUtil.checkObjectList(
checkNotNull(JsonUtil.getList(rawLoadBalancingConfigPolicy, "childPolicy"))));
return ConfigOrError.fromConfig(new LbPolicyConfiguration(routeLookupConfig, lbPolicy));
return ConfigOrError.fromConfig(
new LbPolicyConfiguration(routeLookupConfig, routeLookupChannelServiceConfig, lbPolicy));
} catch (Exception e) {
return ConfigOrError.fromError(
Status.INVALID_ARGUMENT
Expand Down
48 changes: 25 additions & 23 deletions rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -144,19 +143,12 @@ public void uncaughtException(Thread t, Throwable e) {
mock(Helper.class, AdditionalAnswers.delegatesTo(new FakeHelper()));
private final FakeThrottler fakeThrottler = new FakeThrottler();
private final LbPolicyConfiguration lbPolicyConfiguration =
new LbPolicyConfiguration(ROUTE_LOOKUP_CONFIG, childLbPolicy);
new LbPolicyConfiguration(ROUTE_LOOKUP_CONFIG, null, childLbPolicy);

private CachingRlsLbClient rlsLbClient;
private boolean existingEnableOobChannelDirectPath;
private Map<String, ?> rlsChannelServiceConfig;
private String rlsChannelOverriddenAuthority;

@Before
public void setUp() throws Exception {
existingEnableOobChannelDirectPath = CachingRlsLbClient.enableOobChannelDirectPath;
CachingRlsLbClient.enableOobChannelDirectPath = false;
}

private void setUpRlsLbClient() {
rlsLbClient =
CachingRlsLbClient.newBuilder()
Expand All @@ -173,7 +165,6 @@ private void setUpRlsLbClient() {
@After
public void tearDown() throws Exception {
rlsLbClient.close();
CachingRlsLbClient.enableOobChannelDirectPath = existingEnableOobChannelDirectPath;
assertWithMessage(
"On client shut down, RlsLoadBalancer must shut down with all its child loadbalancers.")
.that(lbProvider.loadBalancers).isEmpty();
Expand Down Expand Up @@ -244,9 +235,29 @@ public void get_noError_lifeCycle() throws Exception {
}

@Test
public void rls_overDirectPath() throws Exception {
CachingRlsLbClient.enableOobChannelDirectPath = true;
setUpRlsLbClient();
public void rls_withCustomRlsChannelServiceConfig() throws Exception {
Map<String, ?> routeLookupChannelServiceConfig =
ImmutableMap.of(
"loadBalancingConfig",
ImmutableList.of(ImmutableMap.of(
"grpclb",
ImmutableMap.of(
"childPolicy",
ImmutableList.of(ImmutableMap.of("pick_first", ImmutableMap.of())),
"serviceName",
"service1"))));
LbPolicyConfiguration lbPolicyConfiguration = new LbPolicyConfiguration(
ROUTE_LOOKUP_CONFIG, routeLookupChannelServiceConfig, childLbPolicy);
rlsLbClient =
CachingRlsLbClient.newBuilder()
.setBackoffProvider(fakeBackoffProvider)
.setResolvedAddressesFactory(resolvedAddressFactory)
.setEvictionListener(evictionListener)
.setHelper(helper)
.setLbPolicyConfig(lbPolicyConfiguration)
.setThrottler(fakeThrottler)
.setTimeProvider(fakeTimeProvider)
.build();
RouteLookupRequest routeLookupRequest = new RouteLookupRequest(ImmutableMap.of(
"server", "bigtable.googleapis.com", "service-key", "foo", "method-key", "bar"));
rlsServerImpl.setLookupTable(
Expand All @@ -265,16 +276,7 @@ public void rls_overDirectPath() throws Exception {
assertThat(resp.hasData()).isTrue();

assertThat(rlsChannelOverriddenAuthority).isEqualTo("bigtable.googleapis.com:443");
assertThat(rlsChannelServiceConfig).isEqualTo(
ImmutableMap.of(
"loadBalancingConfig",
ImmutableList.of(ImmutableMap.of(
"grpclb",
ImmutableMap.of(
"childPolicy",
ImmutableList.of(ImmutableMap.of("pick_first", ImmutableMap.of())),
"serviceName",
"service1")))));
assertThat(rlsChannelServiceConfig).isEqualTo(routeLookupChannelServiceConfig);
}

@Test
Expand Down
5 changes: 0 additions & 5 deletions rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,12 @@ public void uncaughtException(Thread t, Throwable e) {
private MethodDescriptor<Object, Object> fakeSearchMethod;
private MethodDescriptor<Object, Object> fakeRescueMethod;
private RlsLoadBalancer rlsLb;
private boolean existingEnableOobChannelDirectPath;
private String defaultTarget = "defaultTarget";

@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);

existingEnableOobChannelDirectPath = CachingRlsLbClient.enableOobChannelDirectPath;
CachingRlsLbClient.enableOobChannelDirectPath = false;

fakeSearchMethod =
MethodDescriptor.newBuilder()
.setFullMethodName("com.google/Search")
Expand Down Expand Up @@ -168,7 +164,6 @@ public CachingRlsLbClient.Builder get() {
@After
public void tearDown() throws Exception {
rlsLb.shutdown();
CachingRlsLbClient.enableOobChannelDirectPath = existingEnableOobChannelDirectPath;
}

@Test
Expand Down

0 comments on commit 7308d92

Please sign in to comment.