diff --git a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrAnnotatedEventDriver.java b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrAnnotatedEventDriver.java index cf8ba70b7e32..32cc32868018 100644 --- a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrAnnotatedEventDriver.java +++ b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrAnnotatedEventDriver.java @@ -131,7 +131,7 @@ public void onBinaryFrame(ByteBuffer buffer, boolean fin) throws IOException session.close(e); } - stream.close(); + stream.handlerComplete(); }); } } diff --git a/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/BinaryStreamTest.java b/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/BinaryStreamTest.java index e5fe7a366824..ae9a31b280af 100644 --- a/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/BinaryStreamTest.java +++ b/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/BinaryStreamTest.java @@ -27,7 +27,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import javax.websocket.ClientEndpoint; +import javax.websocket.CloseReason; import javax.websocket.ContainerProvider; +import javax.websocket.OnClose; import javax.websocket.OnMessage; import javax.websocket.Session; import javax.websocket.WebSocketContainer; @@ -37,11 +39,15 @@ import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.websocket.jsr356.server.deploy.WebSocketServerContainerInitializer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -127,6 +133,62 @@ public void testMoreThanLargestMessageOneByteAtATime() throws Exception assertArrayEquals(data, client.getEcho()); } + @Test + public void testNotReadingToEndOfStream() throws Exception + { + int size = 32; + byte[] data = randomBytes(size); + URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + PATH); + + CountDownLatch handlerComplete = new CountDownLatch(1); + BasicClientBinaryStreamer client = new BasicClientBinaryStreamer((session, inputStream) -> + { + byte[] recv = new byte[16]; + int read = inputStream.read(recv); + assertThat(read, not(is(0))); + handlerComplete.countDown(); + }); + + Session session = wsClient.connectToServer(client, uri); + session.getBasicRemote().sendBinary(BufferUtil.toBuffer(data)); + assertTrue(handlerComplete.await(5, TimeUnit.SECONDS)); + + session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "close from test")); + assertTrue(client.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(client.closeReason.getCloseCode(), is(CloseReason.CloseCodes.NORMAL_CLOSURE)); + assertThat(client.closeReason.getReasonPhrase(), is("close from test")); + } + + @Test + public void testClosingBeforeReadingToEndOfStream() throws Exception + { + int size = 32; + byte[] data = randomBytes(size); + URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + PATH); + + CountDownLatch handlerComplete = new CountDownLatch(1); + BasicClientBinaryStreamer client = new BasicClientBinaryStreamer((session, inputStream) -> + { + byte[] recv = new byte[16]; + int read = inputStream.read(recv); + assertThat(read, not(is(0))); + + inputStream.close(); + read = inputStream.read(recv); + assertThat(read, is(-1)); + handlerComplete.countDown(); + }); + + Session session = wsClient.connectToServer(client, uri); + session.getBasicRemote().sendBinary(BufferUtil.toBuffer(data)); + assertTrue(handlerComplete.await(5, TimeUnit.SECONDS)); + + session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "close from test")); + assertTrue(client.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(client.closeReason.getCloseCode(), is(CloseReason.CloseCodes.NORMAL_CLOSURE)); + assertThat(client.closeReason.getReasonPhrase(), is("close from test")); + } + private byte[] randomBytes(int size) { byte[] data = new byte[size]; @@ -134,6 +196,37 @@ private byte[] randomBytes(int size) return data; } + @ClientEndpoint + public static class BasicClientBinaryStreamer + { + public interface MessageHandler + { + void accept(Session session, InputStream inputStream) throws Exception; + } + + private final MessageHandler handler; + private final CountDownLatch closeLatch = new CountDownLatch(1); + private CloseReason closeReason; + + public BasicClientBinaryStreamer(MessageHandler consumer) + { + this.handler = consumer; + } + + @OnMessage + public void echoed(Session session, InputStream input) throws Exception + { + handler.accept(session, input); + } + + @OnClose + public void onClosed(CloseReason closeReason) + { + this.closeReason = closeReason; + closeLatch.countDown(); + } + } + @ClientEndpoint public static class ClientBinaryStreamer { diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/MessageInputStreamTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/MessageInputStreamTest.java index a619fba31785..96983a391a77 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/MessageInputStreamTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/MessageInputStreamTest.java @@ -172,9 +172,10 @@ public void testReadByteNoBuffersClosed() throws IOException { // wait for a little bit before sending input closed TimeUnit.MILLISECONDS.sleep(1000); + stream.appendFrame(null, true); stream.messageComplete(); } - catch (InterruptedException e) + catch (InterruptedException | IOException e) { hadError.set(true); e.printStackTrace(System.err);