Skip to content

Commit

Permalink
Fix issue jetty#5499
Browse files Browse the repository at this point in the history
this PR let the ByteAccumulator recyclable. after invoke ByteAccumulator.transferTo method
we can invoke ByteAccumulator.recycle method to reuse byte[] via ByteAccumulator.newByteArray method

Signed-off-by: Baoyi Chen <chen.bao.yi@qq.com>
  • Loading branch information
leonchen83 committed Oct 28, 2020
1 parent 47885f7 commit ae95f6d
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 15 deletions.
Expand Up @@ -27,9 +27,25 @@

public class ByteAccumulator
{
private final List<byte[]> chunks = new ArrayList<>();
private class ByteArray
{
private byte[] buf;
private int offset;
private int length;

private ByteArray(byte[] buf, int offset, int length)
{
this.buf = buf;
this.offset = offset;
this.length = length;
}
}

private List<ByteArray> prevChunks = null;
private List<ByteArray> nextChunks = new ArrayList<>();
private final int maxSize;
private int length = 0;
private int index;

public ByteAccumulator(int maxOverallBufferSize)
{
Expand All @@ -43,11 +59,7 @@ public void copyChunk(byte[] buf, int offset, int length)
String err = String.format("Resulting message size [%,d] is too large for configured max of [%,d]", this.length + length, maxSize);
throw new MessageTooLargeException(err);
}

byte[] copy = new byte[length - offset];
System.arraycopy(buf, offset, copy, 0, length);

chunks.add(copy);
nextChunks.add(new ByteArray(buf, offset, length));
this.length += length;
}

Expand All @@ -56,6 +68,21 @@ public int getLength()
return length;
}

byte[] newByteArray(int size)
{
byte[] bytes;
if (prevChunks != null && prevChunks.size() > index)
{
bytes = prevChunks.get(index).buf;
}
else
{
bytes = new byte[size];
}
index++;
return bytes;
}

public void transferTo(ByteBuffer buffer)
{
if (buffer.remaining() < length)
Expand All @@ -65,10 +92,18 @@ public void transferTo(ByteBuffer buffer)
}

int position = buffer.position();
for (byte[] chunk : chunks)
for (ByteArray chunk : nextChunks)
{
buffer.put(chunk, 0, chunk.length);
buffer.put(chunk.buf, chunk.offset, chunk.length);
}
BufferUtil.flipToFlush(buffer, position);
}

void recycle()
{
index = 0;
length = 0;
prevChunks = nextChunks;
nextChunks = new ArrayList<>();
}
}
Expand Up @@ -46,6 +46,11 @@ public abstract class CompressExtension extends AbstractExtension
protected static final byte[] TAIL_BYTES = new byte[]{0x00, 0x00, (byte)0xFF, (byte)0xFF};
protected static final ByteBuffer TAIL_BYTES_BUF = ByteBuffer.wrap(TAIL_BYTES);
private static final Logger LOG = Log.getLogger(CompressExtension.class);

/**
* thread local accumulator
*/
protected ThreadLocal<ByteAccumulator> accumulator = ThreadLocal.withInitial(() -> newByteAccumulator());

