From 941ffcead7678a74fb3feb175cdd857b1d97ed43 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Thu, 1 Oct 2020 15:42:50 +1000 Subject: [PATCH 1/8] Issue #5368 - ensure onMessage exits before next frame is read Signed-off-by: Lachlan Roberts --- .../endpoints/JsrAnnotatedEventDriver.java | 14 +-- .../endpoints/JsrEndpointEventDriver.java | 9 +- .../events/JettyAnnotatedEventDriver.java | 18 ++- .../common/message/MessageInputStream.java | 112 +++++++++++------- .../common/message/MessageReader.java | 12 ++ 5 files changed, 98 insertions(+), 67 deletions(-) diff --git a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrAnnotatedEventDriver.java b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrAnnotatedEventDriver.java index 4f2369e3a01c..cf8ba70b7e32 100644 --- a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrAnnotatedEventDriver.java +++ b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrAnnotatedEventDriver.java @@ -118,10 +118,8 @@ public void onBinaryFrame(ByteBuffer buffer, boolean fin) throws IOException if (LOG.isDebugEnabled()) LOG.debug("Binary Message InputStream"); - final MessageInputStream stream = new MessageInputStream(session); + MessageInputStream stream = new MessageInputStream(session); activeMessage = stream; - - // Always dispatch streaming read to another thread. dispatch(() -> { try @@ -329,11 +327,8 @@ public void onTextFrame(ByteBuffer buffer, boolean fin) throws IOException if (LOG.isDebugEnabled()) LOG.debug("Text Message Writer"); - MessageInputStream inputStream = new MessageInputStream(session); - final MessageReader reader = new MessageReader(inputStream); - activeMessage = inputStream; - - // Always dispatch streaming read to another thread. + MessageReader reader = new MessageReader(session); + activeMessage = reader; dispatch(() -> { try @@ -343,9 +338,10 @@ public void onTextFrame(ByteBuffer buffer, boolean fin) throws IOException catch (Throwable e) { session.close(e); + return; } - inputStream.close(); + reader.handlerComplete(); }); } } diff --git a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrEndpointEventDriver.java b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrEndpointEventDriver.java index 4dc9f01272bb..7e5af34c62e9 100644 --- a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrEndpointEventDriver.java +++ b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrEndpointEventDriver.java @@ -100,9 +100,10 @@ else if (wrapper.wantsStreams()) catch (Throwable t) { session.close(t); + return; } - inputStream.close(); + inputStream.handlerComplete(); }); } else @@ -197,8 +198,7 @@ else if (wrapper.wantsStreams()) { @SuppressWarnings("unchecked") MessageHandler.Whole handler = (Whole)wrapper.getHandler(); - MessageInputStream inputStream = new MessageInputStream(session); - MessageReader reader = new MessageReader(inputStream); + MessageReader reader = new MessageReader(session); activeMessage = reader; dispatch(() -> { @@ -209,9 +209,10 @@ else if (wrapper.wantsStreams()) catch (Throwable t) { session.close(t); + return; } - inputStream.close(); + reader.handlerComplete(); }); } else diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/JettyAnnotatedEventDriver.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/JettyAnnotatedEventDriver.java index c48d0d9de245..7adfafda7428 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/JettyAnnotatedEventDriver.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/JettyAnnotatedEventDriver.java @@ -32,7 +32,6 @@ import org.eclipse.jetty.websocket.api.annotations.WebSocket; import org.eclipse.jetty.websocket.api.extensions.Frame; import org.eclipse.jetty.websocket.common.CloseInfo; -import org.eclipse.jetty.websocket.common.message.MessageAppender; import org.eclipse.jetty.websocket.common.message.MessageInputStream; import org.eclipse.jetty.websocket.common.message.MessageReader; import org.eclipse.jetty.websocket.common.message.NullMessage; @@ -105,7 +104,7 @@ public void onBinaryFrame(ByteBuffer buffer, boolean fin) throws IOException } else if (events.onBinary.isStreaming()) { - final MessageInputStream inputStream = new MessageInputStream(session); + MessageInputStream inputStream = new MessageInputStream(session); activeMessage = inputStream; dispatch(() -> { @@ -115,11 +114,11 @@ else if (events.onBinary.isStreaming()) } catch (Throwable t) { - // dispatched calls need to be reported session.close(t); + return; } - inputStream.close(); + inputStream.handlerComplete(); }); } else @@ -262,22 +261,21 @@ public void onTextFrame(ByteBuffer buffer, boolean fin) throws IOException } else if (events.onText.isStreaming()) { - MessageInputStream inputStream = new MessageInputStream(session); - activeMessage = new MessageReader(inputStream); - final MessageAppender msg = activeMessage; + MessageReader reader = new MessageReader(session); + activeMessage = reader; dispatch(() -> { try { - events.onText.call(websocket, session, msg); + events.onText.call(websocket, session, reader); } catch (Throwable t) { - // dispatched calls need to be reported session.close(t); + return; } - inputStream.close(); + reader.handlerComplete(); }); } else diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java index 346076dd0ecf..369e7ed8625a 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java @@ -55,6 +55,7 @@ private enum State { RESUMED, SUSPENDED, + COMPLETE, CLOSED } @@ -76,23 +77,11 @@ public void appendFrame(ByteBuffer framePayload, boolean fin) throws IOException if (LOG.isDebugEnabled()) LOG.debug("Appending {} chunk: {}", fin ? "final" : "non-final", BufferUtil.toDetailString(framePayload)); - // Early non atomic test that we aren't closed to avoid an unnecessary copy (will be checked again later). - if (state == State.CLOSED) - return; - - // Put the payload into the queue, by copying it. - // Copying is necessary because the payload will - // be processed after this method returns. try { - if (framePayload == null || !framePayload.hasRemaining()) + if (BufferUtil.isEmpty(framePayload)) return; - ByteBuffer copy = acquire(framePayload.remaining(), framePayload.isDirect()); - BufferUtil.clearToFill(copy); - copy.put(framePayload); - BufferUtil.flipToFlush(copy, 0); - synchronized (this) { switch (state) @@ -105,11 +94,14 @@ public void appendFrame(ByteBuffer framePayload, boolean fin) throws IOException state = State.SUSPENDED; break; - case SUSPENDED: + default: throw new IllegalStateException(); } - buffers.put(copy); + // Put the payload into the queue, by copying it. + // Copying is necessary because the payload will + // be processed after this method returns. + buffers.put(copy(framePayload)); } } catch (InterruptedException e) @@ -121,7 +113,23 @@ public void appendFrame(ByteBuffer framePayload, boolean fin) throws IOException @Override public void close() { - SuspendToken resume = null; + synchronized (this) + { + if (state == State.CLOSED) + return; + + state = State.CLOSED; + buffers.clear(); + buffers.offer(EOF); + } + } + + @Override + public void messageComplete() + { + if (LOG.isDebugEnabled()) + LOG.debug("Message completed"); + synchronized (this) { switch (state) @@ -130,43 +138,33 @@ public void close() return; case SUSPENDED: - resume = suspendToken; - suspendToken = null; - state = State.CLOSED; - break; - case RESUMED: - state = State.CLOSED; + state = State.COMPLETE; break; + + default: + throw new IllegalStateException(); } - buffers.clear(); buffers.offer(EOF); } - - // May need to resume to discard until we reach next message. - if (resume != null) - resume.resume(); } - @Override - public void mark(int readlimit) - { - // Not supported. - } - - @Override - public boolean markSupported() + public void handlerComplete() { - return false; - } + // May need to resume to resume and read to the next message. + SuspendToken resume; + synchronized (this) + { + state = State.CLOSED; + resume = suspendToken; + suspendToken = null; + buffers.clear(); + buffers.offer(EOF); + } - @Override - public void messageComplete() - { - if (LOG.isDebugEnabled()) - LOG.debug("Message completed"); - buffers.offer(EOF); + if (resume != null) + resume.resume(); } @Override @@ -186,6 +184,7 @@ public int read() throws IOException { if (LOG.isDebugEnabled()) LOG.debug("Waiting {} ms to read", timeoutMs); + if (timeoutMs < 0) { // Wait forever until a buffer is available. @@ -212,7 +211,6 @@ public int read() throws IOException int result = activeBuffer.get() & 0xFF; if (!activeBuffer.hasRemaining()) { - SuspendToken resume = null; synchronized (this) { @@ -221,6 +219,11 @@ public int read() throws IOException case CLOSED: return -1; + case COMPLETE: + // If we are complete we have read the last frame but + // don't want to resume reading until onMessage() exits. + break; + case SUSPENDED: resume = suspendToken; suspendToken = null; @@ -254,6 +257,27 @@ public void reset() throws IOException throw new IOException("reset() not supported"); } + @Override + public void mark(int readlimit) + { + // Not supported. + } + + @Override + public boolean markSupported() + { + return false; + } + + private ByteBuffer copy(ByteBuffer buffer) + { + ByteBuffer copy = acquire(buffer.remaining(), buffer.isDirect()); + BufferUtil.clearToFill(copy); + copy.put(buffer); + BufferUtil.flipToFlush(copy, 0); + return copy; + } + private ByteBuffer acquire(int capacity, boolean direct) { ByteBuffer buffer; diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageReader.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageReader.java index fbbdfc5ec3a5..9d1422d46a81 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageReader.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageReader.java @@ -24,6 +24,8 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import org.eclipse.jetty.websocket.api.Session; + /** * Support class for reading a (single) WebSocket TEXT message via a Reader. *

