diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java new file mode 100644 index 000000000000..66bba546aec8 --- /dev/null +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java @@ -0,0 +1,201 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.io; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.eclipse.jetty.util.BufferUtil; + +/** + * Accumulates data into a list of ByteBuffers which can then be combined into a single buffer or written to an OutputStream. + * The buffer list automatically grows as data is written to it, the buffers are taken from the + * supplied {@link ByteBufferPool} or freshly allocated if one is not supplied. + * + * The method {@link #ensureBuffer(int, int)} is used to write directly to the last buffer stored in the buffer list, + * if there is less than a certain amount of space available in that buffer then a new one will be allocated and returned instead. + * @see #ensureBuffer(int, int) + */ +public class ByteBufferAccumulator implements AutoCloseable +{ + private final List _buffers = new ArrayList<>(); + private final ByteBufferPool _bufferPool; + private final boolean _direct; + + public ByteBufferAccumulator() + { + this(null, false); + } + + public ByteBufferAccumulator(ByteBufferPool bufferPool, boolean direct) + { + _bufferPool = (bufferPool == null) ? new NullByteBufferPool() : bufferPool; + _direct = direct; + } + + /** + * Get the amount of bytes which have been accumulated. + * This will add up the remaining of each buffer in the accumulator. + * @return the total length of the content in the accumulator. + */ + public int getLength() + { + int length = 0; + for (ByteBuffer buffer : _buffers) + length = Math.addExact(length, buffer.remaining()); + return length; + } + + public ByteBufferPool getByteBufferPool() + { + return _bufferPool; + } + + /** + * Get the last buffer of the accumulator, this can be written to directly to avoid copying into the accumulator. + * @param minAllocationSize new buffers will be allocated to have at least this size. + * @return a buffer with at least {@code minSize} space to write into. + */ + public ByteBuffer ensureBuffer(int minAllocationSize) + { + return ensureBuffer(1, minAllocationSize); + } + + /** + * Get the last buffer of the accumulator, this can be written to directly to avoid copying into the accumulator. + * @param minSize the smallest amount of remaining space before a new buffer is allocated. + * @param minAllocationSize new buffers will be allocated to have at least this size. + * @return a buffer with at least {@code minSize} space to write into. + */ + public ByteBuffer ensureBuffer(int minSize, int minAllocationSize) + { + ByteBuffer buffer = _buffers.isEmpty() ? BufferUtil.EMPTY_BUFFER : _buffers.get(_buffers.size() - 1); + if (BufferUtil.space(buffer) < minSize) + { + buffer = _bufferPool.acquire(minAllocationSize, _direct); + _buffers.add(buffer); + } + + return buffer; + } + + public void copyBytes(byte[] buf, int offset, int length) + { + copyBuffer(BufferUtil.toBuffer(buf, offset, length)); + } + + public void copyBuffer(ByteBuffer buffer) + { + while (buffer.hasRemaining()) + { + ByteBuffer b = ensureBuffer(buffer.remaining()); + int pos = BufferUtil.flipToFill(b); + BufferUtil.put(buffer, b); + BufferUtil.flipToFlush(b, pos); + } + } + + /** + * Take the combined buffer containing all content written to the accumulator. + * The caller is responsible for releasing this {@link ByteBuffer} back into the {@link ByteBufferPool}. + * @return a buffer containing all content written to the accumulator. + * @see #toByteBuffer() + */ + public ByteBuffer takeByteBuffer() + { + ByteBuffer combinedBuffer; + if (_buffers.size() == 1) + { + combinedBuffer = _buffers.get(0); + _buffers.clear(); + return combinedBuffer; + } + + int length = getLength(); + combinedBuffer = _bufferPool.acquire(length, _direct); + BufferUtil.clearToFill(combinedBuffer); + for (ByteBuffer buffer : _buffers) + { + combinedBuffer.put(buffer); + _bufferPool.release(buffer); + } + BufferUtil.flipToFlush(combinedBuffer, 0); + _buffers.clear(); + return combinedBuffer; + } + + /** + * Take the combined buffer containing all content written to the accumulator. + * The returned buffer is still contained within the accumulator and will be released back to the {@link ByteBufferPool} + * when the accumulator is closed. + * @return a buffer containing all content written to the accumulator. + * @see #takeByteBuffer() + * @see #close() + */ + public ByteBuffer toByteBuffer() + { + ByteBuffer combinedBuffer = takeByteBuffer(); + _buffers.add(combinedBuffer); + return combinedBuffer; + } + + /** + * @return a newly allocated byte array containing all content written into the accumulator. + */ + public byte[] toByteArray() + { + int length = getLength(); + if (length == 0) + return new byte[0]; + + byte[] bytes = new byte[length]; + ByteBuffer buffer = BufferUtil.toBuffer(bytes); + BufferUtil.clear(buffer); + writeTo(buffer); + return bytes; + } + + public void writeTo(ByteBuffer buffer) + { + int pos = BufferUtil.flipToFill(buffer); + for (ByteBuffer bb : _buffers) + { + buffer.put(bb.slice()); + } + BufferUtil.flipToFlush(buffer, pos); + } + + public void writeTo(OutputStream out) throws IOException + { + for (ByteBuffer bb : _buffers) + { + BufferUtil.writeTo(bb.slice(), out); + } + } + + @Override + public void close() + { + _buffers.forEach(_bufferPool::release); + _buffers.clear(); + } +} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java new file mode 100644 index 000000000000..ed11c126875f --- /dev/null +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java @@ -0,0 +1,128 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.io; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +/** + * This class implements an output stream in which the data is written into a list of ByteBuffer, + * the buffer list automatically grows as data is written to it, the buffers are taken from the + * supplied {@link ByteBufferPool} or freshly allocated if one is not supplied. + * + * Designed to mimic {@link java.io.ByteArrayOutputStream} but with better memory usage, and less copying. + */ +public class ByteBufferOutputStream2 extends OutputStream +{ + private final ByteBufferAccumulator _accumulator; + private int _size = 0; + + public ByteBufferOutputStream2() + { + this(null, false); + } + + public ByteBufferOutputStream2(ByteBufferPool bufferPool, boolean direct) + { + _accumulator = new ByteBufferAccumulator((bufferPool == null) ? new NullByteBufferPool() : bufferPool, direct); + } + + public ByteBufferPool getByteBufferPool() + { + return _accumulator.getByteBufferPool(); + } + + /** + * Take the combined buffer containing all content written to the OutputStream. + * The caller is responsible for releasing this {@link ByteBuffer} back into the {@link ByteBufferPool}. + * @return a buffer containing all content written to the OutputStream. + */ + public ByteBuffer takeByteBuffer() + { + return _accumulator.takeByteBuffer(); + } + + /** + * Take the combined buffer containing all content written to the OutputStream. + * The returned buffer is still contained within the OutputStream and will be released back to the {@link ByteBufferPool} + * when the OutputStream is closed. + * @return a buffer containing all content written to the OutputStream. + */ + public ByteBuffer toByteBuffer() + { + return _accumulator.toByteBuffer(); + } + + /** + * @return a newly allocated byte array containing all content written into the OutputStream. + */ + public byte[] toByteArray() + { + return _accumulator.toByteArray(); + } + + public int size() + { + return _size; + } + + @Override + public void write(int b) + { + write(new byte[]{(byte)b}, 0, 1); + } + + @Override + public void write(byte[] b, int off, int len) + { + _size += len; + _accumulator.copyBytes(b, off, len); + } + + public void write(ByteBuffer buffer) + { + _size += buffer.remaining(); + _accumulator.copyBuffer(buffer); + } + + public void writeTo(ByteBuffer buffer) + { + _accumulator.writeTo(buffer); + } + + public void writeTo(OutputStream out) throws IOException + { + _accumulator.writeTo(out); + } + + @Override + public void close() + { + _accumulator.close(); + _size = 0; + } + + @Override + public synchronized String toString() + { + return String.format("%s@%x{size=%d, byteAccumulator=%s}", getClass().getSimpleName(), + hashCode(), _size, _accumulator); + } +} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/NullByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/NullByteBufferPool.java new file mode 100644 index 000000000000..41938ae1af47 --- /dev/null +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/NullByteBufferPool.java @@ -0,0 +1,41 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.io; + +import java.nio.ByteBuffer; + +import org.eclipse.jetty.util.BufferUtil; + +public class NullByteBufferPool implements ByteBufferPool +{ + @Override + public ByteBuffer acquire(int size, boolean direct) + { + if (direct) + return BufferUtil.allocateDirect(size); + else + return BufferUtil.allocate(size); + } + + @Override + public void release(ByteBuffer buffer) + { + BufferUtil.clear(buffer); + } +} diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/ByteBufferAccumulatorTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/ByteBufferAccumulatorTest.java new file mode 100644 index 000000000000..22e628be9f9e --- /dev/null +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/ByteBufferAccumulatorTest.java @@ -0,0 +1,333 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.io; + +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.concurrent.atomic.AtomicLong; + +import org.eclipse.jetty.util.BufferUtil; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class ByteBufferAccumulatorTest +{ + private CountingBufferPool byteBufferPool; + private ByteBufferAccumulator accumulator; + + @BeforeEach + public void before() + { + byteBufferPool = new CountingBufferPool(); + accumulator = new ByteBufferAccumulator(byteBufferPool, false); + } + + @Test + public void testToBuffer() + { + int size = 1024 * 1024; + int allocationSize = 1024; + ByteBuffer content = randomBytes(size); + ByteBuffer slice = content.slice(); + + // We completely fill up the internal buffer with the first write. + ByteBuffer internalBuffer = accumulator.ensureBuffer(1, allocationSize); + assertThat(BufferUtil.space(internalBuffer), greaterThanOrEqualTo(allocationSize)); + writeInFlushMode(slice, internalBuffer); + assertThat(BufferUtil.space(internalBuffer), is(0)); + + // If we ask for min size of 0 we get the same buffer which is full. + internalBuffer = accumulator.ensureBuffer(0, allocationSize); + assertThat(BufferUtil.space(internalBuffer), is(0)); + + // If we need at least 1 minSpace we must allocate a new buffer. + internalBuffer = accumulator.ensureBuffer(1, allocationSize); + assertThat(BufferUtil.space(internalBuffer), greaterThan(0)); + assertThat(BufferUtil.space(internalBuffer), greaterThanOrEqualTo(allocationSize)); + + // Write 13 bytes from the end of the internal buffer. + int bytesToWrite = BufferUtil.space(internalBuffer) - 13; + ByteBuffer buffer = BufferUtil.toBuffer(new byte[bytesToWrite]); + BufferUtil.clear(buffer); + assertThat(writeInFlushMode(slice, buffer), is(bytesToWrite)); + assertThat(writeInFlushMode(buffer, internalBuffer), is(bytesToWrite)); + assertThat(BufferUtil.space(internalBuffer), is(13)); + + // If we request anything under the amount remaining we get back the same buffer. + for (int i = 0; i <= 13; i++) + { + internalBuffer = accumulator.ensureBuffer(i, allocationSize); + assertThat(BufferUtil.space(internalBuffer), is(13)); + } + + // If we request over 13 then we get a new buffer. + internalBuffer = accumulator.ensureBuffer(14, allocationSize); + assertThat(BufferUtil.space(internalBuffer), greaterThanOrEqualTo(1024)); + + // Copy the rest of the content. + while (slice.hasRemaining()) + { + internalBuffer = accumulator.ensureBuffer(1, allocationSize); + assertThat(BufferUtil.space(internalBuffer), greaterThanOrEqualTo(1)); + writeInFlushMode(slice, internalBuffer); + } + + // Check we have the same content as the original buffer. + assertThat(accumulator.getLength(), is(size)); + assertThat(byteBufferPool.getLeasedBuffers(), greaterThan(1L)); + ByteBuffer combinedBuffer = accumulator.toByteBuffer(); + assertThat(byteBufferPool.getLeasedBuffers(), is(1L)); + assertThat(accumulator.getLength(), is(size)); + assertThat(combinedBuffer, is(content)); + + // Close the accumulator and make sure all is returned to bufferPool. + accumulator.close(); + byteBufferPool.verifyClosed(); + } + + @Test + public void testTakeBuffer() + { + int size = 1024 * 1024; + int allocationSize = 1024; + ByteBuffer content = randomBytes(size); + ByteBuffer slice = content.slice(); + + // Copy the content. + while (slice.hasRemaining()) + { + ByteBuffer internalBuffer = accumulator.ensureBuffer(1, allocationSize); + assertThat(BufferUtil.space(internalBuffer), greaterThanOrEqualTo(1)); + writeInFlushMode(slice, internalBuffer); + } + + // Check we have the same content as the original buffer. + assertThat(accumulator.getLength(), is(size)); + assertThat(byteBufferPool.getLeasedBuffers(), greaterThan(1L)); + ByteBuffer combinedBuffer = accumulator.takeByteBuffer(); + assertThat(byteBufferPool.getLeasedBuffers(), is(1L)); + assertThat(accumulator.getLength(), is(0)); + accumulator.close(); + assertThat(byteBufferPool.getLeasedBuffers(), is(1L)); + assertThat(combinedBuffer, is(content)); + + // Return the buffer and make sure all is returned to bufferPool. + byteBufferPool.release(combinedBuffer); + byteBufferPool.verifyClosed(); + } + + @Test + public void testToByteArray() + { + int size = 1024 * 1024; + int allocationSize = 1024; + ByteBuffer content = randomBytes(size); + ByteBuffer slice = content.slice(); + + // Copy the content. + while (slice.hasRemaining()) + { + ByteBuffer internalBuffer = accumulator.ensureBuffer(1, allocationSize); + writeInFlushMode(slice, internalBuffer); + } + + // Check we have the same content as the original buffer. + assertThat(accumulator.getLength(), is(size)); + assertThat(byteBufferPool.getLeasedBuffers(), greaterThan(1L)); + byte[] combinedBuffer = accumulator.toByteArray(); + assertThat(byteBufferPool.getLeasedBuffers(), greaterThan(1L)); + assertThat(accumulator.getLength(), is(size)); + assertThat(BufferUtil.toBuffer(combinedBuffer), is(content)); + + // Close the accumulator and make sure all is returned to bufferPool. + accumulator.close(); + byteBufferPool.verifyClosed(); + } + + @Test + public void testEmptyToBuffer() + { + ByteBuffer combinedBuffer = accumulator.toByteBuffer(); + assertThat(combinedBuffer.remaining(), is(0)); + assertThat(byteBufferPool.getLeasedBuffers(), is(1L)); + accumulator.close(); + byteBufferPool.verifyClosed(); + } + + @Test + public void testEmptyTakeBuffer() + { + ByteBuffer combinedBuffer = accumulator.takeByteBuffer(); + assertThat(combinedBuffer.remaining(), is(0)); + accumulator.close(); + assertThat(byteBufferPool.getLeasedBuffers(), is(1L)); + byteBufferPool.release(combinedBuffer); + byteBufferPool.verifyClosed(); + } + + @Test + public void testWriteTo() + { + int size = 1024 * 1024; + int allocationSize = 1024; + ByteBuffer content = randomBytes(size); + ByteBuffer slice = content.slice(); + + // Copy the content. + while (slice.hasRemaining()) + { + ByteBuffer internalBuffer = accumulator.ensureBuffer(1, allocationSize); + writeInFlushMode(slice, internalBuffer); + } + + // Check we have the same content as the original buffer. + assertThat(byteBufferPool.getLeasedBuffers(), greaterThan(1L)); + ByteBuffer combinedBuffer = byteBufferPool.acquire(accumulator.getLength(), false); + accumulator.writeTo(combinedBuffer); + assertThat(accumulator.getLength(), is(size)); + assertThat(combinedBuffer, is(content)); + byteBufferPool.release(combinedBuffer); + + // Close the accumulator and make sure all is returned to bufferPool. + accumulator.close(); + byteBufferPool.verifyClosed(); + } + + @Test + public void testWriteToBufferTooSmall() + { + int size = 1024 * 1024; + int allocationSize = 1024; + ByteBuffer content = randomBytes(size); + ByteBuffer slice = content.slice(); + + // Copy the content. + while (slice.hasRemaining()) + { + ByteBuffer internalBuffer = accumulator.ensureBuffer(1, allocationSize); + writeInFlushMode(slice, internalBuffer); + } + + // Writing to a buffer too small gives buffer overflow. + assertThat(byteBufferPool.getLeasedBuffers(), greaterThan(1L)); + ByteBuffer combinedBuffer = BufferUtil.toBuffer(new byte[accumulator.getLength() - 1]); + BufferUtil.clear(combinedBuffer); + assertThrows(BufferOverflowException.class, () -> accumulator.writeTo(combinedBuffer)); + + // Close the accumulator and make sure all is returned to bufferPool. + accumulator.close(); + byteBufferPool.verifyClosed(); + } + + @Test + public void testCopy() + { + int size = 1024 * 1024; + ByteBuffer content = randomBytes(size); + ByteBuffer slice = content.slice(); + + // Copy the content. + int tmpBufferSize = 1024; + ByteBuffer tmpBuffer = BufferUtil.toBuffer(new byte[tmpBufferSize]); + BufferUtil.clear(tmpBuffer); + while (slice.hasRemaining()) + { + writeInFlushMode(slice, tmpBuffer); + accumulator.copyBuffer(tmpBuffer); + } + + // Check we have the same content as the original buffer. + assertThat(byteBufferPool.getLeasedBuffers(), greaterThan(1L)); + ByteBuffer combinedBuffer = byteBufferPool.acquire(accumulator.getLength(), false); + accumulator.writeTo(combinedBuffer); + assertThat(accumulator.getLength(), is(size)); + assertThat(combinedBuffer, is(content)); + byteBufferPool.release(combinedBuffer); + + // Close the accumulator and make sure all is returned to bufferPool. + accumulator.close(); + byteBufferPool.verifyClosed(); + } + + private ByteBuffer randomBytes(int size) + { + byte[] data = new byte[size]; + new Random().nextBytes(data); + return BufferUtil.toBuffer(data); + } + + private int writeInFlushMode(ByteBuffer from, ByteBuffer to) + { + int pos = BufferUtil.flipToFill(to); + int written = BufferUtil.put(from, to); + BufferUtil.flipToFlush(to, pos); + return written; + } + + public static class CountingBufferPool extends LeakTrackingByteBufferPool + { + private final AtomicLong _leasedBuffers = new AtomicLong(0); + + public CountingBufferPool() + { + this(new MappedByteBufferPool()); + } + + public CountingBufferPool(ByteBufferPool delegate) + { + super(delegate); + } + + @Override + public ByteBuffer acquire(int size, boolean direct) + { + _leasedBuffers.incrementAndGet(); + return super.acquire(size, direct); + } + + @Override + public void release(ByteBuffer buffer) + { + if (buffer != null) + _leasedBuffers.decrementAndGet(); + super.release(buffer); + } + + public long getLeasedBuffers() + { + return _leasedBuffers.get(); + } + + public void verifyClosed() + { + assertThat(_leasedBuffers.get(), is(0L)); + assertThat(getLeakedAcquires(), is(0L)); + assertThat(getLeakedReleases(), is(0L)); + assertThat(getLeakedResources(), is(0L)); + assertThat(getLeakedRemoves(), is(0L)); + } + } +} diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java index 3b56753e7b8e..00a8db41fa5f 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java @@ -19,56 +19,92 @@ package org.eclipse.jetty.websocket.common.extensions.compress; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; +import org.eclipse.jetty.io.ByteBufferAccumulator; +import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.websocket.api.MessageTooLargeException; -public class ByteAccumulator +/** + * @deprecated use {@link ByteBufferAccumulator} instead. + */ +@Deprecated +public class ByteAccumulator implements AutoCloseable { - private final List chunks = new ArrayList<>(); + private static final int MIN_SPACE = 8; + + private final ByteBufferAccumulator accumulator; private final int maxSize; private int length = 0; public ByteAccumulator(int maxOverallBufferSize) + { + this(maxOverallBufferSize, null); + } + + public ByteAccumulator(int maxOverallBufferSize, ByteBufferPool byteBufferPool) { this.maxSize = maxOverallBufferSize; + this.accumulator = new ByteBufferAccumulator(byteBufferPool, false); } - public void copyChunk(byte[] buf, int offset, int length) + public int getLength() { - if (this.length + length > maxSize) + return length; + } + + public ByteBuffer ensureBuffer(int minAllocationSize) + { + return accumulator.ensureBuffer(MIN_SPACE, minAllocationSize); + } + + public void addLength(int read) + { + length += read; + if (length > maxSize) { - String err = String.format("Resulting message size [%,d] is too large for configured max of [%,d]", this.length + length, maxSize); + String err = String.format("Resulting message size [%d] is too large for configured max of [%d]", length, maxSize); throw new MessageTooLargeException(err); } - - byte[] copy = new byte[length - offset]; - System.arraycopy(buf, offset, copy, 0, length); - - chunks.add(copy); - this.length += length; } - public int getLength() + public void copyChunk(byte[] buf, int offset, int length) { - return length; + copyChunk(BufferUtil.toBuffer(buf, offset, length)); } - public void transferTo(ByteBuffer buffer) + public void copyChunk(ByteBuffer buffer) { - if (buffer.remaining() < length) + int remaining = buffer.remaining(); + if (length + remaining > maxSize) { - throw new IllegalArgumentException(String.format("Not enough space in ByteBuffer remaining [%d] for accumulated buffers length [%d]", - buffer.remaining(), length)); + String err = String.format("Resulting message size [%d] is too large for configured max of [%d]", length + remaining, maxSize); + throw new MessageTooLargeException(err); } - int position = buffer.position(); - for (byte[] chunk : chunks) + length += remaining; + accumulator.copyBuffer(buffer); + } + + public void transferTo(ByteBuffer buffer) + { + // For some reason this method expects the buffer in fill mode but returns a buffer in flush mode. + BufferUtil.flipToFlush(buffer, 0); + + int availableSpace = BufferUtil.space(buffer); + if (availableSpace < length) { - buffer.put(chunk, 0, chunk.length); + String err = String.format("Not enough space in ByteBuffer remaining [%d] for accumulated buffers length [%d]", availableSpace, length); + throw new IllegalArgumentException(err); } - BufferUtil.flipToFlush(buffer, position); + + accumulator.writeTo(buffer); + close(); + } + + @Override + public void close() + { + accumulator.close(); } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java index 6952ccb67ea0..a70f7c914758 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java @@ -18,7 +18,6 @@ package org.eclipse.jetty.websocket.common.extensions.compress; -import java.io.ByteArrayOutputStream; import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.Queue; @@ -28,6 +27,8 @@ import java.util.zip.Inflater; import java.util.zip.ZipException; +import org.eclipse.jetty.io.ByteBufferAccumulator; +import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.compression.DeflaterPool; @@ -162,7 +163,7 @@ protected void forwardIncoming(Frame frame, ByteAccumulator accumulator) ByteBuffer buffer = getBufferPool().acquire(accumulator.getLength(), false); try { - BufferUtil.flipToFill(buffer); + BufferUtil.clearToFill(buffer); accumulator.transferTo(buffer); newFrame.setPayload(buffer); nextIncomingFrame(newFrame); @@ -176,19 +177,15 @@ protected void forwardIncoming(Frame frame, ByteAccumulator accumulator) protected ByteAccumulator newByteAccumulator() { int maxSize = Math.max(getPolicy().getMaxTextMessageSize(), getPolicy().getMaxBinaryMessageSize()); - return new ByteAccumulator(maxSize); + return new ByteAccumulator(maxSize, getBufferPool()); } protected void decompress(ByteAccumulator accumulator, ByteBuffer buf) throws DataFormatException { - if ((buf == null) || (!buf.hasRemaining())) - { + if (BufferUtil.isEmpty(buf)) return; - } - byte[] output = new byte[DECOMPRESS_BUF_SIZE]; Inflater inflater = getInflater(); - while (buf.hasRemaining() && inflater.needsInput()) { if (!supplyInput(inflater, buf)) @@ -198,22 +195,17 @@ protected void decompress(ByteAccumulator accumulator, ByteBuffer buf) throws Da return; } - int read; - while ((read = inflater.inflate(output)) >= 0) + while (true) { - if (read == 0) - { - if (LOG.isDebugEnabled()) - LOG.debug("Decompress: read 0 {}", toDetail(inflater)); + ByteBuffer buffer = accumulator.ensureBuffer(DECOMPRESS_BUF_SIZE); + int read = inflater.inflate(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.capacity() - buffer.limit()); + buffer.limit(buffer.limit() + read); + accumulator.addLength(read); + if (LOG.isDebugEnabled()) + LOG.debug("Decompressed {} bytes into buffer {} from {}", read, BufferUtil.toDetailString(buffer), toDetail(inflater)); + + if (read <= 0) break; - } - else - { - // do something with output - if (LOG.isDebugEnabled()) - LOG.debug("Decompressed {} bytes: {}", read, toDetail(inflater)); - accumulator.copyChunk(output, 0, read); - } } } @@ -483,7 +475,9 @@ private void compress(FrameEntry entry, boolean first) // Get a chunk of the payload to avoid to blow // the heap if the payload is a huge mapped file. Frame frame = entry.frame; + boolean fin = frame.isFin(); ByteBuffer data = frame.getPayload(); + Deflater deflater = getDeflater(); if (data == null) data = BufferUtil.EMPTY_BUFFER; @@ -493,40 +487,47 @@ private void compress(FrameEntry entry, boolean first) if (LOG.isDebugEnabled()) LOG.debug("Compressing {}: {} bytes in {} bytes chunk", entry, remaining, outputLength); - boolean needsCompress = true; - - Deflater deflater = getDeflater(); - - if (deflater.needsInput() && !supplyInput(deflater, data)) + ByteBuffer payload = BufferUtil.EMPTY_BUFFER; + WriteCallback callback = this; + if (!deflater.needsInput() || supplyInput(deflater, data)) { - // no input supplied - needsCompress = false; - } - - ByteArrayOutputStream out = new ByteArrayOutputStream(); - - byte[] output = new byte[outputLength]; + ByteBufferPool bufferPool = getBufferPool(); + try (ByteBufferAccumulator accumulator = new ByteBufferAccumulator(bufferPool, false)) + { + while (true) + { + ByteBuffer buffer = accumulator.ensureBuffer(8, outputLength); + int compressed = deflater.deflate(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.capacity() - buffer.limit(), Deflater.SYNC_FLUSH); + buffer.limit(buffer.limit() + compressed); - boolean fin = frame.isFin(); + if (LOG.isDebugEnabled()) + LOG.debug("Wrote {} bytes to output buffer", accumulator); - // Compress the data - while (needsCompress) - { - int compressed = deflater.deflate(output, 0, outputLength, Deflater.SYNC_FLUSH); - - // Append the output for the eventual frame. - if (LOG.isDebugEnabled()) - LOG.debug("Wrote {} bytes to output buffer", compressed); - out.write(output, 0, compressed); + if (compressed <= 0) + break; + } - if (compressed < outputLength) - { - needsCompress = false; + ByteBuffer buffer = accumulator.takeByteBuffer(); + payload = buffer; + callback = new WriteCallback() + { + @Override + public void writeFailed(Throwable x) + { + bufferPool.release(buffer); + Flusher.this.writeFailed(x); + } + + @Override + public void writeSuccess() + { + bufferPool.release(buffer); + Flusher.this.writeSuccess(); + } + }; } } - ByteBuffer payload = ByteBuffer.wrap(out.toByteArray()); - if (payload.remaining() > 0) { // Handle tail bytes generated by SYNC_FLUSH. @@ -575,8 +576,7 @@ else if (fin) } chunk.setPayload(payload); chunk.setFin(fin); - - nextOutgoingFrame(chunk, this, entry.batchMode); + nextOutgoingFrame(chunk, callback, entry.batchMode); } @Override diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtension.java index 0476c0fcc441..c511a9544fc4 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtension.java @@ -61,9 +61,8 @@ public void incomingFrame(Frame frame) return; } - try + try (ByteAccumulator accumulator = newByteAccumulator()) { - ByteAccumulator accumulator = newByteAccumulator(); decompress(accumulator, frame.getPayload()); decompress(accumulator, TAIL_BYTES_BUF.slice()); forwardIncoming(frame, accumulator); diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java index 37482f8bd678..5bf0bee46449 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java @@ -78,9 +78,7 @@ public void incomingFrame(Frame frame) throw new ProtocolException("Invalid RSV1 set on permessage-deflate CONTINUATION frame"); } - ByteAccumulator accumulator = newByteAccumulator(); - - try + try (ByteAccumulator accumulator = newByteAccumulator()) { ByteBuffer payload = frame.getPayload(); decompress(accumulator, payload);