Skip to content

Commit

Permalink
move logic from messageComplete into appendFrame with fin==true
Browse files Browse the repository at this point in the history
Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
  • Loading branch information
lachlan-roberts committed Oct 13, 2020
1 parent 0994768 commit 419eefc
Showing 1 changed file with 34 additions and 42 deletions.
Expand Up @@ -77,31 +77,42 @@ public void appendFrame(ByteBuffer framePayload, boolean fin) throws IOException
if (LOG.isDebugEnabled())
LOG.debug("Appending {} chunk: {}", fin ? "final" : "non-final", BufferUtil.toDetailString(framePayload));

// Avoid entering synchronized block if there is nothing to do.
boolean bufferIsEmpty = BufferUtil.isEmpty(framePayload);
if (bufferIsEmpty && !fin)
return;

try
{
if (BufferUtil.isEmpty(framePayload))
return;

synchronized (this)
{
switch (state)
if (!bufferIsEmpty)
{
case CLOSED:
return;
switch (state)
{
case CLOSED:
return;

case RESUMED:
suspendToken = session.suspend();
state = State.SUSPENDED;
break;

case RESUMED:
suspendToken = session.suspend();
state = State.SUSPENDED;
break;
default:
throw new IllegalStateException();
}

default:
throw new IllegalStateException();
// Put the payload into the queue, by copying it.
// Copying is necessary because the payload will
// be processed after this method returns.
buffers.put(copy(framePayload));
}

// Put the payload into the queue, by copying it.
// Copying is necessary because the payload will
// be processed after this method returns.
buffers.put(copy(framePayload));
if (fin)
{
buffers.add(EOF);
state = State.COMPLETE;
}
}
}
catch (InterruptedException e)
Expand Down Expand Up @@ -131,32 +142,6 @@ public void close()
}
}

@Override
public void messageComplete()
{
if (LOG.isDebugEnabled())
LOG.debug("Message completed");

synchronized (this)
{
switch (state)
{
case CLOSED:
return;

case SUSPENDED:
case RESUMED:
state = State.COMPLETE;
break;

default:
throw new IllegalStateException();
}

buffers.offer(EOF);
}
}

public void handlerComplete()
{
// Close the InputStream.
Expand Down Expand Up @@ -202,6 +187,7 @@ public int read(byte[] b, int off, int len) throws IOException
return -1;
}

// todo: what if we get a buffer with no content and we never resume
// grab a fresh buffer
while (activeBuffer == null || !activeBuffer.hasRemaining())
{
Expand Down Expand Up @@ -279,6 +265,12 @@ public int read(byte[] b, int off, int len) throws IOException
}
}

@Override
public void messageComplete()
{
// We handle this case in appendFrame with fin==true.
}

@Override
public void reset() throws IOException
{
Expand Down

0 comments on commit 419eefc

Please sign in to comment.