Skip to content

Commit

Permalink
core: add more delayedStream tests (#7843)
Browse files Browse the repository at this point in the history
Add more delayedStream tests related to #7750, where we changed to call realStream.start() synchronously with setting realStream.
  • Loading branch information
YifeiZhuang committed Jan 29, 2021
1 parent 9bb9fef commit ef76337
Showing 1 changed file with 114 additions and 0 deletions.
114 changes: 114 additions & 0 deletions core/src/test/java/io/grpc/internal/DelayedStreamTest.java
Expand Up @@ -19,6 +19,8 @@
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
Expand All @@ -34,12 +36,15 @@

import io.grpc.Attributes;
import io.grpc.Codec;
import io.grpc.Compressor;
import io.grpc.Deadline;
import io.grpc.DecompressorRegistry;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.internal.testing.SingleMessageProducer;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -90,6 +95,27 @@ public void start_afterStart() {
stream.start(mock(ClientStreamListener.class));
}

@Test(expected = IllegalStateException.class)
public void writeMessage_beforeStart() {
InputStream message = new ByteArrayInputStream(new byte[]{'a'});
stream.writeMessage(message);
}

@Test(expected = IllegalStateException.class)
public void flush_beforeStart() {
stream.flush();
}

@Test(expected = IllegalStateException.class)
public void request_beforeStart() {
stream.request(1);
}

@Test(expected = IllegalStateException.class)
public void halfClose_beforeStart() {
stream.halfClose();
}

@Test
public void setStream_sendsAllMessages() {
stream.setCompressor(Codec.Identity.NONE);
Expand Down Expand Up @@ -154,6 +180,94 @@ public void setStream_flowControl() {
verify(realStream).request(3);
}

@Test
public void setStreamThenStart() {
stream.optimizeForDirectExecutor();
stream.setCompressor(mock(Compressor.class));
stream.setFullStreamDecompression(false);
stream.setDecompressorRegistry(DecompressorRegistry.emptyInstance());
stream.setDeadline(Deadline.after(1, TimeUnit.MINUTES));
stream.setAuthority("auth");
stream.setMaxInboundMessageSize(10);
stream.setMaxOutboundMessageSize(10);

assertNull(stream.setStream(realStream));
stream.start(listener);
stream.request(1);

InOrder inOrder = inOrder(realStream);
inOrder.verify(realStream).optimizeForDirectExecutor();
inOrder.verify(realStream).setCompressor(any(Compressor.class));
inOrder.verify(realStream).setFullStreamDecompression(false);
inOrder.verify(realStream).setDecompressorRegistry(any(DecompressorRegistry.class));
inOrder.verify(realStream).setDeadline(any(Deadline.class));
inOrder.verify(realStream).setAuthority("auth");
inOrder.verify(realStream).setMaxInboundMessageSize(10);
inOrder.verify(realStream).setMaxOutboundMessageSize(10);
verify(realStream).request(1);
verify(realStream).start(same(listener));
verifyNoMoreInteractions(realStream);
}

@Test
public void startThenSetRealStream() {
stream.setAuthority("auth");
stream.optimizeForDirectExecutor();
stream.setMaxInboundMessageSize(10);
stream.setCompressor(mock(Compressor.class));
stream.setFullStreamDecompression(false);
stream.setDeadline(Deadline.after(1, TimeUnit.MINUTES));
stream.setMaxOutboundMessageSize(10);
stream.setDecompressorRegistry(DecompressorRegistry.emptyInstance());
stream.start(listener);
stream.request(1);
InputStream message = mock(InputStream.class);
stream.writeMessage(message);
Runnable runnable = stream.setStream(realStream);
assertNotNull(runnable);
callMeMaybe(runnable);
stream.getAttributes();
stream.request(4);

InOrder inOrder = inOrder(realStream);
inOrder.verify(realStream).setAuthority("auth");
inOrder.verify(realStream).optimizeForDirectExecutor();
inOrder.verify(realStream).setMaxInboundMessageSize(10);
inOrder.verify(realStream).setCompressor(any(Compressor.class));
inOrder.verify(realStream).setFullStreamDecompression(false);
inOrder.verify(realStream).setDeadline(any(Deadline.class));
inOrder.verify(realStream).setMaxOutboundMessageSize(10);
inOrder.verify(realStream).setDecompressorRegistry(any(DecompressorRegistry.class));
inOrder.verify(realStream).start(listenerCaptor.capture());
inOrder.verify(realStream).request(1);
inOrder.verify(realStream).writeMessage(same(message));
inOrder.verify(realStream).getAttributes();
verify(realStream).request(4);
verifyNoMoreInteractions(realStream);
ClientStreamListener delayedListener = listenerCaptor.getValue();
delayedListener.onReady();
verify(listener).onReady();
}

@Test
public void drainPendingCallRacesCancel() {
stream.start(listener);
InputStream message = mock(InputStream.class);
stream.writeMessage(message);
stream.flush();
Runnable runnable = stream.setStream(realStream);
assertNotNull(runnable);
stream.cancel(Status.CANCELLED);
callMeMaybe(runnable);

InOrder inOrder = inOrder(realStream);
inOrder.verify(realStream).start(any(ClientStreamListener.class));
inOrder.verify(realStream).writeMessage(same(message));
inOrder.verify(realStream).flush();
inOrder.verify(realStream).cancel(Status.CANCELLED);
verifyNoMoreInteractions(realStream);
}

@Test
public void setStream_setMessageCompression() {
stream.start(listener);
Expand Down

0 comments on commit ef76337

Please sign in to comment.