From 7b1e45c57fb1b997cd6aa6c80f0eed1a09e5ca89 Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Mon, 11 Jul 2022 16:09:02 -0700 Subject: [PATCH] fix half close without request --- .../grpc/internal/AbstractServerStream.java | 4 +- .../internal/AbstractServerStreamTest.java | 39 +++++++++++++++++++ 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/AbstractServerStream.java b/core/src/main/java/io/grpc/internal/AbstractServerStream.java index f94c3e54e23..94cdfa4a572 100644 --- a/core/src/main/java/io/grpc/internal/AbstractServerStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractServerStream.java @@ -224,8 +224,8 @@ public final void onStreamAllocated() { @Override public void deframerClosed(boolean hasPartialMessage) { deframerClosed = true; - if (endOfStream) { - if (!immediateCloseRequested && hasPartialMessage) { + if (endOfStream && !immediateCloseRequested) { + if (hasPartialMessage) { // We've received the entire stream and have data available but we don't have // enough to read the next frame ... this is bad. deframeFailed( diff --git a/core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java b/core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java index 65fc89be231..9f6c4922aa5 100644 --- a/core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java @@ -28,11 +28,13 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import com.google.common.util.concurrent.SettableFuture; import io.grpc.InternalStatus; import io.grpc.Metadata; import io.grpc.Status; +import io.grpc.StatusRuntimeException; import io.grpc.internal.AbstractServerStream.TransportState; import io.grpc.internal.MessageFramerTest.ByteWritableBuffer; import java.io.ByteArrayInputStream; @@ -108,6 +110,43 @@ public void messagesAvailable(MessageProducer producer) { assertNull("no message expected", streamListenerMessageQueue.poll()); } + @Test + public void noHalfCloseListenerOnCancellation() throws Exception { + final Queue streamListenerMessageQueue = new LinkedList<>(); + final SettableFuture closedFuture = SettableFuture.create(); + + stream.transportState().setListener(new ServerStreamListenerBase() { + @Override + public void messagesAvailable(StreamListener.MessageProducer producer) { + InputStream message; + while ((message = producer.next()) != null) { + streamListenerMessageQueue.add(message); + } + } + + @Override + public void halfClosed() { + if (streamListenerMessageQueue.isEmpty()) { + throw new StatusRuntimeException(Status.INTERNAL.withDescription( + "Half close without request")); + } + } + + @Override + public void closed(Status status) { + closedFuture.set(status); + } + }); + + ReadableBuffer buffer = mock(ReadableBuffer.class); + when(buffer.readableBytes()).thenReturn(1); + stream.transportState().inboundDataReceived(buffer, true); + Status cancel = Status.CANCELLED.withDescription("DEADLINE EXCEEDED"); + stream.transportState().transportReportStatus(cancel); + assertEquals(cancel, closedFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); + verify(buffer).close(); + } + @Test public void queuedBytesInDeframerShouldNotBlockComplete() throws Exception { final SettableFuture closedFuture = SettableFuture.create();