From 5962c9f18754c83a29626d8d8503db89ea30b147 Mon Sep 17 00:00:00 2001 From: larry-safran Date: Mon, 27 Jun 2022 14:21:29 -0700 Subject: [PATCH] [core] Use SyncContext for InProcessTransport listener callbacks to avoid deadlocks Fixes #3084 Also support unary calls returning null values --- .../io/grpc/inprocess/InProcessTransport.java | 277 +++++++++++------- .../helloworld/HelloWorldClientTest.java | 2 - .../helloworld/HelloWorldServerTest.java | 2 - .../routeguide/RouteGuideClientTest.java | 2 - .../routeguide/RouteGuideServerTest.java | 2 - .../main/java/io/grpc/stub/ClientCalls.java | 6 +- 6 files changed, 170 insertions(+), 121 deletions(-) diff --git a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java index e40658d08ff..f1aa5149091 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -40,6 +40,7 @@ import io.grpc.SecurityLevel; import io.grpc.ServerStreamTracer; import io.grpc.Status; +import io.grpc.SynchronizationContext; import io.grpc.internal.ClientStream; import io.grpc.internal.ClientStreamListener; import io.grpc.internal.ClientStreamListener.RpcProgress; @@ -106,6 +107,18 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans private List serverStreamTracerFactories; private final Attributes attributes; + private Thread.UncaughtExceptionHandler uncaughtExceptionHandler = + new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + if (e instanceof Error) { + throw new Error(e); + } + throw new RuntimeException(e); + } + }; + + @GuardedBy("this") private final InUseStateAggregator inUseState = new InUseStateAggregator() { @@ -407,8 +420,10 @@ private void streamClosed() { private class InProcessServerStream implements ServerStream { final StatsTraceContext statsTraceCtx; - @GuardedBy("this") + // All callbacks must run in syncContext to avoid possibility of deadlock in direct executors private ClientStreamListener clientStreamListener; + private final SynchronizationContext syncContext = + new SynchronizationContext(uncaughtExceptionHandler); @GuardedBy("this") private int clientRequested; @GuardedBy("this") @@ -444,10 +459,11 @@ public void request(int numMessages) { if (onReady) { synchronized (this) { if (!closed) { - clientStreamListener.onReady(); + syncContext.executeLater(() -> clientStreamListener.onReady()); } } } + syncContext.drain(); } // This method is the only reason we have to synchronize field accesses. @@ -456,28 +472,36 @@ public void request(int numMessages) { * * @return whether onReady should be called on the server */ - private synchronized boolean clientRequested(int numMessages) { - if (closed) { - return false; - } - boolean previouslyReady = clientRequested > 0; - clientRequested += numMessages; - while (clientRequested > 0 && !clientReceiveQueue.isEmpty()) { - clientRequested--; - clientStreamListener.messagesAvailable(clientReceiveQueue.poll()); - } - // Attempt being reentrant-safe - if (closed) { - return false; - } - if (clientReceiveQueue.isEmpty() && clientNotifyStatus != null) { - closed = true; - clientStream.statsTraceCtx.clientInboundTrailers(clientNotifyTrailers); - clientStream.statsTraceCtx.streamClosed(clientNotifyStatus); - clientStreamListener.closed( - clientNotifyStatus, RpcProgress.PROCESSED, clientNotifyTrailers); + private boolean clientRequested(int numMessages) { + boolean previouslyReady; + boolean nowReady; + synchronized (this) { + if (closed) { + return false; + } + + previouslyReady = clientRequested > 0; + clientRequested += numMessages; + while (clientRequested > 0 && !clientReceiveQueue.isEmpty()) { + clientRequested--; + StreamListener.MessageProducer producer = clientReceiveQueue.poll(); + syncContext.executeLater(() -> clientStreamListener.messagesAvailable(producer)); + } + + if (clientReceiveQueue.isEmpty() && clientNotifyStatus != null) { + closed = true; + clientStream.statsTraceCtx.clientInboundTrailers(clientNotifyTrailers); + clientStream.statsTraceCtx.streamClosed(clientNotifyStatus); + Status notifyStatus = this.clientNotifyStatus; + Metadata notifyTrailers = this.clientNotifyTrailers; + syncContext.executeLater(() -> + clientStreamListener.closed(notifyStatus, RpcProgress.PROCESSED, notifyTrailers)); + } + + nowReady = clientRequested > 0; } - boolean nowReady = clientRequested > 0; + + syncContext.drain(); return !previouslyReady && nowReady; } @@ -486,22 +510,26 @@ private void clientCancelled(Status status) { } @Override - public synchronized void writeMessage(InputStream message) { - if (closed) { - return; - } - statsTraceCtx.outboundMessage(outboundSeqNo); - statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1); - clientStream.statsTraceCtx.inboundMessage(outboundSeqNo); - clientStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1); - outboundSeqNo++; - StreamListener.MessageProducer producer = new SingleMessageProducer(message); - if (clientRequested > 0) { - clientRequested--; - clientStreamListener.messagesAvailable(producer); - } else { - clientReceiveQueue.add(producer); + public void writeMessage(InputStream message) { + synchronized (this) { + if (closed) { + return; + } + statsTraceCtx.outboundMessage(outboundSeqNo); + statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1); + clientStream.statsTraceCtx.inboundMessage(outboundSeqNo); + clientStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1); + outboundSeqNo++; + StreamListener.MessageProducer producer = new SingleMessageProducer(message); + if (clientRequested > 0) { + clientRequested--; + syncContext.executeLater(() -> clientStreamListener.messagesAvailable(producer)); + } else { + clientReceiveQueue.add(producer); + } } + + syncContext.drain(); } @Override @@ -540,8 +568,9 @@ public void writeHeaders(Metadata headers) { } clientStream.statsTraceCtx.clientInboundHeaders(); - clientStreamListener.headersRead(headers); + syncContext.executeLater(() -> clientStreamListener.headersRead(headers)); } + syncContext.drain(); } @Override @@ -585,13 +614,14 @@ private void notifyClientClose(Status status, Metadata trailers) { closed = true; clientStream.statsTraceCtx.clientInboundTrailers(trailers); clientStream.statsTraceCtx.streamClosed(clientStatus); - clientStreamListener.closed(clientStatus, RpcProgress.PROCESSED, trailers); + syncContext.executeLater( + () -> clientStreamListener.closed(clientStatus, RpcProgress.PROCESSED, trailers)); } else { clientNotifyStatus = clientStatus; clientNotifyTrailers = trailers; } } - + syncContext.drain(); streamClosed(); } @@ -604,24 +634,29 @@ public void cancel(Status status) { streamClosed(); } - private synchronized boolean internalCancel(Status clientStatus) { - if (closed) { - return false; - } - closed = true; - StreamListener.MessageProducer producer; - while ((producer = clientReceiveQueue.poll()) != null) { - InputStream message; - while ((message = producer.next()) != null) { - try { - message.close(); - } catch (Throwable t) { - log.log(Level.WARNING, "Exception closing stream", t); + private boolean internalCancel(Status clientStatus) { + synchronized (this) { + if (closed) { + return false; + } + closed = true; + StreamListener.MessageProducer producer; + while ((producer = clientReceiveQueue.poll()) != null) { + InputStream message; + while ((message = producer.next()) != null) { + try { + message.close(); + } catch (Throwable t) { + log.log(Level.WARNING, "Exception closing stream", t); + } } } + clientStream.statsTraceCtx.streamClosed(clientStatus); + syncContext.executeLater( + () -> + clientStreamListener.closed(clientStatus, RpcProgress.PROCESSED, new Metadata())); } - clientStream.statsTraceCtx.streamClosed(clientStatus); - clientStreamListener.closed(clientStatus, RpcProgress.PROCESSED, new Metadata()); + syncContext.drain(); return true; } @@ -662,8 +697,10 @@ public int streamId() { private class InProcessClientStream implements ClientStream { final StatsTraceContext statsTraceCtx; final CallOptions callOptions; - @GuardedBy("this") + // All callbacks must run in syncContext to avoid possibility of deadlock in direct executors private ServerStreamListener serverStreamListener; + private final SynchronizationContext syncContext = + new SynchronizationContext(uncaughtExceptionHandler); @GuardedBy("this") private int serverRequested; @GuardedBy("this") @@ -693,9 +730,10 @@ public void request(int numMessages) { if (onReady) { synchronized (this) { if (!closed) { - serverStreamListener.onReady(); + syncContext.executeLater(() -> serverStreamListener.onReady()); } } + syncContext.drain(); } } @@ -705,21 +743,29 @@ public void request(int numMessages) { * * @return whether onReady should be called on the server */ - private synchronized boolean serverRequested(int numMessages) { - if (closed) { - return false; - } - boolean previouslyReady = serverRequested > 0; - serverRequested += numMessages; - while (serverRequested > 0 && !serverReceiveQueue.isEmpty()) { - serverRequested--; - serverStreamListener.messagesAvailable(serverReceiveQueue.poll()); - } - if (serverReceiveQueue.isEmpty() && serverNotifyHalfClose) { - serverNotifyHalfClose = false; - serverStreamListener.halfClosed(); + private boolean serverRequested(int numMessages) { + boolean previouslyReady; + boolean nowReady; + synchronized (this) { + if (closed) { + return false; + } + previouslyReady = serverRequested > 0; + serverRequested += numMessages; + + while (serverRequested > 0 && !serverReceiveQueue.isEmpty()) { + serverRequested--; + StreamListener.MessageProducer producer = serverReceiveQueue.poll(); + syncContext.executeLater(() -> serverStreamListener.messagesAvailable(producer)); + } + + if (serverReceiveQueue.isEmpty() && serverNotifyHalfClose) { + serverNotifyHalfClose = false; + syncContext.executeLater(() -> serverStreamListener.halfClosed()); + } + nowReady = serverRequested > 0; } - boolean nowReady = serverRequested > 0; + syncContext.drain(); return !previouslyReady && nowReady; } @@ -728,22 +774,25 @@ private void serverClosed(Status serverListenerStatus, Status serverTracerStatus } @Override - public synchronized void writeMessage(InputStream message) { - if (closed) { - return; - } - statsTraceCtx.outboundMessage(outboundSeqNo); - statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1); - serverStream.statsTraceCtx.inboundMessage(outboundSeqNo); - serverStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1); - outboundSeqNo++; - StreamListener.MessageProducer producer = new SingleMessageProducer(message); - if (serverRequested > 0) { - serverRequested--; - serverStreamListener.messagesAvailable(producer); - } else { - serverReceiveQueue.add(producer); + public void writeMessage(InputStream message) { + synchronized (this) { + if (closed) { + return; + } + statsTraceCtx.outboundMessage(outboundSeqNo); + statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1); + serverStream.statsTraceCtx.inboundMessage(outboundSeqNo); + serverStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1); + outboundSeqNo++; + StreamListener.MessageProducer producer = new SingleMessageProducer(message); + if (serverRequested > 0) { + serverRequested--; + syncContext.executeLater(() -> serverStreamListener.messagesAvailable(producer)); + } else { + serverReceiveQueue.add(producer); + } } + syncContext.drain(); } @Override @@ -768,39 +817,45 @@ public void cancel(Status reason) { streamClosed(); } - private synchronized boolean internalCancel( + private boolean internalCancel( Status serverListenerStatus, Status serverTracerStatus) { - if (closed) { - return false; - } - closed = true; - - StreamListener.MessageProducer producer; - while ((producer = serverReceiveQueue.poll()) != null) { - InputStream message; - while ((message = producer.next()) != null) { - try { - message.close(); - } catch (Throwable t) { - log.log(Level.WARNING, "Exception closing stream", t); + synchronized (this) { + if (closed) { + return false; + } + closed = true; + + StreamListener.MessageProducer producer; + while ((producer = serverReceiveQueue.poll()) != null) { + InputStream message; + while ((message = producer.next()) != null) { + try { + message.close(); + } catch (Throwable t) { + log.log(Level.WARNING, "Exception closing stream", t); + } } } + serverStream.statsTraceCtx.streamClosed(serverTracerStatus); + syncContext.executeLater(() -> serverStreamListener.closed(serverListenerStatus)); } - serverStream.statsTraceCtx.streamClosed(serverTracerStatus); - serverStreamListener.closed(serverListenerStatus); + syncContext.drain(); return true; } @Override - public synchronized void halfClose() { - if (closed) { - return; - } - if (serverReceiveQueue.isEmpty()) { - serverStreamListener.halfClosed(); - } else { - serverNotifyHalfClose = true; + public void halfClose() { + synchronized (this) { + if (closed) { + return; + } + if (serverReceiveQueue.isEmpty()) { + syncContext.executeLater(() -> serverStreamListener.halfClosed()); + } else { + serverNotifyHalfClose = true; + } } + syncContext.drain(); } @Override diff --git a/examples/src/test/java/io/grpc/examples/helloworld/HelloWorldClientTest.java b/examples/src/test/java/io/grpc/examples/helloworld/HelloWorldClientTest.java index d0262d687ee..8c6cf60279a 100644 --- a/examples/src/test/java/io/grpc/examples/helloworld/HelloWorldClientTest.java +++ b/examples/src/test/java/io/grpc/examples/helloworld/HelloWorldClientTest.java @@ -40,8 +40,6 @@ * Not intended to provide a high code coverage or to test every major usecase. * * directExecutor() makes it easier to have deterministic tests. - * However, if your implementation uses another thread and uses streaming it is better to use - * the default executor, to avoid hitting bug #3084. * *

