From ac999724b9610266a18d162830aaff7e6bef5d72 Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Tue, 8 Dec 2020 21:10:09 -0800 Subject: [PATCH 01/11] check pending stream completion at delayed transport lifecycle --- .../grpc/internal/DelayedClientTransport.java | 103 +++++++--- .../java/io/grpc/internal/DelayedStream.java | 27 +++ .../internal/DelayedClientTransportTest.java | 194 +++++++++++++++++- .../io/grpc/internal/DelayedStreamTest.java | 33 +++ .../grpc/internal/ManagedChannelImplTest.java | 3 + 5 files changed, 334 insertions(+), 26 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 3922ee5b89e..24168249643 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -66,6 +66,23 @@ final class DelayedClientTransport implements ManagedClientTransport { @Nonnull @GuardedBy("lock") private Collection pendingStreams = new LinkedHashSet<>(); + @GuardedBy("lock") + private Collection toCheckCompletionStreams = new LinkedHashSet<>(); + private Runnable pollForStreamTransferComplete = new Runnable() { + @Override + public void run() { + ArrayList savedToCheckCompletionStreams; + synchronized (lock) { + savedToCheckCompletionStreams = new ArrayList<>(toCheckCompletionStreams); + if (!toCheckCompletionStreams.isEmpty()) { + toCheckCompletionStreams = Collections.emptyList(); + } + } + for (final PendingStream stream : savedToCheckCompletionStreams) { + stream.awaitStreamTransferCompletion(); + } + } + }; /** * When {@code shutdownStatus != null && !hasPendingStreams()}, then the transport is considered @@ -211,7 +228,7 @@ public void run() { listener.transportShutdown(status); } }); - if (!hasPendingStreams() && reportTransportTerminated != null) { + if (!hasPendingStreams() && !hasUncommittedStreams() && reportTransportTerminated != null) { syncContext.executeLater(reportTransportTerminated); reportTransportTerminated = null; } @@ -227,19 +244,27 @@ public void run() { public final void shutdownNow(Status status) { shutdown(status); Collection savedPendingStreams; + Collection savedToCheckCompletionStreams; Runnable savedReportTransportTerminated; synchronized (lock) { savedPendingStreams = pendingStreams; + savedToCheckCompletionStreams = toCheckCompletionStreams; savedReportTransportTerminated = reportTransportTerminated; reportTransportTerminated = null; if (!pendingStreams.isEmpty()) { pendingStreams = Collections.emptyList(); } + if (!toCheckCompletionStreams.isEmpty()) { + toCheckCompletionStreams = Collections.emptyList(); + } } if (savedReportTransportTerminated != null) { for (PendingStream stream : savedPendingStreams) { stream.cancel(status); } + for (PendingStream stream : savedToCheckCompletionStreams) { + stream.awaitStreamTransferCompletion(); + } syncContext.execute(savedReportTransportTerminated); } // If savedReportTransportTerminated == null, transportTerminated() has already been called in @@ -252,6 +277,12 @@ public final boolean hasPendingStreams() { } } + public final boolean hasUncommittedStreams() { + synchronized (lock) { + return !toCheckCompletionStreams.isEmpty(); + } + } + @VisibleForTesting final int getPendingStreamsCount() { synchronized (lock) { @@ -259,6 +290,13 @@ final int getPendingStreamsCount() { } } + @VisibleForTesting + final int getUncommittedStreamCount() { + synchronized (lock) { + return toCheckCompletionStreams.size(); + } + } + /** * Use the picker to try picking a transport for every pending stream, proceed the stream if the * pick is successful, otherwise keep it pending. @@ -270,48 +308,61 @@ final int getPendingStreamsCount() { *

This method must not be called concurrently with itself. */ final void reprocess(@Nullable SubchannelPicker picker) { - ArrayList toProcess; + ArrayList toCreateRealStream; + ArrayList toCheckCompletion; synchronized (lock) { lastPicker = picker; lastPickerVersion++; - if (picker == null || !hasPendingStreams()) { + if ((picker == null || !hasPendingStreams()) && !hasUncommittedStreams()) { return; } - toProcess = new ArrayList<>(pendingStreams); + toCreateRealStream = new ArrayList<>(pendingStreams); + toCheckCompletion = new ArrayList<>(toCheckCompletionStreams); } - ArrayList toRemove = new ArrayList<>(); + ArrayList newlyCreated = new ArrayList<>(); - for (final PendingStream stream : toProcess) { - PickResult pickResult = picker.pickSubchannel(stream.args); - CallOptions callOptions = stream.args.getCallOptions(); - final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, - callOptions.isWaitForReady()); - if (transport != null) { - Executor executor = defaultAppExecutor; - // createRealStream may be expensive. It will start real streams on the transport. If - // there are pending requests, they will be serialized too, which may be expensive. Since - // we are now on transport thread, we need to offload the work to an executor. - if (callOptions.getExecutor() != null) { - executor = callOptions.getExecutor(); - } - executor.execute(new Runnable() { + + if (picker != null) { + for (final PendingStream stream : toCreateRealStream) { + PickResult pickResult = picker.pickSubchannel(stream.args); + CallOptions callOptions = stream.args.getCallOptions(); + final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, + callOptions.isWaitForReady()); + if (transport != null) { + Executor executor = defaultAppExecutor; + // createRealStream may be expensive. It will start real streams on the transport. If + // there are pending requests, they will be serialized too, which may be expensive. Since + // we are now on transport thread, we need to offload the work to an executor. + if (callOptions.getExecutor() != null) { + executor = callOptions.getExecutor(); + } + executor.execute(new Runnable() { @Override public void run() { stream.createRealStream(transport); } }); - toRemove.add(stream); - } // else: stay pending + newlyCreated.add(stream); + } // else: stay pending + } + } + toCheckCompletion.addAll(newlyCreated); + ArrayList completed = new ArrayList<>(); + for (final PendingStream stream : toCheckCompletion) { + if (stream.isStreamTransferCompleted()) { + completed.add(stream); + } } - synchronized (lock) { // Between this synchronized and the previous one: // - Streams may have been cancelled, which may turn pendingStreams into emptiness. - // - shutdown() may be called, which may turn pendingStreams into null. - if (!hasPendingStreams()) { + // - shutdownNow() may be called, which may turn pendingStreams into emptiness. + if (!hasPendingStreams() && !hasUncommittedStreams()) { return; } - pendingStreams.removeAll(toRemove); + pendingStreams.removeAll(newlyCreated); + toCheckCompletionStreams.addAll(newlyCreated); + toCheckCompletionStreams.removeAll(completed); // Because delayed transport is long-lived, we take this opportunity to down-size the // hashmap. if (pendingStreams.isEmpty()) { @@ -325,6 +376,7 @@ public void run() { // than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second). syncContext.executeLater(reportTransportNotInUse); if (shutdownStatus != null && reportTransportTerminated != null) { + syncContext.executeLater(pollForStreamTransferComplete); syncContext.executeLater(reportTransportTerminated); reportTransportTerminated = null; } @@ -367,6 +419,7 @@ public void cancel(Status reason) { if (!hasPendingStreams() && justRemovedAnElement) { syncContext.executeLater(reportTransportNotInUse); if (shutdownStatus != null) { + syncContext.executeLater(pollForStreamTransferComplete); syncContext.executeLater(reportTransportTerminated); reportTransportTerminated = null; } diff --git a/core/src/main/java/io/grpc/internal/DelayedStream.java b/core/src/main/java/io/grpc/internal/DelayedStream.java index be21b4991ba..68464627320 100644 --- a/core/src/main/java/io/grpc/internal/DelayedStream.java +++ b/core/src/main/java/io/grpc/internal/DelayedStream.java @@ -29,6 +29,8 @@ import java.io.InputStream; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import javax.annotation.concurrent.GuardedBy; /** @@ -59,6 +61,7 @@ class DelayedStream implements ClientStream { private long startTimeNanos; @GuardedBy("this") private long streamSetTimeNanos; + private final CountDownLatch realStreamStarted = new CountDownLatch(1); @Override public void setMaxInboundMessageSize(final int maxSize) { @@ -132,6 +135,24 @@ final void setStream(ClientStream stream) { drainPendingCalls(); } + protected boolean isStreamTransferCompleted() { + return realStreamStarted.getCount() == 0; + } + + protected void awaitStreamTransferCompletion() { + // Wait until accepted RPCs transfer to the real stream and so that we can properly cancel or + // shutdown. Not waiting transfer completed may cause pending calls orphaned. + boolean delegationComplete; + try { + delegationComplete = realStreamStarted.await(5, TimeUnit.SECONDS); + } catch (InterruptedException ex) { + delegationComplete = false; + } + if (!delegationComplete) { + Thread.currentThread().interrupt(); + } + } + /** * Called to transition {@code passThrough} to {@code true}. This method is not safe to be called * multiple times; the caller must ensure it will only be called once, ever. {@code this} lock @@ -221,12 +242,14 @@ public void start(ClientStreamListener listener) { if (savedPassThrough) { realStream.start(listener); + realStreamStarted.countDown(); } else { final ClientStreamListener finalListener = listener; delayOrExecute(new Runnable() { @Override public void run() { realStream.start(finalListener); + realStreamStarted.countDown(); } }); } @@ -302,7 +325,11 @@ public void run() { listenerToClose.closed(reason, new Metadata()); } drainPendingCalls(); + if (!isStreamTransferCompleted()) { + realStreamStarted.countDown(); + } } + awaitStreamTransferCompletion(); } @GuardedBy("this") diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index 41a97d62f9a..4a2ed84b858 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -45,6 +45,7 @@ import io.grpc.StringMarshaller; import io.grpc.SynchronizationContext; import io.grpc.internal.ClientStreamListener.RpcProgress; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -129,6 +130,7 @@ public void uncaughtException(Thread t, Throwable e) { @After public void noMorePendingTasks() { assertEquals(0, fakeExecutor.numPendingTasks()); + assertFalse(Thread.interrupted()); } @Test public void streamStartThenAssignTransport() { @@ -159,9 +161,15 @@ public void uncaughtException(Thread t, Throwable e) { assertEquals(0, delayedTransport.getPendingStreamsCount()); delayedTransport.shutdown(SHUTDOWN_STATUS); verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); - verify(transportListener).transportTerminated(); + verify(transportListener, never()).transportTerminated(); //uncommittedStream prevents shutdown. assertEquals(1, fakeExecutor.runDueTasks()); verify(mockRealTransport).newStream(same(method), same(headers), same(callOptions)); + + delayedTransport.reprocess(null);//uncommittedStream drained. + verifyNoMoreInteractions(mockRealTransport); + assertEquals(0, fakeExecutor.runDueTasks()); + verify(transportListener).transportTerminated(); //uncommittedStream prevents shutdown. + assertTrue(Thread.interrupted()); stream.start(streamListener); verify(mockRealStream).start(same(streamListener)); } @@ -203,6 +211,7 @@ public void uncaughtException(Thread t, Throwable e) { assertEquals(1, delayedTransport.getPendingStreamsCount()); stream.cancel(Status.CANCELLED); assertEquals(0, delayedTransport.getPendingStreamsCount()); + assertFalse(Thread.interrupted()); verifyNoMoreInteractions(mockRealTransport); verifyNoMoreInteractions(mockRealStream); } @@ -213,11 +222,60 @@ public void uncaughtException(Thread t, Throwable e) { assertEquals(1, delayedTransport.getPendingStreamsCount()); stream.cancel(Status.CANCELLED); assertEquals(0, delayedTransport.getPendingStreamsCount()); + assertFalse(Thread.interrupted()); verify(streamListener).closed(same(Status.CANCELLED), any(Metadata.class)); verifyNoMoreInteractions(mockRealTransport); verifyNoMoreInteractions(mockRealStream); } + @Test + public void cancelStreamWhenDelegated() { + ClientStream stream = delayedTransport.newStream(method, headers, callOptions); + stream.start(streamListener); + assertEquals(1, delayedTransport.getPendingStreamsCount()); + delayedTransport.reprocess(mockPicker); + fakeExecutor.runDueTasks(); + stream.cancel(Status.CANCELLED); + assertEquals(0, delayedTransport.getPendingStreamsCount()); + assertFalse(Thread.interrupted()); + verify(mockRealStream).start(listenerCaptor.capture()); + listenerCaptor.getValue().onReady(); + verify(streamListener).onReady(); + verify(mockRealStream).cancel(same(Status.CANCELLED)); + } + + @Test + public void cancelStreamShutdownLastPending() { + ClientStream stream1 = delayedTransport.newStream(method, headers, callOptions); + stream1.start(streamListener); + ClientStream stream2 = delayedTransport.newStream(method, headers, callOptions); + stream2.start(streamListener); + ClientStream stream3 = delayedTransport.newStream(method, headers, callOptions); + stream3.start(streamListener); + when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withSubchannel(mockSubchannel)) + .thenReturn(PickResult.withNoResult()) //stream2 stay + .thenReturn(PickResult.withSubchannel(mockSubchannel)); + + assertEquals(3, delayedTransport.getPendingStreamsCount()); + assertEquals(0, delayedTransport.getUncommittedStreamCount()); + delayedTransport.reprocess(mockPicker); + fakeExecutor.runDueTasks(); + assertEquals(1, delayedTransport.getPendingStreamsCount()); + assertEquals(2, delayedTransport.getUncommittedStreamCount()); + + delayedTransport.shutdown(SHUTDOWN_STATUS); + verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); + verify(transportListener, never()).transportTerminated(); + stream2.cancel(Status.CANCELLED); + assertEquals(0, delayedTransport.getPendingStreamsCount()); + assertEquals(0, delayedTransport.getUncommittedStreamCount()); + assertFalse(Thread.interrupted()); + verify(mockRealStream, times(2)).start(any(ClientStreamListener.class)); + verify(mockRealStream, never()).cancel(any(Status.class));//substituted + verify(transportListener).transportTerminated(); + } + @Test public void newStreamThenShutdownTransportThenAssignTransport() { ClientStream stream = delayedTransport.newStream(method, headers, callOptions); stream.start(streamListener); @@ -249,6 +307,7 @@ public void uncaughtException(Thread t, Throwable e) { assertEquals(0, delayedTransport.getPendingStreamsCount()); verifyNoMoreInteractions(mockRealTransport); verifyNoMoreInteractions(mockRealStream); + assertTrue(Thread.interrupted()); } @Test public void newStreamThenShutdownTransportThenCancelStream() { @@ -259,6 +318,7 @@ public void uncaughtException(Thread t, Throwable e) { assertEquals(1, delayedTransport.getPendingStreamsCount()); stream.cancel(Status.CANCELLED); verify(transportListener).transportTerminated(); + assertFalse(Thread.interrupted()); assertEquals(0, delayedTransport.getPendingStreamsCount()); verifyNoMoreInteractions(mockRealTransport); verifyNoMoreInteractions(mockRealStream); @@ -315,6 +375,7 @@ public void uncaughtException(Thread t, Throwable e) { // Fail-fast streams DelayedStream ff1 = (DelayedStream) delayedTransport.newStream( method, headers, failFastCallOptions); + ff1.start(streamListener); PickSubchannelArgsImpl ff1args = new PickSubchannelArgsImpl(method, headers, failFastCallOptions); verify(transportListener).transportInUse(true); @@ -370,6 +431,7 @@ public void uncaughtException(Thread t, Throwable e) { delayedTransport.reprocess(picker); assertEquals(5, delayedTransport.getPendingStreamsCount()); + assertEquals(3, delayedTransport.getUncommittedStreamCount()); inOrder.verify(picker).pickSubchannel(ff1args); inOrder.verify(picker).pickSubchannel(ff2args); inOrder.verify(picker).pickSubchannel(ff3args); @@ -416,6 +478,7 @@ public void uncaughtException(Thread t, Throwable e) { delayedTransport.reprocess(picker); assertEquals(0, delayedTransport.getPendingStreamsCount()); + assertEquals(7, delayedTransport.getUncommittedStreamCount());//+5(new) - 1(completed) verify(transportListener).transportInUse(false); inOrder.verify(picker).pickSubchannel(ff3args); // ff3 inOrder.verify(picker).pickSubchannel(ff4args); // ff4 @@ -425,11 +488,18 @@ public void uncaughtException(Thread t, Throwable e) { inOrder.verifyNoMoreInteractions(); fakeExecutor.runDueTasks(); assertEquals(0, fakeExecutor.numPendingTasks()); + assertEquals(7, delayedTransport.getUncommittedStreamCount()); assertSame(mockRealStream, ff3.getRealStream()); assertSame(mockRealStream2, ff4.getRealStream()); assertSame(mockRealStream2, wfr2.getRealStream()); assertSame(mockRealStream2, wfr4.getRealStream()); + ff3.start(streamListener); + ff4.start(streamListener); + wfr2.start(streamListener); + delayedTransport.reprocess(picker); + assertEquals(4, delayedTransport.getUncommittedStreamCount());//+0(new) - 3(completed) + // If there is an executor in the CallOptions, it will be used to create the real stream. assertNull(wfr3.getRealStream()); wfr3Executor.runDueTasks(); @@ -443,6 +513,7 @@ public void uncaughtException(Thread t, Throwable e) { new PickSubchannelArgsImpl(method, headers, waitForReadyCallOptions)); inOrder.verifyNoMoreInteractions(); assertEquals(1, delayedTransport.getPendingStreamsCount()); + assertEquals(4, delayedTransport.getUncommittedStreamCount()); // wfr5 will stop delayed transport from terminating delayedTransport.shutdown(SHUTDOWN_STATUS); @@ -452,13 +523,17 @@ public void uncaughtException(Thread t, Throwable e) { picker = mock(SubchannelPicker.class); when(picker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn( PickResult.withSubchannel(subchannel1)); + wfr3.start(streamListener); + wfr4.start(streamListener); delayedTransport.reprocess(picker); verify(picker).pickSubchannel( new PickSubchannelArgsImpl(method, headers, waitForReadyCallOptions)); fakeExecutor.runDueTasks(); assertSame(mockRealStream, wfr5.getRealStream()); assertEquals(0, delayedTransport.getPendingStreamsCount()); + assertEquals(0, delayedTransport.getUncommittedStreamCount()); verify(transportListener).transportTerminated(); + assertTrue(Thread.interrupted()); } @Test @@ -600,6 +675,123 @@ public void newStream_racesWithReprocessIdleMode() throws Exception { verify(transportListener).transportInUse(true); } + @Test + public void pendingStreamReprocessRacesShutdown() throws Exception { + final CyclicBarrier barrier = new CyclicBarrier(2); + final CountDownLatch barrierSignal = new CountDownLatch(1); + + DelayedStream stream1 = + (DelayedStream)delayedTransport.newStream(method, headers, callOptions); + stream1.start(streamListener); + DelayedStream stream2 = + (DelayedStream)delayedTransport.newStream(method2, headers2, callOptions2); + assertEquals(2, delayedTransport.getPendingStreamsCount()); + assertEquals(0, delayedTransport.getUncommittedStreamCount()); + when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withSubchannel(mockSubchannel)) + .thenReturn(PickResult.withNoResult()) + .thenReturn(PickResult.withSubchannel(mockSubchannel)); + + delayedTransport.reprocess(mockPicker); + fakeExecutor.runDueTasks(); + assertEquals(1, delayedTransport.getPendingStreamsCount());//stream2 + assertEquals(1, delayedTransport.getUncommittedStreamCount());//stream1 + + doAnswer(new Answer() { + @Override + @SuppressWarnings("CatchAndPrintStackTrace") + public PickResult answer(InvocationOnMock invocation) throws Throwable { + try { + barrierSignal.countDown(); + barrier.await(); + return PickResult.withNoResult(); + } catch (Exception e) { + e.printStackTrace(); + } + return PickResult.withNoResult(); + } + }).when(mockPicker).pickSubchannel(any(PickSubchannelArgs.class)); + + Thread processThread = new Thread("processThread") { + @Override + public void run() { + // Will call pickSubchannel and wait on barrier + delayedTransport.reprocess(mockPicker); + } + }; + processThread.start(); + assertEquals(1, delayedTransport.getPendingStreamsCount());//stream2 + assertEquals(1, delayedTransport.getUncommittedStreamCount());//stream1 + barrierSignal.await(5, TimeUnit.SECONDS); + assertEquals(1, barrier.getNumberWaiting()); + delayedTransport.shutdownNow(SHUTDOWN_STATUS); + assertFalse(delayedTransport.hasPendingStreams()); + assertFalse(delayedTransport.hasUncommittedStreams()); + assertFalse(Thread.interrupted()); + barrier.await(5, TimeUnit.SECONDS); + assertSame(mockRealStream, stream1.getRealStream()); + assertTrue(stream2.getRealStream() instanceof NoopClientStream); + verify(mockRealStream).start(any(ClientStreamListener.class)); + verify(mockRealStream, never()).cancel(any(Status.class)); + verifyNoMoreInteractions(mockRealStream);//stream2 was substituted with noop stream + } + + @Test + public void uncommittedStreamReprocess() { + ClientStream stream = delayedTransport.newStream(method, headers, callOptions); + assertTrue(stream instanceof DelayedStream); + stream.start(streamListener); + assertEquals(1, delayedTransport.getPendingStreamsCount()); + assertEquals(0, delayedTransport.getUncommittedStreamCount()); + delayedTransport.reprocess(mockPicker); + assertEquals(0, delayedTransport.getPendingStreamsCount()); + assertEquals(1, delayedTransport.getUncommittedStreamCount()); + assertFalse(((DelayedStream) stream).isStreamTransferCompleted()); + fakeExecutor.runDueTasks(); + assertTrue(((DelayedStream) stream).isStreamTransferCompleted()); + + delayedTransport.reprocess(mockPicker); + assertEquals(0, delayedTransport.getUncommittedStreamCount()); + + delayedTransport.shutdownNow(SHUTDOWN_STATUS); + verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); + verify(transportListener).transportTerminated(); + assertFalse(Thread.interrupted()); + verify(mockRealStream).start(listenerCaptor.capture()); + verifyNoMoreInteractions(streamListener); + listenerCaptor.getValue().onReady(); + verify(streamListener).onReady(); + verifyNoMoreInteractions(streamListener); + } + + @Test + public void uncommittedStreamShutdown() { + DelayedStream stream = (DelayedStream) delayedTransport.newStream(method, headers, callOptions); + stream.start(streamListener); + assertEquals(1, delayedTransport.getPendingStreamsCount()); + assertEquals(0, delayedTransport.getUncommittedStreamCount()); + delayedTransport.reprocess(mockPicker); + assertEquals(0, delayedTransport.getPendingStreamsCount()); + assertEquals(1, delayedTransport.getUncommittedStreamCount()); + assertFalse(stream.isStreamTransferCompleted()); + fakeExecutor.runDueTasks(); + assertTrue(stream.isStreamTransferCompleted()); + assertEquals(1, delayedTransport.getUncommittedStreamCount()); + + delayedTransport.shutdown(SHUTDOWN_STATUS); + verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); + verify(transportListener, never()).transportTerminated(); + delayedTransport.reprocess(null); + + assertEquals(0, delayedTransport.getUncommittedStreamCount()); + assertFalse(Thread.interrupted()); + verify(mockRealStream).start(listenerCaptor.capture()); + verifyNoMoreInteractions(streamListener); + listenerCaptor.getValue().onReady(); + verify(streamListener).onReady(); + verifyNoMoreInteractions(streamListener); + } + private static TransportProvider newTransportProvider(final ClientTransport transport) { return new TransportProvider() { @Override diff --git a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java index 393a6c6e6d0..a7720d2a45b 100644 --- a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java @@ -223,6 +223,7 @@ public void setStreamThenStartThenCancelled() { public void setStreamThenCancelled() { stream.setStream(realStream); stream.cancel(Status.CANCELLED); + assertTrue(Thread.interrupted()); verify(realStream).cancel(same(Status.CANCELLED)); } @@ -378,4 +379,36 @@ public Void answer(InvocationOnMock in) { assertThat(insight.toString()) .matches("\\[buffered_nanos=[0-9]+, remote_addr=127\\.0\\.0\\.1:443\\]"); } + + @Test + public void transferCompletion_realStreamStartThenSetThenCancel() { + assertFalse(stream.isStreamTransferCompleted()); + stream.start(listener); + assertFalse(stream.isStreamTransferCompleted()); + stream.setStream(realStream); + assertTrue(stream.isStreamTransferCompleted()); + stream.awaitStreamTransferCompletion(); + assertFalse(Thread.interrupted()); + stream.cancel(Status.CANCELLED); + verify(realStream).start(any(ClientStreamListener.class)); + verify(realStream).cancel(eq(Status.CANCELLED)); + verifyNoMoreInteractions(realStream); + } + + @Test + public void transferCompletion_realStreamSetThenStartThenCancel() { + assertFalse(stream.isStreamTransferCompleted()); + stream.setStream(realStream); + assertFalse(stream.isStreamTransferCompleted()); + stream.awaitStreamTransferCompletion(); + assertTrue(Thread.interrupted()); + stream.start(listener); + assertTrue(stream.isStreamTransferCompleted()); + stream.awaitStreamTransferCompletion(); + assertFalse(Thread.interrupted()); + stream.cancel(Status.CANCELLED); + verify(realStream).start(same(listener)); + verify(realStream).cancel(eq(Status.CANCELLED)); + verifyNoMoreInteractions(realStream); + } } diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 51811010b20..5cb627f70dd 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -353,6 +353,7 @@ public void allPendingTasksAreRun() throws Exception { // would ignore any time-sensitive tasks, e.g., back-off and the idle timer. assertTrue(timer.getDueTasks() + " should be empty", timer.getDueTasks().isEmpty()); assertEquals(executor.getPendingTasks() + " should be empty", 0, executor.numPendingTasks()); + assertFalse(Thread.interrupted()); if (channel != null) { if (!panicExpected) { assertFalse(channel.isInPanicMode()); @@ -874,6 +875,7 @@ private void subtestCallsAndShutdown(boolean shutdownNow, boolean shutdownNowAft when(picker2.pickSubchannel(new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT))) .thenReturn(PickResult.withSubchannel(subchannel)); updateBalancingStateSafely(helper, READY, picker2); + assertTrue(Thread.interrupted()); executor.runDueTasks(); verify(mockTransport).newStream(same(method), same(headers), same(CallOptions.DEFAULT)); verify(mockStream).start(any(ClientStreamListener.class)); @@ -1132,6 +1134,7 @@ public void nameResolutionFailed_delayedTransportShutdownCancelsBackoff() { when(picker.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(PickResult.withDrop(status)); updateBalancingStateSafely(helper, READY, picker); + assertTrue(Thread.interrupted()); executor.runDueTasks(); verify(mockCallListener).onClose(same(status), any(Metadata.class)); From 1fe3bcea1d62dba739e36ac730caad1a78769c97 Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Thu, 10 Dec 2020 14:40:40 -0800 Subject: [PATCH 02/11] rename --- .../grpc/internal/DelayedClientTransport.java | 10 ++--- .../java/io/grpc/internal/DelayedStream.java | 4 +- .../internal/DelayedClientTransportTest.java | 38 +++++++++---------- 3 files changed, 26 insertions(+), 26 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 24168249643..2ecbdf48b78 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -68,7 +68,7 @@ final class DelayedClientTransport implements ManagedClientTransport { private Collection pendingStreams = new LinkedHashSet<>(); @GuardedBy("lock") private Collection toCheckCompletionStreams = new LinkedHashSet<>(); - private Runnable pollForStreamTransferComplete = new Runnable() { + private Runnable pollForStreamTransferCompletion = new Runnable() { @Override public void run() { ArrayList savedToCheckCompletionStreams; @@ -291,7 +291,7 @@ final int getPendingStreamsCount() { } @VisibleForTesting - final int getUncommittedStreamCount() { + final int getUncommittedStreamsCount() { synchronized (lock) { return toCheckCompletionStreams.size(); } @@ -373,10 +373,10 @@ public void run() { // transport starting streams and setting in-use state. During the gap the whole channel's // in-use state may be false. However, it shouldn't cause spurious switching to idleness // (which would shutdown the transports and LoadBalancer) because the gap should be shorter - // than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second). + // than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (30 millis). syncContext.executeLater(reportTransportNotInUse); if (shutdownStatus != null && reportTransportTerminated != null) { - syncContext.executeLater(pollForStreamTransferComplete); + syncContext.executeLater(pollForStreamTransferCompletion); syncContext.executeLater(reportTransportTerminated); reportTransportTerminated = null; } @@ -419,7 +419,7 @@ public void cancel(Status reason) { if (!hasPendingStreams() && justRemovedAnElement) { syncContext.executeLater(reportTransportNotInUse); if (shutdownStatus != null) { - syncContext.executeLater(pollForStreamTransferComplete); + syncContext.executeLater(pollForStreamTransferCompletion); syncContext.executeLater(reportTransportTerminated); reportTransportTerminated = null; } diff --git a/core/src/main/java/io/grpc/internal/DelayedStream.java b/core/src/main/java/io/grpc/internal/DelayedStream.java index 68464627320..05e285851c7 100644 --- a/core/src/main/java/io/grpc/internal/DelayedStream.java +++ b/core/src/main/java/io/grpc/internal/DelayedStream.java @@ -140,8 +140,8 @@ protected boolean isStreamTransferCompleted() { } protected void awaitStreamTransferCompletion() { - // Wait until accepted RPCs transfer to the real stream and so that we can properly cancel or - // shutdown. Not waiting transfer completed may cause pending calls orphaned. + // Wait until accepted RPCs transfer to the real stream so that we can properly cancel or + // shutdown. Not waiting for transfer completion may cause pending calls orphaned. #636. boolean delegationComplete; try { delegationComplete = realStreamStarted.await(5, TimeUnit.SECONDS); diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index 4a2ed84b858..54353edeb83 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -258,18 +258,18 @@ public void cancelStreamShutdownLastPending() { .thenReturn(PickResult.withSubchannel(mockSubchannel)); assertEquals(3, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getUncommittedStreamCount()); + assertEquals(0, delayedTransport.getUncommittedStreamsCount()); delayedTransport.reprocess(mockPicker); fakeExecutor.runDueTasks(); assertEquals(1, delayedTransport.getPendingStreamsCount()); - assertEquals(2, delayedTransport.getUncommittedStreamCount()); + assertEquals(2, delayedTransport.getUncommittedStreamsCount()); delayedTransport.shutdown(SHUTDOWN_STATUS); verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); verify(transportListener, never()).transportTerminated(); stream2.cancel(Status.CANCELLED); assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getUncommittedStreamCount()); + assertEquals(0, delayedTransport.getUncommittedStreamsCount()); assertFalse(Thread.interrupted()); verify(mockRealStream, times(2)).start(any(ClientStreamListener.class)); verify(mockRealStream, never()).cancel(any(Status.class));//substituted @@ -431,7 +431,7 @@ public void cancelStreamShutdownLastPending() { delayedTransport.reprocess(picker); assertEquals(5, delayedTransport.getPendingStreamsCount()); - assertEquals(3, delayedTransport.getUncommittedStreamCount()); + assertEquals(3, delayedTransport.getUncommittedStreamsCount()); inOrder.verify(picker).pickSubchannel(ff1args); inOrder.verify(picker).pickSubchannel(ff2args); inOrder.verify(picker).pickSubchannel(ff3args); @@ -478,7 +478,7 @@ public void cancelStreamShutdownLastPending() { delayedTransport.reprocess(picker); assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertEquals(7, delayedTransport.getUncommittedStreamCount());//+5(new) - 1(completed) + assertEquals(7, delayedTransport.getUncommittedStreamsCount());//+5(new) - 1(completed) verify(transportListener).transportInUse(false); inOrder.verify(picker).pickSubchannel(ff3args); // ff3 inOrder.verify(picker).pickSubchannel(ff4args); // ff4 @@ -488,7 +488,7 @@ public void cancelStreamShutdownLastPending() { inOrder.verifyNoMoreInteractions(); fakeExecutor.runDueTasks(); assertEquals(0, fakeExecutor.numPendingTasks()); - assertEquals(7, delayedTransport.getUncommittedStreamCount()); + assertEquals(7, delayedTransport.getUncommittedStreamsCount()); assertSame(mockRealStream, ff3.getRealStream()); assertSame(mockRealStream2, ff4.getRealStream()); assertSame(mockRealStream2, wfr2.getRealStream()); @@ -498,7 +498,7 @@ public void cancelStreamShutdownLastPending() { ff4.start(streamListener); wfr2.start(streamListener); delayedTransport.reprocess(picker); - assertEquals(4, delayedTransport.getUncommittedStreamCount());//+0(new) - 3(completed) + assertEquals(4, delayedTransport.getUncommittedStreamsCount());//+0(new) - 3(completed) // If there is an executor in the CallOptions, it will be used to create the real stream. assertNull(wfr3.getRealStream()); @@ -513,7 +513,7 @@ public void cancelStreamShutdownLastPending() { new PickSubchannelArgsImpl(method, headers, waitForReadyCallOptions)); inOrder.verifyNoMoreInteractions(); assertEquals(1, delayedTransport.getPendingStreamsCount()); - assertEquals(4, delayedTransport.getUncommittedStreamCount()); + assertEquals(4, delayedTransport.getUncommittedStreamsCount()); // wfr5 will stop delayed transport from terminating delayedTransport.shutdown(SHUTDOWN_STATUS); @@ -531,7 +531,7 @@ public void cancelStreamShutdownLastPending() { fakeExecutor.runDueTasks(); assertSame(mockRealStream, wfr5.getRealStream()); assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getUncommittedStreamCount()); + assertEquals(0, delayedTransport.getUncommittedStreamsCount()); verify(transportListener).transportTerminated(); assertTrue(Thread.interrupted()); } @@ -686,7 +686,7 @@ public void pendingStreamReprocessRacesShutdown() throws Exception { DelayedStream stream2 = (DelayedStream)delayedTransport.newStream(method2, headers2, callOptions2); assertEquals(2, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getUncommittedStreamCount()); + assertEquals(0, delayedTransport.getUncommittedStreamsCount()); when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(PickResult.withSubchannel(mockSubchannel)) .thenReturn(PickResult.withNoResult()) @@ -695,7 +695,7 @@ public void pendingStreamReprocessRacesShutdown() throws Exception { delayedTransport.reprocess(mockPicker); fakeExecutor.runDueTasks(); assertEquals(1, delayedTransport.getPendingStreamsCount());//stream2 - assertEquals(1, delayedTransport.getUncommittedStreamCount());//stream1 + assertEquals(1, delayedTransport.getUncommittedStreamsCount());//stream1 doAnswer(new Answer() { @Override @@ -721,7 +721,7 @@ public void run() { }; processThread.start(); assertEquals(1, delayedTransport.getPendingStreamsCount());//stream2 - assertEquals(1, delayedTransport.getUncommittedStreamCount());//stream1 + assertEquals(1, delayedTransport.getUncommittedStreamsCount());//stream1 barrierSignal.await(5, TimeUnit.SECONDS); assertEquals(1, barrier.getNumberWaiting()); delayedTransport.shutdownNow(SHUTDOWN_STATUS); @@ -742,16 +742,16 @@ public void uncommittedStreamReprocess() { assertTrue(stream instanceof DelayedStream); stream.start(streamListener); assertEquals(1, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getUncommittedStreamCount()); + assertEquals(0, delayedTransport.getUncommittedStreamsCount()); delayedTransport.reprocess(mockPicker); assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertEquals(1, delayedTransport.getUncommittedStreamCount()); + assertEquals(1, delayedTransport.getUncommittedStreamsCount()); assertFalse(((DelayedStream) stream).isStreamTransferCompleted()); fakeExecutor.runDueTasks(); assertTrue(((DelayedStream) stream).isStreamTransferCompleted()); delayedTransport.reprocess(mockPicker); - assertEquals(0, delayedTransport.getUncommittedStreamCount()); + assertEquals(0, delayedTransport.getUncommittedStreamsCount()); delayedTransport.shutdownNow(SHUTDOWN_STATUS); verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); @@ -769,21 +769,21 @@ public void uncommittedStreamShutdown() { DelayedStream stream = (DelayedStream) delayedTransport.newStream(method, headers, callOptions); stream.start(streamListener); assertEquals(1, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getUncommittedStreamCount()); + assertEquals(0, delayedTransport.getUncommittedStreamsCount()); delayedTransport.reprocess(mockPicker); assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertEquals(1, delayedTransport.getUncommittedStreamCount()); + assertEquals(1, delayedTransport.getUncommittedStreamsCount()); assertFalse(stream.isStreamTransferCompleted()); fakeExecutor.runDueTasks(); assertTrue(stream.isStreamTransferCompleted()); - assertEquals(1, delayedTransport.getUncommittedStreamCount()); + assertEquals(1, delayedTransport.getUncommittedStreamsCount()); delayedTransport.shutdown(SHUTDOWN_STATUS); verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); verify(transportListener, never()).transportTerminated(); delayedTransport.reprocess(null); - assertEquals(0, delayedTransport.getUncommittedStreamCount()); + assertEquals(0, delayedTransport.getUncommittedStreamsCount()); assertFalse(Thread.interrupted()); verify(mockRealStream).start(listenerCaptor.capture()); verifyNoMoreInteractions(streamListener); From eb188611d789c2fc0cdb7a6caa7dfbc4a6010e91 Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Sun, 13 Dec 2020 21:37:31 -0800 Subject: [PATCH 03/11] fix travis test --- core/src/main/java/io/grpc/internal/DelayedStream.java | 2 +- .../test/java/io/grpc/internal/DelayedClientTransportTest.java | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/DelayedStream.java b/core/src/main/java/io/grpc/internal/DelayedStream.java index 05e285851c7..a89425f90d8 100644 --- a/core/src/main/java/io/grpc/internal/DelayedStream.java +++ b/core/src/main/java/io/grpc/internal/DelayedStream.java @@ -141,7 +141,7 @@ protected boolean isStreamTransferCompleted() { protected void awaitStreamTransferCompletion() { // Wait until accepted RPCs transfer to the real stream so that we can properly cancel or - // shutdown. Not waiting for transfer completion may cause pending calls orphaned. #636. + // shutdown. Not waiting for transfer completion may cause pending calls orphaned. #6283. boolean delegationComplete; try { delegationComplete = realStreamStarted.await(5, TimeUnit.SECONDS); diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index 54353edeb83..571214df3a0 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -720,8 +720,6 @@ public void run() { } }; processThread.start(); - assertEquals(1, delayedTransport.getPendingStreamsCount());//stream2 - assertEquals(1, delayedTransport.getUncommittedStreamsCount());//stream1 barrierSignal.await(5, TimeUnit.SECONDS); assertEquals(1, barrier.getNumberWaiting()); delayedTransport.shutdownNow(SHUTDOWN_STATUS); @@ -729,6 +727,7 @@ public void run() { assertFalse(delayedTransport.hasUncommittedStreams()); assertFalse(Thread.interrupted()); barrier.await(5, TimeUnit.SECONDS); + processThread.join(5000); assertSame(mockRealStream, stream1.getRealStream()); assertTrue(stream2.getRealStream() instanceof NoopClientStream); verify(mockRealStream).start(any(ClientStreamListener.class)); From b1f553a81385a5b5a61033302305a84cc03f6f46 Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Mon, 14 Dec 2020 13:06:32 -0800 Subject: [PATCH 04/11] Fix syntax: immutability and started flag --- .../grpc/internal/DelayedClientTransport.java | 18 +++++----- .../java/io/grpc/internal/DelayedStream.java | 10 +++--- .../internal/DelayedClientTransportTest.java | 36 +++++++++---------- 3 files changed, 34 insertions(+), 30 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 2ecbdf48b78..4e873f436f1 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -68,12 +68,12 @@ final class DelayedClientTransport implements ManagedClientTransport { private Collection pendingStreams = new LinkedHashSet<>(); @GuardedBy("lock") private Collection toCheckCompletionStreams = new LinkedHashSet<>(); - private Runnable pollForStreamTransferCompletion = new Runnable() { + private final Runnable pollForStreamTransferCompletion = new Runnable() { @Override public void run() { - ArrayList savedToCheckCompletionStreams; + Collection savedToCheckCompletionStreams; synchronized (lock) { - savedToCheckCompletionStreams = new ArrayList<>(toCheckCompletionStreams); + savedToCheckCompletionStreams = toCheckCompletionStreams; if (!toCheckCompletionStreams.isEmpty()) { toCheckCompletionStreams = Collections.emptyList(); } @@ -291,7 +291,7 @@ final int getPendingStreamsCount() { } @VisibleForTesting - final int getUncommittedStreamsCount() { + final int getToCheckCompletionStreamsCount() { synchronized (lock) { return toCheckCompletionStreams.size(); } @@ -319,8 +319,7 @@ final void reprocess(@Nullable SubchannelPicker picker) { toCreateRealStream = new ArrayList<>(pendingStreams); toCheckCompletion = new ArrayList<>(toCheckCompletionStreams); } - ArrayList newlyCreated = new ArrayList<>(); - + final ArrayList newlyCreated = new ArrayList<>(); if (picker != null) { for (final PendingStream stream : toCreateRealStream) { @@ -347,7 +346,7 @@ public void run() { } } toCheckCompletion.addAll(newlyCreated); - ArrayList completed = new ArrayList<>(); + final ArrayList completed = new ArrayList<>(); for (final PendingStream stream : toCheckCompletion) { if (stream.isStreamTransferCompleted()) { completed.add(stream); @@ -364,10 +363,13 @@ public void run() { toCheckCompletionStreams.addAll(newlyCreated); toCheckCompletionStreams.removeAll(completed); // Because delayed transport is long-lived, we take this opportunity to down-size the - // hashmap. + // hashmaps. if (pendingStreams.isEmpty()) { pendingStreams = new LinkedHashSet<>(); } + if (toCheckCompletionStreams.isEmpty()) { + toCheckCompletionStreams = new LinkedHashSet<>(); + } if (!hasPendingStreams()) { // There may be a brief gap between delayed transport clearing in-use state, and first real // transport starting streams and setting in-use state. During the gap the whole channel's diff --git a/core/src/main/java/io/grpc/internal/DelayedStream.java b/core/src/main/java/io/grpc/internal/DelayedStream.java index a89425f90d8..cc0669218ed 100644 --- a/core/src/main/java/io/grpc/internal/DelayedStream.java +++ b/core/src/main/java/io/grpc/internal/DelayedStream.java @@ -62,6 +62,7 @@ class DelayedStream implements ClientStream { @GuardedBy("this") private long streamSetTimeNanos; private final CountDownLatch realStreamStarted = new CountDownLatch(1); + private volatile boolean started; @Override public void setMaxInboundMessageSize(final int maxSize) { @@ -136,7 +137,7 @@ final void setStream(ClientStream stream) { } protected boolean isStreamTransferCompleted() { - return realStreamStarted.getCount() == 0; + return started; } protected void awaitStreamTransferCompletion() { @@ -243,6 +244,7 @@ public void start(ClientStreamListener listener) { if (savedPassThrough) { realStream.start(listener); realStreamStarted.countDown(); + started = true; } else { final ClientStreamListener finalListener = listener; delayOrExecute(new Runnable() { @@ -250,6 +252,7 @@ public void start(ClientStreamListener listener) { public void run() { realStream.start(finalListener); realStreamStarted.countDown(); + started = true; } }); } @@ -325,9 +328,8 @@ public void run() { listenerToClose.closed(reason, new Metadata()); } drainPendingCalls(); - if (!isStreamTransferCompleted()) { - realStreamStarted.countDown(); - } + realStreamStarted.countDown(); + started = true; } awaitStreamTransferCompletion(); } diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index 571214df3a0..d0e66fa5bd6 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -258,18 +258,18 @@ public void cancelStreamShutdownLastPending() { .thenReturn(PickResult.withSubchannel(mockSubchannel)); assertEquals(3, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getUncommittedStreamsCount()); + assertEquals(0, delayedTransport.getToCheckCompletionStreamsCount()); delayedTransport.reprocess(mockPicker); fakeExecutor.runDueTasks(); assertEquals(1, delayedTransport.getPendingStreamsCount()); - assertEquals(2, delayedTransport.getUncommittedStreamsCount()); + assertEquals(2, delayedTransport.getToCheckCompletionStreamsCount()); delayedTransport.shutdown(SHUTDOWN_STATUS); verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); verify(transportListener, never()).transportTerminated(); stream2.cancel(Status.CANCELLED); assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getUncommittedStreamsCount()); + assertEquals(0, delayedTransport.getToCheckCompletionStreamsCount()); assertFalse(Thread.interrupted()); verify(mockRealStream, times(2)).start(any(ClientStreamListener.class)); verify(mockRealStream, never()).cancel(any(Status.class));//substituted @@ -431,7 +431,7 @@ public void cancelStreamShutdownLastPending() { delayedTransport.reprocess(picker); assertEquals(5, delayedTransport.getPendingStreamsCount()); - assertEquals(3, delayedTransport.getUncommittedStreamsCount()); + assertEquals(3, delayedTransport.getToCheckCompletionStreamsCount()); inOrder.verify(picker).pickSubchannel(ff1args); inOrder.verify(picker).pickSubchannel(ff2args); inOrder.verify(picker).pickSubchannel(ff3args); @@ -478,7 +478,7 @@ public void cancelStreamShutdownLastPending() { delayedTransport.reprocess(picker); assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertEquals(7, delayedTransport.getUncommittedStreamsCount());//+5(new) - 1(completed) + assertEquals(7, delayedTransport.getToCheckCompletionStreamsCount());//+5(new) - 1(completed) verify(transportListener).transportInUse(false); inOrder.verify(picker).pickSubchannel(ff3args); // ff3 inOrder.verify(picker).pickSubchannel(ff4args); // ff4 @@ -488,7 +488,7 @@ public void cancelStreamShutdownLastPending() { inOrder.verifyNoMoreInteractions(); fakeExecutor.runDueTasks(); assertEquals(0, fakeExecutor.numPendingTasks()); - assertEquals(7, delayedTransport.getUncommittedStreamsCount()); + assertEquals(7, delayedTransport.getToCheckCompletionStreamsCount()); assertSame(mockRealStream, ff3.getRealStream()); assertSame(mockRealStream2, ff4.getRealStream()); assertSame(mockRealStream2, wfr2.getRealStream()); @@ -498,7 +498,7 @@ public void cancelStreamShutdownLastPending() { ff4.start(streamListener); wfr2.start(streamListener); delayedTransport.reprocess(picker); - assertEquals(4, delayedTransport.getUncommittedStreamsCount());//+0(new) - 3(completed) + assertEquals(4, delayedTransport.getToCheckCompletionStreamsCount());//+0(new) - 3(completed) // If there is an executor in the CallOptions, it will be used to create the real stream. assertNull(wfr3.getRealStream()); @@ -513,7 +513,7 @@ public void cancelStreamShutdownLastPending() { new PickSubchannelArgsImpl(method, headers, waitForReadyCallOptions)); inOrder.verifyNoMoreInteractions(); assertEquals(1, delayedTransport.getPendingStreamsCount()); - assertEquals(4, delayedTransport.getUncommittedStreamsCount()); + assertEquals(4, delayedTransport.getToCheckCompletionStreamsCount()); // wfr5 will stop delayed transport from terminating delayedTransport.shutdown(SHUTDOWN_STATUS); @@ -531,7 +531,7 @@ public void cancelStreamShutdownLastPending() { fakeExecutor.runDueTasks(); assertSame(mockRealStream, wfr5.getRealStream()); assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getUncommittedStreamsCount()); + assertEquals(0, delayedTransport.getToCheckCompletionStreamsCount()); verify(transportListener).transportTerminated(); assertTrue(Thread.interrupted()); } @@ -686,7 +686,7 @@ public void pendingStreamReprocessRacesShutdown() throws Exception { DelayedStream stream2 = (DelayedStream)delayedTransport.newStream(method2, headers2, callOptions2); assertEquals(2, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getUncommittedStreamsCount()); + assertEquals(0, delayedTransport.getToCheckCompletionStreamsCount()); when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(PickResult.withSubchannel(mockSubchannel)) .thenReturn(PickResult.withNoResult()) @@ -695,7 +695,7 @@ public void pendingStreamReprocessRacesShutdown() throws Exception { delayedTransport.reprocess(mockPicker); fakeExecutor.runDueTasks(); assertEquals(1, delayedTransport.getPendingStreamsCount());//stream2 - assertEquals(1, delayedTransport.getUncommittedStreamsCount());//stream1 + assertEquals(1, delayedTransport.getToCheckCompletionStreamsCount());//stream1 doAnswer(new Answer() { @Override @@ -741,16 +741,16 @@ public void uncommittedStreamReprocess() { assertTrue(stream instanceof DelayedStream); stream.start(streamListener); assertEquals(1, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getUncommittedStreamsCount()); + assertEquals(0, delayedTransport.getToCheckCompletionStreamsCount()); delayedTransport.reprocess(mockPicker); assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertEquals(1, delayedTransport.getUncommittedStreamsCount()); + assertEquals(1, delayedTransport.getToCheckCompletionStreamsCount()); assertFalse(((DelayedStream) stream).isStreamTransferCompleted()); fakeExecutor.runDueTasks(); assertTrue(((DelayedStream) stream).isStreamTransferCompleted()); delayedTransport.reprocess(mockPicker); - assertEquals(0, delayedTransport.getUncommittedStreamsCount()); + assertEquals(0, delayedTransport.getToCheckCompletionStreamsCount()); delayedTransport.shutdownNow(SHUTDOWN_STATUS); verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); @@ -768,21 +768,21 @@ public void uncommittedStreamShutdown() { DelayedStream stream = (DelayedStream) delayedTransport.newStream(method, headers, callOptions); stream.start(streamListener); assertEquals(1, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getUncommittedStreamsCount()); + assertEquals(0, delayedTransport.getToCheckCompletionStreamsCount()); delayedTransport.reprocess(mockPicker); assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertEquals(1, delayedTransport.getUncommittedStreamsCount()); + assertEquals(1, delayedTransport.getToCheckCompletionStreamsCount()); assertFalse(stream.isStreamTransferCompleted()); fakeExecutor.runDueTasks(); assertTrue(stream.isStreamTransferCompleted()); - assertEquals(1, delayedTransport.getUncommittedStreamsCount()); + assertEquals(1, delayedTransport.getToCheckCompletionStreamsCount()); delayedTransport.shutdown(SHUTDOWN_STATUS); verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); verify(transportListener, never()).transportTerminated(); delayedTransport.reprocess(null); - assertEquals(0, delayedTransport.getUncommittedStreamsCount()); + assertEquals(0, delayedTransport.getToCheckCompletionStreamsCount()); assertFalse(Thread.interrupted()); verify(mockRealStream).start(listenerCaptor.capture()); verifyNoMoreInteractions(streamListener); From 81ab463a3acfb4ef193d83dce8f22afabd24490c Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Tue, 15 Dec 2020 21:06:22 -0800 Subject: [PATCH 05/11] Revert "Fix syntax: immutability and started flag" This reverts commit b1f553a81385a5b5a61033302305a84cc03f6f46. --- .../grpc/internal/DelayedClientTransport.java | 18 +++++----- .../java/io/grpc/internal/DelayedStream.java | 10 +++--- .../internal/DelayedClientTransportTest.java | 36 +++++++++---------- 3 files changed, 30 insertions(+), 34 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 4e873f436f1..2ecbdf48b78 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -68,12 +68,12 @@ final class DelayedClientTransport implements ManagedClientTransport { private Collection pendingStreams = new LinkedHashSet<>(); @GuardedBy("lock") private Collection toCheckCompletionStreams = new LinkedHashSet<>(); - private final Runnable pollForStreamTransferCompletion = new Runnable() { + private Runnable pollForStreamTransferCompletion = new Runnable() { @Override public void run() { - Collection savedToCheckCompletionStreams; + ArrayList savedToCheckCompletionStreams; synchronized (lock) { - savedToCheckCompletionStreams = toCheckCompletionStreams; + savedToCheckCompletionStreams = new ArrayList<>(toCheckCompletionStreams); if (!toCheckCompletionStreams.isEmpty()) { toCheckCompletionStreams = Collections.emptyList(); } @@ -291,7 +291,7 @@ final int getPendingStreamsCount() { } @VisibleForTesting - final int getToCheckCompletionStreamsCount() { + final int getUncommittedStreamsCount() { synchronized (lock) { return toCheckCompletionStreams.size(); } @@ -319,7 +319,8 @@ final void reprocess(@Nullable SubchannelPicker picker) { toCreateRealStream = new ArrayList<>(pendingStreams); toCheckCompletion = new ArrayList<>(toCheckCompletionStreams); } - final ArrayList newlyCreated = new ArrayList<>(); + ArrayList newlyCreated = new ArrayList<>(); + if (picker != null) { for (final PendingStream stream : toCreateRealStream) { @@ -346,7 +347,7 @@ public void run() { } } toCheckCompletion.addAll(newlyCreated); - final ArrayList completed = new ArrayList<>(); + ArrayList completed = new ArrayList<>(); for (final PendingStream stream : toCheckCompletion) { if (stream.isStreamTransferCompleted()) { completed.add(stream); @@ -363,13 +364,10 @@ public void run() { toCheckCompletionStreams.addAll(newlyCreated); toCheckCompletionStreams.removeAll(completed); // Because delayed transport is long-lived, we take this opportunity to down-size the - // hashmaps. + // hashmap. if (pendingStreams.isEmpty()) { pendingStreams = new LinkedHashSet<>(); } - if (toCheckCompletionStreams.isEmpty()) { - toCheckCompletionStreams = new LinkedHashSet<>(); - } if (!hasPendingStreams()) { // There may be a brief gap between delayed transport clearing in-use state, and first real // transport starting streams and setting in-use state. During the gap the whole channel's diff --git a/core/src/main/java/io/grpc/internal/DelayedStream.java b/core/src/main/java/io/grpc/internal/DelayedStream.java index cc0669218ed..a89425f90d8 100644 --- a/core/src/main/java/io/grpc/internal/DelayedStream.java +++ b/core/src/main/java/io/grpc/internal/DelayedStream.java @@ -62,7 +62,6 @@ class DelayedStream implements ClientStream { @GuardedBy("this") private long streamSetTimeNanos; private final CountDownLatch realStreamStarted = new CountDownLatch(1); - private volatile boolean started; @Override public void setMaxInboundMessageSize(final int maxSize) { @@ -137,7 +136,7 @@ final void setStream(ClientStream stream) { } protected boolean isStreamTransferCompleted() { - return started; + return realStreamStarted.getCount() == 0; } protected void awaitStreamTransferCompletion() { @@ -244,7 +243,6 @@ public void start(ClientStreamListener listener) { if (savedPassThrough) { realStream.start(listener); realStreamStarted.countDown(); - started = true; } else { final ClientStreamListener finalListener = listener; delayOrExecute(new Runnable() { @@ -252,7 +250,6 @@ public void start(ClientStreamListener listener) { public void run() { realStream.start(finalListener); realStreamStarted.countDown(); - started = true; } }); } @@ -328,8 +325,9 @@ public void run() { listenerToClose.closed(reason, new Metadata()); } drainPendingCalls(); - realStreamStarted.countDown(); - started = true; + if (!isStreamTransferCompleted()) { + realStreamStarted.countDown(); + } } awaitStreamTransferCompletion(); } diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index d0e66fa5bd6..571214df3a0 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -258,18 +258,18 @@ public void cancelStreamShutdownLastPending() { .thenReturn(PickResult.withSubchannel(mockSubchannel)); assertEquals(3, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getToCheckCompletionStreamsCount()); + assertEquals(0, delayedTransport.getUncommittedStreamsCount()); delayedTransport.reprocess(mockPicker); fakeExecutor.runDueTasks(); assertEquals(1, delayedTransport.getPendingStreamsCount()); - assertEquals(2, delayedTransport.getToCheckCompletionStreamsCount()); + assertEquals(2, delayedTransport.getUncommittedStreamsCount()); delayedTransport.shutdown(SHUTDOWN_STATUS); verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); verify(transportListener, never()).transportTerminated(); stream2.cancel(Status.CANCELLED); assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getToCheckCompletionStreamsCount()); + assertEquals(0, delayedTransport.getUncommittedStreamsCount()); assertFalse(Thread.interrupted()); verify(mockRealStream, times(2)).start(any(ClientStreamListener.class)); verify(mockRealStream, never()).cancel(any(Status.class));//substituted @@ -431,7 +431,7 @@ public void cancelStreamShutdownLastPending() { delayedTransport.reprocess(picker); assertEquals(5, delayedTransport.getPendingStreamsCount()); - assertEquals(3, delayedTransport.getToCheckCompletionStreamsCount()); + assertEquals(3, delayedTransport.getUncommittedStreamsCount()); inOrder.verify(picker).pickSubchannel(ff1args); inOrder.verify(picker).pickSubchannel(ff2args); inOrder.verify(picker).pickSubchannel(ff3args); @@ -478,7 +478,7 @@ public void cancelStreamShutdownLastPending() { delayedTransport.reprocess(picker); assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertEquals(7, delayedTransport.getToCheckCompletionStreamsCount());//+5(new) - 1(completed) + assertEquals(7, delayedTransport.getUncommittedStreamsCount());//+5(new) - 1(completed) verify(transportListener).transportInUse(false); inOrder.verify(picker).pickSubchannel(ff3args); // ff3 inOrder.verify(picker).pickSubchannel(ff4args); // ff4 @@ -488,7 +488,7 @@ public void cancelStreamShutdownLastPending() { inOrder.verifyNoMoreInteractions(); fakeExecutor.runDueTasks(); assertEquals(0, fakeExecutor.numPendingTasks()); - assertEquals(7, delayedTransport.getToCheckCompletionStreamsCount()); + assertEquals(7, delayedTransport.getUncommittedStreamsCount()); assertSame(mockRealStream, ff3.getRealStream()); assertSame(mockRealStream2, ff4.getRealStream()); assertSame(mockRealStream2, wfr2.getRealStream()); @@ -498,7 +498,7 @@ public void cancelStreamShutdownLastPending() { ff4.start(streamListener); wfr2.start(streamListener); delayedTransport.reprocess(picker); - assertEquals(4, delayedTransport.getToCheckCompletionStreamsCount());//+0(new) - 3(completed) + assertEquals(4, delayedTransport.getUncommittedStreamsCount());//+0(new) - 3(completed) // If there is an executor in the CallOptions, it will be used to create the real stream. assertNull(wfr3.getRealStream()); @@ -513,7 +513,7 @@ public void cancelStreamShutdownLastPending() { new PickSubchannelArgsImpl(method, headers, waitForReadyCallOptions)); inOrder.verifyNoMoreInteractions(); assertEquals(1, delayedTransport.getPendingStreamsCount()); - assertEquals(4, delayedTransport.getToCheckCompletionStreamsCount()); + assertEquals(4, delayedTransport.getUncommittedStreamsCount()); // wfr5 will stop delayed transport from terminating delayedTransport.shutdown(SHUTDOWN_STATUS); @@ -531,7 +531,7 @@ public void cancelStreamShutdownLastPending() { fakeExecutor.runDueTasks(); assertSame(mockRealStream, wfr5.getRealStream()); assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getToCheckCompletionStreamsCount()); + assertEquals(0, delayedTransport.getUncommittedStreamsCount()); verify(transportListener).transportTerminated(); assertTrue(Thread.interrupted()); } @@ -686,7 +686,7 @@ public void pendingStreamReprocessRacesShutdown() throws Exception { DelayedStream stream2 = (DelayedStream)delayedTransport.newStream(method2, headers2, callOptions2); assertEquals(2, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getToCheckCompletionStreamsCount()); + assertEquals(0, delayedTransport.getUncommittedStreamsCount()); when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(PickResult.withSubchannel(mockSubchannel)) .thenReturn(PickResult.withNoResult()) @@ -695,7 +695,7 @@ public void pendingStreamReprocessRacesShutdown() throws Exception { delayedTransport.reprocess(mockPicker); fakeExecutor.runDueTasks(); assertEquals(1, delayedTransport.getPendingStreamsCount());//stream2 - assertEquals(1, delayedTransport.getToCheckCompletionStreamsCount());//stream1 + assertEquals(1, delayedTransport.getUncommittedStreamsCount());//stream1 doAnswer(new Answer() { @Override @@ -741,16 +741,16 @@ public void uncommittedStreamReprocess() { assertTrue(stream instanceof DelayedStream); stream.start(streamListener); assertEquals(1, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getToCheckCompletionStreamsCount()); + assertEquals(0, delayedTransport.getUncommittedStreamsCount()); delayedTransport.reprocess(mockPicker); assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertEquals(1, delayedTransport.getToCheckCompletionStreamsCount()); + assertEquals(1, delayedTransport.getUncommittedStreamsCount()); assertFalse(((DelayedStream) stream).isStreamTransferCompleted()); fakeExecutor.runDueTasks(); assertTrue(((DelayedStream) stream).isStreamTransferCompleted()); delayedTransport.reprocess(mockPicker); - assertEquals(0, delayedTransport.getToCheckCompletionStreamsCount()); + assertEquals(0, delayedTransport.getUncommittedStreamsCount()); delayedTransport.shutdownNow(SHUTDOWN_STATUS); verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); @@ -768,21 +768,21 @@ public void uncommittedStreamShutdown() { DelayedStream stream = (DelayedStream) delayedTransport.newStream(method, headers, callOptions); stream.start(streamListener); assertEquals(1, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getToCheckCompletionStreamsCount()); + assertEquals(0, delayedTransport.getUncommittedStreamsCount()); delayedTransport.reprocess(mockPicker); assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertEquals(1, delayedTransport.getToCheckCompletionStreamsCount()); + assertEquals(1, delayedTransport.getUncommittedStreamsCount()); assertFalse(stream.isStreamTransferCompleted()); fakeExecutor.runDueTasks(); assertTrue(stream.isStreamTransferCompleted()); - assertEquals(1, delayedTransport.getToCheckCompletionStreamsCount()); + assertEquals(1, delayedTransport.getUncommittedStreamsCount()); delayedTransport.shutdown(SHUTDOWN_STATUS); verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); verify(transportListener, never()).transportTerminated(); delayedTransport.reprocess(null); - assertEquals(0, delayedTransport.getToCheckCompletionStreamsCount()); + assertEquals(0, delayedTransport.getUncommittedStreamsCount()); assertFalse(Thread.interrupted()); verify(mockRealStream).start(listenerCaptor.capture()); verifyNoMoreInteractions(streamListener); From 193eb481ed3e82257355ef1569a3bc8758d7dbe3 Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Tue, 15 Dec 2020 21:06:26 -0800 Subject: [PATCH 06/11] Revert "fix travis test" This reverts commit eb188611d789c2fc0cdb7a6caa7dfbc4a6010e91. --- core/src/main/java/io/grpc/internal/DelayedStream.java | 2 +- .../test/java/io/grpc/internal/DelayedClientTransportTest.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/DelayedStream.java b/core/src/main/java/io/grpc/internal/DelayedStream.java index a89425f90d8..05e285851c7 100644 --- a/core/src/main/java/io/grpc/internal/DelayedStream.java +++ b/core/src/main/java/io/grpc/internal/DelayedStream.java @@ -141,7 +141,7 @@ protected boolean isStreamTransferCompleted() { protected void awaitStreamTransferCompletion() { // Wait until accepted RPCs transfer to the real stream so that we can properly cancel or - // shutdown. Not waiting for transfer completion may cause pending calls orphaned. #6283. + // shutdown. Not waiting for transfer completion may cause pending calls orphaned. #636. boolean delegationComplete; try { delegationComplete = realStreamStarted.await(5, TimeUnit.SECONDS); diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index 571214df3a0..54353edeb83 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -720,6 +720,8 @@ public void run() { } }; processThread.start(); + assertEquals(1, delayedTransport.getPendingStreamsCount());//stream2 + assertEquals(1, delayedTransport.getUncommittedStreamsCount());//stream1 barrierSignal.await(5, TimeUnit.SECONDS); assertEquals(1, barrier.getNumberWaiting()); delayedTransport.shutdownNow(SHUTDOWN_STATUS); @@ -727,7 +729,6 @@ public void run() { assertFalse(delayedTransport.hasUncommittedStreams()); assertFalse(Thread.interrupted()); barrier.await(5, TimeUnit.SECONDS); - processThread.join(5000); assertSame(mockRealStream, stream1.getRealStream()); assertTrue(stream2.getRealStream() instanceof NoopClientStream); verify(mockRealStream).start(any(ClientStreamListener.class)); From 930437585c0d98b228173973f15b9a0ff4e5dfc9 Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Tue, 15 Dec 2020 21:06:28 -0800 Subject: [PATCH 07/11] Revert "rename" This reverts commit 1fe3bcea1d62dba739e36ac730caad1a78769c97. --- .../grpc/internal/DelayedClientTransport.java | 10 ++--- .../java/io/grpc/internal/DelayedStream.java | 4 +- .../internal/DelayedClientTransportTest.java | 38 +++++++++---------- 3 files changed, 26 insertions(+), 26 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 2ecbdf48b78..24168249643 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -68,7 +68,7 @@ final class DelayedClientTransport implements ManagedClientTransport { private Collection pendingStreams = new LinkedHashSet<>(); @GuardedBy("lock") private Collection toCheckCompletionStreams = new LinkedHashSet<>(); - private Runnable pollForStreamTransferCompletion = new Runnable() { + private Runnable pollForStreamTransferComplete = new Runnable() { @Override public void run() { ArrayList savedToCheckCompletionStreams; @@ -291,7 +291,7 @@ final int getPendingStreamsCount() { } @VisibleForTesting - final int getUncommittedStreamsCount() { + final int getUncommittedStreamCount() { synchronized (lock) { return toCheckCompletionStreams.size(); } @@ -373,10 +373,10 @@ public void run() { // transport starting streams and setting in-use state. During the gap the whole channel's // in-use state may be false. However, it shouldn't cause spurious switching to idleness // (which would shutdown the transports and LoadBalancer) because the gap should be shorter - // than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (30 millis). + // than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second). syncContext.executeLater(reportTransportNotInUse); if (shutdownStatus != null && reportTransportTerminated != null) { - syncContext.executeLater(pollForStreamTransferCompletion); + syncContext.executeLater(pollForStreamTransferComplete); syncContext.executeLater(reportTransportTerminated); reportTransportTerminated = null; } @@ -419,7 +419,7 @@ public void cancel(Status reason) { if (!hasPendingStreams() && justRemovedAnElement) { syncContext.executeLater(reportTransportNotInUse); if (shutdownStatus != null) { - syncContext.executeLater(pollForStreamTransferCompletion); + syncContext.executeLater(pollForStreamTransferComplete); syncContext.executeLater(reportTransportTerminated); reportTransportTerminated = null; } diff --git a/core/src/main/java/io/grpc/internal/DelayedStream.java b/core/src/main/java/io/grpc/internal/DelayedStream.java index 05e285851c7..68464627320 100644 --- a/core/src/main/java/io/grpc/internal/DelayedStream.java +++ b/core/src/main/java/io/grpc/internal/DelayedStream.java @@ -140,8 +140,8 @@ protected boolean isStreamTransferCompleted() { } protected void awaitStreamTransferCompletion() { - // Wait until accepted RPCs transfer to the real stream so that we can properly cancel or - // shutdown. Not waiting for transfer completion may cause pending calls orphaned. #636. + // Wait until accepted RPCs transfer to the real stream and so that we can properly cancel or + // shutdown. Not waiting transfer completed may cause pending calls orphaned. boolean delegationComplete; try { delegationComplete = realStreamStarted.await(5, TimeUnit.SECONDS); diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index 54353edeb83..4a2ed84b858 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -258,18 +258,18 @@ public void cancelStreamShutdownLastPending() { .thenReturn(PickResult.withSubchannel(mockSubchannel)); assertEquals(3, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getUncommittedStreamsCount()); + assertEquals(0, delayedTransport.getUncommittedStreamCount()); delayedTransport.reprocess(mockPicker); fakeExecutor.runDueTasks(); assertEquals(1, delayedTransport.getPendingStreamsCount()); - assertEquals(2, delayedTransport.getUncommittedStreamsCount()); + assertEquals(2, delayedTransport.getUncommittedStreamCount()); delayedTransport.shutdown(SHUTDOWN_STATUS); verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); verify(transportListener, never()).transportTerminated(); stream2.cancel(Status.CANCELLED); assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getUncommittedStreamsCount()); + assertEquals(0, delayedTransport.getUncommittedStreamCount()); assertFalse(Thread.interrupted()); verify(mockRealStream, times(2)).start(any(ClientStreamListener.class)); verify(mockRealStream, never()).cancel(any(Status.class));//substituted @@ -431,7 +431,7 @@ public void cancelStreamShutdownLastPending() { delayedTransport.reprocess(picker); assertEquals(5, delayedTransport.getPendingStreamsCount()); - assertEquals(3, delayedTransport.getUncommittedStreamsCount()); + assertEquals(3, delayedTransport.getUncommittedStreamCount()); inOrder.verify(picker).pickSubchannel(ff1args); inOrder.verify(picker).pickSubchannel(ff2args); inOrder.verify(picker).pickSubchannel(ff3args); @@ -478,7 +478,7 @@ public void cancelStreamShutdownLastPending() { delayedTransport.reprocess(picker); assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertEquals(7, delayedTransport.getUncommittedStreamsCount());//+5(new) - 1(completed) + assertEquals(7, delayedTransport.getUncommittedStreamCount());//+5(new) - 1(completed) verify(transportListener).transportInUse(false); inOrder.verify(picker).pickSubchannel(ff3args); // ff3 inOrder.verify(picker).pickSubchannel(ff4args); // ff4 @@ -488,7 +488,7 @@ public void cancelStreamShutdownLastPending() { inOrder.verifyNoMoreInteractions(); fakeExecutor.runDueTasks(); assertEquals(0, fakeExecutor.numPendingTasks()); - assertEquals(7, delayedTransport.getUncommittedStreamsCount()); + assertEquals(7, delayedTransport.getUncommittedStreamCount()); assertSame(mockRealStream, ff3.getRealStream()); assertSame(mockRealStream2, ff4.getRealStream()); assertSame(mockRealStream2, wfr2.getRealStream()); @@ -498,7 +498,7 @@ public void cancelStreamShutdownLastPending() { ff4.start(streamListener); wfr2.start(streamListener); delayedTransport.reprocess(picker); - assertEquals(4, delayedTransport.getUncommittedStreamsCount());//+0(new) - 3(completed) + assertEquals(4, delayedTransport.getUncommittedStreamCount());//+0(new) - 3(completed) // If there is an executor in the CallOptions, it will be used to create the real stream. assertNull(wfr3.getRealStream()); @@ -513,7 +513,7 @@ public void cancelStreamShutdownLastPending() { new PickSubchannelArgsImpl(method, headers, waitForReadyCallOptions)); inOrder.verifyNoMoreInteractions(); assertEquals(1, delayedTransport.getPendingStreamsCount()); - assertEquals(4, delayedTransport.getUncommittedStreamsCount()); + assertEquals(4, delayedTransport.getUncommittedStreamCount()); // wfr5 will stop delayed transport from terminating delayedTransport.shutdown(SHUTDOWN_STATUS); @@ -531,7 +531,7 @@ public void cancelStreamShutdownLastPending() { fakeExecutor.runDueTasks(); assertSame(mockRealStream, wfr5.getRealStream()); assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getUncommittedStreamsCount()); + assertEquals(0, delayedTransport.getUncommittedStreamCount()); verify(transportListener).transportTerminated(); assertTrue(Thread.interrupted()); } @@ -686,7 +686,7 @@ public void pendingStreamReprocessRacesShutdown() throws Exception { DelayedStream stream2 = (DelayedStream)delayedTransport.newStream(method2, headers2, callOptions2); assertEquals(2, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getUncommittedStreamsCount()); + assertEquals(0, delayedTransport.getUncommittedStreamCount()); when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(PickResult.withSubchannel(mockSubchannel)) .thenReturn(PickResult.withNoResult()) @@ -695,7 +695,7 @@ public void pendingStreamReprocessRacesShutdown() throws Exception { delayedTransport.reprocess(mockPicker); fakeExecutor.runDueTasks(); assertEquals(1, delayedTransport.getPendingStreamsCount());//stream2 - assertEquals(1, delayedTransport.getUncommittedStreamsCount());//stream1 + assertEquals(1, delayedTransport.getUncommittedStreamCount());//stream1 doAnswer(new Answer() { @Override @@ -721,7 +721,7 @@ public void run() { }; processThread.start(); assertEquals(1, delayedTransport.getPendingStreamsCount());//stream2 - assertEquals(1, delayedTransport.getUncommittedStreamsCount());//stream1 + assertEquals(1, delayedTransport.getUncommittedStreamCount());//stream1 barrierSignal.await(5, TimeUnit.SECONDS); assertEquals(1, barrier.getNumberWaiting()); delayedTransport.shutdownNow(SHUTDOWN_STATUS); @@ -742,16 +742,16 @@ public void uncommittedStreamReprocess() { assertTrue(stream instanceof DelayedStream); stream.start(streamListener); assertEquals(1, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getUncommittedStreamsCount()); + assertEquals(0, delayedTransport.getUncommittedStreamCount()); delayedTransport.reprocess(mockPicker); assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertEquals(1, delayedTransport.getUncommittedStreamsCount()); + assertEquals(1, delayedTransport.getUncommittedStreamCount()); assertFalse(((DelayedStream) stream).isStreamTransferCompleted()); fakeExecutor.runDueTasks(); assertTrue(((DelayedStream) stream).isStreamTransferCompleted()); delayedTransport.reprocess(mockPicker); - assertEquals(0, delayedTransport.getUncommittedStreamsCount()); + assertEquals(0, delayedTransport.getUncommittedStreamCount()); delayedTransport.shutdownNow(SHUTDOWN_STATUS); verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); @@ -769,21 +769,21 @@ public void uncommittedStreamShutdown() { DelayedStream stream = (DelayedStream) delayedTransport.newStream(method, headers, callOptions); stream.start(streamListener); assertEquals(1, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getUncommittedStreamsCount()); + assertEquals(0, delayedTransport.getUncommittedStreamCount()); delayedTransport.reprocess(mockPicker); assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertEquals(1, delayedTransport.getUncommittedStreamsCount()); + assertEquals(1, delayedTransport.getUncommittedStreamCount()); assertFalse(stream.isStreamTransferCompleted()); fakeExecutor.runDueTasks(); assertTrue(stream.isStreamTransferCompleted()); - assertEquals(1, delayedTransport.getUncommittedStreamsCount()); + assertEquals(1, delayedTransport.getUncommittedStreamCount()); delayedTransport.shutdown(SHUTDOWN_STATUS); verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); verify(transportListener, never()).transportTerminated(); delayedTransport.reprocess(null); - assertEquals(0, delayedTransport.getUncommittedStreamsCount()); + assertEquals(0, delayedTransport.getUncommittedStreamCount()); assertFalse(Thread.interrupted()); verify(mockRealStream).start(listenerCaptor.capture()); verifyNoMoreInteractions(streamListener); From 56374ddb865ca87986ab76f26b53013cfb59980c Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Tue, 15 Dec 2020 21:06:29 -0800 Subject: [PATCH 08/11] Revert "check pending stream completion at delayed transport lifecycle" This reverts commit ac999724b9610266a18d162830aaff7e6bef5d72. --- .../grpc/internal/DelayedClientTransport.java | 103 +++------- .../java/io/grpc/internal/DelayedStream.java | 27 --- .../internal/DelayedClientTransportTest.java | 194 +----------------- .../io/grpc/internal/DelayedStreamTest.java | 33 --- .../grpc/internal/ManagedChannelImplTest.java | 3 - 5 files changed, 26 insertions(+), 334 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 24168249643..3922ee5b89e 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -66,23 +66,6 @@ final class DelayedClientTransport implements ManagedClientTransport { @Nonnull @GuardedBy("lock") private Collection pendingStreams = new LinkedHashSet<>(); - @GuardedBy("lock") - private Collection toCheckCompletionStreams = new LinkedHashSet<>(); - private Runnable pollForStreamTransferComplete = new Runnable() { - @Override - public void run() { - ArrayList savedToCheckCompletionStreams; - synchronized (lock) { - savedToCheckCompletionStreams = new ArrayList<>(toCheckCompletionStreams); - if (!toCheckCompletionStreams.isEmpty()) { - toCheckCompletionStreams = Collections.emptyList(); - } - } - for (final PendingStream stream : savedToCheckCompletionStreams) { - stream.awaitStreamTransferCompletion(); - } - } - }; /** * When {@code shutdownStatus != null && !hasPendingStreams()}, then the transport is considered @@ -228,7 +211,7 @@ public void run() { listener.transportShutdown(status); } }); - if (!hasPendingStreams() && !hasUncommittedStreams() && reportTransportTerminated != null) { + if (!hasPendingStreams() && reportTransportTerminated != null) { syncContext.executeLater(reportTransportTerminated); reportTransportTerminated = null; } @@ -244,27 +227,19 @@ public void run() { public final void shutdownNow(Status status) { shutdown(status); Collection savedPendingStreams; - Collection savedToCheckCompletionStreams; Runnable savedReportTransportTerminated; synchronized (lock) { savedPendingStreams = pendingStreams; - savedToCheckCompletionStreams = toCheckCompletionStreams; savedReportTransportTerminated = reportTransportTerminated; reportTransportTerminated = null; if (!pendingStreams.isEmpty()) { pendingStreams = Collections.emptyList(); } - if (!toCheckCompletionStreams.isEmpty()) { - toCheckCompletionStreams = Collections.emptyList(); - } } if (savedReportTransportTerminated != null) { for (PendingStream stream : savedPendingStreams) { stream.cancel(status); } - for (PendingStream stream : savedToCheckCompletionStreams) { - stream.awaitStreamTransferCompletion(); - } syncContext.execute(savedReportTransportTerminated); } // If savedReportTransportTerminated == null, transportTerminated() has already been called in @@ -277,12 +252,6 @@ public final boolean hasPendingStreams() { } } - public final boolean hasUncommittedStreams() { - synchronized (lock) { - return !toCheckCompletionStreams.isEmpty(); - } - } - @VisibleForTesting final int getPendingStreamsCount() { synchronized (lock) { @@ -290,13 +259,6 @@ final int getPendingStreamsCount() { } } - @VisibleForTesting - final int getUncommittedStreamCount() { - synchronized (lock) { - return toCheckCompletionStreams.size(); - } - } - /** * Use the picker to try picking a transport for every pending stream, proceed the stream if the * pick is successful, otherwise keep it pending. @@ -308,61 +270,48 @@ final int getUncommittedStreamCount() { *

This method must not be called concurrently with itself. */ final void reprocess(@Nullable SubchannelPicker picker) { - ArrayList toCreateRealStream; - ArrayList toCheckCompletion; + ArrayList toProcess; synchronized (lock) { lastPicker = picker; lastPickerVersion++; - if ((picker == null || !hasPendingStreams()) && !hasUncommittedStreams()) { + if (picker == null || !hasPendingStreams()) { return; } - toCreateRealStream = new ArrayList<>(pendingStreams); - toCheckCompletion = new ArrayList<>(toCheckCompletionStreams); + toProcess = new ArrayList<>(pendingStreams); } - ArrayList newlyCreated = new ArrayList<>(); + ArrayList toRemove = new ArrayList<>(); - - if (picker != null) { - for (final PendingStream stream : toCreateRealStream) { - PickResult pickResult = picker.pickSubchannel(stream.args); - CallOptions callOptions = stream.args.getCallOptions(); - final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, - callOptions.isWaitForReady()); - if (transport != null) { - Executor executor = defaultAppExecutor; - // createRealStream may be expensive. It will start real streams on the transport. If - // there are pending requests, they will be serialized too, which may be expensive. Since - // we are now on transport thread, we need to offload the work to an executor. - if (callOptions.getExecutor() != null) { - executor = callOptions.getExecutor(); - } - executor.execute(new Runnable() { + for (final PendingStream stream : toProcess) { + PickResult pickResult = picker.pickSubchannel(stream.args); + CallOptions callOptions = stream.args.getCallOptions(); + final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, + callOptions.isWaitForReady()); + if (transport != null) { + Executor executor = defaultAppExecutor; + // createRealStream may be expensive. It will start real streams on the transport. If + // there are pending requests, they will be serialized too, which may be expensive. Since + // we are now on transport thread, we need to offload the work to an executor. + if (callOptions.getExecutor() != null) { + executor = callOptions.getExecutor(); + } + executor.execute(new Runnable() { @Override public void run() { stream.createRealStream(transport); } }); - newlyCreated.add(stream); - } // else: stay pending - } - } - toCheckCompletion.addAll(newlyCreated); - ArrayList completed = new ArrayList<>(); - for (final PendingStream stream : toCheckCompletion) { - if (stream.isStreamTransferCompleted()) { - completed.add(stream); - } + toRemove.add(stream); + } // else: stay pending } + synchronized (lock) { // Between this synchronized and the previous one: // - Streams may have been cancelled, which may turn pendingStreams into emptiness. - // - shutdownNow() may be called, which may turn pendingStreams into emptiness. - if (!hasPendingStreams() && !hasUncommittedStreams()) { + // - shutdown() may be called, which may turn pendingStreams into null. + if (!hasPendingStreams()) { return; } - pendingStreams.removeAll(newlyCreated); - toCheckCompletionStreams.addAll(newlyCreated); - toCheckCompletionStreams.removeAll(completed); + pendingStreams.removeAll(toRemove); // Because delayed transport is long-lived, we take this opportunity to down-size the // hashmap. if (pendingStreams.isEmpty()) { @@ -376,7 +325,6 @@ public void run() { // than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second). syncContext.executeLater(reportTransportNotInUse); if (shutdownStatus != null && reportTransportTerminated != null) { - syncContext.executeLater(pollForStreamTransferComplete); syncContext.executeLater(reportTransportTerminated); reportTransportTerminated = null; } @@ -419,7 +367,6 @@ public void cancel(Status reason) { if (!hasPendingStreams() && justRemovedAnElement) { syncContext.executeLater(reportTransportNotInUse); if (shutdownStatus != null) { - syncContext.executeLater(pollForStreamTransferComplete); syncContext.executeLater(reportTransportTerminated); reportTransportTerminated = null; } diff --git a/core/src/main/java/io/grpc/internal/DelayedStream.java b/core/src/main/java/io/grpc/internal/DelayedStream.java index 68464627320..be21b4991ba 100644 --- a/core/src/main/java/io/grpc/internal/DelayedStream.java +++ b/core/src/main/java/io/grpc/internal/DelayedStream.java @@ -29,8 +29,6 @@ import java.io.InputStream; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import javax.annotation.concurrent.GuardedBy; /** @@ -61,7 +59,6 @@ class DelayedStream implements ClientStream { private long startTimeNanos; @GuardedBy("this") private long streamSetTimeNanos; - private final CountDownLatch realStreamStarted = new CountDownLatch(1); @Override public void setMaxInboundMessageSize(final int maxSize) { @@ -135,24 +132,6 @@ final void setStream(ClientStream stream) { drainPendingCalls(); } - protected boolean isStreamTransferCompleted() { - return realStreamStarted.getCount() == 0; - } - - protected void awaitStreamTransferCompletion() { - // Wait until accepted RPCs transfer to the real stream and so that we can properly cancel or - // shutdown. Not waiting transfer completed may cause pending calls orphaned. - boolean delegationComplete; - try { - delegationComplete = realStreamStarted.await(5, TimeUnit.SECONDS); - } catch (InterruptedException ex) { - delegationComplete = false; - } - if (!delegationComplete) { - Thread.currentThread().interrupt(); - } - } - /** * Called to transition {@code passThrough} to {@code true}. This method is not safe to be called * multiple times; the caller must ensure it will only be called once, ever. {@code this} lock @@ -242,14 +221,12 @@ public void start(ClientStreamListener listener) { if (savedPassThrough) { realStream.start(listener); - realStreamStarted.countDown(); } else { final ClientStreamListener finalListener = listener; delayOrExecute(new Runnable() { @Override public void run() { realStream.start(finalListener); - realStreamStarted.countDown(); } }); } @@ -325,11 +302,7 @@ public void run() { listenerToClose.closed(reason, new Metadata()); } drainPendingCalls(); - if (!isStreamTransferCompleted()) { - realStreamStarted.countDown(); - } } - awaitStreamTransferCompletion(); } @GuardedBy("this") diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index 4a2ed84b858..41a97d62f9a 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -45,7 +45,6 @@ import io.grpc.StringMarshaller; import io.grpc.SynchronizationContext; import io.grpc.internal.ClientStreamListener.RpcProgress; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -130,7 +129,6 @@ public void uncaughtException(Thread t, Throwable e) { @After public void noMorePendingTasks() { assertEquals(0, fakeExecutor.numPendingTasks()); - assertFalse(Thread.interrupted()); } @Test public void streamStartThenAssignTransport() { @@ -161,15 +159,9 @@ public void uncaughtException(Thread t, Throwable e) { assertEquals(0, delayedTransport.getPendingStreamsCount()); delayedTransport.shutdown(SHUTDOWN_STATUS); verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); - verify(transportListener, never()).transportTerminated(); //uncommittedStream prevents shutdown. + verify(transportListener).transportTerminated(); assertEquals(1, fakeExecutor.runDueTasks()); verify(mockRealTransport).newStream(same(method), same(headers), same(callOptions)); - - delayedTransport.reprocess(null);//uncommittedStream drained. - verifyNoMoreInteractions(mockRealTransport); - assertEquals(0, fakeExecutor.runDueTasks()); - verify(transportListener).transportTerminated(); //uncommittedStream prevents shutdown. - assertTrue(Thread.interrupted()); stream.start(streamListener); verify(mockRealStream).start(same(streamListener)); } @@ -211,7 +203,6 @@ public void uncaughtException(Thread t, Throwable e) { assertEquals(1, delayedTransport.getPendingStreamsCount()); stream.cancel(Status.CANCELLED); assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertFalse(Thread.interrupted()); verifyNoMoreInteractions(mockRealTransport); verifyNoMoreInteractions(mockRealStream); } @@ -222,60 +213,11 @@ public void uncaughtException(Thread t, Throwable e) { assertEquals(1, delayedTransport.getPendingStreamsCount()); stream.cancel(Status.CANCELLED); assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertFalse(Thread.interrupted()); verify(streamListener).closed(same(Status.CANCELLED), any(Metadata.class)); verifyNoMoreInteractions(mockRealTransport); verifyNoMoreInteractions(mockRealStream); } - @Test - public void cancelStreamWhenDelegated() { - ClientStream stream = delayedTransport.newStream(method, headers, callOptions); - stream.start(streamListener); - assertEquals(1, delayedTransport.getPendingStreamsCount()); - delayedTransport.reprocess(mockPicker); - fakeExecutor.runDueTasks(); - stream.cancel(Status.CANCELLED); - assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertFalse(Thread.interrupted()); - verify(mockRealStream).start(listenerCaptor.capture()); - listenerCaptor.getValue().onReady(); - verify(streamListener).onReady(); - verify(mockRealStream).cancel(same(Status.CANCELLED)); - } - - @Test - public void cancelStreamShutdownLastPending() { - ClientStream stream1 = delayedTransport.newStream(method, headers, callOptions); - stream1.start(streamListener); - ClientStream stream2 = delayedTransport.newStream(method, headers, callOptions); - stream2.start(streamListener); - ClientStream stream3 = delayedTransport.newStream(method, headers, callOptions); - stream3.start(streamListener); - when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) - .thenReturn(PickResult.withSubchannel(mockSubchannel)) - .thenReturn(PickResult.withNoResult()) //stream2 stay - .thenReturn(PickResult.withSubchannel(mockSubchannel)); - - assertEquals(3, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getUncommittedStreamCount()); - delayedTransport.reprocess(mockPicker); - fakeExecutor.runDueTasks(); - assertEquals(1, delayedTransport.getPendingStreamsCount()); - assertEquals(2, delayedTransport.getUncommittedStreamCount()); - - delayedTransport.shutdown(SHUTDOWN_STATUS); - verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); - verify(transportListener, never()).transportTerminated(); - stream2.cancel(Status.CANCELLED); - assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getUncommittedStreamCount()); - assertFalse(Thread.interrupted()); - verify(mockRealStream, times(2)).start(any(ClientStreamListener.class)); - verify(mockRealStream, never()).cancel(any(Status.class));//substituted - verify(transportListener).transportTerminated(); - } - @Test public void newStreamThenShutdownTransportThenAssignTransport() { ClientStream stream = delayedTransport.newStream(method, headers, callOptions); stream.start(streamListener); @@ -307,7 +249,6 @@ public void cancelStreamShutdownLastPending() { assertEquals(0, delayedTransport.getPendingStreamsCount()); verifyNoMoreInteractions(mockRealTransport); verifyNoMoreInteractions(mockRealStream); - assertTrue(Thread.interrupted()); } @Test public void newStreamThenShutdownTransportThenCancelStream() { @@ -318,7 +259,6 @@ public void cancelStreamShutdownLastPending() { assertEquals(1, delayedTransport.getPendingStreamsCount()); stream.cancel(Status.CANCELLED); verify(transportListener).transportTerminated(); - assertFalse(Thread.interrupted()); assertEquals(0, delayedTransport.getPendingStreamsCount()); verifyNoMoreInteractions(mockRealTransport); verifyNoMoreInteractions(mockRealStream); @@ -375,7 +315,6 @@ public void cancelStreamShutdownLastPending() { // Fail-fast streams DelayedStream ff1 = (DelayedStream) delayedTransport.newStream( method, headers, failFastCallOptions); - ff1.start(streamListener); PickSubchannelArgsImpl ff1args = new PickSubchannelArgsImpl(method, headers, failFastCallOptions); verify(transportListener).transportInUse(true); @@ -431,7 +370,6 @@ public void cancelStreamShutdownLastPending() { delayedTransport.reprocess(picker); assertEquals(5, delayedTransport.getPendingStreamsCount()); - assertEquals(3, delayedTransport.getUncommittedStreamCount()); inOrder.verify(picker).pickSubchannel(ff1args); inOrder.verify(picker).pickSubchannel(ff2args); inOrder.verify(picker).pickSubchannel(ff3args); @@ -478,7 +416,6 @@ public void cancelStreamShutdownLastPending() { delayedTransport.reprocess(picker); assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertEquals(7, delayedTransport.getUncommittedStreamCount());//+5(new) - 1(completed) verify(transportListener).transportInUse(false); inOrder.verify(picker).pickSubchannel(ff3args); // ff3 inOrder.verify(picker).pickSubchannel(ff4args); // ff4 @@ -488,18 +425,11 @@ public void cancelStreamShutdownLastPending() { inOrder.verifyNoMoreInteractions(); fakeExecutor.runDueTasks(); assertEquals(0, fakeExecutor.numPendingTasks()); - assertEquals(7, delayedTransport.getUncommittedStreamCount()); assertSame(mockRealStream, ff3.getRealStream()); assertSame(mockRealStream2, ff4.getRealStream()); assertSame(mockRealStream2, wfr2.getRealStream()); assertSame(mockRealStream2, wfr4.getRealStream()); - ff3.start(streamListener); - ff4.start(streamListener); - wfr2.start(streamListener); - delayedTransport.reprocess(picker); - assertEquals(4, delayedTransport.getUncommittedStreamCount());//+0(new) - 3(completed) - // If there is an executor in the CallOptions, it will be used to create the real stream. assertNull(wfr3.getRealStream()); wfr3Executor.runDueTasks(); @@ -513,7 +443,6 @@ public void cancelStreamShutdownLastPending() { new PickSubchannelArgsImpl(method, headers, waitForReadyCallOptions)); inOrder.verifyNoMoreInteractions(); assertEquals(1, delayedTransport.getPendingStreamsCount()); - assertEquals(4, delayedTransport.getUncommittedStreamCount()); // wfr5 will stop delayed transport from terminating delayedTransport.shutdown(SHUTDOWN_STATUS); @@ -523,17 +452,13 @@ public void cancelStreamShutdownLastPending() { picker = mock(SubchannelPicker.class); when(picker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn( PickResult.withSubchannel(subchannel1)); - wfr3.start(streamListener); - wfr4.start(streamListener); delayedTransport.reprocess(picker); verify(picker).pickSubchannel( new PickSubchannelArgsImpl(method, headers, waitForReadyCallOptions)); fakeExecutor.runDueTasks(); assertSame(mockRealStream, wfr5.getRealStream()); assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getUncommittedStreamCount()); verify(transportListener).transportTerminated(); - assertTrue(Thread.interrupted()); } @Test @@ -675,123 +600,6 @@ public void newStream_racesWithReprocessIdleMode() throws Exception { verify(transportListener).transportInUse(true); } - @Test - public void pendingStreamReprocessRacesShutdown() throws Exception { - final CyclicBarrier barrier = new CyclicBarrier(2); - final CountDownLatch barrierSignal = new CountDownLatch(1); - - DelayedStream stream1 = - (DelayedStream)delayedTransport.newStream(method, headers, callOptions); - stream1.start(streamListener); - DelayedStream stream2 = - (DelayedStream)delayedTransport.newStream(method2, headers2, callOptions2); - assertEquals(2, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getUncommittedStreamCount()); - when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) - .thenReturn(PickResult.withSubchannel(mockSubchannel)) - .thenReturn(PickResult.withNoResult()) - .thenReturn(PickResult.withSubchannel(mockSubchannel)); - - delayedTransport.reprocess(mockPicker); - fakeExecutor.runDueTasks(); - assertEquals(1, delayedTransport.getPendingStreamsCount());//stream2 - assertEquals(1, delayedTransport.getUncommittedStreamCount());//stream1 - - doAnswer(new Answer() { - @Override - @SuppressWarnings("CatchAndPrintStackTrace") - public PickResult answer(InvocationOnMock invocation) throws Throwable { - try { - barrierSignal.countDown(); - barrier.await(); - return PickResult.withNoResult(); - } catch (Exception e) { - e.printStackTrace(); - } - return PickResult.withNoResult(); - } - }).when(mockPicker).pickSubchannel(any(PickSubchannelArgs.class)); - - Thread processThread = new Thread("processThread") { - @Override - public void run() { - // Will call pickSubchannel and wait on barrier - delayedTransport.reprocess(mockPicker); - } - }; - processThread.start(); - assertEquals(1, delayedTransport.getPendingStreamsCount());//stream2 - assertEquals(1, delayedTransport.getUncommittedStreamCount());//stream1 - barrierSignal.await(5, TimeUnit.SECONDS); - assertEquals(1, barrier.getNumberWaiting()); - delayedTransport.shutdownNow(SHUTDOWN_STATUS); - assertFalse(delayedTransport.hasPendingStreams()); - assertFalse(delayedTransport.hasUncommittedStreams()); - assertFalse(Thread.interrupted()); - barrier.await(5, TimeUnit.SECONDS); - assertSame(mockRealStream, stream1.getRealStream()); - assertTrue(stream2.getRealStream() instanceof NoopClientStream); - verify(mockRealStream).start(any(ClientStreamListener.class)); - verify(mockRealStream, never()).cancel(any(Status.class)); - verifyNoMoreInteractions(mockRealStream);//stream2 was substituted with noop stream - } - - @Test - public void uncommittedStreamReprocess() { - ClientStream stream = delayedTransport.newStream(method, headers, callOptions); - assertTrue(stream instanceof DelayedStream); - stream.start(streamListener); - assertEquals(1, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getUncommittedStreamCount()); - delayedTransport.reprocess(mockPicker); - assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertEquals(1, delayedTransport.getUncommittedStreamCount()); - assertFalse(((DelayedStream) stream).isStreamTransferCompleted()); - fakeExecutor.runDueTasks(); - assertTrue(((DelayedStream) stream).isStreamTransferCompleted()); - - delayedTransport.reprocess(mockPicker); - assertEquals(0, delayedTransport.getUncommittedStreamCount()); - - delayedTransport.shutdownNow(SHUTDOWN_STATUS); - verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); - verify(transportListener).transportTerminated(); - assertFalse(Thread.interrupted()); - verify(mockRealStream).start(listenerCaptor.capture()); - verifyNoMoreInteractions(streamListener); - listenerCaptor.getValue().onReady(); - verify(streamListener).onReady(); - verifyNoMoreInteractions(streamListener); - } - - @Test - public void uncommittedStreamShutdown() { - DelayedStream stream = (DelayedStream) delayedTransport.newStream(method, headers, callOptions); - stream.start(streamListener); - assertEquals(1, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getUncommittedStreamCount()); - delayedTransport.reprocess(mockPicker); - assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertEquals(1, delayedTransport.getUncommittedStreamCount()); - assertFalse(stream.isStreamTransferCompleted()); - fakeExecutor.runDueTasks(); - assertTrue(stream.isStreamTransferCompleted()); - assertEquals(1, delayedTransport.getUncommittedStreamCount()); - - delayedTransport.shutdown(SHUTDOWN_STATUS); - verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); - verify(transportListener, never()).transportTerminated(); - delayedTransport.reprocess(null); - - assertEquals(0, delayedTransport.getUncommittedStreamCount()); - assertFalse(Thread.interrupted()); - verify(mockRealStream).start(listenerCaptor.capture()); - verifyNoMoreInteractions(streamListener); - listenerCaptor.getValue().onReady(); - verify(streamListener).onReady(); - verifyNoMoreInteractions(streamListener); - } - private static TransportProvider newTransportProvider(final ClientTransport transport) { return new TransportProvider() { @Override diff --git a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java index a7720d2a45b..393a6c6e6d0 100644 --- a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java @@ -223,7 +223,6 @@ public void setStreamThenStartThenCancelled() { public void setStreamThenCancelled() { stream.setStream(realStream); stream.cancel(Status.CANCELLED); - assertTrue(Thread.interrupted()); verify(realStream).cancel(same(Status.CANCELLED)); } @@ -379,36 +378,4 @@ public Void answer(InvocationOnMock in) { assertThat(insight.toString()) .matches("\\[buffered_nanos=[0-9]+, remote_addr=127\\.0\\.0\\.1:443\\]"); } - - @Test - public void transferCompletion_realStreamStartThenSetThenCancel() { - assertFalse(stream.isStreamTransferCompleted()); - stream.start(listener); - assertFalse(stream.isStreamTransferCompleted()); - stream.setStream(realStream); - assertTrue(stream.isStreamTransferCompleted()); - stream.awaitStreamTransferCompletion(); - assertFalse(Thread.interrupted()); - stream.cancel(Status.CANCELLED); - verify(realStream).start(any(ClientStreamListener.class)); - verify(realStream).cancel(eq(Status.CANCELLED)); - verifyNoMoreInteractions(realStream); - } - - @Test - public void transferCompletion_realStreamSetThenStartThenCancel() { - assertFalse(stream.isStreamTransferCompleted()); - stream.setStream(realStream); - assertFalse(stream.isStreamTransferCompleted()); - stream.awaitStreamTransferCompletion(); - assertTrue(Thread.interrupted()); - stream.start(listener); - assertTrue(stream.isStreamTransferCompleted()); - stream.awaitStreamTransferCompletion(); - assertFalse(Thread.interrupted()); - stream.cancel(Status.CANCELLED); - verify(realStream).start(same(listener)); - verify(realStream).cancel(eq(Status.CANCELLED)); - verifyNoMoreInteractions(realStream); - } } diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 5cb627f70dd..51811010b20 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -353,7 +353,6 @@ public void allPendingTasksAreRun() throws Exception { // would ignore any time-sensitive tasks, e.g., back-off and the idle timer. assertTrue(timer.getDueTasks() + " should be empty", timer.getDueTasks().isEmpty()); assertEquals(executor.getPendingTasks() + " should be empty", 0, executor.numPendingTasks()); - assertFalse(Thread.interrupted()); if (channel != null) { if (!panicExpected) { assertFalse(channel.isInPanicMode()); @@ -875,7 +874,6 @@ private void subtestCallsAndShutdown(boolean shutdownNow, boolean shutdownNowAft when(picker2.pickSubchannel(new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT))) .thenReturn(PickResult.withSubchannel(subchannel)); updateBalancingStateSafely(helper, READY, picker2); - assertTrue(Thread.interrupted()); executor.runDueTasks(); verify(mockTransport).newStream(same(method), same(headers), same(CallOptions.DEFAULT)); verify(mockStream).start(any(ClientStreamListener.class)); @@ -1134,7 +1132,6 @@ public void nameResolutionFailed_delayedTransportShutdownCancelsBackoff() { when(picker.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(PickResult.withDrop(status)); updateBalancingStateSafely(helper, READY, picker); - assertTrue(Thread.interrupted()); executor.runDueTasks(); verify(mockCallListener).onClose(same(status), any(Metadata.class)); From 502429067108066fcb2840b9daa31fc3de7cf7a4 Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Tue, 15 Dec 2020 14:12:42 -0800 Subject: [PATCH 09/11] transfer complete abstract class --- .../grpc/internal/DelayedClientTransport.java | 57 +++++++++++-------- .../java/io/grpc/internal/DelayedStream.java | 5 +- .../internal/TransferableClientStream.java | 30 ++++++++++ .../internal/DelayedClientTransportTest.java | 49 +++++++++++++++- 4 files changed, 113 insertions(+), 28 deletions(-) create mode 100644 core/src/main/java/io/grpc/internal/TransferableClientStream.java diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 3922ee5b89e..9fee2bc33d2 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -66,6 +66,8 @@ final class DelayedClientTransport implements ManagedClientTransport { @Nonnull @GuardedBy("lock") private Collection pendingStreams = new LinkedHashSet<>(); + @GuardedBy("lock") + private int pendingCompleteStreams; /** * When {@code shutdownStatus != null && !hasPendingStreams()}, then the transport is considered @@ -175,6 +177,7 @@ public final ClientStream newStream( private PendingStream createPendingStream(PickSubchannelArgs args) { PendingStream pendingStream = new PendingStream(args); pendingStreams.add(pendingStream); + pendingCompleteStreams++; if (getPendingStreamsCount() == 1) { syncContext.executeLater(reportTransportInUse); } @@ -211,7 +214,7 @@ public void run() { listener.transportShutdown(status); } }); - if (!hasPendingStreams() && reportTransportTerminated != null) { + if (pendingCompleteStreams == 0 && reportTransportTerminated != null) { syncContext.executeLater(reportTransportTerminated); reportTransportTerminated = null; } @@ -227,23 +230,15 @@ public void run() { public final void shutdownNow(Status status) { shutdown(status); Collection savedPendingStreams; - Runnable savedReportTransportTerminated; synchronized (lock) { savedPendingStreams = pendingStreams; - savedReportTransportTerminated = reportTransportTerminated; - reportTransportTerminated = null; if (!pendingStreams.isEmpty()) { pendingStreams = Collections.emptyList(); } } - if (savedReportTransportTerminated != null) { - for (PendingStream stream : savedPendingStreams) { - stream.cancel(status); - } - syncContext.execute(savedReportTransportTerminated); + for (PendingStream stream : savedPendingStreams) { + stream.cancel(status); } - // If savedReportTransportTerminated == null, transportTerminated() has already been called in - // shutdown(). } public final boolean hasPendingStreams() { @@ -259,6 +254,13 @@ final int getPendingStreamsCount() { } } + @VisibleForTesting + final int getPendingCompleteStreamsCount() { + synchronized (lock) { + return pendingCompleteStreams; + } + } + /** * Use the picker to try picking a transport for every pending stream, proceed the stream if the * pick is successful, otherwise keep it pending. @@ -324,10 +326,6 @@ public void run() { // (which would shutdown the transports and LoadBalancer) because the gap should be shorter // than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second). syncContext.executeLater(reportTransportNotInUse); - if (shutdownStatus != null && reportTransportTerminated != null) { - syncContext.executeLater(reportTransportTerminated); - reportTransportTerminated = null; - } } } syncContext.drain(); @@ -341,6 +339,7 @@ public InternalLogId getLogId() { private class PendingStream extends DelayedStream { private final PickSubchannelArgs args; private final Context context = Context.current(); + private volatile boolean transferCompleted; private PendingStream(PickSubchannelArgs args) { this.args = args; @@ -362,15 +361,25 @@ private void createRealStream(ClientTransport transport) { public void cancel(Status reason) { super.cancel(reason); synchronized (lock) { - if (reportTransportTerminated != null) { - boolean justRemovedAnElement = pendingStreams.remove(this); - if (!hasPendingStreams() && justRemovedAnElement) { - syncContext.executeLater(reportTransportNotInUse); - if (shutdownStatus != null) { - syncContext.executeLater(reportTransportTerminated); - reportTransportTerminated = null; - } - } + boolean justRemovedAnElement = pendingStreams.remove(this); + if (!hasPendingStreams() && justRemovedAnElement && reportTransportTerminated != null) { + syncContext.executeLater(reportTransportNotInUse); + } + } + syncContext.drain(); + } + + @Override + public void onTransferComplete() { + if (transferCompleted) { + return; + } + transferCompleted = true; + synchronized (lock) { + pendingCompleteStreams--; + if (shutdownStatus != null && pendingCompleteStreams == 0) { + syncContext.executeLater(reportTransportTerminated); + reportTransportTerminated = null; } } syncContext.drain(); diff --git a/core/src/main/java/io/grpc/internal/DelayedStream.java b/core/src/main/java/io/grpc/internal/DelayedStream.java index be21b4991ba..3b88b8f7567 100644 --- a/core/src/main/java/io/grpc/internal/DelayedStream.java +++ b/core/src/main/java/io/grpc/internal/DelayedStream.java @@ -39,7 +39,7 @@ * DelayedStream} may be internally altered by different threads, thus internal synchronization is * necessary. */ -class DelayedStream implements ClientStream { +class DelayedStream extends TransferableClientStream { /** {@code true} once realStream is valid and all pending calls have been drained. */ private volatile boolean passThrough; /** @@ -221,12 +221,14 @@ public void start(ClientStreamListener listener) { if (savedPassThrough) { realStream.start(listener); + onTransferComplete(); } else { final ClientStreamListener finalListener = listener; delayOrExecute(new Runnable() { @Override public void run() { realStream.start(finalListener); + onTransferComplete(); } }); } @@ -302,6 +304,7 @@ public void run() { listenerToClose.closed(reason, new Metadata()); } drainPendingCalls(); + onTransferComplete(); } } diff --git a/core/src/main/java/io/grpc/internal/TransferableClientStream.java b/core/src/main/java/io/grpc/internal/TransferableClientStream.java new file mode 100644 index 00000000000..4a62e6cf54b --- /dev/null +++ b/core/src/main/java/io/grpc/internal/TransferableClientStream.java @@ -0,0 +1,30 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +/** + * A logical {@link ClientStream} that does internal transfer processing of the clint requests. + */ +abstract class TransferableClientStream implements ClientStream { + + /** + * Provides the place to define actions at the point when transfer is done. + * Call this method to trigger those transfer completion activities. No-op by default. + */ + public void onTransferComplete() { + } +} diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index 41a97d62f9a..bf88ae5e3cb 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -158,12 +158,14 @@ public void uncaughtException(Thread t, Throwable e) { delayedTransport.reprocess(mockPicker); assertEquals(0, delayedTransport.getPendingStreamsCount()); delayedTransport.shutdown(SHUTDOWN_STATUS); - verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); - verify(transportListener).transportTerminated(); + verify(transportListener, never()).transportTerminated(); assertEquals(1, fakeExecutor.runDueTasks()); + verify(transportListener, never()).transportTerminated(); verify(mockRealTransport).newStream(same(method), same(headers), same(callOptions)); stream.start(streamListener); verify(mockRealStream).start(same(streamListener)); + verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); + verify(transportListener).transportTerminated(); } @Test public void transportTerminatedThenAssignTransport() { @@ -201,8 +203,10 @@ public void uncaughtException(Thread t, Throwable e) { @Test public void cancelStreamWithoutSetTransport() { ClientStream stream = delayedTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); assertEquals(1, delayedTransport.getPendingStreamsCount()); + assertEquals(1, delayedTransport.getPendingCompleteStreamsCount()); stream.cancel(Status.CANCELLED); assertEquals(0, delayedTransport.getPendingStreamsCount()); + assertEquals(0, delayedTransport.getPendingCompleteStreamsCount()); verifyNoMoreInteractions(mockRealTransport); verifyNoMoreInteractions(mockRealStream); } @@ -211,13 +215,34 @@ public void uncaughtException(Thread t, Throwable e) { ClientStream stream = delayedTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); stream.start(streamListener); assertEquals(1, delayedTransport.getPendingStreamsCount()); + assertEquals(1, delayedTransport.getPendingCompleteStreamsCount()); stream.cancel(Status.CANCELLED); assertEquals(0, delayedTransport.getPendingStreamsCount()); + assertEquals(0, delayedTransport.getPendingCompleteStreamsCount()); verify(streamListener).closed(same(Status.CANCELLED), any(Metadata.class)); verifyNoMoreInteractions(mockRealTransport); verifyNoMoreInteractions(mockRealStream); } + @Test + public void cancelStreamShutdownThenStart() { + ClientStream stream = delayedTransport.newStream(method, headers, callOptions); + delayedTransport.shutdown(Status.UNAVAILABLE); + assertEquals(1, delayedTransport.getPendingStreamsCount()); + assertEquals(1, delayedTransport.getPendingCompleteStreamsCount()); + delayedTransport.reprocess(mockPicker); + assertEquals(1, fakeExecutor.runDueTasks()); + assertEquals(0, delayedTransport.getPendingStreamsCount()); + assertEquals(1, delayedTransport.getPendingCompleteStreamsCount()); + stream.cancel(Status.CANCELLED); + verify(mockRealStream).cancel(same(Status.CANCELLED)); + verify(transportListener, never()).transportTerminated(); + stream.start(streamListener); + assertEquals(0, delayedTransport.getPendingCompleteStreamsCount()); + verify(mockRealStream).start(streamListener); + verify(transportListener).transportTerminated(); + } + @Test public void newStreamThenShutdownTransportThenAssignTransport() { ClientStream stream = delayedTransport.newStream(method, headers, callOptions); stream.start(streamListener); @@ -353,6 +378,7 @@ public void uncaughtException(Thread t, Throwable e) { waitForReadyCallOptions); assertEquals(8, delayedTransport.getPendingStreamsCount()); + assertEquals(8, delayedTransport.getPendingCompleteStreamsCount()); // First reprocess(). Some will proceed, some will fail and the rest will stay buffered. SubchannelPicker picker = mock(SubchannelPicker.class); @@ -370,6 +396,7 @@ public void uncaughtException(Thread t, Throwable e) { delayedTransport.reprocess(picker); assertEquals(5, delayedTransport.getPendingStreamsCount()); + assertEquals(8, delayedTransport.getPendingCompleteStreamsCount()); inOrder.verify(picker).pickSubchannel(ff1args); inOrder.verify(picker).pickSubchannel(ff2args); inOrder.verify(picker).pickSubchannel(ff3args); @@ -385,8 +412,12 @@ public void uncaughtException(Thread t, Throwable e) { any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); verify(mockRealTransport2, never()).newStream( any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); + ff1.start(streamListener); + ff2.start(streamListener); fakeExecutor.runDueTasks(); assertEquals(0, fakeExecutor.numPendingTasks()); + // 8 - 2(runDueTask with start) + assertEquals(6, delayedTransport.getPendingCompleteStreamsCount()); // ff1 and wfr1 went through verify(mockRealTransport).newStream(method, headers, failFastCallOptions); verify(mockRealTransport2).newStream(method, headers, waitForReadyCallOptions); @@ -394,6 +425,8 @@ public void uncaughtException(Thread t, Throwable e) { assertSame(mockRealStream2, wfr1.getRealStream()); // The ff2 has failed due to picker returning an error assertSame(Status.UNAVAILABLE, ((FailingClientStream) ff2.getRealStream()).getError()); + wfr1.start(streamListener); + assertEquals(5, delayedTransport.getPendingCompleteStreamsCount()); // Other streams are still buffered assertNull(ff3.getRealStream()); assertNull(ff4.getRealStream()); @@ -414,8 +447,14 @@ public void uncaughtException(Thread t, Throwable e) { assertEquals(0, wfr3Executor.numPendingTasks()); verify(transportListener, never()).transportInUse(false); + ff3.start(streamListener); + ff4.start(streamListener); + wfr2.start(streamListener); + wfr3.start(streamListener); + wfr4.start(streamListener); delayedTransport.reprocess(picker); assertEquals(0, delayedTransport.getPendingStreamsCount()); + assertEquals(5, delayedTransport.getPendingCompleteStreamsCount()); verify(transportListener).transportInUse(false); inOrder.verify(picker).pickSubchannel(ff3args); // ff3 inOrder.verify(picker).pickSubchannel(ff4args); // ff4 @@ -423,8 +462,9 @@ public void uncaughtException(Thread t, Throwable e) { inOrder.verify(picker).pickSubchannel(wfr3args); // wfr3 inOrder.verify(picker).pickSubchannel(wfr4args); // wfr4 inOrder.verifyNoMoreInteractions(); - fakeExecutor.runDueTasks(); + assertEquals(4, fakeExecutor.runDueTasks()); assertEquals(0, fakeExecutor.numPendingTasks()); + assertEquals(1, delayedTransport.getPendingCompleteStreamsCount()); assertSame(mockRealStream, ff3.getRealStream()); assertSame(mockRealStream2, ff4.getRealStream()); assertSame(mockRealStream2, wfr2.getRealStream()); @@ -434,15 +474,18 @@ public void uncaughtException(Thread t, Throwable e) { assertNull(wfr3.getRealStream()); wfr3Executor.runDueTasks(); assertSame(mockRealStream, wfr3.getRealStream()); + assertEquals(0, delayedTransport.getPendingCompleteStreamsCount()); // New streams will use the last picker DelayedStream wfr5 = (DelayedStream) delayedTransport.newStream( method, headers, waitForReadyCallOptions); + wfr5.start(streamListener); assertNull(wfr5.getRealStream()); inOrder.verify(picker).pickSubchannel( new PickSubchannelArgsImpl(method, headers, waitForReadyCallOptions)); inOrder.verifyNoMoreInteractions(); assertEquals(1, delayedTransport.getPendingStreamsCount()); + assertEquals(1, delayedTransport.getPendingCompleteStreamsCount()); // wfr5 will stop delayed transport from terminating delayedTransport.shutdown(SHUTDOWN_STATUS); From 7022f5e10c71b739443d54835736822218d533a7 Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Thu, 17 Dec 2020 10:07:26 -0800 Subject: [PATCH 10/11] make delayedStream abstract class and fix thread safetiness --- .../grpc/internal/DelayedClientTransport.java | 11 +++---- .../java/io/grpc/internal/DelayedStream.java | 8 ++++- .../io/grpc/internal/MetadataApplierImpl.java | 9 ++++-- .../internal/TransferableClientStream.java | 30 ------------------- .../io/grpc/internal/DelayedStreamTest.java | 8 ++++- 5 files changed, 27 insertions(+), 39 deletions(-) delete mode 100644 core/src/main/java/io/grpc/internal/TransferableClientStream.java diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 9fee2bc33d2..df57a14bae6 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -339,7 +339,8 @@ public InternalLogId getLogId() { private class PendingStream extends DelayedStream { private final PickSubchannelArgs args; private final Context context = Context.current(); - private volatile boolean transferCompleted; + @GuardedBy("lock") + private boolean transferCompleted; private PendingStream(PickSubchannelArgs args) { this.args = args; @@ -371,11 +372,11 @@ public void cancel(Status reason) { @Override public void onTransferComplete() { - if (transferCompleted) { - return; - } - transferCompleted = true; synchronized (lock) { + if (transferCompleted) { + return; + } + transferCompleted = true; pendingCompleteStreams--; if (shutdownStatus != null && pendingCompleteStreams == 0) { syncContext.executeLater(reportTransportTerminated); diff --git a/core/src/main/java/io/grpc/internal/DelayedStream.java b/core/src/main/java/io/grpc/internal/DelayedStream.java index 3b88b8f7567..1e42fabd163 100644 --- a/core/src/main/java/io/grpc/internal/DelayedStream.java +++ b/core/src/main/java/io/grpc/internal/DelayedStream.java @@ -39,7 +39,7 @@ * DelayedStream} may be internally altered by different threads, thus internal synchronization is * necessary. */ -class DelayedStream extends TransferableClientStream { +abstract class DelayedStream implements ClientStream { /** {@code true} once realStream is valid and all pending calls have been drained. */ private volatile boolean passThrough; /** @@ -410,6 +410,12 @@ ClientStream getRealStream() { return realStream; } + /** + * Provides the place to define actions at the point when transfer is done. + * Call this method to trigger those transfer completion activities. + */ + abstract void onTransferComplete(); + private static class DelayedStreamListener implements ClientStreamListener { private final ClientStreamListener realListener; private volatile boolean passThrough; diff --git a/core/src/main/java/io/grpc/internal/MetadataApplierImpl.java b/core/src/main/java/io/grpc/internal/MetadataApplierImpl.java index c3196ddd107..ea243378cdb 100644 --- a/core/src/main/java/io/grpc/internal/MetadataApplierImpl.java +++ b/core/src/main/java/io/grpc/internal/MetadataApplierImpl.java @@ -47,7 +47,7 @@ final class MetadataApplierImpl extends MetadataApplier { boolean finalized; // not null if returnStream() was called before apply() - DelayedStream delayedStream; + ApplierDelayedStream delayedStream; MetadataApplierImpl( ClientTransport transport, MethodDescriptor method, Metadata origHeaders, @@ -105,11 +105,16 @@ ClientStream returnStream() { synchronized (lock) { if (returnedStream == null) { // apply() has not been called, needs to buffer the requests. - delayedStream = new DelayedStream(); + delayedStream = new ApplierDelayedStream(); return returnedStream = delayedStream; } else { return returnedStream; } } } + + private static final class ApplierDelayedStream extends DelayedStream { + @Override + void onTransferComplete() {} + } } diff --git a/core/src/main/java/io/grpc/internal/TransferableClientStream.java b/core/src/main/java/io/grpc/internal/TransferableClientStream.java deleted file mode 100644 index 4a62e6cf54b..00000000000 --- a/core/src/main/java/io/grpc/internal/TransferableClientStream.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright 2020 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.internal; - -/** - * A logical {@link ClientStream} that does internal transfer processing of the clint requests. - */ -abstract class TransferableClientStream implements ClientStream { - - /** - * Provides the place to define actions at the point when transfer is done. - * Call this method to trigger those transfer completion activities. No-op by default. - */ - public void onTransferComplete() { - } -} diff --git a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java index 393a6c6e6d0..9b857c119fb 100644 --- a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java @@ -65,7 +65,7 @@ public class DelayedStreamTest { @Mock private ClientStreamListener listener; @Mock private ClientStream realStream; @Captor private ArgumentCaptor listenerCaptor; - private DelayedStream stream = new DelayedStream(); + private DelayedStream stream = new SimpleDelayedStream(); @Test public void setStream_setAuthority() { @@ -378,4 +378,10 @@ public Void answer(InvocationOnMock in) { assertThat(insight.toString()) .matches("\\[buffered_nanos=[0-9]+, remote_addr=127\\.0\\.0\\.1:443\\]"); } + + private static class SimpleDelayedStream extends DelayedStream { + @Override + void onTransferComplete() { + } + } } From cf2d51bfea41772bda8f7b335ce23ea2c4d10f22 Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Thu, 17 Dec 2020 10:17:55 -0800 Subject: [PATCH 11/11] non-final ApplierDelayedStream inner class --- core/src/main/java/io/grpc/internal/MetadataApplierImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/io/grpc/internal/MetadataApplierImpl.java b/core/src/main/java/io/grpc/internal/MetadataApplierImpl.java index ea243378cdb..fdbcc33fcb0 100644 --- a/core/src/main/java/io/grpc/internal/MetadataApplierImpl.java +++ b/core/src/main/java/io/grpc/internal/MetadataApplierImpl.java @@ -113,7 +113,7 @@ ClientStream returnStream() { } } - private static final class ApplierDelayedStream extends DelayedStream { + private static class ApplierDelayedStream extends DelayedStream { @Override void onTransferComplete() {} }