/**
* Never drop tail bytes 0000FFFF, from any frame type
Expand Down Expand Up @@ -185,10 +190,10 @@ protected void decompress(ByteAccumulator accumulator, ByteBuffer buf) throws Da
{
return;
}
byte[] output = new byte[DECOMPRESS_BUF_SIZE];


Inflater inflater = getInflater();

while (buf.hasRemaining() && inflater.needsInput())
{
if (!supplyInput(inflater, buf))
Expand All @@ -199,9 +204,12 @@ protected void decompress(ByteAccumulator accumulator, ByteBuffer buf) throws Da
}

int read;
while ((read = inflater.inflate(output)) >= 0)

while (true)
{
if (read == 0)
byte[] output = accumulator.newByteArray(DECOMPRESS_BUF_SIZE);
read = inflater.inflate(output);
if (read <= 0)
{
if (LOG.isDebugEnabled())
LOG.debug("Decompress: read 0 {}", toDetail(inflater));
Expand Down
Expand Up @@ -47,7 +47,7 @@ int getTailDropMode()
{
return TAIL_DROP_ALWAYS;
}

@Override
public void incomingFrame(Frame frame)
{
Expand All @@ -63,10 +63,11 @@ public void incomingFrame(Frame frame)

try
{
ByteAccumulator accumulator = newByteAccumulator();
ByteAccumulator accumulator = this.accumulator.get();
decompress(accumulator, frame.getPayload());
decompress(accumulator, TAIL_BYTES_BUF.slice());
forwardIncoming(frame, accumulator);
accumulator.recycle();
}
catch (DataFormatException e)
{
Expand Down
Expand Up @@ -78,7 +78,7 @@ public void incomingFrame(Frame frame)
throw new ProtocolException("Invalid RSV1 set on permessage-deflate CONTINUATION frame");
}

ByteAccumulator accumulator = newByteAccumulator();
ByteAccumulator accumulator = this.accumulator.get();

try
{
Expand All @@ -90,6 +90,7 @@ public void incomingFrame(Frame frame)
}

forwardIncoming(frame, accumulator);
accumulator.recycle();
}
catch (DataFormatException e)
{
Expand Down
Expand Up @@ -91,4 +91,77 @@ public void testCopyChunkNotEnoughSpace()
MessageTooLargeException e = assertThrows(MessageTooLargeException.class, () -> accumulator.copyChunk(world, 0, world.length));
assertThat(e.getMessage(), containsString("too large for configured max"));
}

@Test
public void testRecycle()
{
ByteAccumulator accumulator = new ByteAccumulator(10_000);
ByteBuffer out0 = ByteBuffer.allocate(200);
ByteBuffer out1 = ByteBuffer.allocate(200);
{
// 1
byte[] bytes = accumulator.newByteArray(10);
byte[] hello = "Hello".getBytes(UTF_8);
System.arraycopy(hello, 0, bytes, 0, hello.length);
accumulator.copyChunk(bytes, 0, hello.length);

// 2
bytes = accumulator.newByteArray(10);
byte[] space = " ".getBytes(UTF_8);
System.arraycopy(space, 0, bytes, 0, space.length);
accumulator.copyChunk(bytes, 0, space.length);

// 3
bytes = accumulator.newByteArray(10);
byte[] world = "World".getBytes(UTF_8);
System.arraycopy(world, 0, bytes, 0, world.length);
accumulator.copyChunk(bytes, 0, world.length);

assertThat("Length", accumulator.getLength(), is(hello.length + space.length + world.length));

accumulator.transferTo(out0);

// reuse that byte[]
accumulator.recycle();
}

{
// 1
byte[] bytes = accumulator.newByteArray(10);
byte[] olleh = "olleH".getBytes(UTF_8);
System.arraycopy(olleh, 0, bytes, 0, olleh.length);
accumulator.copyChunk(bytes, 0, olleh.length);

// 2
bytes = accumulator.newByteArray(10);
byte[] space = " ".getBytes(UTF_8);
System.arraycopy(space, 0, bytes, 0, space.length);
accumulator.copyChunk(bytes, 0, space.length);

// 3
bytes = accumulator.newByteArray(10);
byte[] dlrow = "dlroW".getBytes(UTF_8);
System.arraycopy(dlrow, 0, bytes, 0, dlrow.length);
accumulator.copyChunk(bytes, 0, dlrow.length);

// 4
bytes = accumulator.newByteArray(10);
byte[] done = " enoD".getBytes(UTF_8);
System.arraycopy(done, 0, bytes, 0, done.length);
accumulator.copyChunk(bytes, 0, done.length);

assertThat("Length", accumulator.getLength(), is(olleh.length + space.length + dlrow.length + done.length));

accumulator.transferTo(out1);

// reuse that byte[]
accumulator.recycle();
}

String result0 = BufferUtil.toUTF8String(out0);
assertThat("result", result0, is("Hello World"));

String result1 = BufferUtil.toUTF8String(out1);
assertThat("result", result1, is("olleH dlroW enoD"));
}
}

0 comments on commit ae95f6d

Please sign in to comment.