Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #5368 - ensure onMessage exits before next frame is read #5377

Merged
merged 8 commits into from Oct 16, 2020
Expand Up @@ -118,10 +118,8 @@ public void onBinaryFrame(ByteBuffer buffer, boolean fin) throws IOException
if (LOG.isDebugEnabled())
LOG.debug("Binary Message InputStream");

final MessageInputStream stream = new MessageInputStream(session);
MessageInputStream stream = new MessageInputStream(session);
activeMessage = stream;

// Always dispatch streaming read to another thread.
dispatch(() ->
{
try
Expand Down Expand Up @@ -329,11 +327,8 @@ public void onTextFrame(ByteBuffer buffer, boolean fin) throws IOException
if (LOG.isDebugEnabled())
LOG.debug("Text Message Writer");

MessageInputStream inputStream = new MessageInputStream(session);
final MessageReader reader = new MessageReader(inputStream);
activeMessage = inputStream;

// Always dispatch streaming read to another thread.
MessageReader reader = new MessageReader(session);
activeMessage = reader;
dispatch(() ->
{
try
Expand All @@ -343,9 +338,10 @@ public void onTextFrame(ByteBuffer buffer, boolean fin) throws IOException
catch (Throwable e)
{
session.close(e);
return;
joakime marked this conversation as resolved.
Show resolved Hide resolved
}

inputStream.close();
reader.handlerComplete();
});
}
}
Expand Down
Expand Up @@ -100,9 +100,10 @@ else if (wrapper.wantsStreams())
catch (Throwable t)
{
session.close(t);
return;
}

inputStream.close();
inputStream.handlerComplete();
});
}
else
Expand Down Expand Up @@ -197,8 +198,7 @@ else if (wrapper.wantsStreams())
{
@SuppressWarnings("unchecked")
MessageHandler.Whole<Reader> handler = (Whole<Reader>)wrapper.getHandler();
MessageInputStream inputStream = new MessageInputStream(session);
MessageReader reader = new MessageReader(inputStream);
MessageReader reader = new MessageReader(session);
activeMessage = reader;
dispatch(() ->
{
Expand All @@ -209,9 +209,10 @@ else if (wrapper.wantsStreams())
catch (Throwable t)
{
session.close(t);
return;
joakime marked this conversation as resolved.
Show resolved Hide resolved
}

inputStream.close();
reader.handlerComplete();
});
}
else
Expand Down
Expand Up @@ -32,7 +32,6 @@
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.message.MessageAppender;
import org.eclipse.jetty.websocket.common.message.MessageInputStream;
import org.eclipse.jetty.websocket.common.message.MessageReader;
import org.eclipse.jetty.websocket.common.message.NullMessage;
Expand Down Expand Up @@ -105,7 +104,7 @@ public void onBinaryFrame(ByteBuffer buffer, boolean fin) throws IOException
}
else if (events.onBinary.isStreaming())
{
final MessageInputStream inputStream = new MessageInputStream(session);
MessageInputStream inputStream = new MessageInputStream(session);
activeMessage = inputStream;
dispatch(() ->
{
Expand All @@ -115,11 +114,11 @@ else if (events.onBinary.isStreaming())
}
catch (Throwable t)
{
// dispatched calls need to be reported
session.close(t);
return;
joakime marked this conversation as resolved.
Show resolved Hide resolved
}

inputStream.close();
inputStream.handlerComplete();
});
}
else
Expand Down Expand Up @@ -262,22 +261,21 @@ public void onTextFrame(ByteBuffer buffer, boolean fin) throws IOException
}
else if (events.onText.isStreaming())
{
MessageInputStream inputStream = new MessageInputStream(session);
activeMessage = new MessageReader(inputStream);
final MessageAppender msg = activeMessage;
MessageReader reader = new MessageReader(session);
activeMessage = reader;
dispatch(() ->
{
try
{
events.onText.call(websocket, session, msg);
events.onText.call(websocket, session, reader);
}
catch (Throwable t)
{
// dispatched calls need to be reported
session.close(t);
return;
joakime marked this conversation as resolved.
Show resolved Hide resolved
}

inputStream.close();
reader.handlerComplete();
});
}
else
Expand Down
Expand Up @@ -521,7 +521,7 @@ public void resume()
{
ByteBuffer resume = readState.resume();
if (resume != null)
onFillable(resume);
getExecutor().execute(() -> onFillable(resume));
joakime marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down
Expand Up @@ -55,6 +55,7 @@ private enum State
{
RESUMED,
SUSPENDED,
COMPLETE,
joakime marked this conversation as resolved.
Show resolved Hide resolved
CLOSED
}

Expand All @@ -76,23 +77,11 @@ public void appendFrame(ByteBuffer framePayload, boolean fin) throws IOException
if (LOG.isDebugEnabled())
LOG.debug("Appending {} chunk: {}", fin ? "final" : "non-final", BufferUtil.toDetailString(framePayload));

// Early non atomic test that we aren't closed to avoid an unnecessary copy (will be checked again later).
if (state == State.CLOSED)
return;

// Put the payload into the queue, by copying it.
// Copying is necessary because the payload will
// be processed after this method returns.
try
{
if (framePayload == null || !framePayload.hasRemaining())
if (BufferUtil.isEmpty(framePayload))
return;

ByteBuffer copy = acquire(framePayload.remaining(), framePayload.isDirect());
BufferUtil.clearToFill(copy);
copy.put(framePayload);
BufferUtil.flipToFlush(copy, 0);

synchronized (this)
{
switch (state)
Expand All @@ -105,11 +94,14 @@ public void appendFrame(ByteBuffer framePayload, boolean fin) throws IOException
state = State.SUSPENDED;
break;

case SUSPENDED:
default:
throw new IllegalStateException();
lachlan-roberts marked this conversation as resolved.
Show resolved Hide resolved
}

buffers.put(copy);
// Put the payload into the queue, by copying it.
// Copying is necessary because the payload will
// be processed after this method returns.
buffers.put(copy(framePayload));
joakime marked this conversation as resolved.
Show resolved Hide resolved
}
}
catch (InterruptedException e)
Expand All @@ -121,7 +113,30 @@ public void appendFrame(ByteBuffer framePayload, boolean fin) throws IOException
@Override
public void close()
{
SuspendToken resume = null;
synchronized (this)
{
if (state == State.CLOSED)
return;

boolean remainingContent = (state != State.COMPLETE) ||
(!buffers.isEmpty() && buffers.peek() != EOF) ||
(activeBuffer != null && activeBuffer.hasRemaining());

if (remainingContent)
LOG.warn("MessageInputStream closed without fully consuming content");

state = State.CLOSED;
buffers.clear();
buffers.add(EOF);
}
}

