diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index 3d277bbe2fc..1fb8d3c43bd 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -30,8 +30,10 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Status; +import io.grpc.SynchronizationContext; import io.grpc.internal.ClientStreamListener.RpcProgress; import java.io.InputStream; +import java.lang.Thread.UncaughtExceptionHandler; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -64,6 +66,16 @@ abstract class RetriableStream implements ClientStream { private final MethodDescriptor method; private final Executor callExecutor; + private final Executor listenerSerializeExecutor = new SynchronizationContext( + new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + throw Status.fromThrowable(e) + .withDescription("Uncaught exception in the SynchronizationContext. Re-thrown.") + .asRuntimeException(); + } + } + ); private final ScheduledExecutorService scheduledExecutorService; // Must not modify it. private final Metadata headers; @@ -105,6 +117,7 @@ abstract class RetriableStream implements ClientStream { private FutureCanceller scheduledHedging; private long nextBackoffIntervalNanos; private Status cancellationStatus; + private boolean isClosed; RetriableStream( MethodDescriptor method, Metadata headers, @@ -247,6 +260,7 @@ private void drain(Substream substream) { int chunk = 0x80; List list = null; boolean streamStarted = false; + Runnable onReadyRunnable = null; while (true) { State savedState; @@ -264,7 +278,18 @@ private void drain(Substream substream) { } if (index == savedState.buffer.size()) { // I'm drained state = savedState.substreamDrained(substream); - return; + if (!isReady()) { + return; + } + onReadyRunnable = new Runnable() { + @Override + public void run() { + if (!isClosed) { + masterListener.onReady(); + } + } + }; + break; } if (substream.closed) { @@ -299,6 +324,11 @@ private void drain(Substream substream) { } } + if (onReadyRunnable != null) { + listenerSerializeExecutor.execute(onReadyRunnable); + return; + } + substream.stream.cancel( state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED); } @@ -450,14 +480,22 @@ public void run() { } @Override - public final void cancel(Status reason) { + public final void cancel(final Status reason) { Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */); noopSubstream.stream = new NoopClientStream(); Runnable runnable = commit(noopSubstream); if (runnable != null) { - masterListener.closed(reason, RpcProgress.PROCESSED, new Metadata()); runnable.run(); + listenerSerializeExecutor.execute( + new Runnable() { + @Override + public void run() { + isClosed = true; + masterListener.closed(reason, RpcProgress.PROCESSED, new Metadata()); + + } + }); return; } @@ -771,18 +809,25 @@ private final class Sublistener implements ClientStreamListener { } @Override - public void headersRead(Metadata headers) { + public void headersRead(final Metadata headers) { commitAndRun(substream); if (state.winningSubstream == substream) { - masterListener.headersRead(headers); if (throttle != null) { throttle.onSuccess(); } + listenerSerializeExecutor.execute( + new Runnable() { + @Override + public void run() { + masterListener.headersRead(headers); + } + }); } } @Override - public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) { + public void closed( + final Status status, final RpcProgress rpcProgress, final Metadata trailers) { synchronized (lock) { state = state.substreamClosed(substream); closedSubstreamsInsight.append(status.getCode()); @@ -793,7 +838,14 @@ public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) { if (substream.bufferLimitExceeded) { commitAndRun(substream); if (state.winningSubstream == substream) { - masterListener.closed(status, rpcProgress, trailers); + listenerSerializeExecutor.execute( + new Runnable() { + @Override + public void run() { + isClosed = true; + masterListener.closed(status, rpcProgress, trailers); + } + }); } return; } @@ -900,7 +952,14 @@ public void run() { commitAndRun(substream); if (state.winningSubstream == substream) { - masterListener.closed(status, rpcProgress, trailers); + listenerSerializeExecutor.execute( + new Runnable() { + @Override + public void run() { + isClosed = true; + masterListener.closed(status, rpcProgress, trailers); + } + }); } } @@ -970,22 +1029,37 @@ private Integer getPushbackMills(Metadata trailer) { } @Override - public void messagesAvailable(MessageProducer producer) { + public void messagesAvailable(final MessageProducer producer) { State savedState = state; checkState( savedState.winningSubstream != null, "Headers should be received prior to messages."); if (savedState.winningSubstream != substream) { return; } - masterListener.messagesAvailable(producer); + listenerSerializeExecutor.execute( + new Runnable() { + @Override + public void run() { + masterListener.messagesAvailable(producer); + } + }); } @Override public void onReady() { // FIXME(#7089): hedging case is broken. - // TODO(zdapeng): optimization: if the substream is not drained yet, delay onReady() once - // drained and if is still ready. - masterListener.onReady(); + if (!isReady()) { + return; + } + listenerSerializeExecutor.execute( + new Runnable() { + @Override + public void run() { + if (!isClosed) { + masterListener.onReady(); + } + } + }); } } diff --git a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java index c9ea504e18b..8b851573b21 100644 --- a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java +++ b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java @@ -256,6 +256,7 @@ public Void answer(InvocationOnMock in) { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); retriableStream.sendMessage("msg1"); @@ -308,6 +309,7 @@ public Void answer(InvocationOnMock in) { inOrder.verify(mockStream2).writeMessage(any(InputStream.class)); inOrder.verify(mockStream2).request(456); inOrder.verify(mockStream2, times(2)).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); // send more messages @@ -356,6 +358,7 @@ public Void answer(InvocationOnMock in) { inOrder.verify(mockStream3).writeMessage(any(InputStream.class)); inOrder.verify(mockStream3).request(456); inOrder.verify(mockStream3, times(7)).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream3).isReady(); inOrder.verifyNoMoreInteractions(); InsightBuilder insight = new InsightBuilder(); @@ -637,6 +640,7 @@ public void retry_cancelWhileBackoff() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); verify(mockStream1).start(sublistenerCaptor1.capture()); + verify(mockStream1).isReady(); // retry ClientStream mockStream2 = mock(ClientStream.class); @@ -656,7 +660,7 @@ public void retry_cancelWhileBackoff() { @Test public void operationsWhileDraining() { - ArgumentCaptor sublistenerCaptor1 = + final ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); final AtomicReference sublistenerCaptor2 = new AtomicReference<>(); @@ -669,10 +673,16 @@ public void operationsWhileDraining() { @Override public void request(int numMessages) { retriableStream.sendMessage("substream1 request " + numMessages); + sublistenerCaptor1.getValue().onReady(); if (numMessages > 1) { retriableStream.request(--numMessages); } } + + @Override + public boolean isReady() { + return true; + } })); final ClientStream mockStream2 = @@ -688,7 +698,7 @@ public void start(ClientStreamListener listener) { @Override public void request(int numMessages) { retriableStream.sendMessage("substream2 request " + numMessages); - + sublistenerCaptor2.get().onReady(); if (numMessages == 3) { sublistenerCaptor2.get().headersRead(new Metadata()); } @@ -699,9 +709,14 @@ public void request(int numMessages) { retriableStream.cancel(cancelStatus); } } + + @Override + public boolean isReady() { + return true; + } })); - InOrder inOrder = inOrder(retriableStreamRecorder, mockStream1, mockStream2); + InOrder inOrder = inOrder(retriableStreamRecorder, mockStream1, mockStream2, masterListener); doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); retriableStream.start(masterListener); @@ -716,6 +731,7 @@ public void request(int numMessages) { inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); // msg "substream1 request 2" inOrder.verify(mockStream1).request(1); inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); // msg "substream1 request 1" + inOrder.verify(masterListener).onReady(); // retry doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); @@ -743,8 +759,8 @@ public void request(int numMessages) { // msg "substream2 request 2" inOrder.verify(mockStream2, times(2)).writeMessage(any(InputStream.class)); inOrder.verify(mockStream2).request(100); - - verify(mockStream2).cancel(cancelStatus); + inOrder.verify(mockStream2).cancel(cancelStatus); + inOrder.verify(masterListener, never()).onReady(); // "substream2 request 1" will never be sent inOrder.verify(mockStream2, never()).writeMessage(any(InputStream.class)); @@ -1073,6 +1089,7 @@ public void perRpcBufferLimitExceededDuringBackoff() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); verify(mockStream1).start(sublistenerCaptor1.capture()); + verify(mockStream1).isReady(); bufferSizeTracer.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1); @@ -1089,6 +1106,7 @@ public void perRpcBufferLimitExceededDuringBackoff() { fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); verify(mockStream2).start(any(ClientStreamListener.class)); + verify(mockStream2).isReady(); // bufferLimitExceeded bufferSizeTracer.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1); @@ -1152,6 +1170,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); @@ -1167,6 +1186,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); // retry2 @@ -1183,6 +1203,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() { ArgumentCaptor sublistenerCaptor3 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); + inOrder.verify(mockStream3).isReady(); inOrder.verifyNoMoreInteractions(); // retry3 @@ -1200,6 +1221,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() { ArgumentCaptor sublistenerCaptor4 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream4).start(sublistenerCaptor4.capture()); + inOrder.verify(mockStream4).isReady(); inOrder.verifyNoMoreInteractions(); // retry4 @@ -1214,6 +1236,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() { ArgumentCaptor sublistenerCaptor5 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream5).start(sublistenerCaptor5.capture()); + inOrder.verify(mockStream5).isReady(); inOrder.verifyNoMoreInteractions(); // retry5 @@ -1228,6 +1251,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() { ArgumentCaptor sublistenerCaptor6 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream6).start(sublistenerCaptor6.capture()); + inOrder.verify(mockStream6).isReady(); inOrder.verifyNoMoreInteractions(); // can not retry any more @@ -1258,6 +1282,7 @@ public void pushback() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); @@ -1276,6 +1301,7 @@ public void pushback() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); // retry2 @@ -1293,6 +1319,7 @@ public void pushback() { ArgumentCaptor sublistenerCaptor3 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); + inOrder.verify(mockStream3).isReady(); inOrder.verifyNoMoreInteractions(); // retry3 @@ -1307,6 +1334,7 @@ public void pushback() { ArgumentCaptor sublistenerCaptor4 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream4).start(sublistenerCaptor4.capture()); + inOrder.verify(mockStream4).isReady(); inOrder.verifyNoMoreInteractions(); // retry4 @@ -1323,6 +1351,7 @@ public void pushback() { ArgumentCaptor sublistenerCaptor5 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream5).start(sublistenerCaptor5.capture()); + inOrder.verify(mockStream5).isReady(); inOrder.verifyNoMoreInteractions(); // retry5 @@ -1340,6 +1369,7 @@ public void pushback() { ArgumentCaptor sublistenerCaptor6 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream6).start(sublistenerCaptor6.capture()); + inOrder.verify(mockStream6).isReady(); inOrder.verifyNoMoreInteractions(); // can not retry any more even pushback is positive @@ -1597,6 +1627,7 @@ public void transparentRetry() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); // transparent retry @@ -1608,6 +1639,7 @@ public void transparentRetry() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); verify(retriableStreamRecorder, never()).postCommit(); assertEquals(0, fakeClock.numPendingTasks()); @@ -1623,6 +1655,7 @@ public void transparentRetry() { ArgumentCaptor sublistenerCaptor3 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); + inOrder.verify(mockStream3).isReady(); inOrder.verifyNoMoreInteractions(); verify(retriableStreamRecorder, never()).postCommit(); assertEquals(0, fakeClock.numPendingTasks()); @@ -1645,6 +1678,7 @@ public void normalRetry_thenNoTransparentRetry_butNormalRetry() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); // normal retry @@ -1658,6 +1692,7 @@ public void normalRetry_thenNoTransparentRetry_butNormalRetry() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); verify(retriableStreamRecorder, never()).postCommit(); assertEquals(0, fakeClock.numPendingTasks()); @@ -1674,6 +1709,7 @@ public void normalRetry_thenNoTransparentRetry_butNormalRetry() { ArgumentCaptor sublistenerCaptor3 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); + inOrder.verify(mockStream3).isReady(); inOrder.verifyNoMoreInteractions(); verify(retriableStreamRecorder, never()).postCommit(); } @@ -1695,6 +1731,7 @@ public void normalRetry_thenNoTransparentRetry_andNoMoreRetry() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); // normal retry @@ -1708,6 +1745,7 @@ public void normalRetry_thenNoTransparentRetry_andNoMoreRetry() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); verify(retriableStreamRecorder, never()).postCommit(); assertEquals(0, fakeClock.numPendingTasks()); @@ -1738,6 +1776,7 @@ method, new Metadata(), channelBufferUsed, PER_RPC_BUFFER_LIMIT, CHANNEL_BUFFER_ ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); // transparent retry @@ -1750,6 +1789,7 @@ method, new Metadata(), channelBufferUsed, PER_RPC_BUFFER_LIMIT, CHANNEL_BUFFER_ ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(retriableStreamRecorder).postCommit(); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); assertEquals(0, fakeClock.numPendingTasks()); } @@ -1768,6 +1808,7 @@ public void droppedShouldNeverRetry() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); verify(mockStream1).start(sublistenerCaptor1.capture()); + verify(mockStream1).isReady(); // drop and verify no retry Status status = Status.fromCode(RETRIABLE_STATUS_CODE_1); @@ -1839,6 +1880,7 @@ public Void answer(InvocationOnMock in) { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); hedgingStream.sendMessage("msg1"); @@ -1880,6 +1922,8 @@ public Void answer(InvocationOnMock in) { inOrder.verify(mockStream2, times(2)).flush(); inOrder.verify(mockStream2).writeMessage(any(InputStream.class)); inOrder.verify(mockStream2).request(456); + inOrder.verify(mockStream1).isReady(); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); // send more messages @@ -1917,6 +1961,9 @@ public Void answer(InvocationOnMock in) { inOrder.verify(mockStream3).writeMessage(any(InputStream.class)); inOrder.verify(mockStream3).request(456); inOrder.verify(mockStream3, times(2)).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream1).isReady(); + inOrder.verify(mockStream2).isReady(); + inOrder.verify(mockStream3).isReady(); inOrder.verifyNoMoreInteractions(); // send one more message @@ -1959,6 +2006,9 @@ public Void answer(InvocationOnMock in) { inOrder.verify(mockStream4).writeMessage(any(InputStream.class)); inOrder.verify(mockStream4).request(456); inOrder.verify(mockStream4, times(4)).writeMessage(any(InputStream.class)); + inOrder.verify(mockStream1).isReady(); + inOrder.verify(mockStream2).isReady(); + inOrder.verify(mockStream4).isReady(); inOrder.verifyNoMoreInteractions(); InsightBuilder insight = new InsightBuilder(); @@ -2009,6 +2059,7 @@ public void hedging_maxAttempts() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2016,6 +2067,7 @@ public void hedging_maxAttempts() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2023,6 +2075,7 @@ public void hedging_maxAttempts() { ArgumentCaptor sublistenerCaptor3 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); + inOrder.verify(mockStream3).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2030,6 +2083,7 @@ public void hedging_maxAttempts() { ArgumentCaptor sublistenerCaptor4 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream4).start(sublistenerCaptor4.capture()); + inOrder.verify(mockStream4).isReady(); inOrder.verifyNoMoreInteractions(); // a random one of the hedges fails @@ -2041,6 +2095,7 @@ public void hedging_maxAttempts() { ArgumentCaptor sublistenerCaptor5 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream5).start(sublistenerCaptor5.capture()); + inOrder.verify(mockStream5).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2048,6 +2103,7 @@ public void hedging_maxAttempts() { ArgumentCaptor sublistenerCaptor6 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream6).start(sublistenerCaptor6.capture()); + inOrder.verify(mockStream6).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2092,6 +2148,7 @@ public void hedging_receiveHeaders() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2099,6 +2156,7 @@ public void hedging_receiveHeaders() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2106,6 +2164,7 @@ public void hedging_receiveHeaders() { ArgumentCaptor sublistenerCaptor3 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); + inOrder.verify(mockStream3).isReady(); inOrder.verifyNoMoreInteractions(); // a random one of the hedges receives headers @@ -2143,6 +2202,7 @@ public void hedging_pushback_negative() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2150,6 +2210,7 @@ public void hedging_pushback_negative() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2157,6 +2218,7 @@ public void hedging_pushback_negative() { ArgumentCaptor sublistenerCaptor3 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); + inOrder.verify(mockStream3).isReady(); inOrder.verifyNoMoreInteractions(); // a random one of the hedges receives a negative pushback @@ -2188,6 +2250,7 @@ public void hedging_pushback_positive() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2195,6 +2258,7 @@ public void hedging_pushback_positive() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); @@ -2212,6 +2276,7 @@ public void hedging_pushback_positive() { ArgumentCaptor sublistenerCaptor3 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); + inOrder.verify(mockStream3).isReady(); inOrder.verifyNoMoreInteractions(); // hedge2 receives a pushback for HEDGING_DELAY_IN_SECONDS - 1 second @@ -2225,6 +2290,7 @@ public void hedging_pushback_positive() { ArgumentCaptor sublistenerCaptor4 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream4).start(sublistenerCaptor4.capture()); + inOrder.verify(mockStream4).isReady(); inOrder.verifyNoMoreInteractions(); // commit @@ -2254,6 +2320,7 @@ public void hedging_cancelled() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); inOrder.verifyNoMoreInteractions(); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); @@ -2261,6 +2328,8 @@ public void hedging_cancelled() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream1).isReady(); + inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); Status status = Status.CANCELLED.withDescription("cancelled"); @@ -2275,6 +2344,8 @@ public void hedging_cancelled() { assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription()); inOrder.verify(retriableStreamRecorder).postCommit(); + inOrder.verify(masterListener).closed( + any(Status.class), any(RpcProgress.class), any(Metadata.class)); inOrder.verifyNoMoreInteractions(); } @@ -2289,6 +2360,7 @@ public void hedging_perRpcBufferLimitExceeded() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); verify(mockStream1).start(sublistenerCaptor1.capture()); + verify(mockStream1).isReady(); ClientStreamTracer bufferSizeTracer1 = bufferSizeTracer; bufferSizeTracer1.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1); @@ -2297,6 +2369,8 @@ public void hedging_perRpcBufferLimitExceeded() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); verify(mockStream2).start(sublistenerCaptor2.capture()); + verify(mockStream1, times(2)).isReady(); + verify(mockStream2).isReady(); ClientStreamTracer bufferSizeTracer2 = bufferSizeTracer; bufferSizeTracer2.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1); @@ -2313,6 +2387,7 @@ public void hedging_perRpcBufferLimitExceeded() { verify(retriableStreamRecorder).postCommit(); verifyNoMoreInteractions(mockStream1); + verify(mockStream2).isReady(); verifyNoMoreInteractions(mockStream2); } @@ -2327,6 +2402,7 @@ public void hedging_channelBufferLimitExceeded() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); verify(mockStream1).start(sublistenerCaptor1.capture()); + verify(mockStream1).isReady(); ClientStreamTracer bufferSizeTracer1 = bufferSizeTracer; bufferSizeTracer1.outboundWireSize(100); @@ -2335,6 +2411,8 @@ public void hedging_channelBufferLimitExceeded() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); verify(mockStream2).start(sublistenerCaptor2.capture()); + verify(mockStream1, times(2)).isReady(); + verify(mockStream2).isReady(); ClientStreamTracer bufferSizeTracer2 = bufferSizeTracer; bufferSizeTracer2.outboundWireSize(100);