Skip to content

Commit

Permalink
Issue #5368 - add tests for not reading to end of InputStream
Browse files Browse the repository at this point in the history
Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
  • Loading branch information
lachlan-roberts committed Oct 15, 2020
1 parent 680020d commit be041d3
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 2 deletions.
Expand Up @@ -131,7 +131,7 @@ public void onBinaryFrame(ByteBuffer buffer, boolean fin) throws IOException
session.close(e);
}

stream.close();
stream.handlerComplete();
});
}
}
Expand Down
Expand Up @@ -27,7 +27,9 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.websocket.ClientEndpoint;
import javax.websocket.CloseReason;
import javax.websocket.ContainerProvider;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
Expand All @@ -37,11 +39,15 @@
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.jsr356.server.deploy.WebSocketServerContainerInitializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -127,13 +133,100 @@ public void testMoreThanLargestMessageOneByteAtATime() throws Exception
assertArrayEquals(data, client.getEcho());
}

@Test
public void testNotReadingToEndOfStream() throws Exception
{
int size = 32;
byte[] data = randomBytes(size);
URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + PATH);

CountDownLatch handlerComplete = new CountDownLatch(1);
BasicClientBinaryStreamer client = new BasicClientBinaryStreamer((session, inputStream) ->
{
byte[] recv = new byte[16];
int read = inputStream.read(recv);
assertThat(read, not(is(0)));
handlerComplete.countDown();
});

Session session = wsClient.connectToServer(client, uri);
session.getBasicRemote().sendBinary(BufferUtil.toBuffer(data));
assertTrue(handlerComplete.await(5, TimeUnit.SECONDS));

session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "close from test"));
assertTrue(client.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(client.closeReason.getCloseCode(), is(CloseReason.CloseCodes.NORMAL_CLOSURE));
assertThat(client.closeReason.getReasonPhrase(), is("close from test"));
}

@Test
public void testClosingBeforeReadingToEndOfStream() throws Exception
{
int size = 32;
byte[] data = randomBytes(size);
URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + PATH);

CountDownLatch handlerComplete = new CountDownLatch(1);
BasicClientBinaryStreamer client = new BasicClientBinaryStreamer((session, inputStream) ->
{
byte[] recv = new byte[16];
int read = inputStream.read(recv);
assertThat(read, not(is(0)));

inputStream.close();
read = inputStream.read(recv);
assertThat(read, is(-1));
handlerComplete.countDown();
});

Session session = wsClient.connectToServer(client, uri);
session.getBasicRemote().sendBinary(BufferUtil.toBuffer(data));
assertTrue(handlerComplete.await(5, TimeUnit.SECONDS));

session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "close from test"));
assertTrue(client.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(client.closeReason.getCloseCode(), is(CloseReason.CloseCodes.NORMAL_CLOSURE));
assertThat(client.closeReason.getReasonPhrase(), is("close from test"));
}

private byte[] randomBytes(int size)
{
byte[] data = new byte[size];
new Random().nextBytes(data);
return data;
}

@ClientEndpoint
public static class BasicClientBinaryStreamer
{
public interface MessageHandler
{
void accept(Session session, InputStream inputStream) throws Exception;
}

private final MessageHandler handler;
private final CountDownLatch closeLatch = new CountDownLatch(1);
private CloseReason closeReason;

public BasicClientBinaryStreamer(MessageHandler consumer)
{
this.handler = consumer;
}

@OnMessage
public void echoed(Session session, InputStream input) throws Exception
{
handler.accept(session, input);
}

@OnClose
public void onClosed(CloseReason closeReason)
{
this.closeReason = closeReason;
closeLatch.countDown();
}
}

@ClientEndpoint
public static class ClientBinaryStreamer
{
Expand Down
Expand Up @@ -172,9 +172,10 @@ public void testReadByteNoBuffersClosed() throws IOException
{
// wait for a little bit before sending input closed
TimeUnit.MILLISECONDS.sleep(1000);
stream.appendFrame(null, true);
stream.messageComplete();
}
catch (InterruptedException e)
catch (InterruptedException | IOException e)
{
hadError.set(true);
e.printStackTrace(System.err);
Expand Down

0 comments on commit be041d3

Please sign in to comment.