Skip to content

Commit

Permalink
Issue #5368 - changes from review
Browse files Browse the repository at this point in the history
Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
  • Loading branch information
lachlan-roberts committed Oct 15, 2020
1 parent 419eefc commit 680020d
Showing 1 changed file with 23 additions and 14 deletions.
Expand Up @@ -53,9 +53,24 @@ public class MessageInputStream extends InputStream implements MessageAppender

private enum State
{
/**
* Open and waiting for a frame to be delivered in {@link #appendFrame(ByteBuffer, boolean)}.
*/
RESUMED,

/**
* We have suspended the session after reading a websocket frame but have not reached the end of the message.
*/
SUSPENDED,

/**
* We have received a frame with fin==true and have suspended until we are signaled that onMessage method exited.
*/
COMPLETE,

/**
* We have read to EOF or someone has called InputStream.close(), any further reads will result in reading -1.
*/
CLOSED
}

Expand Down Expand Up @@ -99,13 +114,17 @@ public void appendFrame(ByteBuffer framePayload, boolean fin) throws IOException
break;

default:
throw new IllegalStateException();
throw new IllegalStateException("Incorrect State: " + state.name());
}

// Put the payload into the queue, by copying it.
// Copying is necessary because the payload will
// be processed after this method returns.
buffers.put(copy(framePayload));
ByteBuffer copy = acquire(framePayload.remaining(), framePayload.isDirect());
BufferUtil.clearToFill(copy);
copy.put(framePayload);
BufferUtil.flipToFlush(copy, 0);
buffers.put(copy);
}

if (fin)
Expand Down Expand Up @@ -134,7 +153,7 @@ public void close()
(activeBuffer != null && activeBuffer.hasRemaining());

if (remainingContent)
LOG.warn("MessageInputStream closed without fully consuming content");
LOG.warn("MessageInputStream closed without fully consuming content {}", session);

state = State.CLOSED;
buffers.clear();
Expand Down Expand Up @@ -187,7 +206,6 @@ public int read(byte[] b, int off, int len) throws IOException
return -1;
}

// todo: what if we get a buffer with no content and we never resume
// grab a fresh buffer
while (activeBuffer == null || !activeBuffer.hasRemaining())
{
Expand Down Expand Up @@ -245,7 +263,7 @@ public int read(byte[] b, int off, int len) throws IOException
break;

case RESUMED:
throw new IllegalStateException();
throw new IllegalStateException("Incorrect State: " + state.name());
}
}

Expand Down Expand Up @@ -289,15 +307,6 @@ public boolean markSupported()
return false;
}

private ByteBuffer copy(ByteBuffer buffer)
{
ByteBuffer copy = acquire(buffer.remaining(), buffer.isDirect());
BufferUtil.clearToFill(copy);
copy.put(buffer);
BufferUtil.flipToFlush(copy, 0);
return copy;
}

private ByteBuffer acquire(int capacity, boolean direct)
{
ByteBuffer buffer;
Expand Down

0 comments on commit 680020d

Please sign in to comment.