Skip to content

Commit

Permalink
Fix issue jetty#5499
Browse files Browse the repository at this point in the history
this PR let the ByteAccumulator recyclable. after invoke ByteAccumulator.transferTo method
we can invoke ByteAccumulator.recycle method to reuse byte[] via ByteAccumulator.newByteArray method

Signed-off-by: Baoyi Chen <chen.bao.yi@qq.com>
  • Loading branch information
leonchen83 committed Oct 29, 2020
1 parent 018722f commit 5a87f3b
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 46 deletions.
Expand Up @@ -22,20 +22,47 @@
import java.util.ArrayList;
import java.util.List;

import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.MessageTooLargeException;

public class ByteAccumulator
{
private List<ByteBuffer> prevChunks = null;
private List<ByteBuffer> nextChunks = new ArrayList<>();
private List<ByteBuffer> chunks = new ArrayList<>();
private final int maxSize;
private int length = 0;
private int index;
private ByteBufferPool bufferPool;

public ByteAccumulator(int maxOverallBufferSize)
{
this(maxOverallBufferSize, null);
}

public ByteAccumulator(int maxOverallBufferSize, ByteBufferPool bufferPool)
{
this.maxSize = maxOverallBufferSize;
this.bufferPool = bufferPool;
}

public void copyChunk(ByteBuffer buffer, int length)
{
if (this.length + length > maxSize)
{
String err = String.format("Resulting message size [%,d] is too large for configured max of [%,d]", this.length + length, maxSize);
throw new MessageTooLargeException(err);
}
if (length > 0)
{
buffer.limit(length);
chunks.add(buffer);
this.length += length;
}
else
{
// release 0 length buffer directly
if (bufferPool != null)
bufferPool.release(buffer);
}
}

public void copyChunk(byte[] buf, int offset, int length)
Expand All @@ -45,7 +72,7 @@ public void copyChunk(byte[] buf, int offset, int length)
String err = String.format("Resulting message size [%,d] is too large for configured max of [%,d]", this.length + length, maxSize);
throw new MessageTooLargeException(err);
}
nextChunks.add(ByteBuffer.wrap(buf, offset, length));
chunks.add(ByteBuffer.wrap(buf, offset, length));
this.length += length;
}

Expand All @@ -61,17 +88,11 @@ int getMaxSize()

ByteBuffer newByteBuffer(int size)
{
ByteBuffer buf;
if (prevChunks != null && prevChunks.size() > index)
if (bufferPool == null)
{
buf = prevChunks.get(index);
return ByteBuffer.allocate(size);
}
else
{
buf = ByteBuffer.allocate(size);
}
index++;
return buf;
return (ByteBuffer)bufferPool.acquire(size, false).clear();
}

public void transferTo(ByteBuffer buffer)
Expand All @@ -83,7 +104,7 @@ public void transferTo(ByteBuffer buffer)
}

int position = buffer.position();
for (ByteBuffer chunk : nextChunks)
for (ByteBuffer chunk : chunks)
{
buffer.put(chunk);
}
Expand All @@ -92,20 +113,17 @@ public void transferTo(ByteBuffer buffer)

void recycle()
{
index = 0;
length = 0;

int resize = 16;
if (prevChunks == null || nextChunks.size() > prevChunks.size())
if (bufferPool == null)
{
prevChunks = nextChunks;
return;
}

// keep prevChunks retain max resize elements
if (prevChunks.size() > resize)
for (ByteBuffer chunk : chunks)
{
prevChunks.subList(resize, prevChunks.size()).clear();
bufferPool.release(chunk);
}
nextChunks = new ArrayList<>();

chunks = new ArrayList<>();
}
}
Expand Up @@ -184,6 +184,24 @@ protected ByteAccumulator newByteAccumulator()
return accumulator;
}

int copyChunk(Inflater inflater, ByteAccumulator accumulator, ByteBuffer buf) throws DataFormatException
{
int position = 0;
int capacity = buf.capacity();
while (position < capacity)
{
int read = inflater.inflate(buf.array(), position, capacity - position);
if (read <= 0)
{
accumulator.copyChunk(buf, position);
return read;
}
position += read;
}
accumulator.copyChunk(buf, position);
return position;
}