For more unit test examples see {@link io.grpc.examples.routeguide.RouteGuideClientTest} and * {@link io.grpc.examples.routeguide.RouteGuideServerTest}. diff --git a/examples/src/test/java/io/grpc/examples/helloworld/HelloWorldServerTest.java b/examples/src/test/java/io/grpc/examples/helloworld/HelloWorldServerTest.java index 9a20476772c..63281eeba1a 100644 --- a/examples/src/test/java/io/grpc/examples/helloworld/HelloWorldServerTest.java +++ b/examples/src/test/java/io/grpc/examples/helloworld/HelloWorldServerTest.java @@ -33,8 +33,6 @@ * Not intended to provide a high code coverage or to test every major usecase. * * directExecutor() makes it easier to have deterministic tests. - * However, if your implementation uses another thread and uses streaming it is better to use - * the default executor, to avoid hitting bug #3084. * *

For more unit test examples see {@link io.grpc.examples.routeguide.RouteGuideClientTest} and * {@link io.grpc.examples.routeguide.RouteGuideServerTest}. diff --git a/examples/src/test/java/io/grpc/examples/routeguide/RouteGuideClientTest.java b/examples/src/test/java/io/grpc/examples/routeguide/RouteGuideClientTest.java index be2337cc4ce..4c184fb82ee 100644 --- a/examples/src/test/java/io/grpc/examples/routeguide/RouteGuideClientTest.java +++ b/examples/src/test/java/io/grpc/examples/routeguide/RouteGuideClientTest.java @@ -53,8 +53,6 @@ * Not intended to provide a high code coverage or to test every major usecase. * * directExecutor() makes it easier to have deterministic tests. - * However, if your implementation uses another thread and uses streaming it is better to use - * the default executor, to avoid hitting bug #3084. * *