@@ -33,6 +35,11 @@ public class MessageReader extends InputStreamReader implements MessageAppender { private final MessageInputStream stream; + public MessageReader(Session session) + { + this(new MessageInputStream(session)); + } + public MessageReader(MessageInputStream stream) { super(stream, StandardCharsets.UTF_8); @@ -50,4 +57,9 @@ public void messageComplete() { this.stream.messageComplete(); } + + public void handlerComplete() + { + this.stream.handlerComplete(); + } } From 6c94ef5848abb1778a32161e5c085a5b195baf01 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Tue, 6 Oct 2020 17:43:58 +1100 Subject: [PATCH 2/8] Issue #5368 - warn if MessageInputStream closed without fully consuming Signed-off-by: Lachlan Roberts --- .../common/message/MessageInputStream.java | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java index 369e7ed8625a..985fbe5808f7 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java @@ -87,6 +87,7 @@ public void appendFrame(ByteBuffer framePayload, boolean fin) throws IOException switch (state) { case CLOSED: + LOG.warn("Received content after InputStream closed"); return; case RESUMED: @@ -118,9 +119,11 @@ public void close() if (state == State.CLOSED) return; + if (!buffers.isEmpty() || (activeBuffer != null && activeBuffer.hasRemaining())) + LOG.warn("InputStream closed without fully consuming content"); + state = State.CLOSED; buffers.clear(); - buffers.offer(EOF); } } @@ -156,11 +159,17 @@ public void handlerComplete() SuspendToken resume; synchronized (this) { - state = State.CLOSED; + if (state != State.CLOSED) + { + if (!buffers.isEmpty() || (activeBuffer != null && activeBuffer.hasRemaining())) + LOG.warn("InputStream closed without fully consuming content"); + + state = State.CLOSED; + buffers.clear(); + } + resume = suspendToken; suspendToken = null; - buffers.clear(); - buffers.offer(EOF); } if (resume != null) From 7df0dfa7c2c8210514f83688d84a9cae2ecc3cf2 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Wed, 7 Oct 2020 15:41:51 +1100 Subject: [PATCH 3/8] warn if did not read until EOF once per MessageInputStream --- .../common/message/MessageInputStream.java | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java index 985fbe5808f7..95c1e91aa379 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java @@ -87,7 +87,6 @@ public void appendFrame(ByteBuffer framePayload, boolean fin) throws IOException switch (state) { case CLOSED: - LOG.warn("Received content after InputStream closed"); return; case RESUMED: @@ -119,11 +118,16 @@ public void close() if (state == State.CLOSED) return; - if (!buffers.isEmpty() || (activeBuffer != null && activeBuffer.hasRemaining())) - LOG.warn("InputStream closed without fully consuming content"); + boolean remainingContent = (state != State.COMPLETE) || + (!buffers.isEmpty() && buffers.peek() != EOF) || + (activeBuffer != null && activeBuffer.hasRemaining()); + + if (remainingContent) + LOG.warn("MessageInputStream closed without fully consuming content"); state = State.CLOSED; buffers.clear(); + buffers.add(EOF); } } @@ -155,19 +159,13 @@ public void messageComplete() public void handlerComplete() { + // Close the InputStream. + close(); + // May need to resume to resume and read to the next message. SuspendToken resume; synchronized (this) { - if (state != State.CLOSED) - { - if (!buffers.isEmpty() || (activeBuffer != null && activeBuffer.hasRemaining())) - LOG.warn("InputStream closed without fully consuming content"); - - state = State.CLOSED; - buffers.clear(); - } - resume = suspendToken; suspendToken = null; } From aa1299912d39a9752217e57560a3076d4999df2d Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Wed, 7 Oct 2020 16:32:06 +1100 Subject: [PATCH 4/8] Issue #5368 - when resuming always call onFillable from newThread. Signed-off-by: Lachlan Roberts --- .../jetty/websocket/common/io/AbstractWebSocketConnection.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java index 11583bd58a05..1e3c98b63532 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java @@ -521,7 +521,7 @@ public void resume() { ByteBuffer resume = readState.resume(); if (resume != null) - onFillable(resume); + getExecutor().execute(() -> onFillable(resume)); } @Override From 09947681fe37da9aa2c8d9b943c608720bcbbaa9 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Wed, 7 Oct 2020 19:48:13 +1100 Subject: [PATCH 5/8] allow MessageInputStream to read multiple bytes at a time Signed-off-by: Lachlan Roberts --- .../common/message/MessageInputStream.java | 25 +++++++++++++++++-- .../message/MessageInputStreamTest.java | 16 +++++++----- 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java index 95c1e91aa379..acc91c507ed6 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java @@ -176,6 +176,22 @@ public void handlerComplete() @Override public int read() throws IOException + { + byte[] bytes = new byte[1]; + while (true) + { + int read = read(bytes, 0, 1); + if (read < 0) + return -1; + if (read == 0) + continue; + + return bytes[0] & 0xFF; + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { try { @@ -215,7 +231,12 @@ public int read() throws IOException } } - int result = activeBuffer.get() & 0xFF; + ByteBuffer buffer = BufferUtil.toBuffer(b, off, len); + BufferUtil.clearToFill(buffer); + int written = BufferUtil.put(activeBuffer, buffer); + BufferUtil.flipToFlush(buffer, 0); + + // If we have no more content we may need to resume to get more data. if (!activeBuffer.hasRemaining()) { SuspendToken resume = null; @@ -247,7 +268,7 @@ public int read() throws IOException resume.resume(); } - return result; + return written; } catch (InterruptedException x) { diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/MessageInputStreamTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/MessageInputStreamTest.java index 4266ed515426..a619fba31785 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/MessageInputStreamTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/MessageInputStreamTest.java @@ -18,6 +18,7 @@ package org.eclipse.jetty.websocket.common.message; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -32,6 +33,7 @@ import org.eclipse.jetty.toolchain.test.jupiter.WorkDirExtension; import org.eclipse.jetty.util.BlockingArrayQueue; import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.IO; import org.eclipse.jetty.websocket.api.SuspendToken; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -111,9 +113,10 @@ public void testBlockOnRead() throws Exception startLatch.await(); // Read it from the stream. - byte[] buf = new byte[32]; - int len = stream.read(buf); - String message = new String(buf, 0, len, StandardCharsets.UTF_8); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + IO.copy(stream, out); + byte[] bytes = out.toByteArray(); + String message = new String(bytes, 0, bytes.length, StandardCharsets.UTF_8); // Test it assertThat("Error when appending", hadError.get(), is(false)); @@ -206,9 +209,10 @@ public void testSplitMessageWithEmptyPayloads() throws IOException session.provideContent(); // Read entire message it from the stream. - byte[] buf = new byte[32]; - int len = stream.read(buf); - String message = new String(buf, 0, len, StandardCharsets.UTF_8); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + IO.copy(stream, out); + byte[] bytes = out.toByteArray(); + String message = new String(bytes, 0, bytes.length, StandardCharsets.UTF_8); // Test it assertThat("Message", message, is("Hello World!")); From 419eefc2efca3c20c8bc69e0908b9126159c56c6 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Wed, 14 Oct 2020 10:26:08 +1100 Subject: [PATCH 6/8] move logic from messageComplete into appendFrame with fin==true Signed-off-by: Lachlan Roberts --- .../common/message/MessageInputStream.java | 76 +++++++++---------- 1 file changed, 34 insertions(+), 42 deletions(-) diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java index acc91c507ed6..97ea9200e4d8 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java @@ -77,31 +77,42 @@ public void appendFrame(ByteBuffer framePayload, boolean fin) throws IOException if (LOG.isDebugEnabled()) LOG.debug("Appending {} chunk: {}", fin ? "final" : "non-final", BufferUtil.toDetailString(framePayload)); + // Avoid entering synchronized block if there is nothing to do. + boolean bufferIsEmpty = BufferUtil.isEmpty(framePayload); + if (bufferIsEmpty && !fin) + return; + try { - if (BufferUtil.isEmpty(framePayload)) - return; - synchronized (this) { - switch (state) + if (!bufferIsEmpty) { - case CLOSED: - return; + switch (state) + { + case CLOSED: + return; + + case RESUMED: + suspendToken = session.suspend(); + state = State.SUSPENDED; + break; - case RESUMED: - suspendToken = session.suspend(); - state = State.SUSPENDED; - break; + default: + throw new IllegalStateException(); + } - default: - throw new IllegalStateException(); + // Put the payload into the queue, by copying it. + // Copying is necessary because the payload will + // be processed after this method returns. + buffers.put(copy(framePayload)); } - // Put the payload into the queue, by copying it. - // Copying is necessary because the payload will - // be processed after this method returns. - buffers.put(copy(framePayload)); + if (fin) + { + buffers.add(EOF); + state = State.COMPLETE; + } } } catch (InterruptedException e) @@ -131,32 +142,6 @@ public void close() } } - @Override - public void messageComplete() - { - if (LOG.isDebugEnabled()) - LOG.debug("Message completed"); - - synchronized (this) - { - switch (state) - { - case CLOSED: - return; - - case SUSPENDED: - case RESUMED: - state = State.COMPLETE; - break; - - default: - throw new IllegalStateException(); - } - - buffers.offer(EOF); - } - } - public void handlerComplete() { // Close the InputStream. @@ -202,6 +187,7 @@ public int read(byte[] b, int off, int len) throws IOException return -1; } + // todo: what if we get a buffer with no content and we never resume // grab a fresh buffer while (activeBuffer == null || !activeBuffer.hasRemaining()) { @@ -279,6 +265,12 @@ public int read(byte[] b, int off, int len) throws IOException } } + @Override + public void messageComplete() + { + // We handle this case in appendFrame with fin==true. + } + @Override public void reset() throws IOException { From 680020dcb2cc22dc3252690f263ecd890d78aa5c Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Thu, 15 Oct 2020 12:08:57 +1100 Subject: [PATCH 7/8] Issue #5368 - changes from review Signed-off-by: Lachlan Roberts --- .../common/message/MessageInputStream.java | 37 ++++++++++++------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java index 97ea9200e4d8..96c9f99deeb9 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java @@ -53,9 +53,24 @@ public class MessageInputStream extends InputStream implements MessageAppender private enum State { + /** + * Open and waiting for a frame to be delivered in {@link #appendFrame(ByteBuffer, boolean)}. + */ RESUMED, + + /** + * We have suspended the session after reading a websocket frame but have not reached the end of the message. + */ SUSPENDED, + + /** + * We have received a frame with fin==true and have suspended until we are signaled that onMessage method exited. + */ COMPLETE, + + /** + * We have read to EOF or someone has called InputStream.close(), any further reads will result in reading -1. + */ CLOSED } @@ -99,13 +114,17 @@ public void appendFrame(ByteBuffer framePayload, boolean fin) throws IOException break; default: - throw new IllegalStateException(); + throw new IllegalStateException("Incorrect State: " + state.name()); } // Put the payload into the queue, by copying it. // Copying is necessary because the payload will // be processed after this method returns. - buffers.put(copy(framePayload)); + ByteBuffer copy = acquire(framePayload.remaining(), framePayload.isDirect()); + BufferUtil.clearToFill(copy); + copy.put(framePayload); + BufferUtil.flipToFlush(copy, 0); + buffers.put(copy); } if (fin) @@ -134,7 +153,7 @@ public void close() (activeBuffer != null && activeBuffer.hasRemaining()); if (remainingContent) - LOG.warn("MessageInputStream closed without fully consuming content"); + LOG.warn("MessageInputStream closed without fully consuming content {}", session); state = State.CLOSED; buffers.clear(); @@ -187,7 +206,6 @@ public int read(byte[] b, int off, int len) throws IOException return -1; } - // todo: what if we get a buffer with no content and we never resume // grab a fresh buffer while (activeBuffer == null || !activeBuffer.hasRemaining()) { @@ -245,7 +263,7 @@ public int read(byte[] b, int off, int len) throws IOException break; case RESUMED: - throw new IllegalStateException(); + throw new IllegalStateException("Incorrect State: " + state.name()); } } @@ -289,15 +307,6 @@ public boolean markSupported() return false; } - private ByteBuffer copy(ByteBuffer buffer) - { - ByteBuffer copy = acquire(buffer.remaining(), buffer.isDirect()); - BufferUtil.clearToFill(copy); - copy.put(buffer); - BufferUtil.flipToFlush(copy, 0); - return copy; - } - private ByteBuffer acquire(int capacity, boolean direct) { ByteBuffer buffer; From be041d3044ee1a2d009c27c8e43cf32d9173f3ac Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Thu, 15 Oct 2020 12:10:33 +1100 Subject: [PATCH 8/8] Issue #5368 - add tests for not reading to end of InputStream Signed-off-by: Lachlan Roberts --- .../endpoints/JsrAnnotatedEventDriver.java | 2 +- .../jsr356/server/BinaryStreamTest.java | 93 +++++++++++++++++++ .../message/MessageInputStreamTest.java | 3 +- 3 files changed, 96 insertions(+), 2 deletions(-) diff --git a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrAnnotatedEventDriver.java b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrAnnotatedEventDriver.java index cf8ba70b7e32..32cc32868018 100644 --- a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrAnnotatedEventDriver.java +++ b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrAnnotatedEventDriver.java @@ -131,7 +131,7 @@ public void onBinaryFrame(ByteBuffer buffer, boolean fin) throws IOException session.close(e); } - stream.close(); + stream.handlerComplete(); }); } } diff --git a/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/BinaryStreamTest.java b/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/BinaryStreamTest.java index e5fe7a366824..ae9a31b280af 100644 --- a/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/BinaryStreamTest.java +++ b/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/BinaryStreamTest.java @@ -27,7 +27,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import javax.websocket.ClientEndpoint; +import javax.websocket.CloseReason; import javax.websocket.ContainerProvider; +import javax.websocket.OnClose; import javax.websocket.OnMessage; import javax.websocket.Session; import javax.websocket.WebSocketContainer; @@ -37,11 +39,15 @@ import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.websocket.jsr356.server.deploy.WebSocketServerContainerInitializer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -127,6 +133,62 @@ public void testMoreThanLargestMessageOneByteAtATime() throws Exception assertArrayEquals(data, client.getEcho()); } + @Test + public void testNotReadingToEndOfStream() throws Exception + { + int size = 32; + byte[] data = randomBytes(size); + URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + PATH); + + CountDownLatch handlerComplete = new CountDownLatch(1); + BasicClientBinaryStreamer client = new BasicClientBinaryStreamer((session, inputStream) -> + { + byte[] recv = new byte[16]; + int read = inputStream.read(recv); + assertThat(read, not(is(0))); + handlerComplete.countDown(); + }); + + Session session = wsClient.connectToServer(client, uri); + session.getBasicRemote().sendBinary(BufferUtil.toBuffer(data)); + assertTrue(handlerComplete.await(5, TimeUnit.SECONDS)); + + session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "close from test")); + assertTrue(client.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(client.closeReason.getCloseCode(), is(CloseReason.CloseCodes.NORMAL_CLOSURE)); + assertThat(client.closeReason.getReasonPhrase(), is("close from test")); + } + + @Test + public void testClosingBeforeReadingToEndOfStream() throws Exception + { + int size = 32; + byte[] data = randomBytes(size); + URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + PATH); + + CountDownLatch handlerComplete = new CountDownLatch(1); + BasicClientBinaryStreamer client = new BasicClientBinaryStreamer((session, inputStream) -> + { + byte[] recv = new byte[16]; + int read = inputStream.read(recv); + assertThat(read, not(is(0))); + + inputStream.close(); + read = inputStream.read(recv); + assertThat(read, is(-1)); + handlerComplete.countDown(); + }); + + Session session = wsClient.connectToServer(client, uri); + session.getBasicRemote().sendBinary(BufferUtil.toBuffer(data)); + assertTrue(handlerComplete.await(5, TimeUnit.SECONDS)); + + session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "close from test")); + assertTrue(client.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(client.closeReason.getCloseCode(), is(CloseReason.CloseCodes.NORMAL_CLOSURE)); + assertThat(client.closeReason.getReasonPhrase(), is("close from test")); + } + private byte[] randomBytes(int size) { byte[] data = new byte[size]; @@ -134,6 +196,37 @@ private byte[] randomBytes(int size) return data; } + @ClientEndpoint + public static class BasicClientBinaryStreamer + { + public interface MessageHandler + { + void accept(Session session, InputStream inputStream) throws Exception; + } + + private final MessageHandler handler; + private final CountDownLatch closeLatch = new CountDownLatch(1); + private CloseReason closeReason; + + public BasicClientBinaryStreamer(MessageHandler consumer) + { + this.handler = consumer; + } + + @OnMessage + public void echoed(Session session, InputStream input) throws Exception + { + handler.accept(session, input); + } + + @OnClose + public void onClosed(CloseReason closeReason) + { + this.closeReason = closeReason; + closeLatch.countDown(); + } + } + @ClientEndpoint public static class ClientBinaryStreamer { diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/MessageInputStreamTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/MessageInputStreamTest.java index a619fba31785..96983a391a77 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/MessageInputStreamTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/MessageInputStreamTest.java @@ -172,9 +172,10 @@ public void testReadByteNoBuffersClosed() throws IOException { // wait for a little bit before sending input closed TimeUnit.MILLISECONDS.sleep(1000); + stream.appendFrame(null, true); stream.messageComplete(); } - catch (InterruptedException e) + catch (InterruptedException | IOException e) { hadError.set(true); e.printStackTrace(System.err);