diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java index 97ea9200e4d8..96c9f99deeb9 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java @@ -53,9 +53,24 @@ public class MessageInputStream extends InputStream implements MessageAppender private enum State { + /** + * Open and waiting for a frame to be delivered in {@link #appendFrame(ByteBuffer, boolean)}. + */ RESUMED, + + /** + * We have suspended the session after reading a websocket frame but have not reached the end of the message. + */ SUSPENDED, + + /** + * We have received a frame with fin==true and have suspended until we are signaled that onMessage method exited. + */ COMPLETE, + + /** + * We have read to EOF or someone has called InputStream.close(), any further reads will result in reading -1. + */ CLOSED } @@ -99,13 +114,17 @@ public void appendFrame(ByteBuffer framePayload, boolean fin) throws IOException break; default: - throw new IllegalStateException(); + throw new IllegalStateException("Incorrect State: " + state.name()); } // Put the payload into the queue, by copying it. // Copying is necessary because the payload will // be processed after this method returns. - buffers.put(copy(framePayload)); + ByteBuffer copy = acquire(framePayload.remaining(), framePayload.isDirect()); + BufferUtil.clearToFill(copy); + copy.put(framePayload); + BufferUtil.flipToFlush(copy, 0); + buffers.put(copy); } if (fin) @@ -134,7 +153,7 @@ public void close() (activeBuffer != null && activeBuffer.hasRemaining()); if (remainingContent) - LOG.warn("MessageInputStream closed without fully consuming content"); + LOG.warn("MessageInputStream closed without fully consuming content {}", session); state = State.CLOSED; buffers.clear(); @@ -187,7 +206,6 @@ public int read(byte[] b, int off, int len) throws IOException return -1; } - // todo: what if we get a buffer with no content and we never resume // grab a fresh buffer while (activeBuffer == null || !activeBuffer.hasRemaining()) { @@ -245,7 +263,7 @@ public int read(byte[] b, int off, int len) throws IOException break; case RESUMED: - throw new IllegalStateException(); + throw new IllegalStateException("Incorrect State: " + state.name()); } } @@ -289,15 +307,6 @@ public boolean markSupported() return false; } - private ByteBuffer copy(ByteBuffer buffer) - { - ByteBuffer copy = acquire(buffer.remaining(), buffer.isDirect()); - BufferUtil.clearToFill(copy); - copy.put(buffer); - BufferUtil.flipToFlush(copy, 0); - return copy; - } - private ByteBuffer acquire(int capacity, boolean direct) { ByteBuffer buffer;