For basic unit test examples see {@link io.grpc.examples.helloworld.HelloWorldClientTest} and * {@link io.grpc.examples.helloworld.HelloWorldServerTest}. diff --git a/examples/src/test/java/io/grpc/examples/routeguide/RouteGuideServerTest.java b/examples/src/test/java/io/grpc/examples/routeguide/RouteGuideServerTest.java index 19322c2d72c..a5a84824af6 100644 --- a/examples/src/test/java/io/grpc/examples/routeguide/RouteGuideServerTest.java +++ b/examples/src/test/java/io/grpc/examples/routeguide/RouteGuideServerTest.java @@ -50,8 +50,6 @@ * Not intended to provide a high code coverage or to test every major usecase. * * directExecutor() makes it easier to have deterministic tests. - * However, if your implementation uses another thread and uses streaming it is better to use - * the default executor, to avoid hitting bug #3084. * *

For basic unit test examples see {@link io.grpc.examples.helloworld.HelloWorldClientTest} and * {@link io.grpc.examples.helloworld.HelloWorldServerTest}. diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java index 4fac94bfaef..6986a285ae2 100644 --- a/stub/src/main/java/io/grpc/stub/ClientCalls.java +++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java @@ -509,6 +509,7 @@ void onStart() { private static final class UnaryStreamToFuture extends StartableListener { private final GrpcFuture responseFuture; private RespT value; + private boolean isValueReceived = false; // Non private to avoid synthetic class UnaryStreamToFuture(GrpcFuture responseFuture) { @@ -521,17 +522,18 @@ public void onHeaders(Metadata headers) { @Override public void onMessage(RespT value) { - if (this.value != null) { + if (this.isValueReceived) { throw Status.INTERNAL.withDescription("More than one value received for unary call") .asRuntimeException(); } this.value = value; + this.isValueReceived = true; } @Override public void onClose(Status status, Metadata trailers) { if (status.isOk()) { - if (value == null) { + if (!isValueReceived) { // No value received so mark the future as an error responseFuture.setException( Status.INTERNAL.withDescription("No value received for unary call")