diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java index cf6d2dd9c66e..97f009fd9049 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java @@ -161,7 +161,9 @@ public byte[] toByteArray() return new byte[0]; byte[] bytes = new byte[length]; - writeTo(BufferUtil.toBuffer(bytes)); + ByteBuffer buffer = BufferUtil.toBuffer(bytes); + BufferUtil.clear(buffer); + writeTo(buffer); return bytes; } @@ -170,7 +172,7 @@ public void writeTo(ByteBuffer buffer) int pos = BufferUtil.flipToFill(buffer); for (ByteBuffer bb : _buffers) { - buffer.put(bb); + buffer.put(bb.slice()); } BufferUtil.flipToFlush(buffer, pos); } @@ -179,7 +181,7 @@ public void writeTo(OutputStream out) throws IOException { for (ByteBuffer bb : _buffers) { - BufferUtil.writeTo(bb, out); + BufferUtil.writeTo(bb.slice(), out); } } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/ByteBufferAccumulatorTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/ByteBufferAccumulatorTest.java new file mode 100644 index 000000000000..68f72d465593 --- /dev/null +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/ByteBufferAccumulatorTest.java @@ -0,0 +1,333 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.io; + +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.concurrent.atomic.AtomicLong; + +import org.eclipse.jetty.util.BufferUtil; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class ByteBufferAccumulatorTest +{ + private CountingBufferPool byteBufferPool; + private ByteBufferAccumulator accumulator; + + @BeforeEach + public void before() + { + byteBufferPool = new CountingBufferPool(); + accumulator = new ByteBufferAccumulator(byteBufferPool); + } + + @Test + public void testToBuffer() + { + int size = 1024 * 1024; + int allocationSize = 1024; + ByteBuffer content = randomBytes(size); + ByteBuffer slice = content.slice(); + + // We completely fill up the internal buffer with the first write. + ByteBuffer internalBuffer = accumulator.ensureBuffer(1, allocationSize); + assertThat(BufferUtil.space(internalBuffer), greaterThanOrEqualTo(allocationSize)); + writeInFlushMode(slice, internalBuffer); + assertThat(BufferUtil.space(internalBuffer), is(0)); + + // If we ask for min size of 0 we get the same buffer which is full. + internalBuffer = accumulator.ensureBuffer(0, allocationSize); + assertThat(BufferUtil.space(internalBuffer), is(0)); + + // If we need at least 1 minSpace we must allocate a new buffer. + internalBuffer = accumulator.ensureBuffer(1, allocationSize); + assertThat(BufferUtil.space(internalBuffer), greaterThan(0)); + assertThat(BufferUtil.space(internalBuffer), greaterThanOrEqualTo(allocationSize)); + + // Write 13 bytes from the end of the internal buffer. + int bytesToWrite = BufferUtil.space(internalBuffer) - 13; + ByteBuffer buffer = BufferUtil.toBuffer(new byte[bytesToWrite]); + BufferUtil.clear(buffer); + assertThat(writeInFlushMode(slice, buffer), is(bytesToWrite)); + assertThat(writeInFlushMode(buffer, internalBuffer), is(bytesToWrite)); + assertThat(BufferUtil.space(internalBuffer), is(13)); + + // If we request anything under the amount remaining we get back the same buffer. + for (int i = 0; i <= 13; i++) + { + internalBuffer = accumulator.ensureBuffer(i, allocationSize); + assertThat(BufferUtil.space(internalBuffer), is(13)); + } + + // If we request over 13 then we get a new buffer. + internalBuffer = accumulator.ensureBuffer(14, allocationSize); + assertThat(BufferUtil.space(internalBuffer), greaterThanOrEqualTo(1024)); + + // Copy the rest of the content. + while (slice.hasRemaining()) + { + internalBuffer = accumulator.ensureBuffer(1, allocationSize); + assertThat(BufferUtil.space(internalBuffer), greaterThanOrEqualTo(1)); + writeInFlushMode(slice, internalBuffer); + } + + // Check we have the same content as the original buffer. + assertThat(accumulator.getLength(), is(size)); + assertThat(byteBufferPool.getLeasedBuffers(), greaterThan(1L)); + ByteBuffer combinedBuffer = accumulator.toByteBuffer(); + assertThat(byteBufferPool.getLeasedBuffers(), is(1L)); + assertThat(accumulator.getLength(), is(size)); + assertThat(combinedBuffer, is(content)); + + // Close the accumulator and make sure all is returned to bufferPool. + accumulator.close(); + byteBufferPool.verifyClosed(); + } + + @Test + public void testTakeBuffer() + { + int size = 1024 * 1024; + int allocationSize = 1024; + ByteBuffer content = randomBytes(size); + ByteBuffer slice = content.slice(); + + // Copy the content. + while (slice.hasRemaining()) + { + ByteBuffer internalBuffer = accumulator.ensureBuffer(1, allocationSize); + assertThat(BufferUtil.space(internalBuffer), greaterThanOrEqualTo(1)); + writeInFlushMode(slice, internalBuffer); + } + + // Check we have the same content as the original buffer. + assertThat(accumulator.getLength(), is(size)); + assertThat(byteBufferPool.getLeasedBuffers(), greaterThan(1L)); + ByteBuffer combinedBuffer = accumulator.takeByteBuffer(); + assertThat(byteBufferPool.getLeasedBuffers(), is(1L)); + assertThat(accumulator.getLength(), is(0)); + accumulator.close(); + assertThat(byteBufferPool.getLeasedBuffers(), is(1L)); + assertThat(combinedBuffer, is(content)); + + // Return the buffer and make sure all is returned to bufferPool. + byteBufferPool.release(combinedBuffer); + byteBufferPool.verifyClosed(); + } + + @Test + public void testToByteArray() + { + int size = 1024 * 1024; + int allocationSize = 1024; + ByteBuffer content = randomBytes(size); + ByteBuffer slice = content.slice(); + + // Copy the content. + while (slice.hasRemaining()) + { + ByteBuffer internalBuffer = accumulator.ensureBuffer(1, allocationSize); + writeInFlushMode(slice, internalBuffer); + } + + // Check we have the same content as the original buffer. + assertThat(accumulator.getLength(), is(size)); + assertThat(byteBufferPool.getLeasedBuffers(), greaterThan(1L)); + byte[] combinedBuffer = accumulator.toByteArray(); + assertThat(byteBufferPool.getLeasedBuffers(), greaterThan(1L)); + assertThat(accumulator.getLength(), is(size)); + assertThat(BufferUtil.toBuffer(combinedBuffer), is(content)); + + // Close the accumulator and make sure all is returned to bufferPool. + accumulator.close(); + byteBufferPool.verifyClosed(); + } + + @Test + public void testEmptyToBuffer() + { + ByteBuffer combinedBuffer = accumulator.toByteBuffer(); + assertThat(combinedBuffer.remaining(), is(0)); + assertThat(byteBufferPool.getLeasedBuffers(), is(1L)); + accumulator.close(); + byteBufferPool.verifyClosed(); + } + + @Test + public void testEmptyTakeBuffer() + { + ByteBuffer combinedBuffer = accumulator.takeByteBuffer(); + assertThat(combinedBuffer.remaining(), is(0)); + accumulator.close(); + assertThat(byteBufferPool.getLeasedBuffers(), is(1L)); + byteBufferPool.release(combinedBuffer); + byteBufferPool.verifyClosed(); + } + + @Test + public void testWriteTo() + { + int size = 1024 * 1024; + int allocationSize = 1024; + ByteBuffer content = randomBytes(size); + ByteBuffer slice = content.slice(); + + // Copy the content. + while (slice.hasRemaining()) + { + ByteBuffer internalBuffer = accumulator.ensureBuffer(1, allocationSize); + writeInFlushMode(slice, internalBuffer); + } + + // Check we have the same content as the original buffer. + assertThat(byteBufferPool.getLeasedBuffers(), greaterThan(1L)); + ByteBuffer combinedBuffer = byteBufferPool.acquire(accumulator.getLength(), false); + accumulator.writeTo(combinedBuffer); + assertThat(accumulator.getLength(), is(size)); + assertThat(combinedBuffer, is(content)); + byteBufferPool.release(combinedBuffer); + + // Close the accumulator and make sure all is returned to bufferPool. + accumulator.close(); + byteBufferPool.verifyClosed(); + } + + @Test + public void testWriteToBufferTooSmall() + { + int size = 1024 * 1024; + int allocationSize = 1024; + ByteBuffer content = randomBytes(size); + ByteBuffer slice = content.slice(); + + // Copy the content. + while (slice.hasRemaining()) + { + ByteBuffer internalBuffer = accumulator.ensureBuffer(1, allocationSize); + writeInFlushMode(slice, internalBuffer); + } + + // Writing to a buffer too small gives buffer overflow. + assertThat(byteBufferPool.getLeasedBuffers(), greaterThan(1L)); + ByteBuffer combinedBuffer = BufferUtil.toBuffer(new byte[accumulator.getLength() - 1]); + BufferUtil.clear(combinedBuffer); + assertThrows(BufferOverflowException.class, () -> accumulator.writeTo(combinedBuffer)); + + // Close the accumulator and make sure all is returned to bufferPool. + accumulator.close(); + byteBufferPool.verifyClosed(); + } + + @Test + public void testCopy() + { + int size = 1024 * 1024; + ByteBuffer content = randomBytes(size); + ByteBuffer slice = content.slice(); + + // Copy the content. + int tmpBufferSize = 1024; + ByteBuffer tmpBuffer = BufferUtil.toBuffer(new byte[tmpBufferSize]); + BufferUtil.clear(tmpBuffer); + while (slice.hasRemaining()) + { + writeInFlushMode(slice, tmpBuffer); + accumulator.copyBuffer(tmpBuffer); + } + + // Check we have the same content as the original buffer. + assertThat(byteBufferPool.getLeasedBuffers(), greaterThan(1L)); + ByteBuffer combinedBuffer = byteBufferPool.acquire(accumulator.getLength(), false); + accumulator.writeTo(combinedBuffer); + assertThat(accumulator.getLength(), is(size)); + assertThat(combinedBuffer, is(content)); + byteBufferPool.release(combinedBuffer); + + // Close the accumulator and make sure all is returned to bufferPool. + accumulator.close(); + byteBufferPool.verifyClosed(); + } + + private ByteBuffer randomBytes(int size) + { + byte[] data = new byte[size]; + new Random().nextBytes(data); + return BufferUtil.toBuffer(data); + } + + private int writeInFlushMode(ByteBuffer from, ByteBuffer to) + { + int pos = BufferUtil.flipToFill(to); + int written = BufferUtil.put(from, to); + BufferUtil.flipToFlush(to, pos); + return written; + } + + public static class CountingBufferPool extends LeakTrackingByteBufferPool + { + private final AtomicLong _leasedBuffers = new AtomicLong(0); + + public CountingBufferPool() + { + this(new MappedByteBufferPool()); + } + + public CountingBufferPool(ByteBufferPool delegate) + { + super(delegate); + } + + @Override + public ByteBuffer acquire(int size, boolean direct) + { + _leasedBuffers.incrementAndGet(); + return super.acquire(size, direct); + } + + @Override + public void release(ByteBuffer buffer) + { + if (buffer != null) + _leasedBuffers.decrementAndGet(); + super.release(buffer); + } + + public long getLeasedBuffers() + { + return _leasedBuffers.get(); + } + + public void verifyClosed() + { + assertThat(_leasedBuffers.get(), is(0L)); + assertThat(getLeakedAcquires(), is(0L)); + assertThat(getLeakedReleases(), is(0L)); + assertThat(getLeakedResources(), is(0L)); + assertThat(getLeakedRemoves(), is(0L)); + } + } +} diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulatorTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulatorTest.java index 08074de115c4..5aa41765e070 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulatorTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulatorTest.java @@ -91,79 +91,4 @@ public void testCopyChunkNotEnoughSpace() MessageTooLargeException e = assertThrows(MessageTooLargeException.class, () -> accumulator.copyChunk(world, 0, world.length)); assertThat(e.getMessage(), containsString("too large for configured max")); } - - /* - @Test - public void testRecycle() - { - ByteAccumulator accumulator = new ByteAccumulator(10_000, new ArrayByteBufferPool()); - ByteBuffer out0 = ByteBuffer.allocate(200); - ByteBuffer out1 = ByteBuffer.allocate(200); - { - // 1 - ByteBuffer buf = accumulator.newByteBuffer(10); - byte[] hello = "Hello".getBytes(UTF_8); - buf.put(hello).flip(); - accumulator.copyChunk(buf); - - // 2 - buf = accumulator.newByteBuffer(10); - byte[] space = " ".getBytes(UTF_8); - buf.put(space).flip(); - accumulator.copyChunk(buf); - - // 3 - buf = accumulator.newByteBuffer(10); - byte[] world = "World".getBytes(UTF_8); - buf.put(world).flip(); - accumulator.copyChunk(buf); - - assertThat("Length", accumulator.getLength(), is(hello.length + space.length + world.length)); - - accumulator.transferTo(out0); - - // reuse that byte[] - accumulator.recycle(); - } - - { - // 1 - ByteBuffer buf = accumulator.newByteBuffer(10); - byte[] olleh = "olleH".getBytes(UTF_8); - buf.put(olleh).flip(); - accumulator.copyChunk(buf); - - // 2 - buf = accumulator.newByteBuffer(10); - byte[] space = " ".getBytes(UTF_8); - buf.put(space).flip(); - accumulator.copyChunk(buf); - - // 3 - buf = accumulator.newByteBuffer(10); - byte[] dlrow = "dlroW".getBytes(UTF_8); - buf.put(dlrow).flip(); - accumulator.copyChunk(buf); - - // 4 - buf = accumulator.newByteBuffer(10); - byte[] done = " enoD".getBytes(UTF_8); - buf.put(done).flip(); - accumulator.copyChunk(buf); - - assertThat("Length", accumulator.getLength(), is(olleh.length + space.length + dlrow.length + done.length)); - - accumulator.transferTo(out1); - - // reuse that byte[] - accumulator.recycle(); - } - - String result0 = BufferUtil.toUTF8String(out0); - assertThat("result", result0, is("Hello World")); - - String result1 = BufferUtil.toUTF8String(out1); - assertThat("result", result1, is("olleH dlroW enoD")); - } - */ }