headersCapture = new AtomicReference<>();
@@ -118,6 +121,8 @@ public void setUp() throws Exception {
TestUtils.recordRequestHeadersInterceptor(headersCapture));
AndroidComponentAddress serverAddress = HostServices.allocateService(appContext);
+ fakeNameResolverProvider = new FakeNameResolverProvider(SERVER_TARGET_URI, serverAddress);
+ NameResolverRegistry.getDefaultRegistry().register(fakeNameResolverProvider);
HostServices.configureService(serverAddress,
HostServices.serviceParamsBuilder()
.setServerFactory((service, receiver) ->
@@ -132,6 +137,7 @@ public void setUp() throws Exception {
@After
public void tearDown() throws Exception {
channel.shutdownNow();
+ NameResolverRegistry.getDefaultRegistry().deregister(fakeNameResolverProvider);
HostServices.awaitServiceShutdown();
}
@@ -192,6 +198,12 @@ public void testStreamingCallOptionHeaders() throws Exception {
assertThat(headersCapture.get().get(GrpcUtil.TIMEOUT_KEY)).isGreaterThan(0);
}
+ @Test
+ public void testConnectViaTargetUri() throws Exception {
+ channel = BinderChannelBuilder.forTarget(SERVER_TARGET_URI, appContext).build();
+ assertThat(doCall("Hello").get()).isEqualTo("Hello");
+ }
+
private static String createLargeString(int size) {
StringBuilder sb = new StringBuilder();
while (sb.length() < size) {
diff --git a/binder/src/main/java/io/grpc/binder/BinderChannelBuilder.java b/binder/src/main/java/io/grpc/binder/BinderChannelBuilder.java
index 99191cfad3c6..91e4e8f1c76a 100644
--- a/binder/src/main/java/io/grpc/binder/BinderChannelBuilder.java
+++ b/binder/src/main/java/io/grpc/binder/BinderChannelBuilder.java
@@ -67,13 +67,35 @@ public final class BinderChannelBuilder
* You the caller are responsible for managing the lifecycle of any channels built by the
* resulting builder. They will not be shut down automatically.
*
- * @param targetAddress the {@link AndroidComponentAddress} referencing the service to bind to.
+ * @param directAddress the {@link AndroidComponentAddress} referencing the service to bind to.
* @param sourceContext the context to bind from (e.g. The current Activity or Application).
* @return a new builder
*/
public static BinderChannelBuilder forAddress(
- AndroidComponentAddress targetAddress, Context sourceContext) {
- return new BinderChannelBuilder(targetAddress, sourceContext);
+ AndroidComponentAddress directAddress, Context sourceContext) {
+ return new BinderChannelBuilder(
+ checkNotNull(directAddress, "directAddress"), null, sourceContext);
+ }
+
+ /**
+ * Creates a channel builder that will bind to a remote Android service, via a string
+ * target name which will be resolved.
+ *
+ *
The underlying Android binding will be torn down when the channel becomes idle. This happens
+ * after 30 minutes without use by default but can be configured via {@link
+ * ManagedChannelBuilder#idleTimeout(long, TimeUnit)} or triggered manually with {@link
+ * ManagedChannel#enterIdle()}.
+ *
+ *
You the caller are responsible for managing the lifecycle of any channels built by the
+ * resulting builder. They will not be shut down automatically.
+ *
+ * @param target A target uri which should resolve into an {@link AndroidComponentAddress}
+ * referencing the service to bind to.
+ * @param sourceContext the context to bind from (e.g. The current Activity or Application).
+ * @return a new builder
+ */
+ public static BinderChannelBuilder forTarget(String target, Context sourceContext) {
+ return new BinderChannelBuilder(null, checkNotNull(target, "target"), sourceContext);
}
/**
@@ -88,7 +110,7 @@ public static BinderChannelBuilder forAddress(String name, int port) {
/**
* Always fails. Call {@link #forAddress(AndroidComponentAddress, Context)} instead.
*/
- @DoNotCall("Unsupported. Use forAddress(AndroidComponentAddress, Context) instead")
+ @DoNotCall("Unsupported. Use forTarget(String, Context) instead")
public static BinderChannelBuilder forTarget(String target) {
throw new UnsupportedOperationException(
"call forAddress(AndroidComponentAddress, Context) instead");
@@ -104,9 +126,11 @@ public static BinderChannelBuilder forTarget(String target) {
private BindServiceFlags bindServiceFlags;
private BinderChannelBuilder(
- AndroidComponentAddress targetAddress,
+ @Nullable AndroidComponentAddress directAddress,
+ @Nullable String target,
Context sourceContext) {
- mainThreadExecutor = ContextCompat.getMainExecutor(sourceContext);
+ mainThreadExecutor =
+ ContextCompat.getMainExecutor(checkNotNull(sourceContext, "sourceContext"));
securityPolicy = SecurityPolicies.internalOnly();
inboundParcelablePolicy = InboundParcelablePolicy.DEFAULT;
bindServiceFlags = BindServiceFlags.DEFAULTS;
@@ -126,12 +150,20 @@ public ClientTransportFactory buildClientTransportFactory() {
}
}
- managedChannelImplBuilder =
- new ManagedChannelImplBuilder(
- targetAddress,
- targetAddress.getAuthority(),
- new BinderChannelTransportFactoryBuilder(),
- null);
+ if (directAddress != null) {
+ managedChannelImplBuilder =
+ new ManagedChannelImplBuilder(
+ directAddress,
+ directAddress.getAuthority(),
+ new BinderChannelTransportFactoryBuilder(),
+ null);
+ } else {
+ managedChannelImplBuilder =
+ new ManagedChannelImplBuilder(
+ target,
+ new BinderChannelTransportFactoryBuilder(),
+ null);
+ }
}
@Override
diff --git a/binder/src/main/java/io/grpc/binder/SecurityPolicies.java b/binder/src/main/java/io/grpc/binder/SecurityPolicies.java
index be46b9e3e54c..dcf36be00ca6 100644
--- a/binder/src/main/java/io/grpc/binder/SecurityPolicies.java
+++ b/binder/src/main/java/io/grpc/binder/SecurityPolicies.java
@@ -16,9 +16,20 @@
package io.grpc.binder;
+import android.annotation.SuppressLint;
+import android.content.pm.PackageInfo;
+import android.content.pm.PackageManager;
+import android.content.pm.PackageManager.NameNotFoundException;
+import android.content.pm.Signature;
+import android.os.Build;
import android.os.Process;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import io.grpc.ExperimentalApi;
import io.grpc.Status;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
import javax.annotation.CheckReturnValue;
/** Static factory methods for creating standard security policies. */
@@ -55,4 +66,125 @@ public Status checkAuthorization(int uid) {
}
};
}
+
+ /**
+ * Creates a {@link SecurityPolicy} which checks if the package signature
+ * matches {@code requiredSignature}.
+ *
+ * @param packageName the package name of the allowed package.
+ * @param requiredSignature the allowed signature of the allowed package.
+ * @throws NullPointerException if any of the inputs are {@code null}.
+ */
+ public static SecurityPolicy hasSignature(
+ PackageManager packageManager, String packageName, Signature requiredSignature) {
+ return oneOfSignatures(
+ packageManager, packageName, ImmutableList.of(requiredSignature));
+ }
+
+ /**
+ * Creates a {@link SecurityPolicy} which checks if the package signature
+ * matches any of {@code requiredSignatures}.
+ *
+ * @param packageName the package name of the allowed package.
+ * @param requiredSignatures the allowed signatures of the allowed package.
+ * @throws NullPointerException if any of the inputs are {@code null}.
+ * @throws IllegalArgumentException if {@code requiredSignatures} is empty.
+ */
+ public static SecurityPolicy oneOfSignatures(
+ PackageManager packageManager,
+ String packageName,
+ Collection requiredSignatures) {
+ Preconditions.checkNotNull(packageManager, "packageManager");
+ Preconditions.checkNotNull(packageName, "packageName");
+ Preconditions.checkNotNull(requiredSignatures, "requiredSignatures");
+ Preconditions.checkArgument(!requiredSignatures.isEmpty(),
+ "requiredSignatures");
+ ImmutableList requiredSignaturesImmutable = ImmutableList.copyOf(requiredSignatures);
+
+ for (Signature requiredSignature : requiredSignaturesImmutable) {
+ Preconditions.checkNotNull(requiredSignature);
+ }
+
+ return new SecurityPolicy() {
+ @Override
+ public Status checkAuthorization(int uid) {
+ return checkUidSignature(
+ packageManager, uid, packageName, requiredSignaturesImmutable);
+ }
+ };
+ }
+
+ private static Status checkUidSignature(
+ PackageManager packageManager,
+ int uid,
+ String packageName,
+ ImmutableList requiredSignatures) {
+ String[] packages = packageManager.getPackagesForUid(uid);
+ if (packages == null) {
+ return Status.UNAUTHENTICATED.withDescription(
+ "Rejected by signature check security policy");
+ }
+ boolean packageNameMatched = false;
+ for (String pkg : packages) {
+ if (!packageName.equals(pkg)) {
+ continue;
+ }
+ packageNameMatched = true;
+ if (checkPackageSignature(packageManager, pkg, requiredSignatures)) {
+ return Status.OK;
+ }
+ }
+ return Status.PERMISSION_DENIED.withDescription(
+ "Rejected by signature check security policy. Package name matched: "
+ + packageNameMatched);
+ }
+
+ /**
+ * Checks if the signature of {@code packageName} matches one of the given signatures.
+ *
+ * @param packageName the package to be checked
+ * @param requiredSignatures list of signatures.
+ * @return {@code true} if {@code packageName} has a matching signature.
+ */
+ @SuppressWarnings("deprecation") // For PackageInfo.signatures
+ @SuppressLint("PackageManagerGetSignatures") // We only allow 1 signature.
+ private static boolean checkPackageSignature(
+ PackageManager packageManager,
+ String packageName,
+ ImmutableList requiredSignatures) {
+ PackageInfo packageInfo;
+ try {
+ if (Build.VERSION.SDK_INT >= 28) {
+ packageInfo =
+ packageManager.getPackageInfo(packageName, PackageManager.GET_SIGNING_CERTIFICATES);
+ if (packageInfo.signingInfo == null) {
+ return false;
+ }
+ Signature[] signatures =
+ packageInfo.signingInfo.hasMultipleSigners()
+ ? packageInfo.signingInfo.getApkContentsSigners()
+ : packageInfo.signingInfo.getSigningCertificateHistory();
+
+ for (Signature signature : signatures) {
+ if (requiredSignatures.contains(signature)) {
+ return true;
+ }
+ }
+ } else {
+ packageInfo = packageManager.getPackageInfo(packageName, PackageManager.GET_SIGNATURES);
+ if (packageInfo.signatures == null || packageInfo.signatures.length != 1) {
+ // Reject multiply-signed apks because of b/13678484
+ // (See PackageManagerGetSignatures supression above).
+ return false;
+ }
+
+ if (requiredSignatures.contains(packageInfo.signatures[0])) {
+ return true;
+ }
+ }
+ } catch (NameNotFoundException nnfe) {
+ return false;
+ }
+ return false;
+ }
}
diff --git a/binder/src/main/java/io/grpc/binder/SecurityPolicy.java b/binder/src/main/java/io/grpc/binder/SecurityPolicy.java
index d7dad53fdc83..d13f3a863fd2 100644
--- a/binder/src/main/java/io/grpc/binder/SecurityPolicy.java
+++ b/binder/src/main/java/io/grpc/binder/SecurityPolicy.java
@@ -23,6 +23,11 @@
/**
* Decides whether a given Android UID is authorized to access some resource.
*
+ * While it's possible to extend this class to define your own policy, it's strongly
+ * recommended that you only use the policies provided by the {@link SecurityPolicies} or
+ * {@link UntrustedSecurityPolicies} classes. Implementing your own security policy requires
+ * significant care, and an understanding of the details and pitfalls of Android security.
+ *
* IMPORTANT For any concrete extensions of this class, it's assumed that the
* authorization status of a given UID will not change as long as a process with that UID is
* alive.
diff --git a/binder/src/main/java/io/grpc/binder/UntrustedSecurityPolicies.java b/binder/src/main/java/io/grpc/binder/UntrustedSecurityPolicies.java
new file mode 100644
index 000000000000..7c842b025acb
--- /dev/null
+++ b/binder/src/main/java/io/grpc/binder/UntrustedSecurityPolicies.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2021 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.binder;
+
+import io.grpc.ExperimentalApi;
+import io.grpc.Status;
+import javax.annotation.CheckReturnValue;
+
+/**
+ * Static factory methods for creating untrusted security policies.
+ */
+@CheckReturnValue
+@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8022")
+public final class UntrustedSecurityPolicies {
+
+ private UntrustedSecurityPolicies() {}
+
+ /**
+ * Return a security policy which allows any peer on device.
+ * Servers should only use this policy if they intend to expose
+ * a service to all applications on device.
+ * Clients should only use this policy if they don't need to trust the
+ * application they're connecting to.
+ */
+ public static SecurityPolicy untrustedPublic() {
+ return new SecurityPolicy() {
+ @Override
+ public Status checkAuthorization(int uid) {
+ return Status.OK;
+ }
+ };
+ }
+}
diff --git a/binder/src/test/java/io/grpc/binder/SecurityPoliciesTest.java b/binder/src/test/java/io/grpc/binder/SecurityPoliciesTest.java
index 6fd9e22ebaac..86edb5ad7df2 100644
--- a/binder/src/test/java/io/grpc/binder/SecurityPoliciesTest.java
+++ b/binder/src/test/java/io/grpc/binder/SecurityPoliciesTest.java
@@ -17,22 +17,64 @@
package io.grpc.binder;
import static com.google.common.truth.Truth.assertThat;
+import static org.robolectric.Shadows.shadowOf;
+import android.content.Context;
+import android.content.pm.PackageInfo;
+import android.content.pm.PackageManager;
+import android.content.pm.Signature;
import android.os.Process;
+import androidx.test.core.app.ApplicationProvider;
+import com.google.common.collect.ImmutableList;
import io.grpc.Status;
+import io.grpc.binder.SecurityPolicy;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.robolectric.RobolectricTestRunner;
@RunWith(RobolectricTestRunner.class)
public final class SecurityPoliciesTest {
+
private static final int MY_UID = Process.myUid();
private static final int OTHER_UID = MY_UID + 1;
+ private static final int OTHER_UID_SAME_SIGNATURE = MY_UID + 2;
+ private static final int OTHER_UID_NO_SIGNATURE = MY_UID + 3;
+ private static final int OTHER_UID_UNKNOWN = MY_UID + 4;
private static final String PERMISSION_DENIED_REASONS = "some reasons";
+ private static final Signature SIG1 = new Signature("1234");
+ private static final Signature SIG2 = new Signature("4321");
+
+ private static final String OTHER_UID_PACKAGE_NAME = "other.package";
+ private static final String OTHER_UID_SAME_SIGNATURE_PACKAGE_NAME = "other.package.samesignature";
+ private static final String OTHER_UID_NO_SIGNATURE_PACKAGE_NAME = "other.package.nosignature";
+
+ private Context appContext;
+ private PackageManager packageManager;
+
private SecurityPolicy policy;
+ @Before
+ public void setUp() {
+ appContext = ApplicationProvider.getApplicationContext();
+ packageManager = appContext.getPackageManager();
+ installPackage(MY_UID, appContext.getPackageName(), SIG1);
+ installPackage(OTHER_UID, OTHER_UID_PACKAGE_NAME, SIG2);
+ installPackage(OTHER_UID_SAME_SIGNATURE, OTHER_UID_SAME_SIGNATURE_PACKAGE_NAME, SIG1);
+ installPackage(OTHER_UID_NO_SIGNATURE, OTHER_UID_NO_SIGNATURE_PACKAGE_NAME);
+ }
+
+ @SuppressWarnings("deprecation")
+ private void installPackage(int uid, String packageName, Signature... signatures) {
+ PackageInfo info = new PackageInfo();
+ info.packageName = packageName;
+ info.signatures = signatures;
+ shadowOf(packageManager).installPackage(info);
+ shadowOf(packageManager).setPackagesForUid(uid, packageName);
+ }
+
@Test
public void testInternalOnly() throws Exception {
policy = SecurityPolicies.internalOnly();
@@ -53,4 +95,80 @@ public void testPermissionDenied() throws Exception {
assertThat(policy.checkAuthorization(OTHER_UID).getDescription())
.isEqualTo(PERMISSION_DENIED_REASONS);
}
+
+ @Test
+ public void testHasSignature_succeedsIfPackageNameAndSignaturesMatch()
+ throws Exception {
+ policy = SecurityPolicies.hasSignature(packageManager, OTHER_UID_PACKAGE_NAME, SIG2);
+
+ // THEN UID for package that has SIG2 will be authorized
+ assertThat(policy.checkAuthorization(OTHER_UID).getCode()).isEqualTo(Status.OK.getCode());
+ }
+
+ @Test
+ public void testHasSignature_failsIfPackageNameDoesNotMatch() throws Exception {
+ policy = SecurityPolicies.hasSignature(packageManager, appContext.getPackageName(), SIG1);
+
+ // THEN UID for package that has SIG1 but different package name will not be authorized
+ assertThat(policy.checkAuthorization(OTHER_UID_SAME_SIGNATURE).getCode())
+ .isEqualTo(Status.PERMISSION_DENIED.getCode());
+ }
+
+ @Test
+ public void testHasSignature_failsIfSignatureDoesNotMatch() throws Exception {
+ policy = SecurityPolicies.hasSignature(packageManager, OTHER_UID_PACKAGE_NAME, SIG1);
+
+ // THEN UID for package that doesn't have SIG1 will not be authorized
+ assertThat(policy.checkAuthorization(OTHER_UID).getCode())
+ .isEqualTo(Status.PERMISSION_DENIED.getCode());
+ }
+
+ @Test
+ public void testOneOfSignatures_succeedsIfPackageNameAndSignaturesMatch()
+ throws Exception {
+ policy =
+ SecurityPolicies.oneOfSignatures(
+ packageManager, OTHER_UID_PACKAGE_NAME, ImmutableList.of(SIG2));
+
+ // THEN UID for package that has SIG2 will be authorized
+ assertThat(policy.checkAuthorization(OTHER_UID).getCode()).isEqualTo(Status.OK.getCode());
+ }
+
+ @Test
+ public void testOneOfSignature_failsIfAllSignaturesDoNotMatch() throws Exception {
+ policy =
+ SecurityPolicies.oneOfSignatures(
+ packageManager,
+ appContext.getPackageName(),
+ ImmutableList.of(SIG1, new Signature("1314")));
+
+ // THEN UID for package that has SIG1 but different package name will not be authorized
+ assertThat(policy.checkAuthorization(OTHER_UID_SAME_SIGNATURE).getCode())
+ .isEqualTo(Status.PERMISSION_DENIED.getCode());
+ }
+
+ @Test
+ public void testOneOfSignature_succeedsIfPackageNameAndOneOfSignaturesMatch()
+ throws Exception {
+ policy =
+ SecurityPolicies.oneOfSignatures(
+ packageManager,
+ OTHER_UID_PACKAGE_NAME,
+ ImmutableList.of(SIG1, SIG2));
+
+ // THEN UID for package that has SIG2 will be authorized
+ assertThat(policy.checkAuthorization(OTHER_UID).getCode()).isEqualTo(Status.OK.getCode());
+ }
+
+ @Test
+ public void testHasSignature_failsIfUidUnknown() throws Exception {
+ policy =
+ SecurityPolicies.hasSignature(
+ packageManager,
+ appContext.getPackageName(),
+ SIG1);
+
+ assertThat(policy.checkAuthorization(OTHER_UID_UNKNOWN).getCode())
+ .isEqualTo(Status.UNAUTHENTICATED.getCode());
+ }
}
diff --git a/buildscripts/kokoro/xds-k8s.cfg b/buildscripts/kokoro/psm-security.cfg
similarity index 75%
rename from buildscripts/kokoro/xds-k8s.cfg
rename to buildscripts/kokoro/psm-security.cfg
index 09a8e705a4d2..f2cfd7babff3 100644
--- a/buildscripts/kokoro/xds-k8s.cfg
+++ b/buildscripts/kokoro/psm-security.cfg
@@ -1,8 +1,8 @@
# Config file for internal CI
# Location of the continuous shell script in repository.
-build_file: "grpc-java/buildscripts/kokoro/xds-k8s.sh"
-timeout_mins: 120
+build_file: "grpc-java/buildscripts/kokoro/psm-security.sh"
+timeout_mins: 180
action {
define_artifacts {
diff --git a/buildscripts/kokoro/xds-k8s.sh b/buildscripts/kokoro/psm-security.sh
similarity index 99%
rename from buildscripts/kokoro/xds-k8s.sh
rename to buildscripts/kokoro/psm-security.sh
index d0275d459e30..105e67b2d0fd 100755
--- a/buildscripts/kokoro/xds-k8s.sh
+++ b/buildscripts/kokoro/psm-security.sh
@@ -168,6 +168,7 @@ main() {
cd "${TEST_DRIVER_FULL_DIR}"
run_test baseline_test
run_test security_test
+ run_test authz_test
}
main "$@"
diff --git a/buildscripts/kokoro/upload_artifacts.sh b/buildscripts/kokoro/upload_artifacts.sh
index 8d7f2f5b3656..20e16c38a2ff 100644
--- a/buildscripts/kokoro/upload_artifacts.sh
+++ b/buildscripts/kokoro/upload_artifacts.sh
@@ -33,6 +33,8 @@ LOCAL_OTHER_ARTIFACTS="$KOKORO_GFILE_DIR"/github/grpc-java/artifacts/
# from macos job:
[[ "$(find "$LOCAL_MVN_ARTIFACTS" -type f -iname 'protoc-gen-grpc-java-*-osx-x86_64.exe' | wc -l)" != '0' ]]
+# copy all x86 artifacts to aarch until native artifacts are built
+find "$LOCAL_MVN_ARTIFACTS" -type f -iname 'protoc-gen-grpc-java-*-osx-x86_64.exe*' -exec bash -c 'cp "${0}" "${0/x86/aarch}"' {} \;
# from windows job:
[[ "$(find "$LOCAL_MVN_ARTIFACTS" -type f -iname 'protoc-gen-grpc-java-*-windows-x86_64.exe' | wc -l)" != '0' ]]
diff --git a/cronet/README.md b/cronet/README.md
index 8b220bd606de..a682c5bcee8f 100644
--- a/cronet/README.md
+++ b/cronet/README.md
@@ -26,7 +26,7 @@ In your app module's `build.gradle` file, include a dependency on both `grpc-cro
Google Play Services Client Library for Cronet
```
-implementation 'io.grpc:grpc-cronet:1.41.0'
+implementation 'io.grpc:grpc-cronet:1.42.1'
implementation 'com.google.android.gms:play-services-cronet:16.0.0'
```
diff --git a/documentation/android-channel-builder.md b/documentation/android-channel-builder.md
index 60e3bb35a85d..d9541c566231 100644
--- a/documentation/android-channel-builder.md
+++ b/documentation/android-channel-builder.md
@@ -36,8 +36,8 @@ In your `build.gradle` file, include a dependency on both `grpc-android` and
`grpc-okhttp`:
```
-implementation 'io.grpc:grpc-android:1.41.0'
-implementation 'io.grpc:grpc-okhttp:1.41.0'
+implementation 'io.grpc:grpc-android:1.42.1'
+implementation 'io.grpc:grpc-okhttp:1.42.1'
```
You also need permission to access the device's network state in your
diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
index 8607d3996a5d..1eebaa63a8e8 100644
--- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
+++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
@@ -287,8 +287,9 @@ void handleAddresses(
cancelLbRpcRetryTimer();
startLbRpc();
}
- // Start the fallback timer if it's never started
- if (fallbackTimer == null) {
+ // Start the fallback timer if it's never started and we are not already using fallback
+ // backends.
+ if (fallbackTimer == null && !usingFallbackBackends) {
fallbackTimer = syncContext.schedule(
new FallbackModeTask(BALANCER_TIMEOUT_STATUS), FALLBACK_TIMEOUT_MS,
TimeUnit.MILLISECONDS, timerService);
diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
index cb231c6c055f..293c0aa0b82a 100644
--- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
+++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
@@ -1462,6 +1462,33 @@ public void grpclbFallback_noBalancerAddress() {
.updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));
}
+ /**
+ * A test for a situation where we first only get backend addresses resolved and then in a
+ * later name resolution get both backend and load balancer addresses. The first instance
+ * will switch us to using fallback backends and it is important that in the second instance
+ * we do not start a fallback timer as it will fail when it triggers if the fallback backends
+ * are already in use.
+ */
+ @Test
+ public void grpclbFallback_noTimerWhenAlreadyInFallback() {
+ // Initially we only get backend addresses without any LB ones. This should get us to use
+ // fallback backends from the start as we won't be able to even talk to the load balancer.
+ // No fallback timer would be started as we already started to use fallback backends.
+ deliverResolvedAddresses(createResolvedBalancerAddresses(1),
+ Collections.emptyList());
+ assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
+
+ // Later a new name resolution call happens and we get both backend and LB addresses. Since we
+ // are already operating with fallback backends a fallback timer should not be started to move
+ // us to fallback mode.
+ deliverResolvedAddresses(Collections.emptyList(),
+ createResolvedBalancerAddresses(1));
+
+ // If a fallback timer is started it will eventually throw an exception when it tries to switch
+ // us to using fallback backends when we already are using them.
+ assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
+ }
+
@Test
public void grpclbFallback_balancerLost() {
subtestGrpclbFallbackConnectionLost(true, false);
diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestServer.java b/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestServer.java
index b2af083bd248..6b3e7213cfb7 100644
--- a/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestServer.java
+++ b/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestServer.java
@@ -180,16 +180,16 @@ private void start() throws Exception {
.addService(
ServerInterceptors.intercept(
new TestServiceImpl(serverId, host), new TestInfoInterceptor(host)))
- .build()
- .start();
+ .build();
+ server.start();
maintenanceServer =
NettyServerBuilder.forPort(maintenancePort)
.addService(new XdsUpdateHealthServiceImpl(health))
.addService(health.getHealthService())
.addService(ProtoReflectionService.newInstance())
.addServices(AdminInterface.getStandardServices())
- .build()
- .start();
+ .build();
+ maintenanceServer.start();
} else {
server =
NettyServerBuilder.forPort(port)
@@ -200,8 +200,8 @@ private void start() throws Exception {
.addService(health.getHealthService())
.addService(ProtoReflectionService.newInstance())
.addServices(AdminInterface.getStandardServices())
- .build()
- .start();
+ .build();
+ server.start();
maintenanceServer = null;
}
health.setStatus("", ServingStatus.SERVING);
diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java
index c286c17f6409..f552b937a05c 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java
@@ -109,6 +109,9 @@ class NettyServerHandler extends AbstractNettyHandler {
@VisibleForTesting
static final long GRACEFUL_SHUTDOWN_PING = 0x97ACEF001L;
private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10);
+ /** Temporary workaround for #8674. Fine to delete after v1.45 release, and maybe earlier. */
+ private static final boolean DISABLE_CONNECTION_HEADER_CHECK = Boolean.parseBoolean(
+ System.getProperty("io.grpc.netty.disableConnectionHeaderCheck", "false"));
private final Http2Connection.PropertyKey streamKey;
private final ServerTransportListener transportListener;
@@ -380,7 +383,7 @@ private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers
try {
// Connection-specific header fields makes a request malformed. Ideally this would be handled
// by Netty. RFC 7540 section 8.1.2.2
- if (headers.contains(CONNECTION)) {
+ if (!DISABLE_CONNECTION_HEADER_CHECK && headers.contains(CONNECTION)) {
resetStream(ctx, streamId, Http2Error.PROTOCOL_ERROR.code(), ctx.newPromise());
return;
}
diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java
index af5ebe2886cc..b3e90d158fa8 100644
--- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java
+++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java
@@ -105,15 +105,22 @@ private enum NegotiationType {
new ConnectionSpec.Builder(ConnectionSpec.MODERN_TLS)
.cipherSuites(
// The following items should be sync with Netty's Http2SecurityUtil.CIPHERS.
- CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
- CipherSuite.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
CipherSuite.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
- CipherSuite.TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,
- CipherSuite.TLS_DHE_DSS_WITH_AES_128_GCM_SHA256,
- CipherSuite.TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,
- CipherSuite.TLS_DHE_DSS_WITH_AES_256_GCM_SHA384)
- .tlsVersions(TlsVersion.TLS_1_2)
+ CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
+ CipherSuite.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
+ CipherSuite.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256,
+ CipherSuite.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256
+
+ // TLS 1.3 does not work so far. See issues:
+ // https://github.com/grpc/grpc-java/issues/7765
+ //
+ // TLS 1.3
+ //CipherSuite.TLS_AES_128_GCM_SHA256,
+ //CipherSuite.TLS_AES_256_GCM_SHA384,
+ //CipherSuite.TLS_CHACHA20_POLY1305_SHA256
+ )
+ .tlsVersions(/*TlsVersion.TLS_1_3,*/ TlsVersion.TLS_1_2)
.supportsTlsExtensions(true)
.build();
@@ -396,6 +403,32 @@ public OkHttpChannelBuilder connectionSpec(
return this;
}
+ /**
+ * Sets the connection specification used for secure connections.
+ *
+ * By default a modern, HTTP/2-compatible spec will be used.
+ *
+ *
This method is only used when building a secure connection. For plaintext
+ * connection, use {@link #usePlaintext()} instead.
+ *
+ * @param tlsVersions List of tls versions.
+ * @param cipherSuites List of cipher suites.
+ */
+ public OkHttpChannelBuilder tlsConnectionSpec(
+ String[] tlsVersions, String[] cipherSuites) {
+ Preconditions.checkState(!freezeSecurityConfiguration,
+ "Cannot change security when using ChannelCredentials");
+ Preconditions.checkNotNull(tlsVersions, "tls versions must not null");
+ Preconditions.checkNotNull(cipherSuites, "ciphers must not null");
+
+ this.connectionSpec = new ConnectionSpec.Builder(true)
+ .supportsTlsExtensions(true)
+ .tlsVersions(tlsVersions)
+ .cipherSuites(cipherSuites)
+ .build();
+ return this;
+ }
+
/** Sets the negotiation type for the HTTP/2 connection to plaintext. */
@Override
public OkHttpChannelBuilder usePlaintext() {
diff --git a/okhttp/third_party/okhttp/main/java/io/grpc/okhttp/internal/CipherSuite.java b/okhttp/third_party/okhttp/main/java/io/grpc/okhttp/internal/CipherSuite.java
index 70fa796f2488..1a9aab284bbd 100644
--- a/okhttp/third_party/okhttp/main/java/io/grpc/okhttp/internal/CipherSuite.java
+++ b/okhttp/third_party/okhttp/main/java/io/grpc/okhttp/internal/CipherSuite.java
@@ -354,6 +354,22 @@ public enum CipherSuite {
// TLS_ECDHE_ECDSA_WITH_AES_256_CCM("TLS_ECDHE_ECDSA_WITH_AES_256_CCM", 0xc0ad, 7251, MAX_VALUE, MAX_VALUE),
// TLS_ECDHE_ECDSA_WITH_AES_128_CCM_8("TLS_ECDHE_ECDSA_WITH_AES_128_CCM_8", 0xc0ae, 7251, MAX_VALUE, MAX_VALUE),
// TLS_ECDHE_ECDSA_WITH_AES_256_CCM_8("TLS_ECDHE_ECDSA_WITH_AES_256_CCM_8", 0xc0af, 7251, MAX_VALUE, MAX_VALUE),
+
+ TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256("TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256", 0xcca8),
+ TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256("TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256", 0xcca9),
+ TLS_DHE_RSA_WITH_CHACHA20_POLY1305_SHA256("TLS_DHE_RSA_WITH_CHACHA20_POLY1305_SHA256", 0xccaa),
+ // TLS_PSK_WITH_CHACHA20_POLY1305_SHA256("TLS_PSK_WITH_CHACHA20_POLY1305_SHA256", 0xccab),
+ TLS_ECDHE_PSK_WITH_CHACHA20_POLY1305_SHA256("TLS_ECDHE_PSK_WITH_CHACHA20_POLY1305_SHA256", 0xccac),
+ // TLS_DHE_PSK_WITH_CHACHA20_POLY1305_SHA256("TLS_DHE_PSK_WITH_CHACHA20_POLY1305_SHA256", 0xccad),
+ // TLS_RSA_PSK_WITH_CHACHA20_POLY1305_SHA256("TLS_RSA_PSK_WITH_CHACHA20_POLY1305_SHA256", 0xccae),
+
+ // TLS 1.3 https://tools.ietf.org/html/rfc8446
+ TLS_AES_128_GCM_SHA256("TLS_AES_128_GCM_SHA256", 0x1301),
+ TLS_AES_256_GCM_SHA384("TLS_AES_256_GCM_SHA384", 0x1302),
+ TLS_CHACHA20_POLY1305_SHA256("TLS_CHACHA20_POLY1305_SHA256", 0x1303),
+ TLS_AES_128_CCM_SHA256("TLS_AES_128_CCM_SHA256", 0x1304),
+ TLS_AES_128_CCM_8_SHA256("TLS_AES_128_CCM_8_SHA256", 0x1305),
+
;
final String javaName;
@@ -372,6 +388,12 @@ private CipherSuite(
this.javaName = javaName;
}
+ @SuppressWarnings("UnusedVariable")
+ private CipherSuite(
+ String javaName, int value) {
+ this.javaName = javaName;
+ }
+
public static CipherSuite forJavaName(String javaName) {
return javaName.startsWith("SSL_")
? valueOf("TLS_" + javaName.substring(4))
diff --git a/okhttp/third_party/okhttp/main/java/io/grpc/okhttp/internal/ConnectionSpec.java b/okhttp/third_party/okhttp/main/java/io/grpc/okhttp/internal/ConnectionSpec.java
index 457e9c301f37..b84a1ff94eec 100644
--- a/okhttp/third_party/okhttp/main/java/io/grpc/okhttp/internal/ConnectionSpec.java
+++ b/okhttp/third_party/okhttp/main/java/io/grpc/okhttp/internal/ConnectionSpec.java
@@ -30,40 +30,43 @@
*/
public final class ConnectionSpec {
- // This is a subset of the cipher suites supported in Chrome 37, current as of 2014-10-5.
- // All of these suites are available on Android 5.0; earlier releases support a subset of
- // these suites. https://github.com/square/okhttp/issues/330
+ // This is nearly equal to the cipher suites supported in Chrome 72, current as of 2019-02-24.
+ // See https://tinyurl.com/okhttp-cipher-suites for availability.
private static final CipherSuite[] APPROVED_CIPHER_SUITES = new CipherSuite[] {
- CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
- CipherSuite.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
- CipherSuite.TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,
-
- // Note that the following cipher suites are all on HTTP/2's bad cipher suites list. We'll
- // continue to include them until better suites are commonly available. For example, none
- // of the better cipher suites listed above shipped with Android 4.4 or Java 7.
- CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
- CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
- CipherSuite.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
- CipherSuite.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
- CipherSuite.TLS_DHE_RSA_WITH_AES_128_CBC_SHA,
- CipherSuite.TLS_DHE_DSS_WITH_AES_128_CBC_SHA,
- CipherSuite.TLS_DHE_RSA_WITH_AES_256_CBC_SHA,
- CipherSuite.TLS_RSA_WITH_AES_128_GCM_SHA256,
- CipherSuite.TLS_RSA_WITH_AES_128_CBC_SHA,
- CipherSuite.TLS_RSA_WITH_AES_256_CBC_SHA,
- CipherSuite.TLS_RSA_WITH_3DES_EDE_CBC_SHA,
+ // TLSv1.3.
+ CipherSuite.TLS_AES_128_GCM_SHA256,
+ CipherSuite.TLS_AES_256_GCM_SHA384,
+ CipherSuite.TLS_CHACHA20_POLY1305_SHA256,
+
+ // TLSv1.0, TLSv1.1, TLSv1.2.
+ CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
+ CipherSuite.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
+ CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
+ CipherSuite.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
+ CipherSuite.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256,
+ CipherSuite.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256,
+
+ // Note that the following cipher suites are all on HTTP/2's bad cipher suites list. We'll
+ // continue to include them until better suites are commonly available.
+ CipherSuite.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
+ CipherSuite.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
+ CipherSuite.TLS_RSA_WITH_AES_128_GCM_SHA256,
+ CipherSuite.TLS_RSA_WITH_AES_256_GCM_SHA384,
+ CipherSuite.TLS_RSA_WITH_AES_128_CBC_SHA,
+ CipherSuite.TLS_RSA_WITH_AES_256_CBC_SHA,
+ CipherSuite.TLS_RSA_WITH_3DES_EDE_CBC_SHA
};
/** A modern TLS connection with extensions like SNI and ALPN available. */
public static final ConnectionSpec MODERN_TLS = new Builder(true)
.cipherSuites(APPROVED_CIPHER_SUITES)
- .tlsVersions(TlsVersion.TLS_1_2, TlsVersion.TLS_1_1, TlsVersion.TLS_1_0)
+ .tlsVersions(TlsVersion.TLS_1_3, TlsVersion.TLS_1_2)
.supportsTlsExtensions(true)
.build();
/** A backwards-compatible fallback connection for interop with obsolete servers. */
public static final ConnectionSpec COMPATIBLE_TLS = new Builder(MODERN_TLS)
- .tlsVersions(TlsVersion.TLS_1_0)
+ .tlsVersions(TlsVersion.TLS_1_3, TlsVersion.TLS_1_2, TlsVersion.TLS_1_1, TlsVersion.TLS_1_0)
.supportsTlsExtensions(true)
.build();
diff --git a/okhttp/third_party/okhttp/main/java/io/grpc/okhttp/internal/TlsVersion.java b/okhttp/third_party/okhttp/main/java/io/grpc/okhttp/internal/TlsVersion.java
index 548f4acbc5b3..6692c80fad4d 100644
--- a/okhttp/third_party/okhttp/main/java/io/grpc/okhttp/internal/TlsVersion.java
+++ b/okhttp/third_party/okhttp/main/java/io/grpc/okhttp/internal/TlsVersion.java
@@ -26,6 +26,7 @@
* {@link SSLSocket#setEnabledProtocols}.
*/
public enum TlsVersion {
+ TLS_1_3("TLSv1.3"), // 2016.
TLS_1_2("TLSv1.2"), // 2008.
TLS_1_1("TLSv1.1"), // 2006.
TLS_1_0("TLSv1"), // 1999.
@@ -39,7 +40,9 @@ private TlsVersion(String javaName) {
}
public static TlsVersion forJavaName(String javaName) {
- if ("TLSv1.2".equals(javaName)) {
+ if ("TLSv1.3".equals(javaName)) {
+ return TLS_1_3;
+ } else if ("TLSv1.2".equals(javaName)) {
return TLS_1_2;
} else if ("TLSv1.1".equals(javaName)) {
return TLS_1_1;
diff --git a/rls/src/main/java/io/grpc/rls/RlsProtoData.java b/rls/src/main/java/io/grpc/rls/RlsProtoData.java
index 9e62f228a1f8..2ddf00db207b 100644
--- a/rls/src/main/java/io/grpc/rls/RlsProtoData.java
+++ b/rls/src/main/java/io/grpc/rls/RlsProtoData.java
@@ -24,15 +24,13 @@
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import io.grpc.Internal;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
/** RlsProtoData is a collection of internal representation of RouteLookupService proto messages. */
-@Internal
-public final class RlsProtoData {
+final class RlsProtoData {
private RlsProtoData() {}
@@ -137,7 +135,7 @@ public String toString() {
/** A config object for gRPC RouteLookupService. */
@Immutable
- public static final class RouteLookupConfig {
+ static final class RouteLookupConfig {
private final ImmutableList grpcKeyBuilders;
@@ -154,8 +152,7 @@ public static final class RouteLookupConfig {
@Nullable
private final String defaultTarget;
- /** Constructor. */
- public RouteLookupConfig(
+ RouteLookupConfig(
List grpcKeyBuilders,
String lookupService,
long lookupServiceTimeoutInNanos,
@@ -284,14 +281,13 @@ public String toString() {
* is true, one of the specified names must be present for the keybuilder to match.
*/
@Immutable
- public static final class NameMatcher {
+ static final class NameMatcher {
private final String key;
private final ImmutableList names;
- /** Constructor. */
- public NameMatcher(String key, List names) {
+ NameMatcher(String key, List names) {
this.key = checkNotNull(key, "key");
this.names = ImmutableList.copyOf(checkNotNull(names, "names"));
}
@@ -334,7 +330,7 @@ public String toString() {
}
/** GrpcKeyBuilder is a configuration to construct headers consumed by route lookup service. */
- public static final class GrpcKeyBuilder {
+ static final class GrpcKeyBuilder {
private final ImmutableList names;
@@ -343,7 +339,7 @@ public static final class GrpcKeyBuilder {
private final ImmutableMap constantKeys;
/** Constructor. All args should be nonnull. Headers should head unique keys. */
- public GrpcKeyBuilder(
+ GrpcKeyBuilder(
List names, List headers, ExtraKeys extraKeys,
Map constantKeys) {
checkState(names != null && !names.isEmpty(), "names cannot be empty");
@@ -414,7 +410,7 @@ public String toString() {
* required and includes the proto package name. The method name may be omitted, in which case
* any method on the given service is matched.
*/
- public static final class Name {
+ static final class Name {
private final String service;
@@ -425,7 +421,7 @@ public Name(String service) {
}
/** The primary constructor. */
- public Name(String service, String method) {
+ Name(String service, String method) {
this.service = service;
this.method = method;
}
@@ -467,7 +463,7 @@ public String toString() {
}
@AutoValue
- public abstract static class ExtraKeys {
+ abstract static class ExtraKeys {
static final ExtraKeys DEFAULT = create(null, null, null);
@Nullable abstract String host();
@@ -476,7 +472,7 @@ public abstract static class ExtraKeys {
@Nullable abstract String method();
- public static ExtraKeys create(
+ static ExtraKeys create(
@Nullable String host, @Nullable String service, @Nullable String method) {
return new AutoValue_RlsProtoData_ExtraKeys(host, service, method);
}
diff --git a/testing/src/main/java/io/grpc/internal/testing/FakeNameResolverProvider.java b/testing/src/main/java/io/grpc/internal/testing/FakeNameResolverProvider.java
new file mode 100644
index 000000000000..d056707b7196
--- /dev/null
+++ b/testing/src/main/java/io/grpc/internal/testing/FakeNameResolverProvider.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2021 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.internal.testing;
+
+import com.google.common.collect.ImmutableList;
+import io.grpc.EquivalentAddressGroup;
+import io.grpc.NameResolver;
+import io.grpc.NameResolverProvider;
+import io.grpc.Status;
+import java.net.SocketAddress;
+import java.net.URI;
+
+/** A name resolver to always resolve the given URI into the given address. */
+public final class FakeNameResolverProvider extends NameResolverProvider {
+
+ private final URI targetUri;
+ private final SocketAddress address;
+
+ public FakeNameResolverProvider(String targetUri, SocketAddress address) {
+ this.targetUri = URI.create(targetUri);
+ this.address = address;
+ }
+
+ @Override
+ public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
+ if (targetUri.equals(this.targetUri)) {
+ return new FakeNameResolver(address);
+ }
+ return null;
+ }
+
+ @Override
+ protected boolean isAvailable() {
+ return true;
+ }
+
+ @Override
+ protected int priority() {
+ return 5; // Default
+ }
+
+ @Override
+ public String getDefaultScheme() {
+ return targetUri.getScheme();
+ }
+
+ /** A single name resolver. */
+ private static final class FakeNameResolver extends NameResolver {
+ private static final String AUTHORITY = "fake-authority";
+
+ private final SocketAddress address;
+ private volatile boolean shutdown;
+
+ private FakeNameResolver(SocketAddress address) {
+ this.address = address;
+ }
+
+ @Override
+ public void start(Listener2 listener) {
+ if (shutdown) {
+ listener.onError(Status.FAILED_PRECONDITION.withDescription("Resolver is shutdown"));
+ } else {
+ listener.onResult(
+ ResolutionResult.newBuilder()
+ .setAddresses(ImmutableList.of(new EquivalentAddressGroup(address)))
+ .build());
+ }
+ }
+
+ @Override
+ public String getServiceAuthority() {
+ return AUTHORITY;
+ }
+
+ @Override
+ public void shutdown() {
+ shutdown = true;
+ }
+ }
+}
diff --git a/xds/build.gradle b/xds/build.gradle
index a8dbde3e0a11..16b1c8779062 100644
--- a/xds/build.gradle
+++ b/xds/build.gradle
@@ -39,9 +39,9 @@ dependencies {
libraries.autovalue_annotation,
libraries.opencensus_proto,
libraries.protobuf_util
- implementation project(path: ':grpc-rls')
def nettyDependency = implementation project(':grpc-netty')
+ testImplementation project(':grpc-rls')
testImplementation project(':grpc-core').sourceSets.test.output
annotationProcessor libraries.autovalue
@@ -71,7 +71,7 @@ sourceSets {
proto {
srcDir 'third_party/envoy/src/main/proto'
srcDir 'third_party/protoc-gen-validate/src/main/proto'
- srcDir 'third_party/udpa/src/main/proto'
+ srcDir 'third_party/xds/src/main/proto'
srcDir 'third_party/googleapis/src/main/proto'
srcDir 'third_party/istio/src/main/proto'
}
@@ -92,6 +92,7 @@ jar {
javadoc {
// Exclusions here should generally also be relocated
exclude 'com/github/udpa/**'
+ exclude 'com/github/xds/**'
exclude 'com/google/security/**'
exclude 'io/envoyproxy/**'
// Need to clean up the class structure to reduce how much is exposed
@@ -115,6 +116,7 @@ shadowJar {
}
// Relocated packages commonly need exclusions in jacocoTestReport and javadoc
relocate 'com.github.udpa', "${prefixName}.shaded.com.github.udpa"
+ relocate 'com.github.xds', "${prefixName}.shaded.com.github.xds"
relocate 'com.google.api.expr', "${prefixName}.shaded.com.google.api.expr"
relocate 'com.google.security', "${prefixName}.shaded.com.google.security"
// TODO: missing java_package option in .proto
@@ -124,6 +126,7 @@ shadowJar {
relocate 'io.netty', 'io.grpc.netty.shaded.io.netty'
// TODO: missing java_package option in .proto
relocate 'udpa.annotations', "${prefixName}.shaded.udpa.annotations"
+ relocate 'xds.annotations', "${prefixName}.shaded.xds.annotations"
exclude "**/*.proto"
}
@@ -159,11 +162,13 @@ jacocoTestReport {
fileTree(dir: it,
exclude: [ // Exclusions here should generally also be relocated
'**/com/github/udpa/**',
+ '**/com/github/xds/**',
'**/com/google/api/expr/**',
'**/com/google/security/**',
'**/envoy/annotations/**',
'**/io/envoyproxy/**',
'**/udpa/annotations/**',
+ '**/xds/annotations/**',
])
}
}
diff --git a/xds/src/generated/main/grpc/com/github/udpa/udpa/service/orca/v1/OpenRcaServiceGrpc.java b/xds/src/generated/main/grpc/com/github/xds/service/orca/v3/OpenRcaServiceGrpc.java
similarity index 83%
rename from xds/src/generated/main/grpc/com/github/udpa/udpa/service/orca/v1/OpenRcaServiceGrpc.java
rename to xds/src/generated/main/grpc/com/github/xds/service/orca/v3/OpenRcaServiceGrpc.java
index 78383dba2d35..52ec68988085 100644
--- a/xds/src/generated/main/grpc/com/github/udpa/udpa/service/orca/v1/OpenRcaServiceGrpc.java
+++ b/xds/src/generated/main/grpc/com/github/xds/service/orca/v3/OpenRcaServiceGrpc.java
@@ -1,4 +1,4 @@
-package com.github.udpa.udpa.service.orca.v1;
+package com.github.xds.service.orca.v3;
import static io.grpc.MethodDescriptor.generateFullMethodName;
@@ -16,38 +16,38 @@
*/
@javax.annotation.Generated(
value = "by gRPC proto compiler",
- comments = "Source: udpa/service/orca/v1/orca.proto")
+ comments = "Source: xds/service/orca/v3/orca.proto")
@io.grpc.stub.annotations.GrpcGenerated
public final class OpenRcaServiceGrpc {
private OpenRcaServiceGrpc() {}
- public static final String SERVICE_NAME = "udpa.service.orca.v1.OpenRcaService";
+ public static final String SERVICE_NAME = "xds.service.orca.v3.OpenRcaService";
// Static method descriptors that strictly reflect the proto.
- private static volatile io.grpc.MethodDescriptor getStreamCoreMetricsMethod;
+ private static volatile io.grpc.MethodDescriptor getStreamCoreMetricsMethod;
@io.grpc.stub.annotations.RpcMethod(
fullMethodName = SERVICE_NAME + '/' + "StreamCoreMetrics",
- requestType = com.github.udpa.udpa.service.orca.v1.OrcaLoadReportRequest.class,
- responseType = com.github.udpa.udpa.data.orca.v1.OrcaLoadReport.class,
+ requestType = com.github.xds.service.orca.v3.OrcaLoadReportRequest.class,
+ responseType = com.github.xds.data.orca.v3.OrcaLoadReport.class,
methodType = io.grpc.MethodDescriptor.MethodType.SERVER_STREAMING)
- public static io.grpc.MethodDescriptor getStreamCoreMetricsMethod() {
- io.grpc.MethodDescriptor getStreamCoreMetricsMethod;
+ public static io.grpc.MethodDescriptor getStreamCoreMetricsMethod() {
+ io.grpc.MethodDescriptor getStreamCoreMetricsMethod;
if ((getStreamCoreMetricsMethod = OpenRcaServiceGrpc.getStreamCoreMetricsMethod) == null) {
synchronized (OpenRcaServiceGrpc.class) {
if ((getStreamCoreMetricsMethod = OpenRcaServiceGrpc.getStreamCoreMetricsMethod) == null) {
OpenRcaServiceGrpc.getStreamCoreMetricsMethod = getStreamCoreMetricsMethod =
- io.grpc.MethodDescriptor.newBuilder()
+ io.grpc.MethodDescriptor.newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.SERVER_STREAMING)
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "StreamCoreMetrics"))
.setSampledToLocalTracing(true)
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
- com.github.udpa.udpa.service.orca.v1.OrcaLoadReportRequest.getDefaultInstance()))
+ com.github.xds.service.orca.v3.OrcaLoadReportRequest.getDefaultInstance()))
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
- com.github.udpa.udpa.data.orca.v1.OrcaLoadReport.getDefaultInstance()))
+ com.github.xds.data.orca.v3.OrcaLoadReport.getDefaultInstance()))
.setSchemaDescriptor(new OpenRcaServiceMethodDescriptorSupplier("StreamCoreMetrics"))
.build();
}
@@ -116,8 +116,8 @@ public static abstract class OpenRcaServiceImplBase implements io.grpc.BindableS
/**
*/
- public void streamCoreMetrics(com.github.udpa.udpa.service.orca.v1.OrcaLoadReportRequest request,
- io.grpc.stub.StreamObserver responseObserver) {
+ public void streamCoreMetrics(com.github.xds.service.orca.v3.OrcaLoadReportRequest request,
+ io.grpc.stub.StreamObserver responseObserver) {
io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getStreamCoreMetricsMethod(), responseObserver);
}
@@ -127,8 +127,8 @@ public void streamCoreMetrics(com.github.udpa.udpa.service.orca.v1.OrcaLoadRepor
getStreamCoreMetricsMethod(),
io.grpc.stub.ServerCalls.asyncServerStreamingCall(
new MethodHandlers<
- com.github.udpa.udpa.service.orca.v1.OrcaLoadReportRequest,
- com.github.udpa.udpa.data.orca.v1.OrcaLoadReport>(
+ com.github.xds.service.orca.v3.OrcaLoadReportRequest,
+ com.github.xds.data.orca.v3.OrcaLoadReport>(
this, METHODID_STREAM_CORE_METRICS)))
.build();
}
@@ -160,8 +160,8 @@ protected OpenRcaServiceStub build(
/**
*/
- public void streamCoreMetrics(com.github.udpa.udpa.service.orca.v1.OrcaLoadReportRequest request,
- io.grpc.stub.StreamObserver responseObserver) {
+ public void streamCoreMetrics(com.github.xds.service.orca.v3.OrcaLoadReportRequest request,
+ io.grpc.stub.StreamObserver responseObserver) {
io.grpc.stub.ClientCalls.asyncServerStreamingCall(
getChannel().newCall(getStreamCoreMetricsMethod(), getCallOptions()), request, responseObserver);
}
@@ -193,8 +193,8 @@ protected OpenRcaServiceBlockingStub build(
/**
*/
- public java.util.Iterator streamCoreMetrics(
- com.github.udpa.udpa.service.orca.v1.OrcaLoadReportRequest request) {
+ public java.util.Iterator streamCoreMetrics(
+ com.github.xds.service.orca.v3.OrcaLoadReportRequest request) {
return io.grpc.stub.ClientCalls.blockingServerStreamingCall(
getChannel(), getStreamCoreMetricsMethod(), getCallOptions(), request);
}
@@ -245,8 +245,8 @@ private static final class MethodHandlers implements
public void invoke(Req request, io.grpc.stub.StreamObserver responseObserver) {
switch (methodId) {
case METHODID_STREAM_CORE_METRICS:
- serviceImpl.streamCoreMetrics((com.github.udpa.udpa.service.orca.v1.OrcaLoadReportRequest) request,
- (io.grpc.stub.StreamObserver) responseObserver);
+ serviceImpl.streamCoreMetrics((com.github.xds.service.orca.v3.OrcaLoadReportRequest) request,
+ (io.grpc.stub.StreamObserver) responseObserver);
break;
default:
throw new AssertionError();
@@ -270,7 +270,7 @@ private static abstract class OpenRcaServiceBaseDescriptorSupplier
@java.lang.Override
public com.google.protobuf.Descriptors.FileDescriptor getFileDescriptor() {
- return com.github.udpa.udpa.service.orca.v1.OrcaProto.getDescriptor();
+ return com.github.xds.service.orca.v3.OrcaProto.getDescriptor();
}
@java.lang.Override
diff --git a/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java b/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java
index 0e609ff74587..61780c60a554 100644
--- a/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java
+++ b/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java
@@ -28,6 +28,7 @@
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
+import io.grpc.Channel;
import io.grpc.Context;
import io.grpc.InternalLogId;
import io.grpc.ManagedChannel;
@@ -36,6 +37,11 @@
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.BackoffPolicy;
import io.grpc.stub.StreamObserver;
+import io.grpc.xds.Bootstrapper.ServerInfo;
+import io.grpc.xds.ClientXdsClient.XdsChannelFactory;
+import io.grpc.xds.EnvoyProtoData.Node;
+import io.grpc.xds.XdsClient.ResourceStore;
+import io.grpc.xds.XdsClient.XdsResponseHandler;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import java.util.Collection;
import java.util.Collections;
@@ -48,7 +54,7 @@
* Common base type for XdsClient implementations, which encapsulates the layer abstraction of
* the xDS RPC stream.
*/
-abstract class AbstractXdsClient extends XdsClient {
+final class AbstractXdsClient {
private static final String ADS_TYPE_URL_LDS_V2 = "type.googleapis.com/envoy.api.v2.Listener";
private static final String ADS_TYPE_URL_LDS =
@@ -66,26 +72,18 @@ abstract class AbstractXdsClient extends XdsClient {
private static final String ADS_TYPE_URL_EDS =
"type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment";
- private final SynchronizationContext syncContext = new SynchronizationContext(
- new Thread.UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- getLogger().log(
- XdsLogLevel.ERROR,
- "Uncaught exception in XdsClient SynchronizationContext. Panic!",
- e);
- // TODO(chengyuanzhang): better error handling.
- throw new AssertionError(e);
- }
- });
+ private final SynchronizationContext syncContext;
private final InternalLogId logId;
private final XdsLogger logger;
+ private final ServerInfo serverInfo;
private final ManagedChannel channel;
+ private final XdsResponseHandler xdsResponseHandler;
+ private final ResourceStore resourceStore;
private final Context context;
private final ScheduledExecutorService timeService;
private final BackoffPolicy.Provider backoffPolicyProvider;
private final Stopwatch stopwatch;
- private final Bootstrapper.BootstrapInfo bootstrapInfo;
+ private final Node bootstrapNode;
// Last successfully applied version_info for each resource type. Starts with empty string.
// A version_info is used to update management server with client's most recent knowledge of
@@ -103,71 +101,42 @@ public void uncaughtException(Thread t, Throwable e) {
@Nullable
private ScheduledHandle rpcRetryTimer;
- AbstractXdsClient(ManagedChannel channel, Bootstrapper.BootstrapInfo bootstrapInfo,
- Context context, ScheduledExecutorService timeService,
- BackoffPolicy.Provider backoffPolicyProvider, Supplier stopwatchSupplier) {
- this.channel = checkNotNull(channel, "channel");
- this.bootstrapInfo = checkNotNull(bootstrapInfo, "bootstrapInfo");
+ /** An entity that manages ADS RPCs over a single channel. */
+ // TODO: rename to XdsChannel
+ AbstractXdsClient(
+ XdsChannelFactory xdsChannelFactory,
+ ServerInfo serverInfo,
+ Node bootstrapNode,
+ XdsResponseHandler xdsResponseHandler,
+ ResourceStore resourceStore,
+ Context context,
+ ScheduledExecutorService
+ timeService,
+ SynchronizationContext syncContext,
+ BackoffPolicy.Provider backoffPolicyProvider,
+ Supplier stopwatchSupplier) {
+ this.serverInfo = checkNotNull(serverInfo, "serverInfo");
+ this.channel = checkNotNull(xdsChannelFactory, "xdsChannelFactory").create(serverInfo);
+ this.xdsResponseHandler = checkNotNull(xdsResponseHandler, "xdsResponseHandler");
+ this.resourceStore = checkNotNull(resourceStore, "resourcesSubscriber");
+ this.bootstrapNode = checkNotNull(bootstrapNode, "bootstrapNode");
this.context = checkNotNull(context, "context");
this.timeService = checkNotNull(timeService, "timeService");
+ this.syncContext = checkNotNull(syncContext, "syncContext");
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
stopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get();
- logId = InternalLogId.allocate("xds-client", null);
+ logId = InternalLogId.allocate("xds-client", serverInfo.target());
logger = XdsLogger.withLogId(logId);
logger.log(XdsLogLevel.INFO, "Created");
}
- /**
- * Called when an LDS response is received.
- */
- // Must be synchronized.
- protected void handleLdsResponse(String versionInfo, List resources, String nonce) {
- }
-
- /**
- * Called when a RDS response is received.
- */
- // Must be synchronized.
- protected void handleRdsResponse(String versionInfo, List resources, String nonce) {
- }
-
- /**
- * Called when a CDS response is received.
- */
- // Must be synchronized.
- protected void handleCdsResponse(String versionInfo, List resources, String nonce) {
- }
-
- /**
- * Called when an EDS response is received.
- */
- // Must be synchronized.
- protected void handleEdsResponse(String versionInfo, List resources, String nonce) {
- }
-
- /**
- * Called when the ADS stream is closed passively.
- */
- // Must be synchronized.
- protected void handleStreamClosed(Status error) {
- }
-
- /**
- * Called when the ADS stream has been recreated.
- */
- // Must be synchronized.
- protected void handleStreamRestarted() {
- }
-
- /**
- * Called when being shut down.
- */
- // Must be synchronized.
- protected void handleShutdown() {
+ /** The underlying channel. */
+ // Currently, only externally used for LrsClient.
+ Channel channel() {
+ return channel;
}
- @Override
- final void shutdown() {
+ void shutdown() {
syncContext.execute(new Runnable() {
@Override
public void run() {
@@ -179,49 +148,28 @@ public void run() {
if (rpcRetryTimer != null && rpcRetryTimer.isPending()) {
rpcRetryTimer.cancel();
}
- handleShutdown();
+ channel.shutdown();
}
});
}
- @Override
- boolean isShutDown() {
- return shutdown;
- }
-
- @Override
- Bootstrapper.BootstrapInfo getBootstrapInfo() {
- return bootstrapInfo;
- }
-
@Override
public String toString() {
return logId.toString();
}
- /**
- * Returns the collection of resources currently subscribing to or {@code null} if not
- * subscribing to any resources for the given type.
- *
- * Note an empty collection indicates subscribing to resources of the given type with
- * wildcard mode.
- */
- // Must be synchronized.
- @Nullable
- abstract Collection getSubscribedResources(ResourceType type);
-
/**
* Updates the resource subscription for the given resource type.
*/
// Must be synchronized.
- protected final void adjustResourceSubscription(ResourceType type) {
+ void adjustResourceSubscription(ResourceType type) {
if (isInBackoff()) {
return;
}
if (adsStream == null) {
startRpcStream();
}
- Collection resources = getSubscribedResources(type);
+ Collection resources = resourceStore.getSubscribedResources(serverInfo, type);
if (resources != null) {
adsStream.sendDiscoveryRequest(type, resources);
}
@@ -232,7 +180,7 @@ protected final void adjustResourceSubscription(ResourceType type) {
* and sends an ACK request to the management server.
*/
// Must be synchronized.
- protected final void ackResponse(ResourceType type, String versionInfo, String nonce) {
+ void ackResponse(ResourceType type, String versionInfo, String nonce) {
switch (type) {
case LDS:
ldsVersion = versionInfo;
@@ -252,7 +200,7 @@ protected final void ackResponse(ResourceType type, String versionInfo, String n
}
logger.log(XdsLogLevel.INFO, "Sending ACK for {0} update, nonce: {1}, current version: {2}",
type, nonce, versionInfo);
- Collection resources = getSubscribedResources(type);
+ Collection resources = resourceStore.getSubscribedResources(serverInfo, type);
if (resources == null) {
resources = Collections.emptyList();
}
@@ -264,34 +212,22 @@ protected final void ackResponse(ResourceType type, String versionInfo, String n
* accepted version) to the management server.
*/
// Must be synchronized.
- protected final void nackResponse(ResourceType type, String nonce, String errorDetail) {
+ void nackResponse(ResourceType type, String nonce, String errorDetail) {
String versionInfo = getCurrentVersion(type);
logger.log(XdsLogLevel.INFO, "Sending NACK for {0} update, nonce: {1}, current version: {2}",
type, nonce, versionInfo);
- Collection resources = getSubscribedResources(type);
+ Collection resources = resourceStore.getSubscribedResources(serverInfo, type);
if (resources == null) {
resources = Collections.emptyList();
}
adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, errorDetail);
}
- protected final SynchronizationContext getSyncContext() {
- return syncContext;
- }
-
- protected final ScheduledExecutorService getTimeService() {
- return timeService;
- }
-
- protected final XdsLogger getLogger() {
- return logger;
- }
-
/**
* Returns {@code true} if the resource discovery is currently in backoff.
*/
// Must be synchronized.
- protected final boolean isInBackoff() {
+ boolean isInBackoff() {
return rpcRetryTimer != null && rpcRetryTimer.isPending();
}
@@ -302,7 +238,7 @@ protected final boolean isInBackoff() {
// Must be synchronized.
private void startRpcStream() {
checkState(adsStream == null, "Previous adsStream has not been cleared yet");
- if (bootstrapInfo.servers().get(0).useProtocolV3()) {
+ if (serverInfo.useProtocolV3()) {
adsStream = new AdsStreamV3();
} else {
adsStream = new AdsStreamV2();
@@ -317,8 +253,8 @@ private void startRpcStream() {
stopwatch.reset().start();
}
+ /** Returns the latest accepted version of the given resource type. */
// Must be synchronized.
- @Override
String getCurrentVersion(ResourceType type) {
String version;
switch (type) {
@@ -353,16 +289,16 @@ public void run() {
if (type == ResourceType.UNKNOWN) {
continue;
}
- Collection resources = getSubscribedResources(type);
+ Collection resources = resourceStore.getSubscribedResources(serverInfo, type);
if (resources != null) {
adsStream.sendDiscoveryRequest(type, resources);
}
}
- handleStreamRestarted();
+ xdsResponseHandler.handleStreamRestarted(serverInfo);
}
}
- protected enum ResourceType {
+ enum ResourceType {
UNKNOWN, LDS, RDS, CDS, EDS;
String typeUrl() {
@@ -397,7 +333,8 @@ String typeUrlV2() {
}
}
- private static ResourceType fromTypeUrl(String typeUrl) {
+ @VisibleForTesting
+ static ResourceType fromTypeUrl(String typeUrl) {
switch (typeUrl) {
case ADS_TYPE_URL_LDS:
// fall trough
@@ -488,19 +425,19 @@ final void handleRpcResponse(
switch (type) {
case LDS:
ldsRespNonce = nonce;
- handleLdsResponse(versionInfo, resources, nonce);
+ xdsResponseHandler.handleLdsResponse(serverInfo, versionInfo, resources, nonce);
break;
case RDS:
rdsRespNonce = nonce;
- handleRdsResponse(versionInfo, resources, nonce);
+ xdsResponseHandler.handleRdsResponse(serverInfo, versionInfo, resources, nonce);
break;
case CDS:
cdsRespNonce = nonce;
- handleCdsResponse(versionInfo, resources, nonce);
+ xdsResponseHandler.handleCdsResponse(serverInfo, versionInfo, resources, nonce);
break;
case EDS:
edsRespNonce = nonce;
- handleEdsResponse(versionInfo, resources, nonce);
+ xdsResponseHandler.handleEdsResponse(serverInfo, versionInfo, resources, nonce);
break;
case UNKNOWN:
default:
@@ -526,7 +463,7 @@ private void handleRpcStreamClosed(Status error) {
"ADS stream closed with status {0}: {1}. Cause: {2}",
error.getCode(), error.getDescription(), error.getCause());
closed = true;
- handleStreamClosed(error);
+ xdsResponseHandler.handleStreamClosed(error);
cleanUp();
if (responseReceived || retryBackoffPolicy == null) {
// Reset the backoff sequence if had received a response, or backoff sequence
@@ -619,7 +556,7 @@ void sendDiscoveryRequest(ResourceType type, String versionInfo, CollectionA filesystem path defined by environment variable "GRPC_XDS_BOOTSTRAP"
* A filesystem path defined by Java System Property "io.grpc.xds.bootstrap"
* Environment variable value of "GRPC_XDS_BOOTSTRAP_CONFIG"
- * Java System Property value of "io.grpc.xds.bootstrap_value"
+ * Java System Property value of "io.grpc.xds.bootstrapConfig"
*
*/
@SuppressWarnings("unchecked")
diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
index 4af187bf1dd4..396292a23927 100644
--- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
+++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
@@ -157,12 +157,12 @@ private void handleClusterDiscovered() {
if (clusterState.result.clusterType() == ClusterType.EDS) {
instance = DiscoveryMechanism.forEds(
clusterState.name, clusterState.result.edsServiceName(),
- clusterState.result.lrsServerName(), clusterState.result.maxConcurrentRequests(),
+ clusterState.result.lrsServerInfo(), clusterState.result.maxConcurrentRequests(),
clusterState.result.upstreamTlsContext());
} else { // logical DNS
instance = DiscoveryMechanism.forLogicalDns(
clusterState.name, clusterState.result.dnsHostName(),
- clusterState.result.lrsServerName(), clusterState.result.maxConcurrentRequests(),
+ clusterState.result.lrsServerInfo(), clusterState.result.maxConcurrentRequests(),
clusterState.result.upstreamTlsContext());
}
instances.add(instance);
diff --git a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java
index 2a4405f45fca..0a11ad472880 100644
--- a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java
+++ b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java
@@ -27,6 +27,7 @@
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import com.google.protobuf.Any;
import com.google.protobuf.Duration;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -56,14 +57,20 @@
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.DownstreamTlsContext;
import io.envoyproxy.envoy.type.v3.FractionalPercent;
import io.envoyproxy.envoy.type.v3.FractionalPercent.DenominatorType;
+import io.grpc.ChannelCredentials;
import io.grpc.Context;
import io.grpc.EquivalentAddressGroup;
+import io.grpc.Grpc;
+import io.grpc.InternalLogId;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.Status.Code;
+import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.TimeProvider;
+import io.grpc.xds.AbstractXdsClient.ResourceType;
+import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.Endpoints.DropOverload;
import io.grpc.xds.Endpoints.LbEndpoint;
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
@@ -85,6 +92,8 @@
import io.grpc.xds.VirtualHost.Route.RouteAction.RetryPolicy;
import io.grpc.xds.VirtualHost.Route.RouteMatch;
import io.grpc.xds.VirtualHost.Route.RouteMatch.PathMatcher;
+import io.grpc.xds.XdsClient.ResourceStore;
+import io.grpc.xds.XdsClient.XdsResponseHandler;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.internal.Matchers.FractionMatcher;
import io.grpc.xds.internal.Matchers.HeaderMatcher;
@@ -103,7 +112,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
@@ -111,7 +119,7 @@
/**
* XdsClient implementation for client side usages.
*/
-final class ClientXdsClient extends AbstractXdsClient {
+final class ClientXdsClient extends XdsClient implements XdsResponseHandler, ResourceStore {
// Longest time to wait, since the subscription to some resource, for concluding its absence.
@VisibleForTesting
@@ -154,8 +162,10 @@ final class ClientXdsClient extends AbstractXdsClient {
"type.googleapis.com/envoy.config.cluster.aggregate.v2alpha.ClusterConfig";
private static final String TYPE_URL_CLUSTER_CONFIG =
"type.googleapis.com/envoy.extensions.clusters.aggregate.v3.ClusterConfig";
- private static final String TYPE_URL_TYPED_STRUCT =
+ private static final String TYPE_URL_TYPED_STRUCT_UDPA =
"type.googleapis.com/udpa.type.v1.TypedStruct";
+ private static final String TYPE_URL_TYPED_STRUCT =
+ "type.googleapis.com/xds.type.v3.TypedStruct";
private static final String TYPE_URL_FILTER_CONFIG =
"type.googleapis.com/envoy.config.route.v3.FilterConfig";
// TODO(zdapeng): need to discuss how to handle unsupported values.
@@ -164,33 +174,90 @@ final class ClientXdsClient extends AbstractXdsClient {
Code.CANCELLED, Code.DEADLINE_EXCEEDED, Code.INTERNAL, Code.RESOURCE_EXHAUSTED,
Code.UNAVAILABLE));
+ private final SynchronizationContext syncContext = new SynchronizationContext(
+ new Thread.UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ logger.log(
+ XdsLogLevel.ERROR,
+ "Uncaught exception in XdsClient SynchronizationContext. Panic!",
+ e);
+ // TODO(chengyuanzhang): better error handling.
+ throw new AssertionError(e);
+ }
+ });
private final FilterRegistry filterRegistry = FilterRegistry.getDefaultRegistry();
+ private final Map serverChannelMap = new HashMap<>();
private final Map ldsResourceSubscribers = new HashMap<>();
private final Map rdsResourceSubscribers = new HashMap<>();
private final Map cdsResourceSubscribers = new HashMap<>();
private final Map edsResourceSubscribers = new HashMap<>();
private final LoadStatsManager2 loadStatsManager;
- private final LoadReportClient lrsClient;
+ private final Map serverLrsClientMap = new HashMap<>();
+ private final XdsChannelFactory xdsChannelFactory;
+ private final Bootstrapper.BootstrapInfo bootstrapInfo;
+ private final Context context;
+ private final ScheduledExecutorService timeService;
+ private final BackoffPolicy.Provider backoffPolicyProvider;
+ private final Supplier stopwatchSupplier;
private final TimeProvider timeProvider;
private boolean reportingLoad;
private final TlsContextManager tlsContextManager;
+ private final InternalLogId logId;
+ private final XdsLogger logger;
+ private volatile boolean isShutdown;
+ // TODO(zdapeng): rename to XdsClientImpl
ClientXdsClient(
- ManagedChannel channel, Bootstrapper.BootstrapInfo bootstrapInfo, Context context,
- ScheduledExecutorService timeService, BackoffPolicy.Provider backoffPolicyProvider,
- Supplier stopwatchSupplier, TimeProvider timeProvider,
+ XdsChannelFactory xdsChannelFactory,
+ Bootstrapper.BootstrapInfo bootstrapInfo,
+ Context context,
+ ScheduledExecutorService timeService,
+ BackoffPolicy.Provider backoffPolicyProvider,
+ Supplier stopwatchSupplier,
+ TimeProvider timeProvider,
TlsContextManager tlsContextManager) {
- super(channel, bootstrapInfo, context, timeService, backoffPolicyProvider, stopwatchSupplier);
+ this.xdsChannelFactory = xdsChannelFactory;
+ this.bootstrapInfo = bootstrapInfo;
+ this.context = context;
+ this.timeService = timeService;
loadStatsManager = new LoadStatsManager2(stopwatchSupplier);
+ this.backoffPolicyProvider = backoffPolicyProvider;
+ this.stopwatchSupplier = stopwatchSupplier;
this.timeProvider = timeProvider;
this.tlsContextManager = checkNotNull(tlsContextManager, "tlsContextManager");
- lrsClient = new LoadReportClient(loadStatsManager, channel, context,
- bootstrapInfo.servers().get(0).useProtocolV3(), bootstrapInfo.node(),
- getSyncContext(), timeService, backoffPolicyProvider, stopwatchSupplier);
+ logId = InternalLogId.allocate("xds-client", null);
+ logger = XdsLogger.withLogId(logId);
+ logger.log(XdsLogLevel.INFO, "Created");
+ }
+
+ private void maybeCreateXdsChannelWithLrs(ServerInfo serverInfo) {
+ syncContext.throwIfNotInThisSynchronizationContext();
+ if (serverChannelMap.containsKey(serverInfo)) {
+ return;
+ }
+ AbstractXdsClient xdsChannel = new AbstractXdsClient(
+ xdsChannelFactory,
+ serverInfo,
+ bootstrapInfo.node(),
+ this,
+ this,
+ context,
+ timeService,
+ syncContext,
+ backoffPolicyProvider,
+ stopwatchSupplier);
+ LoadReportClient lrsClient = new LoadReportClient(
+ loadStatsManager, xdsChannel.channel(), context, serverInfo.useProtocolV3(),
+ bootstrapInfo.node(), syncContext, timeService, backoffPolicyProvider, stopwatchSupplier);
+ serverChannelMap.put(serverInfo, xdsChannel);
+ serverLrsClientMap.put(serverInfo, lrsClient);
}
@Override
- protected void handleLdsResponse(String versionInfo, List resources, String nonce) {
+ public void handleLdsResponse(
+ ServerInfo serverInfo, String versionInfo, List resources, String nonce) {
+ syncContext.throwIfNotInThisSynchronizationContext();
Map parsedResources = new HashMap<>(resources.size());
Set unpackedResources = new HashSet<>(resources.size());
Set invalidResources = new HashSet<>();
@@ -233,12 +300,12 @@ protected void handleLdsResponse(String versionInfo, List resources, String
// LdsUpdate parsed successfully.
parsedResources.put(listenerName, new ParsedResource(ldsUpdate, resource));
}
- getLogger().log(XdsLogLevel.INFO,
+ logger.log(XdsLogLevel.INFO,
"Received LDS Response version {0} nonce {1}. Parsed resources: {2}",
versionInfo, nonce, unpackedResources);
handleResourceUpdate(
- ResourceType.LDS, parsedResources, invalidResources, retainedRdsResources, versionInfo,
- nonce, errors);
+ serverInfo, ResourceType.LDS, parsedResources, invalidResources, retainedRdsResources,
+ versionInfo, nonce, errors);
}
private LdsUpdate processClientSideListener(
@@ -375,14 +442,10 @@ static FilterChain parseFilterChain(
validateDownstreamTlsContext(downstreamTlsContextProto, certProviderInstances));
}
- String name = proto.getName();
- if (name.isEmpty()) {
- name = UUID.randomUUID().toString();
- }
FilterChainMatch filterChainMatch = parseFilterChainMatch(proto.getFilterChainMatch());
checkForUniqueness(uniqueSet, filterChainMatch);
return new FilterChain(
- name,
+ proto.getName(),
filterChainMatch,
httpConnectionManager,
downstreamTlsContext,
@@ -847,16 +910,21 @@ static StructOrError parseHttpFilter(
}
Message rawConfig = httpFilter.getTypedConfig();
String typeUrl = httpFilter.getTypedConfig().getTypeUrl();
- if (typeUrl.equals(TYPE_URL_TYPED_STRUCT)) {
- TypedStruct typedStruct;
- try {
- typedStruct = httpFilter.getTypedConfig().unpack(TypedStruct.class);
- } catch (InvalidProtocolBufferException e) {
- return StructOrError.fromError(
- "HttpFilter [" + filterName + "] contains invalid proto: " + e);
+
+ try {
+ if (typeUrl.equals(TYPE_URL_TYPED_STRUCT_UDPA)) {
+ TypedStruct typedStruct = httpFilter.getTypedConfig().unpack(TypedStruct.class);
+ typeUrl = typedStruct.getTypeUrl();
+ rawConfig = typedStruct.getValue();
+ } else if (typeUrl.equals(TYPE_URL_TYPED_STRUCT)) {
+ com.github.xds.type.v3.TypedStruct newTypedStruct =
+ httpFilter.getTypedConfig().unpack(com.github.xds.type.v3.TypedStruct.class);
+ typeUrl = newTypedStruct.getTypeUrl();
+ rawConfig = newTypedStruct.getValue();
}
- typeUrl = typedStruct.getTypeUrl();
- rawConfig = typedStruct.getValue();
+ } catch (InvalidProtocolBufferException e) {
+ return StructOrError.fromError(
+ "HttpFilter [" + filterName + "] contains invalid proto: " + e);
}
Filter filter = filterRegistry.get(typeUrl);
if ((isForClient && !(filter instanceof ClientInterceptorBuilder))
@@ -930,16 +998,20 @@ static StructOrError