diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 3922ee5b89e1..e362140c4912 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -294,12 +294,10 @@ final void reprocess(@Nullable SubchannelPicker picker) { if (callOptions.getExecutor() != null) { executor = callOptions.getExecutor(); } - executor.execute(new Runnable() { - @Override - public void run() { - stream.createRealStream(transport); - } - }); + Runnable runnable = stream.createRealStream(transport); + if (runnable != null) { + executor.execute(runnable); + } toRemove.add(stream); } // else: stay pending } @@ -346,7 +344,8 @@ private PendingStream(PickSubchannelArgs args) { this.args = args; } - private void createRealStream(ClientTransport transport) { + /** Runnable may be null. */ + private Runnable createRealStream(ClientTransport transport) { ClientStream realStream; Context origContext = context.attach(); try { @@ -355,7 +354,7 @@ private void createRealStream(ClientTransport transport) { } finally { context.detach(origContext); } - setStream(realStream); + return setStream(realStream); } @Override diff --git a/core/src/main/java/io/grpc/internal/DelayedStream.java b/core/src/main/java/io/grpc/internal/DelayedStream.java index be21b4991ba6..0eb6b80c3666 100644 --- a/core/src/main/java/io/grpc/internal/DelayedStream.java +++ b/core/src/main/java/io/grpc/internal/DelayedStream.java @@ -29,6 +29,7 @@ import java.io.InputStream; import java.util.ArrayList; import java.util.List; +import javax.annotation.CheckReturnValue; import javax.annotation.concurrent.GuardedBy; /** @@ -60,6 +61,12 @@ class DelayedStream implements ClientStream { @GuardedBy("this") private long streamSetTimeNanos; + // start()-related storage. All references should be cleared after start() is called on realStream + // to avoid retaining the objects for the life of the call + /** non-null when setAuthority needs to be called on realStream when it is set. */ + // No need to synchronize; start() synchronization provides a happens-before + private String startAuthority; + @Override public void setMaxInboundMessageSize(final int maxSize) { if (passThrough) { @@ -115,21 +122,35 @@ public void appendTimeoutInsight(InsightBuilder insight) { } /** - * Transfers all pending and future requests and mutations to the given stream. + * Transfers all pending and future requests and mutations to the given stream. Method will return + * quickly, but if the returned Runnable is non-null it must be called to complete the process. + * The Runnable may take a while to execute. * *