protected void decompress(ByteAccumulator accumulator, ByteBuffer buf) throws DataFormatException
{
if ((buf == null) || (!buf.hasRemaining()))
Expand All @@ -203,26 +221,14 @@ protected void decompress(ByteAccumulator accumulator, ByteBuffer buf) throws Da
return;
}

int read;

loop:
while (true)
{
ByteBuffer output = accumulator.newByteBuffer(DECOMPRESS_BUF_SIZE);
int offset = 0;
while (offset < output.capacity())
int read = copyChunk(inflater, accumulator, output);
if (read <= 0)
{
read = inflater.inflate(output.array(), offset, output.capacity() - offset);
if (read <= 0)
{
// last chunk
if (offset > 0)
accumulator.copyChunk(output.array(), 0, offset);
break loop;
}
offset += read;
break;
}
accumulator.copyChunk(output.array(), 0, offset);
}
}

Expand Down
Expand Up @@ -20,6 +20,7 @@

import java.nio.ByteBuffer;

import org.eclipse.jetty.io.ArrayByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.MessageTooLargeException;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -95,27 +96,27 @@ public void testCopyChunkNotEnoughSpace()
@Test
public void testRecycle()
{
ByteAccumulator accumulator = new ByteAccumulator(10_000);
ByteAccumulator accumulator = new ByteAccumulator(10_000, new ArrayByteBufferPool());
ByteBuffer out0 = ByteBuffer.allocate(200);
ByteBuffer out1 = ByteBuffer.allocate(200);
{
// 1
ByteBuffer buf = accumulator.newByteBuffer(10);
byte[] hello = "Hello".getBytes(UTF_8);
System.arraycopy(hello, 0, buf.array(), 0, hello.length);
accumulator.copyChunk(buf.array(), 0, hello.length);
accumulator.copyChunk(buf, hello.length);

// 2
buf = accumulator.newByteBuffer(10);
byte[] space = " ".getBytes(UTF_8);
System.arraycopy(space, 0, buf.array(), 0, space.length);
accumulator.copyChunk(buf.array(), 0, space.length);
accumulator.copyChunk(buf, space.length);

// 3
buf = accumulator.newByteBuffer(10);
byte[] world = "World".getBytes(UTF_8);
System.arraycopy(world, 0, buf.array(), 0, world.length);
accumulator.copyChunk(buf.array(), 0, world.length);
accumulator.copyChunk(buf, world.length);

assertThat("Length", accumulator.getLength(), is(hello.length + space.length + world.length));

Expand All @@ -130,25 +131,25 @@ public void testRecycle()
ByteBuffer buf = accumulator.newByteBuffer(10);
byte[] olleh = "olleH".getBytes(UTF_8);
System.arraycopy(olleh, 0, buf.array(), 0, olleh.length);
accumulator.copyChunk(buf.array(), 0, olleh.length);
accumulator.copyChunk(buf, olleh.length);

// 2
buf = accumulator.newByteBuffer(10);
byte[] space = " ".getBytes(UTF_8);
System.arraycopy(space, 0, buf.array(), 0, space.length);
accumulator.copyChunk(buf.array(), 0, space.length);
accumulator.copyChunk(buf, space.length);

// 3
buf = accumulator.newByteBuffer(10);
byte[] dlrow = "dlroW".getBytes(UTF_8);
System.arraycopy(dlrow, 0, buf.array(), 0, dlrow.length);
accumulator.copyChunk(buf.array(), 0, dlrow.length);
accumulator.copyChunk(buf, dlrow.length);

// 4
buf = accumulator.newByteBuffer(10);
byte[] done = " enoD".getBytes(UTF_8);
System.arraycopy(done, 0, buf.array(), 0, done.length);
accumulator.copyChunk(buf.array(), 0, done.length);
accumulator.copyChunk(buf, done.length);

assertThat("Length", accumulator.getLength(), is(olleh.length + space.length + dlrow.length + done.length));

Expand Down

0 comments on commit 5a87f3b

Please sign in to comment.