Skip to content

Commit

Permalink
xds: Implement XdsServingStatusListener as per the new xDS server gRFC (
Browse files Browse the repository at this point in the history
  • Loading branch information
sanjaypujare committed Feb 12, 2021
1 parent 7b70161 commit 8030c3a
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 90 deletions.
15 changes: 6 additions & 9 deletions xds/src/main/java/io/grpc/xds/XdsClientWrapperForServerSds.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ public void createXdsClientAndStart() throws IOException {
throw new XdsInitializationException("No management server provided by bootstrap");
}
} catch (XdsInitializationException e) {
reportError(Status.fromThrowable(e));
throw new IOException(e);
}
Node node = bootstrapInfo.getNode();
Expand All @@ -116,9 +115,7 @@ public void createXdsClientAndStart() throws IOException {
newServerApi = serverInfo.isUseProtocolV3() && experimentalNewServerApiEnvVar;
String grpcServerResourceId = bootstrapInfo.getGrpcServerResourceId();
if (newServerApi && grpcServerResourceId == null) {
reportError(
Status.INVALID_ARGUMENT.withDescription("missing grpc_server_resource_name_id value"));
throw new IOException("missing grpc_server_resource_name_id value");
throw new IOException("missing grpc_server_resource_name_id value in xds bootstrap");
}
XdsClient xdsClientImpl =
new ServerXdsClient(
Expand Down Expand Up @@ -152,14 +149,14 @@ public void onListenerChanged(XdsClient.ListenerUpdate update) {
public void onResourceDoesNotExist(String resourceName) {
logger.log(Level.WARNING, "Resource {0} is unavailable", resourceName);
curListener = null;
reportError(Status.NOT_FOUND.withDescription(resourceName));
reportError(Status.NOT_FOUND.withDescription(resourceName).asException());
}

@Override
public void onError(Status error) {
logger.log(
Level.WARNING, "ListenerWatcher in XdsClientWrapperForServerSds: {0}", error);
reportError(error);
reportError(error.asException());
}
};
xdsClient.watchListenerData(port, listenerWatcher);
Expand Down Expand Up @@ -225,9 +222,9 @@ private Set<ServerWatcher> getServerWatchers() {
}
}

private void reportError(Status status) {
private void reportError(Throwable throwable) {
for (ServerWatcher watcher : getServerWatchers()) {
watcher.onError(status);
watcher.onError(throwable);
}
}

Expand All @@ -249,7 +246,7 @@ XdsClient.ListenerWatcher getListenerWatcher() {
public interface ServerWatcher {

/** Called to report errors from the control plane including "not found". */
void onError(Status error);
void onError(Throwable throwable);

/** Called to report successful receipt of server config. */
void onSuccess(DownstreamTlsContext downstreamTlsContext);
Expand Down
51 changes: 43 additions & 8 deletions xds/src/main/java/io/grpc/xds/XdsServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.grpc.xds;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -26,7 +27,6 @@
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCredentials;
import io.grpc.Status;
import io.grpc.netty.InternalNettyServerBuilder;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.xds.internal.sds.SdsProtocolNegotiators;
Expand All @@ -42,12 +42,13 @@ public final class XdsServerBuilder extends ForwardingServerBuilder<XdsServerBui

private final NettyServerBuilder delegate;
private final int port;
private ErrorNotifier errorNotifier;
private XdsServingStatusListener xdsServingStatusListener;
private AtomicBoolean isServerBuilt = new AtomicBoolean(false);

private XdsServerBuilder(NettyServerBuilder nettyDelegate, int port) {
this.delegate = nettyDelegate;
this.port = port;
xdsServingStatusListener = new DefaultListener("port:" + port);
}

@Override
Expand All @@ -56,9 +57,11 @@ protected ServerBuilder<?> delegate() {
return delegate;
}

/** Set the {@link ErrorNotifier}. Pass null to unset a previously set value. */
public XdsServerBuilder errorNotifier(ErrorNotifier errorNotifier) {
this.errorNotifier = errorNotifier;
/** Set the {@link XdsServingStatusListener}. */
public XdsServerBuilder xdsServingStatusListener(
XdsServingStatusListener xdsServingStatusListener) {
this.xdsServingStatusListener =
checkNotNull(xdsServingStatusListener, "xdsServingStatusListener");
return this;
}

Expand Down Expand Up @@ -91,16 +94,48 @@ ServerWrapperForXds buildServer(
InternalNettyServerBuilder.eagAttributes(delegate, Attributes.newBuilder()
.set(SdsProtocolNegotiators.SERVER_XDS_CLIENT, xdsClient)
.build());
return new ServerWrapperForXds(delegate.build(), xdsClient, errorNotifier);
return new ServerWrapperForXds(delegate.build(), xdsClient, xdsServingStatusListener);
}

public ServerBuilder<?> transportBuilder() {
return delegate;
}

/** Watcher to receive error notifications from xDS control plane during {@code start()}. */
public interface ErrorNotifier {
public interface XdsServingStatusListener {

void onError(Status error);
/** Callback invoked when server begins serving. */
void onServing();

/** Callback invoked when server is forced to be "not serving" due to an error.
* @param throwable cause of the error
*/
void onNotServing(Throwable throwable);
}

/** Default implementation that logs at WARNING level. */
private static class DefaultListener implements XdsServingStatusListener {
XdsLogger xdsLogger;
boolean notServing;

DefaultListener(String prefix) {
xdsLogger = XdsLogger.withPrefix(prefix);
notServing = true;
}

/** Log calls to onServing() following a call to onNotServing() at WARNING level. */
@Override
public void onServing() {
if (notServing) {
notServing = false;
xdsLogger.log(XdsLogger.XdsLogLevel.WARNING, "Entering serving state.");
}
}

@Override
public void onNotServing(Throwable throwable) {
xdsLogger.log(XdsLogger.XdsLogLevel.WARNING, throwable.getMessage());
notServing = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Server;
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import io.grpc.xds.EnvoyServerProtoData;
import io.grpc.xds.XdsClientWrapperForServerSds;
import io.grpc.xds.XdsServerBuilder;
Expand All @@ -45,19 +44,20 @@
public final class ServerWrapperForXds extends Server {
private final Server delegate;
private final XdsClientWrapperForServerSds xdsClientWrapperForServerSds;
@Nullable XdsServerBuilder.ErrorNotifier errorNotifier;
private XdsServerBuilder.XdsServingStatusListener xdsServingStatusListener;
@Nullable XdsClientWrapperForServerSds.ServerWatcher serverWatcher;
private AtomicBoolean started = new AtomicBoolean();

/** Creates the wrapper object using the delegate passed. */
public ServerWrapperForXds(
Server delegate,
XdsClientWrapperForServerSds xdsClientWrapperForServerSds,
@Nullable XdsServerBuilder.ErrorNotifier errorNotifier) {
XdsServerBuilder.XdsServingStatusListener xdsServingStatusListener) {
this.delegate = checkNotNull(delegate, "delegate");
this.xdsClientWrapperForServerSds =
checkNotNull(xdsClientWrapperForServerSds, "xdsClientWrapperForServerSds");
this.errorNotifier = errorNotifier;
this.xdsServingStatusListener =
checkNotNull(xdsServingStatusListener, "xdsServingStatusListener");
}

@Override
Expand All @@ -77,6 +77,7 @@ public Server start() throws IOException {
throw new RuntimeException(ex);
}
delegate.start();
xdsServingStatusListener.onServing();
return this;
}

Expand All @@ -86,15 +87,12 @@ private Future<EnvoyServerProtoData.DownstreamTlsContext> addServerWatcher() {
serverWatcher =
new XdsClientWrapperForServerSds.ServerWatcher() {
@Override
public void onError(Status error) {
if (errorNotifier != null) {
errorNotifier.onError(error);
}
public void onError(Throwable throwable) {
xdsServingStatusListener.onNotServing(throwable);
}

@Override
public void onSuccess(EnvoyServerProtoData.DownstreamTlsContext downstreamTlsContext) {
removeServerWatcher();
settableFuture.set(downstreamTlsContext);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.mockito.Mockito.when;

import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.inprocess.InProcessSocketAddress;
import io.grpc.xds.EnvoyServerProtoData.DownstreamTlsContext;
import io.grpc.xds.internal.sds.CommonTlsContextTestsUtil;
Expand Down Expand Up @@ -171,12 +172,19 @@ public void registerServerWatcher_notifyError() throws UnknownHostException {
mock(XdsClientWrapperForServerSds.ServerWatcher.class);
xdsClientWrapperForServerSds.addServerWatcher(mockServerWatcher);
registeredWatcher.onError(Status.INTERNAL);
verify(mockServerWatcher).onError(eq(Status.INTERNAL));
ArgumentCaptor<Throwable> argCaptor = ArgumentCaptor.forClass(null);
verify(mockServerWatcher).onError(argCaptor.capture());
Throwable throwable = argCaptor.getValue();
assertThat(throwable).isInstanceOf(StatusException.class);
Status captured = ((StatusException)throwable).getStatus();
assertThat(captured.getCode()).isEqualTo(Status.Code.INTERNAL);
reset(mockServerWatcher);
registeredWatcher.onResourceDoesNotExist("not-found Error");
ArgumentCaptor<Status> argCaptor = ArgumentCaptor.forClass(null);
verify(mockServerWatcher).onError(argCaptor.capture());
Status captured = argCaptor.getValue();
ArgumentCaptor<Throwable> argCaptor1 = ArgumentCaptor.forClass(null);
verify(mockServerWatcher).onError(argCaptor1.capture());
throwable = argCaptor1.getValue();
assertThat(throwable).isInstanceOf(StatusException.class);
captured = ((StatusException)throwable).getStatus();
assertThat(captured.getCode()).isEqualTo(Status.Code.NOT_FOUND);
assertThat(captured.getDescription()).isEqualTo("not-found Error");
InetAddress ipLocalAddress = InetAddress.getByName("10.1.2.3");
Expand All @@ -203,14 +211,7 @@ public void startXdsClient_expectException() {
.hasMessageThat()
.contains("Cannot find bootstrap configuration");
}
ArgumentCaptor<Status> argCaptor = ArgumentCaptor.forClass(null);
verify(mockServerWatcher).onError(argCaptor.capture());
Status captured = argCaptor.getValue();
assertThat(captured.getCode()).isEqualTo(Status.Code.UNKNOWN);
assertThat(captured.getCause()).isInstanceOf(XdsInitializationException.class);
assertThat(captured.getCause())
.hasMessageThat()
.contains("Cannot find bootstrap configuration");
verify(mockServerWatcher, never()).onError(any(Throwable.class));
}

private DownstreamTlsContext sendListenerUpdate(
Expand Down

0 comments on commit 8030c3a

Please sign in to comment.