No-op if either this method or {@link #cancel} have already been called. */ - // When this method returns, passThrough is guaranteed to be true - final void setStream(ClientStream stream) { + // When this method returns, start() has been called on realStream or passThrough is guaranteed to + // be true + @CheckReturnValue + final Runnable setStream(ClientStream stream) { + ClientStreamListener savedListener; synchronized (this) { // If realStream != null, then either setStream() or cancel() has been called. if (realStream != null) { - return; + return null; } setRealStream(checkNotNull(stream, "stream")); + savedListener = listener; + } + if (savedListener != null) { + internalStart(savedListener); } - drainPendingCalls(); + return new Runnable() { + @Override + public void run() { + drainPendingCalls(); + } + }; } /** @@ -190,28 +211,24 @@ private void delayOrExecute(Runnable runnable) { public void setAuthority(final String authority) { checkState(listener == null, "May only be called before start"); checkNotNull(authority, "authority"); - delayOrExecute(new Runnable() { - @Override - public void run() { - realStream.setAuthority(authority); - } - }); + this.startAuthority = authority; } @Override public void start(ClientStreamListener listener) { + checkNotNull(listener, "listener"); checkState(this.listener == null, "already started"); Status savedError; boolean savedPassThrough; synchronized (this) { - this.listener = checkNotNull(listener, "listener"); // If error != null, then cancel() has been called and was unable to close the listener savedError = error; savedPassThrough = passThrough; if (!savedPassThrough) { listener = delayedListener = new DelayedStreamListener(listener); } + this.listener = listener; startTimeNanos = System.nanoTime(); } if (savedError != null) { @@ -220,16 +237,20 @@ public void start(ClientStreamListener listener) { } if (savedPassThrough) { - realStream.start(listener); - } else { - final ClientStreamListener finalListener = listener; - delayOrExecute(new Runnable() { - @Override - public void run() { - realStream.start(finalListener); - } - }); + internalStart(listener); + } // else internalStart() will be called by setStream + } + + /** + * Starts stream without synchronization. {@code listener} should be same instance as {@link + * #listener}. + */ + private void internalStart(ClientStreamListener listener) { + if (startAuthority != null) { + realStream.setAuthority(startAuthority); + startAuthority = null; } + realStream.start(listener); } @Override @@ -298,10 +319,11 @@ public void run() { } }); } else { + drainPendingCalls(); if (listenerToClose != null) { + // Note that listenerToClose is a DelayedStreamListener listenerToClose.closed(reason, new Metadata()); } - drainPendingCalls(); } } diff --git a/core/src/main/java/io/grpc/internal/MetadataApplierImpl.java b/core/src/main/java/io/grpc/internal/MetadataApplierImpl.java index c3196ddd1078..4c49a14a06bd 100644 --- a/core/src/main/java/io/grpc/internal/MetadataApplierImpl.java +++ b/core/src/main/java/io/grpc/internal/MetadataApplierImpl.java @@ -95,7 +95,11 @@ private void finalizeWith(ClientStream stream) { // returnStream() has been called before me, thus delayedStream must have been // created. checkState(delayedStream != null, "delayedStream is null"); - delayedStream.setStream(stream); + Runnable slow = delayedStream.setStream(stream); + if (slow != null) { + // TODO(ejona): run this on a separate thread + slow.run(); + } } /** diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index 41a97d62f9a1..8bcd4e529857 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -315,6 +315,8 @@ public void uncaughtException(Thread t, Throwable e) { // Fail-fast streams DelayedStream ff1 = (DelayedStream) delayedTransport.newStream( method, headers, failFastCallOptions); + ff1.start(mock(ClientStreamListener.class)); + ff1.halfClose(); PickSubchannelArgsImpl ff1args = new PickSubchannelArgsImpl(method, headers, failFastCallOptions); verify(transportListener).transportInUse(true); @@ -345,6 +347,7 @@ public void uncaughtException(Thread t, Throwable e) { wfr3Executor.getScheduledExecutorService()); DelayedStream wfr3 = (DelayedStream) delayedTransport.newStream( method, headers, wfr3callOptions); + wfr3.halfClose(); PickSubchannelArgsImpl wfr3args = new PickSubchannelArgsImpl(method, headers, wfr3callOptions); DelayedStream wfr4 = (DelayedStream) delayedTransport.newStream( @@ -380,18 +383,22 @@ public void uncaughtException(Thread t, Throwable e) { inOrder.verify(picker).pickSubchannel(wfr4args); inOrder.verifyNoMoreInteractions(); - // Make sure that real transport creates streams in the executor - verify(mockRealTransport, never()).newStream( - any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); - verify(mockRealTransport2, never()).newStream( - any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); - fakeExecutor.runDueTasks(); - assertEquals(0, fakeExecutor.numPendingTasks()); + // Make sure that streams are created and started immediately, not in any executor. This is + // necessary during shut down to guarantee that when DelayedClientTransport terminates, all + // streams are now owned by a real transport (which should prevent the Channel from + // terminating). // ff1 and wfr1 went through verify(mockRealTransport).newStream(method, headers, failFastCallOptions); verify(mockRealTransport2).newStream(method, headers, waitForReadyCallOptions); assertSame(mockRealStream, ff1.getRealStream()); assertSame(mockRealStream2, wfr1.getRealStream()); + verify(mockRealStream).start(any(ClientStreamListener.class)); + // But also verify that non-start()-related calls are run within the Executor, since they may be + // slow. + verify(mockRealStream, never()).halfClose(); + fakeExecutor.runDueTasks(); + assertEquals(0, fakeExecutor.numPendingTasks()); + verify(mockRealStream).halfClose(); // The ff2 has failed due to picker returning an error assertSame(Status.UNAVAILABLE, ((FailingClientStream) ff2.getRealStream()).getError()); // Other streams are still buffered @@ -430,10 +437,11 @@ public void uncaughtException(Thread t, Throwable e) { assertSame(mockRealStream2, wfr2.getRealStream()); assertSame(mockRealStream2, wfr4.getRealStream()); + assertSame(mockRealStream, wfr3.getRealStream()); // If there is an executor in the CallOptions, it will be used to create the real stream. - assertNull(wfr3.getRealStream()); + verify(mockRealStream, times(1)).halfClose(); // 1 for ff1 wfr3Executor.runDueTasks(); - assertSame(mockRealStream, wfr3.getRealStream()); + verify(mockRealStream, times(2)).halfClose(); // New streams will use the last picker DelayedStream wfr5 = (DelayedStream) delayedTransport.newStream( diff --git a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java index 393a6c6e6d04..8fb42c48fb52 100644 --- a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java @@ -72,7 +72,7 @@ public void setStream_setAuthority() { final String authority = "becauseIsaidSo"; stream.setAuthority(authority); stream.start(listener); - stream.setStream(realStream); + callMeMaybe(stream.setStream(realStream)); InOrder inOrder = inOrder(realStream); inOrder.verify(realStream).setAuthority(authority); inOrder.verify(realStream).start(any(ClientStreamListener.class)); @@ -102,7 +102,7 @@ public void setStream_sendsAllMessages() { stream.setMessageCompression(false); stream.writeMessage(message); - stream.setStream(realStream); + callMeMaybe(stream.setStream(realStream)); verify(realStream).setCompressor(Codec.Identity.NONE); verify(realStream).setDecompressorRegistry(DecompressorRegistry.getDefaultInstance()); @@ -125,7 +125,7 @@ public void setStream_sendsAllMessages() { public void setStream_halfClose() { stream.start(listener); stream.halfClose(); - stream.setStream(realStream); + callMeMaybe(stream.setStream(realStream)); verify(realStream).halfClose(); } @@ -134,7 +134,7 @@ public void setStream_halfClose() { public void setStream_flush() { stream.start(listener); stream.flush(); - stream.setStream(realStream); + callMeMaybe(stream.setStream(realStream)); verify(realStream).flush(); stream.flush(); @@ -146,7 +146,7 @@ public void setStream_flowControl() { stream.start(listener); stream.request(1); stream.request(2); - stream.setStream(realStream); + callMeMaybe(stream.setStream(realStream)); verify(realStream).request(1); verify(realStream).request(2); @@ -158,7 +158,7 @@ public void setStream_flowControl() { public void setStream_setMessageCompression() { stream.start(listener); stream.setMessageCompression(false); - stream.setStream(realStream); + callMeMaybe(stream.setStream(realStream)); verify(realStream).setMessageCompression(false); stream.setMessageCompression(true); @@ -169,7 +169,7 @@ public void setStream_setMessageCompression() { public void setStream_isReady() { stream.start(listener); assertFalse(stream.isReady()); - stream.setStream(realStream); + callMeMaybe(stream.setStream(realStream)); verify(realStream, never()).isReady(); assertFalse(stream.isReady()); @@ -190,7 +190,7 @@ public void setStream_getAttributes() { assertEquals(Attributes.EMPTY, stream.getAttributes()); - stream.setStream(realStream); + callMeMaybe(stream.setStream(realStream)); assertEquals(attributes, stream.getAttributes()); } @@ -204,7 +204,7 @@ public void startThenCancelled() { @Test public void startThenSetStreamThenCancelled() { stream.start(listener); - stream.setStream(realStream); + callMeMaybe(stream.setStream(realStream)); stream.cancel(Status.CANCELLED); verify(realStream).start(any(ClientStreamListener.class)); verify(realStream).cancel(same(Status.CANCELLED)); @@ -212,7 +212,7 @@ public void startThenSetStreamThenCancelled() { @Test public void setStreamThenStartThenCancelled() { - stream.setStream(realStream); + callMeMaybe(stream.setStream(realStream)); stream.start(listener); stream.cancel(Status.CANCELLED); verify(realStream).start(same(listener)); @@ -221,7 +221,7 @@ public void setStreamThenStartThenCancelled() { @Test public void setStreamThenCancelled() { - stream.setStream(realStream); + callMeMaybe(stream.setStream(realStream)); stream.cancel(Status.CANCELLED); verify(realStream).cancel(same(Status.CANCELLED)); } @@ -229,9 +229,9 @@ public void setStreamThenCancelled() { @Test public void setStreamTwice() { stream.start(listener); - stream.setStream(realStream); + callMeMaybe(stream.setStream(realStream)); verify(realStream).start(any(ClientStreamListener.class)); - stream.setStream(mock(ClientStream.class)); + callMeMaybe(stream.setStream(mock(ClientStream.class))); stream.flush(); verify(realStream).flush(); } @@ -239,7 +239,7 @@ public void setStreamTwice() { @Test public void cancelThenSetStream() { stream.cancel(Status.CANCELLED); - stream.setStream(realStream); + callMeMaybe(stream.setStream(realStream)); stream.start(listener); stream.isReady(); verifyNoMoreInteractions(realStream); @@ -275,7 +275,7 @@ public void onReady() { IsReadyListener isReadyListener = new IsReadyListener(); stream.start(isReadyListener); - stream.setStream(new NoopClientStream() { + callMeMaybe(stream.setStream(new NoopClientStream() { @Override public void start(ClientStreamListener listener) { // This call to the listener should end up being delayed. @@ -286,7 +286,7 @@ public void start(ClientStreamListener listener) { public boolean isReady() { return true; } - }); + })); assertTrue(isReadyListener.onReadyCalled); } @@ -302,7 +302,7 @@ public void listener_allQueued() { final InOrder inOrder = inOrder(listener); stream.start(listener); - stream.setStream(new NoopClientStream() { + callMeMaybe(stream.setStream(new NoopClientStream() { @Override public void start(ClientStreamListener passedListener) { passedListener.onReady(); @@ -314,7 +314,7 @@ public void start(ClientStreamListener passedListener) { verifyNoMoreInteractions(listener); } - }); + })); inOrder.verify(listener).onReady(); inOrder.verify(listener).headersRead(headers); inOrder.verify(listener).messagesAvailable(producer1); @@ -332,7 +332,7 @@ public void listener_noQueued() { final Status status = Status.UNKNOWN.withDescription("unique status"); stream.start(listener); - stream.setStream(realStream); + callMeMaybe(stream.setStream(realStream)); verify(realStream).start(listenerCaptor.capture()); ClientStreamListener delayedListener = listenerCaptor.getValue(); delayedListener.onReady(); @@ -371,11 +371,17 @@ public Void answer(InvocationOnMock in) { } }).when(realStream).appendTimeoutInsight(any(InsightBuilder.class)); stream.start(listener); - stream.setStream(realStream); + callMeMaybe(stream.setStream(realStream)); InsightBuilder insight = new InsightBuilder(); stream.appendTimeoutInsight(insight); assertThat(insight.toString()) .matches("\\[buffered_nanos=[0-9]+, remote_addr=127\\.0\\.0\\.1:443\\]"); } + + private void callMeMaybe(Runnable r) { + if (r != null) { + r.run(); + } + } }