@Override
public void messageComplete()
joakime marked this conversation as resolved.
Show resolved Hide resolved
{
if (LOG.isDebugEnabled())
LOG.debug("Message completed");

synchronized (this)
{
switch (state)
Expand All @@ -130,47 +145,53 @@ public void close()
return;

case SUSPENDED:
resume = suspendToken;
suspendToken = null;
state = State.CLOSED;
break;

case RESUMED:
state = State.CLOSED;
state = State.COMPLETE;
break;

default:
throw new IllegalStateException();
}

buffers.clear();
buffers.offer(EOF);
}

// May need to resume to discard until we reach next message.
if (resume != null)
resume.resume();
}

@Override
public void mark(int readlimit)
public void handlerComplete()
{
// Not supported.
}
// Close the InputStream.
close();

@Override
public boolean markSupported()
{
return false;
// May need to resume to resume and read to the next message.
SuspendToken resume;
synchronized (this)
{
resume = suspendToken;
suspendToken = null;
}

if (resume != null)
resume.resume();
}

@Override
public void messageComplete()
public int read() throws IOException
{
if (LOG.isDebugEnabled())
LOG.debug("Message completed");
buffers.offer(EOF);
byte[] bytes = new byte[1];
while (true)
{
int read = read(bytes, 0, 1);
if (read < 0)
return -1;
if (read == 0)
continue;

return bytes[0] & 0xFF;
joakime marked this conversation as resolved.
Show resolved Hide resolved
}
}

@Override
public int read() throws IOException
public int read(byte[] b, int off, int len) throws IOException
{
try
{
Expand All @@ -186,6 +207,7 @@ public int read() throws IOException
{
if (LOG.isDebugEnabled())
LOG.debug("Waiting {} ms to read", timeoutMs);

if (timeoutMs < 0)
{
// Wait forever until a buffer is available.
Expand All @@ -209,10 +231,14 @@ public int read() throws IOException
}
}

int result = activeBuffer.get() & 0xFF;
ByteBuffer buffer = BufferUtil.toBuffer(b, off, len);
joakime marked this conversation as resolved.
Show resolved Hide resolved
BufferUtil.clearToFill(buffer);
int written = BufferUtil.put(activeBuffer, buffer);
BufferUtil.flipToFlush(buffer, 0);

// If we have no more content we may need to resume to get more data.
if (!activeBuffer.hasRemaining())
{

SuspendToken resume = null;
synchronized (this)
{
Expand All @@ -221,6 +247,11 @@ public int read() throws IOException
case CLOSED:
return -1;

case COMPLETE:
// If we are complete we have read the last frame but
// don't want to resume reading until onMessage() exits.
joakime marked this conversation as resolved.
Show resolved Hide resolved
break;

case SUSPENDED:
resume = suspendToken;
suspendToken = null;
Expand All @@ -237,7 +268,7 @@ public int read() throws IOException
resume.resume();
}

return result;
return written;
}
catch (InterruptedException x)
{
Expand All @@ -254,6 +285,27 @@ public void reset() throws IOException
throw new IOException("reset() not supported");
}

@Override
public void mark(int readlimit)
{
// Not supported.
}

@Override
public boolean markSupported()
{
return false;
}

private ByteBuffer copy(ByteBuffer buffer)
{
ByteBuffer copy = acquire(buffer.remaining(), buffer.isDirect());
BufferUtil.clearToFill(copy);
copy.put(buffer);
BufferUtil.flipToFlush(copy, 0);
return copy;
}

private ByteBuffer acquire(int capacity, boolean direct)
{
ByteBuffer buffer;
Expand Down