From 5a87f3bd20e8891e435ca1c20b71b07c68eb7236 Mon Sep 17 00:00:00 2001 From: Baoyi Chen Date: Thu, 29 Oct 2020 17:35:41 +0800 Subject: [PATCH] Fix issue #5499 this PR let the ByteAccumulator recyclable. after invoke ByteAccumulator.transferTo method we can invoke ByteAccumulator.recycle method to reuse byte[] via ByteAccumulator.newByteArray method Signed-off-by: Baoyi Chen --- .../extensions/compress/ByteAccumulator.java | 64 ++++++++++++------- .../compress/CompressExtension.java | 36 ++++++----- .../compress/ByteAccumulatorTest.java | 17 ++--- 3 files changed, 71 insertions(+), 46 deletions(-) diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java index 6e4ce2f7ef5f..a92d40d1b476 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java @@ -22,20 +22,47 @@ import java.util.ArrayList; import java.util.List; +import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.websocket.api.MessageTooLargeException; public class ByteAccumulator { - private List prevChunks = null; - private List nextChunks = new ArrayList<>(); + private List chunks = new ArrayList<>(); private final int maxSize; private int length = 0; - private int index; + private ByteBufferPool bufferPool; public ByteAccumulator(int maxOverallBufferSize) + { + this(maxOverallBufferSize, null); + } + + public ByteAccumulator(int maxOverallBufferSize, ByteBufferPool bufferPool) { this.maxSize = maxOverallBufferSize; + this.bufferPool = bufferPool; + } + + public void copyChunk(ByteBuffer buffer, int length) + { + if (this.length + length > maxSize) + { + String err = String.format("Resulting message size [%,d] is too large for configured max of [%,d]", this.length + length, maxSize); + throw new MessageTooLargeException(err); + } + if (length > 0) + { + buffer.limit(length); + chunks.add(buffer); + this.length += length; + } + else + { + // release 0 length buffer directly + if (bufferPool != null) + bufferPool.release(buffer); + } } public void copyChunk(byte[] buf, int offset, int length) @@ -45,7 +72,7 @@ public void copyChunk(byte[] buf, int offset, int length) String err = String.format("Resulting message size [%,d] is too large for configured max of [%,d]", this.length + length, maxSize); throw new MessageTooLargeException(err); } - nextChunks.add(ByteBuffer.wrap(buf, offset, length)); + chunks.add(ByteBuffer.wrap(buf, offset, length)); this.length += length; } @@ -61,17 +88,11 @@ int getMaxSize() ByteBuffer newByteBuffer(int size) { - ByteBuffer buf; - if (prevChunks != null && prevChunks.size() > index) + if (bufferPool == null) { - buf = prevChunks.get(index); + return ByteBuffer.allocate(size); } - else - { - buf = ByteBuffer.allocate(size); - } - index++; - return buf; + return (ByteBuffer)bufferPool.acquire(size, false).clear(); } public void transferTo(ByteBuffer buffer) @@ -83,7 +104,7 @@ public void transferTo(ByteBuffer buffer) } int position = buffer.position(); - for (ByteBuffer chunk : nextChunks) + for (ByteBuffer chunk : chunks) { buffer.put(chunk); } @@ -92,20 +113,17 @@ public void transferTo(ByteBuffer buffer) void recycle() { - index = 0; length = 0; - int resize = 16; - if (prevChunks == null || nextChunks.size() > prevChunks.size()) + if (bufferPool == null) { - prevChunks = nextChunks; + return; } - - // keep prevChunks retain max resize elements - if (prevChunks.size() > resize) + for (ByteBuffer chunk : chunks) { - prevChunks.subList(resize, prevChunks.size()).clear(); + bufferPool.release(chunk); } - nextChunks = new ArrayList<>(); + + chunks = new ArrayList<>(); } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java index 60dd1c997f74..dd26bbd3e6c1 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java @@ -184,6 +184,24 @@ protected ByteAccumulator newByteAccumulator() return accumulator; } + int copyChunk(Inflater inflater, ByteAccumulator accumulator, ByteBuffer buf) throws DataFormatException + { + int position = 0; + int capacity = buf.capacity(); + while (position < capacity) + { + int read = inflater.inflate(buf.array(), position, capacity - position); + if (read <= 0) + { + accumulator.copyChunk(buf, position); + return read; + } + position += read; + } + accumulator.copyChunk(buf, position); + return position; + } + protected void decompress(ByteAccumulator accumulator, ByteBuffer buf) throws DataFormatException { if ((buf == null) || (!buf.hasRemaining())) @@ -203,26 +221,14 @@ protected void decompress(ByteAccumulator accumulator, ByteBuffer buf) throws Da return; } - int read; - - loop: while (true) { ByteBuffer output = accumulator.newByteBuffer(DECOMPRESS_BUF_SIZE); - int offset = 0; - while (offset < output.capacity()) + int read = copyChunk(inflater, accumulator, output); + if (read <= 0) { - read = inflater.inflate(output.array(), offset, output.capacity() - offset); - if (read <= 0) - { - // last chunk - if (offset > 0) - accumulator.copyChunk(output.array(), 0, offset); - break loop; - } - offset += read; + break; } - accumulator.copyChunk(output.array(), 0, offset); } } diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulatorTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulatorTest.java index caa69e2c975b..452644e4c1ae 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulatorTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulatorTest.java @@ -20,6 +20,7 @@ import java.nio.ByteBuffer; +import org.eclipse.jetty.io.ArrayByteBufferPool; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.websocket.api.MessageTooLargeException; import org.junit.jupiter.api.Test; @@ -95,7 +96,7 @@ public void testCopyChunkNotEnoughSpace() @Test public void testRecycle() { - ByteAccumulator accumulator = new ByteAccumulator(10_000); + ByteAccumulator accumulator = new ByteAccumulator(10_000, new ArrayByteBufferPool()); ByteBuffer out0 = ByteBuffer.allocate(200); ByteBuffer out1 = ByteBuffer.allocate(200); { @@ -103,19 +104,19 @@ public void testRecycle() ByteBuffer buf = accumulator.newByteBuffer(10); byte[] hello = "Hello".getBytes(UTF_8); System.arraycopy(hello, 0, buf.array(), 0, hello.length); - accumulator.copyChunk(buf.array(), 0, hello.length); + accumulator.copyChunk(buf, hello.length); // 2 buf = accumulator.newByteBuffer(10); byte[] space = " ".getBytes(UTF_8); System.arraycopy(space, 0, buf.array(), 0, space.length); - accumulator.copyChunk(buf.array(), 0, space.length); + accumulator.copyChunk(buf, space.length); // 3 buf = accumulator.newByteBuffer(10); byte[] world = "World".getBytes(UTF_8); System.arraycopy(world, 0, buf.array(), 0, world.length); - accumulator.copyChunk(buf.array(), 0, world.length); + accumulator.copyChunk(buf, world.length); assertThat("Length", accumulator.getLength(), is(hello.length + space.length + world.length)); @@ -130,25 +131,25 @@ public void testRecycle() ByteBuffer buf = accumulator.newByteBuffer(10); byte[] olleh = "olleH".getBytes(UTF_8); System.arraycopy(olleh, 0, buf.array(), 0, olleh.length); - accumulator.copyChunk(buf.array(), 0, olleh.length); + accumulator.copyChunk(buf, olleh.length); // 2 buf = accumulator.newByteBuffer(10); byte[] space = " ".getBytes(UTF_8); System.arraycopy(space, 0, buf.array(), 0, space.length); - accumulator.copyChunk(buf.array(), 0, space.length); + accumulator.copyChunk(buf, space.length); // 3 buf = accumulator.newByteBuffer(10); byte[] dlrow = "dlroW".getBytes(UTF_8); System.arraycopy(dlrow, 0, buf.array(), 0, dlrow.length); - accumulator.copyChunk(buf.array(), 0, dlrow.length); + accumulator.copyChunk(buf, dlrow.length); // 4 buf = accumulator.newByteBuffer(10); byte[] done = " enoD".getBytes(UTF_8); System.arraycopy(done, 0, buf.array(), 0, done.length); - accumulator.copyChunk(buf.array(), 0, done.length); + accumulator.copyChunk(buf, done.length); assertThat("Length", accumulator.getLength(), is(olleh.length + space.length + dlrow.length + done.length));