From 1f5b446462944a7a0d36053c09770774b640a9cc Mon Sep 17 00:00:00 2001 From: Baoyi Chen Date: Wed, 28 Oct 2020 12:39:12 +0800 Subject: [PATCH 01/19] Fix issue #5499 this PR let the ByteAccumulator recyclable. after invoke ByteAccumulator.transferTo method we can invoke ByteAccumulator.recycle method to reuse byte[] via ByteAccumulator.newByteBuffer method Signed-off-by: Baoyi Chen --- .../extensions/compress/ByteAccumulator.java | 76 +++++++++++++++++-- .../compress/CompressExtension.java | 53 +++++++++---- .../compress/DeflateFrameExtension.java | 9 ++- .../compress/PerMessageDeflateExtension.java | 9 ++- .../compress/ByteAccumulatorTest.java | 74 ++++++++++++++++++ 5 files changed, 193 insertions(+), 28 deletions(-) diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java index 3b56753e7b8e..78687f3dbfa9 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java @@ -22,18 +22,48 @@ import java.util.ArrayList; import java.util.List; +import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.websocket.api.MessageTooLargeException; public class ByteAccumulator { - private final List chunks = new ArrayList<>(); + private final List chunks = new ArrayList<>(); private final int maxSize; private int length = 0; + private final ByteBufferPool bufferPool; public ByteAccumulator(int maxOverallBufferSize) + { + this(maxOverallBufferSize, null); + } + + public ByteAccumulator(int maxOverallBufferSize, ByteBufferPool bufferPool) { this.maxSize = maxOverallBufferSize; + this.bufferPool = bufferPool; + } + + public void copyChunk(ByteBuffer buf) + { + int length = buf.remaining(); + if (this.length + length > maxSize) + { + release(buf); + String err = String.format("Resulting message size [%,d] is too large for configured max of [%,d]", this.length + length, maxSize); + throw new MessageTooLargeException(err); + } + + if (buf.hasRemaining()) + { + chunks.add(buf); + this.length += length; + } + else + { + // release 0 length buffer directly + release(buf); + } } public void copyChunk(byte[] buf, int offset, int length) @@ -43,11 +73,7 @@ public void copyChunk(byte[] buf, int offset, int length) String err = String.format("Resulting message size [%,d] is too large for configured max of [%,d]", this.length + length, maxSize); throw new MessageTooLargeException(err); } - - byte[] copy = new byte[length - offset]; - System.arraycopy(buf, offset, copy, 0, length); - - chunks.add(copy); + chunks.add(ByteBuffer.wrap(buf, offset, length)); this.length += length; } @@ -56,6 +82,20 @@ public int getLength() return length; } + int getMaxSize() + { + return maxSize; + } + + ByteBuffer newByteBuffer(int size) + { + if (bufferPool == null) + { + return ByteBuffer.allocate(size); + } + return (ByteBuffer)bufferPool.acquire(size, false).clear(); + } + public void transferTo(ByteBuffer buffer) { if (buffer.remaining() < length) @@ -65,10 +105,30 @@ public void transferTo(ByteBuffer buffer) } int position = buffer.position(); - for (byte[] chunk : chunks) + for (ByteBuffer chunk : chunks) { - buffer.put(chunk, 0, chunk.length); + buffer.put(chunk); } BufferUtil.flipToFlush(buffer, position); } + + void recycle() + { + length = 0; + + for (ByteBuffer chunk : chunks) + { + release(chunk); + } + + chunks.clear(); + } + + void release(ByteBuffer buffer) + { + if (bufferPool != null) + { + bufferPool.release(buffer); + } + } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java index 6952ccb67ea0..512cce17ba94 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java @@ -46,7 +46,7 @@ public abstract class CompressExtension extends AbstractExtension protected static final byte[] TAIL_BYTES = new byte[]{0x00, 0x00, (byte)0xFF, (byte)0xFF}; protected static final ByteBuffer TAIL_BYTES_BUF = ByteBuffer.wrap(TAIL_BYTES); private static final Logger LOG = Log.getLogger(CompressExtension.class); - + /** * Never drop tail bytes 0000FFFF, from any frame type */ @@ -92,6 +92,7 @@ public abstract class CompressExtension extends AbstractExtension private InflaterPool inflaterPool; private Deflater deflaterImpl; private Inflater inflaterImpl; + protected ByteAccumulator accumulator; protected AtomicInteger decompressCount = new AtomicInteger(0); private int tailDrop = TAIL_DROP_NEVER; private int rsvUse = RSV_USE_ALWAYS; @@ -176,7 +177,37 @@ protected void forwardIncoming(Frame frame, ByteAccumulator accumulator) protected ByteAccumulator newByteAccumulator() { int maxSize = Math.max(getPolicy().getMaxTextMessageSize(), getPolicy().getMaxBinaryMessageSize()); - return new ByteAccumulator(maxSize); + if (accumulator == null || accumulator.getMaxSize() != maxSize) + { + accumulator = new ByteAccumulator(maxSize, getBufferPool()); + } + return accumulator; + } + + int copyChunk(Inflater inflater, ByteAccumulator accumulator) throws DataFormatException + { + ByteBuffer buf = accumulator.newByteBuffer(DECOMPRESS_BUF_SIZE); + while (buf.hasRemaining()) + { + try + { + int read = inflater.inflate(buf.array(), buf.position(), buf.remaining()); + if (read <= 0) + { + accumulator.copyChunk((ByteBuffer)buf.flip()); + return read; + } + buf.position(buf.position() + read); + } + catch (DataFormatException e) + { + accumulator.release(buf); + throw e; + } + } + int position = buf.position(); + accumulator.copyChunk((ByteBuffer)buf.flip()); + return position; } protected void decompress(ByteAccumulator accumulator, ByteBuffer buf) throws DataFormatException @@ -185,10 +216,10 @@ protected void decompress(ByteAccumulator accumulator, ByteBuffer buf) throws Da { return; } - byte[] output = new byte[DECOMPRESS_BUF_SIZE]; + Inflater inflater = getInflater(); - + while (buf.hasRemaining() && inflater.needsInput()) { if (!supplyInput(inflater, buf)) @@ -198,22 +229,12 @@ protected void decompress(ByteAccumulator accumulator, ByteBuffer buf) throws Da return; } - int read; - while ((read = inflater.inflate(output)) >= 0) + while (true) { - if (read == 0) + if (copyChunk(inflater, accumulator) <= 0) { - if (LOG.isDebugEnabled()) - LOG.debug("Decompress: read 0 {}", toDetail(inflater)); break; } - else - { - // do something with output - if (LOG.isDebugEnabled()) - LOG.debug("Decompressed {} bytes: {}", read, toDetail(inflater)); - accumulator.copyChunk(output, 0, read); - } } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtension.java index 0476c0fcc441..a81cba641cc7 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtension.java @@ -47,7 +47,7 @@ int getTailDropMode() { return TAIL_DROP_ALWAYS; } - + @Override public void incomingFrame(Frame frame) { @@ -63,7 +63,7 @@ public void incomingFrame(Frame frame) try { - ByteAccumulator accumulator = newByteAccumulator(); + accumulator = newByteAccumulator(); decompress(accumulator, frame.getPayload()); decompress(accumulator, TAIL_BYTES_BUF.slice()); forwardIncoming(frame, accumulator); @@ -72,5 +72,10 @@ public void incomingFrame(Frame frame) { throw new BadPayloadException(e); } + finally + { + if (accumulator != null) + accumulator.recycle(); + } } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java index 37482f8bd678..d338be0b1eac 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java @@ -78,10 +78,9 @@ public void incomingFrame(Frame frame) throw new ProtocolException("Invalid RSV1 set on permessage-deflate CONTINUATION frame"); } - ByteAccumulator accumulator = newByteAccumulator(); - try { + accumulator = newByteAccumulator(); ByteBuffer payload = frame.getPayload(); decompress(accumulator, payload); if (frame.isFin()) @@ -90,11 +89,17 @@ public void incomingFrame(Frame frame) } forwardIncoming(frame, accumulator); + } catch (DataFormatException e) { throw new BadPayloadException(e); } + finally + { + if (accumulator != null) + accumulator.recycle(); + } if (frame.isFin()) incomingCompressed = false; diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulatorTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulatorTest.java index 5aa41765e070..1c3dd1ffaead 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulatorTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulatorTest.java @@ -20,6 +20,7 @@ import java.nio.ByteBuffer; +import org.eclipse.jetty.io.ArrayByteBufferPool; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.websocket.api.MessageTooLargeException; import org.junit.jupiter.api.Test; @@ -91,4 +92,77 @@ public void testCopyChunkNotEnoughSpace() MessageTooLargeException e = assertThrows(MessageTooLargeException.class, () -> accumulator.copyChunk(world, 0, world.length)); assertThat(e.getMessage(), containsString("too large for configured max")); } + + @Test + public void testRecycle() + { + ByteAccumulator accumulator = new ByteAccumulator(10_000, new ArrayByteBufferPool()); + ByteBuffer out0 = ByteBuffer.allocate(200); + ByteBuffer out1 = ByteBuffer.allocate(200); + { + // 1 + ByteBuffer buf = accumulator.newByteBuffer(10); + byte[] hello = "Hello".getBytes(UTF_8); + buf.put(hello).flip(); + accumulator.copyChunk(buf); + + // 2 + buf = accumulator.newByteBuffer(10); + byte[] space = " ".getBytes(UTF_8); + buf.put(space).flip(); + accumulator.copyChunk(buf); + + // 3 + buf = accumulator.newByteBuffer(10); + byte[] world = "World".getBytes(UTF_8); + buf.put(world).flip(); + accumulator.copyChunk(buf); + + assertThat("Length", accumulator.getLength(), is(hello.length + space.length + world.length)); + + accumulator.transferTo(out0); + + // reuse that byte[] + accumulator.recycle(); + } + + { + // 1 + ByteBuffer buf = accumulator.newByteBuffer(10); + byte[] olleh = "olleH".getBytes(UTF_8); + buf.put(olleh).flip(); + accumulator.copyChunk(buf); + + // 2 + buf = accumulator.newByteBuffer(10); + byte[] space = " ".getBytes(UTF_8); + buf.put(space).flip(); + accumulator.copyChunk(buf); + + // 3 + buf = accumulator.newByteBuffer(10); + byte[] dlrow = "dlroW".getBytes(UTF_8); + buf.put(dlrow).flip(); + accumulator.copyChunk(buf); + + // 4 + buf = accumulator.newByteBuffer(10); + byte[] done = " enoD".getBytes(UTF_8); + buf.put(done).flip(); + accumulator.copyChunk(buf); + + assertThat("Length", accumulator.getLength(), is(olleh.length + space.length + dlrow.length + done.length)); + + accumulator.transferTo(out1); + + // reuse that byte[] + accumulator.recycle(); + } + + String result0 = BufferUtil.toUTF8String(out0); + assertThat("result", result0, is("Hello World")); + + String result1 = BufferUtil.toUTF8String(out1); + assertThat("result", result1, is("olleH dlroW enoD")); + } } From 05dafb89ab5eb6dc961d3bc0ae84682b560289cd Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Thu, 5 Nov 2020 13:42:47 +1100 Subject: [PATCH 02/19] Move work on ByteAccumulator to jetty-util Signed-off-by: Lachlan Roberts --- .../jetty/io/ByteBufferAccumulator.java | 94 ++++++++++++ .../jetty/io/ByteBufferOutputStream2.java | 144 ++++++++++++++++++ .../eclipse/jetty/io/NullByteBufferPool.java | 41 +++++ .../extensions/WebSocketExtensionFactory.java | 59 +------ .../extensions/compress/ByteAccumulator.java | 76 +-------- .../compress/CompressExtension.java | 53 ++----- .../compress/DeflateFrameExtension.java | 9 +- .../compress/PerMessageDeflateExtension.java | 9 +- 8 files changed, 309 insertions(+), 176 deletions(-) create mode 100644 jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java create mode 100644 jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java create mode 100644 jetty-io/src/main/java/org/eclipse/jetty/io/NullByteBufferPool.java diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java new file mode 100644 index 000000000000..f5a4977f85dd --- /dev/null +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java @@ -0,0 +1,94 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.io; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.eclipse.jetty.util.BufferUtil; + +public class ByteBufferAccumulator implements AutoCloseable +{ + private static final int MIN_SPACE = 3; + private static final int DEFAULT_BUFFER_SIZE = 1024; + + private final List _buffers = new ArrayList<>(); + private final ByteBufferPool _bufferPool; + + public ByteBufferAccumulator(ByteBufferPool bufferPool) + { + this._bufferPool = bufferPool; + } + + public int getLength() + { + int length = 0; + for (ByteBuffer buffer : _buffers) + length += buffer.remaining(); + return length; + } + + public ByteBuffer getBuffer() + { + return getBuffer(DEFAULT_BUFFER_SIZE); + } + + public ByteBuffer getBuffer(int minAllocationSize) + { + ByteBuffer buffer = _buffers.isEmpty() ? BufferUtil.EMPTY_BUFFER : _buffers.get(_buffers.size() - 1); + if (BufferUtil.space(buffer) <= MIN_SPACE) + { + buffer = _bufferPool.acquire(minAllocationSize, false); + _buffers.add(buffer); + } + + return buffer; + } + + public void writeTo(ByteBuffer buffer) + { + int pos = BufferUtil.flipToFill(buffer); + for (ByteBuffer bb : _buffers) + { + buffer.put(bb); + } + BufferUtil.flipToFlush(buffer, pos); + } + + public void writeTo(OutputStream out) throws IOException + { + for (ByteBuffer bb : _buffers) + { + BufferUtil.writeTo(bb, out); + } + } + + @Override + public void close() + { + for (ByteBuffer buffer : _buffers) + { + _bufferPool.release(buffer); + } + _buffers.clear(); + } +} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java new file mode 100644 index 000000000000..0d18d617d3c7 --- /dev/null +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java @@ -0,0 +1,144 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under +// the terms of the Eclipse Public License 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0 +// +// This Source Code may also be made available under the following +// Secondary Licenses when the conditions for such availability set +// forth in the Eclipse Public License, v. 2.0 are satisfied: +// the Apache License v2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.io; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import org.eclipse.jetty.util.BufferUtil; + +/** + * This class implements an output stream in which the data is written into a list of ByteBuffer, + * the buffer list automatically grows as data is written to it, the buffers are taken from the + * supplied {@link ByteBufferPool} or freshly allocated if one is not supplied. + * + * Designed to mimic {@link java.io.ByteArrayOutputStream} but with better memory usage, and less copying. + */ +public class ByteBufferOutputStream2 extends OutputStream +{ + private final ByteBufferAccumulator _accumulator; + private final ByteBufferPool _bufferPool; + private ByteBuffer _combinedByteBuffer; + private int _size = 0; + + public ByteBufferOutputStream2() + { + this(null); + } + + public ByteBufferOutputStream2(ByteBufferPool bufferPool) + { + _bufferPool = (bufferPool == null) ? new NullByteBufferPool() : bufferPool; + _accumulator = new ByteBufferAccumulator(bufferPool); + } + + /** + * Get an aggregated content written to the OutputStream in a ByteBuffer. + * @return the content in a ByteBuffer. + */ + public ByteBuffer toByteBuffer() + { + int length = _accumulator.getLength(); + if (length == 0) + return BufferUtil.EMPTY_BUFFER; + + if (_combinedByteBuffer != null && length == _combinedByteBuffer.remaining()) + return _combinedByteBuffer; + + ByteBuffer buffer = _bufferPool.acquire(_size, false); + _accumulator.writeTo(buffer); + if (_combinedByteBuffer != null) + { + _bufferPool.release(_combinedByteBuffer); + _combinedByteBuffer = buffer; + } + + return buffer; + } + + /** + * Get an aggregated content written to the OutputStream in a byte array. + * @return the content in a byte array. + */ + public byte[] toByteArray() + { + int length = _accumulator.getLength(); + if (length == 0) + return new byte[0]; + + byte[] bytes = new byte[_size]; + ByteBuffer buffer = BufferUtil.toBuffer(bytes); + _accumulator.writeTo(buffer); + return bytes; + } + + public int size() + { + return _accumulator.getLength(); + } + + @Override + public void write(int b) + { + write(new byte[]{(byte)b}, 0, 1); + } + + @Override + public void write(byte[] b, int off, int len) + { + write(BufferUtil.toBuffer(b, off, len)); + } + + public void write(ByteBuffer buffer) + { + while (buffer.hasRemaining()) + { + ByteBuffer lastBuffer = _accumulator.getBuffer(buffer.remaining()); + int pos = BufferUtil.flipToFill(lastBuffer); + _size += BufferUtil.put(buffer, lastBuffer); + BufferUtil.flipToFlush(lastBuffer, pos); + } + } + + public void writeTo(OutputStream out) throws IOException + { + _accumulator.writeTo(out); + } + + @Override + public void close() + { + if (_combinedByteBuffer != null) + { + _bufferPool.release(_combinedByteBuffer); + _combinedByteBuffer = null; + } + + _accumulator.close(); + _size = 0; + } + + @Override + public synchronized String toString() + { + return String.format("%s@%x{size=%d, bufferPool=%s, byteAccumulator=%s}", getClass().getSimpleName(), + hashCode(), _size, _bufferPool, _accumulator); + } +} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/NullByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/NullByteBufferPool.java new file mode 100644 index 000000000000..a68d4a28ffb4 --- /dev/null +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/NullByteBufferPool.java @@ -0,0 +1,41 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under +// the terms of the Eclipse Public License 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0 +// +// This Source Code may also be made available under the following +// Secondary Licenses when the conditions for such availability set +// forth in the Eclipse Public License, v. 2.0 are satisfied: +// the Apache License v2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.io; + +import java.nio.ByteBuffer; + +import org.eclipse.jetty.util.BufferUtil; + +public class NullByteBufferPool implements ByteBufferPool +{ + @Override + public ByteBuffer acquire(int size, boolean direct) + { + if (direct) + return BufferUtil.allocateDirect(size); + else + return BufferUtil.allocate(size); + } + + @Override + public void release(ByteBuffer buffer) + { + BufferUtil.clear(buffer); + } +} diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/WebSocketExtensionFactory.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/WebSocketExtensionFactory.java index cf275487a64d..70205569d369 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/WebSocketExtensionFactory.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/WebSocketExtensionFactory.java @@ -19,11 +19,6 @@ package org.eclipse.jetty.websocket.common.extensions; import java.io.IOException; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.ServiceLoader; -import java.util.Set; import java.util.zip.Deflater; import org.eclipse.jetty.util.StringUtil; @@ -42,10 +37,8 @@ public class WebSocketExtensionFactory extends ExtensionFactory implements LifeCycle, Dumpable { - private ContainerLifeCycle containerLifeCycle; - private WebSocketContainerScope container; - private ServiceLoader extensionLoader = ServiceLoader.load(Extension.class); - private Map> availableExtensions; + private final ContainerLifeCycle containerLifeCycle; + private final WebSocketContainerScope container; private final InflaterPool inflaterPool = new InflaterPool(CompressionPool.INFINITE_CAPACITY, true); private final DeflaterPool deflaterPool = new DeflaterPool(CompressionPool.INFINITE_CAPACITY, Deflater.DEFAULT_COMPRESSION, true); @@ -59,42 +52,12 @@ public String toString() return String.format("%s@%x{%s}", WebSocketExtensionFactory.class.getSimpleName(), hashCode(), containerLifeCycle.getState()); } }; - availableExtensions = new HashMap<>(); - for (Extension ext : extensionLoader) - { - if (ext != null) - availableExtensions.put(ext.getName(), ext.getClass()); - } this.container = container; containerLifeCycle.addBean(inflaterPool); containerLifeCycle.addBean(deflaterPool); } - @Override - public Map> getAvailableExtensions() - { - return availableExtensions; - } - - @Override - public Class getExtension(String name) - { - return availableExtensions.get(name); - } - - @Override - public Set getExtensionNames() - { - return availableExtensions.keySet(); - } - - @Override - public boolean isAvailable(String name) - { - return availableExtensions.containsKey(name); - } - @Override public Extension newInstance(ExtensionConfig config) { @@ -139,24 +102,6 @@ public Extension newInstance(ExtensionConfig config) } } - @Override - public void register(String name, Class extension) - { - availableExtensions.put(name, extension); - } - - @Override - public void unregister(String name) - { - availableExtensions.remove(name); - } - - @Override - public Iterator> iterator() - { - return availableExtensions.values().iterator(); - } - /* --- All of the below ugliness due to not being able to break API compatibility with ExtensionFactory --- */ @Override diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java index 78687f3dbfa9..3b56753e7b8e 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java @@ -22,48 +22,18 @@ import java.util.ArrayList; import java.util.List; -import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.websocket.api.MessageTooLargeException; public class ByteAccumulator { - private final List chunks = new ArrayList<>(); + private final List chunks = new ArrayList<>(); private final int maxSize; private int length = 0; - private final ByteBufferPool bufferPool; public ByteAccumulator(int maxOverallBufferSize) - { - this(maxOverallBufferSize, null); - } - - public ByteAccumulator(int maxOverallBufferSize, ByteBufferPool bufferPool) { this.maxSize = maxOverallBufferSize; - this.bufferPool = bufferPool; - } - - public void copyChunk(ByteBuffer buf) - { - int length = buf.remaining(); - if (this.length + length > maxSize) - { - release(buf); - String err = String.format("Resulting message size [%,d] is too large for configured max of [%,d]", this.length + length, maxSize); - throw new MessageTooLargeException(err); - } - - if (buf.hasRemaining()) - { - chunks.add(buf); - this.length += length; - } - else - { - // release 0 length buffer directly - release(buf); - } } public void copyChunk(byte[] buf, int offset, int length) @@ -73,7 +43,11 @@ public void copyChunk(byte[] buf, int offset, int length) String err = String.format("Resulting message size [%,d] is too large for configured max of [%,d]", this.length + length, maxSize); throw new MessageTooLargeException(err); } - chunks.add(ByteBuffer.wrap(buf, offset, length)); + + byte[] copy = new byte[length - offset]; + System.arraycopy(buf, offset, copy, 0, length); + + chunks.add(copy); this.length += length; } @@ -82,20 +56,6 @@ public int getLength() return length; } - int getMaxSize() - { - return maxSize; - } - - ByteBuffer newByteBuffer(int size) - { - if (bufferPool == null) - { - return ByteBuffer.allocate(size); - } - return (ByteBuffer)bufferPool.acquire(size, false).clear(); - } - public void transferTo(ByteBuffer buffer) { if (buffer.remaining() < length) @@ -105,30 +65,10 @@ public void transferTo(ByteBuffer buffer) } int position = buffer.position(); - for (ByteBuffer chunk : chunks) + for (byte[] chunk : chunks) { - buffer.put(chunk); + buffer.put(chunk, 0, chunk.length); } BufferUtil.flipToFlush(buffer, position); } - - void recycle() - { - length = 0; - - for (ByteBuffer chunk : chunks) - { - release(chunk); - } - - chunks.clear(); - } - - void release(ByteBuffer buffer) - { - if (bufferPool != null) - { - bufferPool.release(buffer); - } - } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java index 512cce17ba94..6952ccb67ea0 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java @@ -46,7 +46,7 @@ public abstract class CompressExtension extends AbstractExtension protected static final byte[] TAIL_BYTES = new byte[]{0x00, 0x00, (byte)0xFF, (byte)0xFF}; protected static final ByteBuffer TAIL_BYTES_BUF = ByteBuffer.wrap(TAIL_BYTES); private static final Logger LOG = Log.getLogger(CompressExtension.class); - + /** * Never drop tail bytes 0000FFFF, from any frame type */ @@ -92,7 +92,6 @@ public abstract class CompressExtension extends AbstractExtension private InflaterPool inflaterPool; private Deflater deflaterImpl; private Inflater inflaterImpl; - protected ByteAccumulator accumulator; protected AtomicInteger decompressCount = new AtomicInteger(0); private int tailDrop = TAIL_DROP_NEVER; private int rsvUse = RSV_USE_ALWAYS; @@ -177,37 +176,7 @@ protected void forwardIncoming(Frame frame, ByteAccumulator accumulator) protected ByteAccumulator newByteAccumulator() { int maxSize = Math.max(getPolicy().getMaxTextMessageSize(), getPolicy().getMaxBinaryMessageSize()); - if (accumulator == null || accumulator.getMaxSize() != maxSize) - { - accumulator = new ByteAccumulator(maxSize, getBufferPool()); - } - return accumulator; - } - - int copyChunk(Inflater inflater, ByteAccumulator accumulator) throws DataFormatException - { - ByteBuffer buf = accumulator.newByteBuffer(DECOMPRESS_BUF_SIZE); - while (buf.hasRemaining()) - { - try - { - int read = inflater.inflate(buf.array(), buf.position(), buf.remaining()); - if (read <= 0) - { - accumulator.copyChunk((ByteBuffer)buf.flip()); - return read; - } - buf.position(buf.position() + read); - } - catch (DataFormatException e) - { - accumulator.release(buf); - throw e; - } - } - int position = buf.position(); - accumulator.copyChunk((ByteBuffer)buf.flip()); - return position; + return new ByteAccumulator(maxSize); } protected void decompress(ByteAccumulator accumulator, ByteBuffer buf) throws DataFormatException @@ -216,10 +185,10 @@ protected void decompress(ByteAccumulator accumulator, ByteBuffer buf) throws Da { return; } - + byte[] output = new byte[DECOMPRESS_BUF_SIZE]; Inflater inflater = getInflater(); - + while (buf.hasRemaining() && inflater.needsInput()) { if (!supplyInput(inflater, buf)) @@ -229,12 +198,22 @@ protected void decompress(ByteAccumulator accumulator, ByteBuffer buf) throws Da return; } - while (true) + int read; + while ((read = inflater.inflate(output)) >= 0) { - if (copyChunk(inflater, accumulator) <= 0) + if (read == 0) { + if (LOG.isDebugEnabled()) + LOG.debug("Decompress: read 0 {}", toDetail(inflater)); break; } + else + { + // do something with output + if (LOG.isDebugEnabled()) + LOG.debug("Decompressed {} bytes: {}", read, toDetail(inflater)); + accumulator.copyChunk(output, 0, read); + } } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtension.java index a81cba641cc7..0476c0fcc441 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtension.java @@ -47,7 +47,7 @@ int getTailDropMode() { return TAIL_DROP_ALWAYS; } - + @Override public void incomingFrame(Frame frame) { @@ -63,7 +63,7 @@ public void incomingFrame(Frame frame) try { - accumulator = newByteAccumulator(); + ByteAccumulator accumulator = newByteAccumulator(); decompress(accumulator, frame.getPayload()); decompress(accumulator, TAIL_BYTES_BUF.slice()); forwardIncoming(frame, accumulator); @@ -72,10 +72,5 @@ public void incomingFrame(Frame frame) { throw new BadPayloadException(e); } - finally - { - if (accumulator != null) - accumulator.recycle(); - } } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java index d338be0b1eac..37482f8bd678 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java @@ -78,9 +78,10 @@ public void incomingFrame(Frame frame) throw new ProtocolException("Invalid RSV1 set on permessage-deflate CONTINUATION frame"); } + ByteAccumulator accumulator = newByteAccumulator(); + try { - accumulator = newByteAccumulator(); ByteBuffer payload = frame.getPayload(); decompress(accumulator, payload); if (frame.isFin()) @@ -89,17 +90,11 @@ public void incomingFrame(Frame frame) } forwardIncoming(frame, accumulator); - } catch (DataFormatException e) { throw new BadPayloadException(e); } - finally - { - if (accumulator != null) - accumulator.recycle(); - } if (frame.isFin()) incomingCompressed = false; From a3c3e24cab543ab501ec1267015326fb2de63adb Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Thu, 5 Nov 2020 14:38:19 +1100 Subject: [PATCH 03/19] Use the ByteBufferPool in the ByteAccumulator Signed-off-by: Lachlan Roberts --- .../jetty/io/ByteBufferAccumulator.java | 7 +- .../extensions/compress/ByteAccumulator.java | 66 ++++++++++++------- .../compress/CompressExtension.java | 3 +- .../compress/DeflateFrameExtension.java | 3 +- .../compress/PerMessageDeflateExtension.java | 4 +- .../compress/ByteAccumulatorTest.java | 3 +- 6 files changed, 52 insertions(+), 34 deletions(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java index f5a4977f85dd..6de5baf33806 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java @@ -34,9 +34,14 @@ public class ByteBufferAccumulator implements AutoCloseable private final List _buffers = new ArrayList<>(); private final ByteBufferPool _bufferPool; + public ByteBufferAccumulator() + { + this(null); + } + public ByteBufferAccumulator(ByteBufferPool bufferPool) { - this._bufferPool = bufferPool; + _bufferPool = (bufferPool == null) ? new NullByteBufferPool() : bufferPool; } public int getLength() diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java index 3b56753e7b8e..9863634b1ed1 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java @@ -19,36 +19,27 @@ package org.eclipse.jetty.websocket.common.extensions.compress; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; +import org.eclipse.jetty.io.ByteBufferAccumulator; +import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.websocket.api.MessageTooLargeException; -public class ByteAccumulator +public class ByteAccumulator implements AutoCloseable { - private final List chunks = new ArrayList<>(); + private final ByteBufferAccumulator accumulator; private final int maxSize; private int length = 0; public ByteAccumulator(int maxOverallBufferSize) { - this.maxSize = maxOverallBufferSize; + this(maxOverallBufferSize, null); } - public void copyChunk(byte[] buf, int offset, int length) + public ByteAccumulator(int maxOverallBufferSize, ByteBufferPool byteBufferPool) { - if (this.length + length > maxSize) - { - String err = String.format("Resulting message size [%,d] is too large for configured max of [%,d]", this.length + length, maxSize); - throw new MessageTooLargeException(err); - } - - byte[] copy = new byte[length - offset]; - System.arraycopy(buf, offset, copy, 0, length); - - chunks.add(copy); - this.length += length; + this.maxSize = maxOverallBufferSize; + this.accumulator = new ByteBufferAccumulator(byteBufferPool); } public int getLength() @@ -56,19 +47,44 @@ public int getLength() return length; } - public void transferTo(ByteBuffer buffer) + public void copyChunk(byte[] buf, int offset, int length) + { + copyChunk(BufferUtil.toBuffer(buf, offset, length)); + } + + public void copyChunk(ByteBuffer buffer) { - if (buffer.remaining() < length) + if (length + buffer.remaining() > maxSize) + { + String err = String.format("Resulting message size [%d] is too large for configured max of [%d]", this.length + length, maxSize); + throw new MessageTooLargeException(err); + } + + while (buffer.hasRemaining()) { - throw new IllegalArgumentException(String.format("Not enough space in ByteBuffer remaining [%d] for accumulated buffers length [%d]", - buffer.remaining(), length)); + ByteBuffer b = accumulator.getBuffer(buffer.remaining()); + int pos = BufferUtil.flipToFill(b); + this.length += BufferUtil.put(buffer, b); + BufferUtil.flipToFlush(b, pos); } + } - int position = buffer.position(); - for (byte[] chunk : chunks) + public void transferTo(ByteBuffer buffer) + { + if (BufferUtil.space(buffer) < length) { - buffer.put(chunk, 0, chunk.length); + String err = String.format("Not enough space in ByteBuffer remaining [%d] for accumulated buffers length [%d]", BufferUtil.space(buffer), length); + throw new IllegalArgumentException(err); } - BufferUtil.flipToFlush(buffer, position); + + accumulator.writeTo(buffer); + close(); + } + + @Override + public void close() + { + length = 0; + accumulator.close(); } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java index 6952ccb67ea0..a18a07f2098d 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java @@ -162,7 +162,6 @@ protected void forwardIncoming(Frame frame, ByteAccumulator accumulator) ByteBuffer buffer = getBufferPool().acquire(accumulator.getLength(), false); try { - BufferUtil.flipToFill(buffer); accumulator.transferTo(buffer); newFrame.setPayload(buffer); nextIncomingFrame(newFrame); @@ -176,7 +175,7 @@ protected void forwardIncoming(Frame frame, ByteAccumulator accumulator) protected ByteAccumulator newByteAccumulator() { int maxSize = Math.max(getPolicy().getMaxTextMessageSize(), getPolicy().getMaxBinaryMessageSize()); - return new ByteAccumulator(maxSize); + return new ByteAccumulator(maxSize, getBufferPool()); } protected void decompress(ByteAccumulator accumulator, ByteBuffer buf) throws DataFormatException diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtension.java index 0476c0fcc441..c511a9544fc4 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtension.java @@ -61,9 +61,8 @@ public void incomingFrame(Frame frame) return; } - try + try (ByteAccumulator accumulator = newByteAccumulator()) { - ByteAccumulator accumulator = newByteAccumulator(); decompress(accumulator, frame.getPayload()); decompress(accumulator, TAIL_BYTES_BUF.slice()); forwardIncoming(frame, accumulator); diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java index 37482f8bd678..5bf0bee46449 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java @@ -78,9 +78,7 @@ public void incomingFrame(Frame frame) throw new ProtocolException("Invalid RSV1 set on permessage-deflate CONTINUATION frame"); } - ByteAccumulator accumulator = newByteAccumulator(); - - try + try (ByteAccumulator accumulator = newByteAccumulator()) { ByteBuffer payload = frame.getPayload(); decompress(accumulator, payload); diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulatorTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulatorTest.java index 1c3dd1ffaead..08074de115c4 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulatorTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulatorTest.java @@ -20,7 +20,6 @@ import java.nio.ByteBuffer; -import org.eclipse.jetty.io.ArrayByteBufferPool; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.websocket.api.MessageTooLargeException; import org.junit.jupiter.api.Test; @@ -93,6 +92,7 @@ public void testCopyChunkNotEnoughSpace() assertThat(e.getMessage(), containsString("too large for configured max")); } + /* @Test public void testRecycle() { @@ -165,4 +165,5 @@ public void testRecycle() String result1 = BufferUtil.toUTF8String(out1); assertThat("result", result1, is("olleH dlroW enoD")); } + */ } From 7bcae9968b562926f1037b213ee5dbe42b858775 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Thu, 5 Nov 2020 17:19:05 +1100 Subject: [PATCH 04/19] allow writing directly into the ByteAccumulator Signed-off-by: Lachlan Roberts --- .../jetty/io/ByteBufferAccumulator.java | 16 +++++++++++ .../jetty/io/ByteBufferOutputStream2.java | 24 ++++++++-------- .../eclipse/jetty/io/NullByteBufferPool.java | 24 ++++++++-------- .../extensions/compress/ByteAccumulator.java | 26 +++++++++-------- .../compress/CompressExtension.java | 28 ++++++------------- 5 files changed, 63 insertions(+), 55 deletions(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java index 6de5baf33806..1ee285f4d075 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java @@ -69,6 +69,22 @@ public ByteBuffer getBuffer(int minAllocationSize) return buffer; } + public void copyBytes(byte[] buf, int offset, int length) + { + copyBuffer(BufferUtil.toBuffer(buf, offset, length)); + } + + public void copyBuffer(ByteBuffer buffer) + { + while (buffer.hasRemaining()) + { + ByteBuffer b = getBuffer(buffer.remaining()); + int pos = BufferUtil.flipToFill(b); + BufferUtil.put(buffer, b); + BufferUtil.flipToFlush(b, pos); + } + } + public void writeTo(ByteBuffer buffer) { int pos = BufferUtil.flipToFill(buffer); diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java index 0d18d617d3c7..1addc9b1d09b 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java @@ -1,19 +1,19 @@ // -// ======================================================================== -// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. // -// This program and the accompanying materials are made available under -// the terms of the Eclipse Public License 2.0 which is available at -// https://www.eclipse.org/legal/epl-2.0 +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html // -// This Source Code may also be made available under the following -// Secondary Licenses when the conditions for such availability set -// forth in the Eclipse Public License, v. 2.0 are satisfied: -// the Apache License v2.0 which is available at -// https://www.apache.org/licenses/LICENSE-2.0 +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php // -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// ======================================================================== +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== // package org.eclipse.jetty.io; diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/NullByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/NullByteBufferPool.java index a68d4a28ffb4..41938ae1af47 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/NullByteBufferPool.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/NullByteBufferPool.java @@ -1,19 +1,19 @@ // -// ======================================================================== -// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. // -// This program and the accompanying materials are made available under -// the terms of the Eclipse Public License 2.0 which is available at -// https://www.eclipse.org/legal/epl-2.0 +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html // -// This Source Code may also be made available under the following -// Secondary Licenses when the conditions for such availability set -// forth in the Eclipse Public License, v. 2.0 are satisfied: -// the Apache License v2.0 which is available at -// https://www.apache.org/licenses/LICENSE-2.0 +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php // -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// ======================================================================== +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== // package org.eclipse.jetty.io; diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java index 9863634b1ed1..77dc5606a91c 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java @@ -44,7 +44,12 @@ public ByteAccumulator(int maxOverallBufferSize, ByteBufferPool byteBufferPool) public int getLength() { - return length; + return accumulator.getLength(); + } + + public ByteBuffer getBuffer(int minAllocationSize) + { + return accumulator.getBuffer(minAllocationSize); } public void copyChunk(byte[] buf, int offset, int length) @@ -54,26 +59,23 @@ public void copyChunk(byte[] buf, int offset, int length) public void copyChunk(ByteBuffer buffer) { - if (length + buffer.remaining() > maxSize) + int remaining = buffer.remaining(); + if (getLength() + remaining > maxSize) { - String err = String.format("Resulting message size [%d] is too large for configured max of [%d]", this.length + length, maxSize); + String err = String.format("Resulting message size [%d] is too large for configured max of [%d]", length + remaining, maxSize); throw new MessageTooLargeException(err); } - while (buffer.hasRemaining()) - { - ByteBuffer b = accumulator.getBuffer(buffer.remaining()); - int pos = BufferUtil.flipToFill(b); - this.length += BufferUtil.put(buffer, b); - BufferUtil.flipToFlush(b, pos); - } + length += remaining; + accumulator.copyBuffer(buffer); } public void transferTo(ByteBuffer buffer) { - if (BufferUtil.space(buffer) < length) + int availableSpace = BufferUtil.space(buffer); + if (availableSpace < length) { - String err = String.format("Not enough space in ByteBuffer remaining [%d] for accumulated buffers length [%d]", BufferUtil.space(buffer), length); + String err = String.format("Not enough space in ByteBuffer remaining [%d] for accumulated buffers length [%d]", availableSpace, length); throw new IllegalArgumentException(err); } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java index a18a07f2098d..957d4e6477a1 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java @@ -180,14 +180,10 @@ protected ByteAccumulator newByteAccumulator() protected void decompress(ByteAccumulator accumulator, ByteBuffer buf) throws DataFormatException { - if ((buf == null) || (!buf.hasRemaining())) - { + if (BufferUtil.isEmpty(buf)) return; - } - byte[] output = new byte[DECOMPRESS_BUF_SIZE]; Inflater inflater = getInflater(); - while (buf.hasRemaining() && inflater.needsInput()) { if (!supplyInput(inflater, buf)) @@ -197,22 +193,16 @@ protected void decompress(ByteAccumulator accumulator, ByteBuffer buf) throws Da return; } - int read; - while ((read = inflater.inflate(output)) >= 0) + while (true) { - if (read == 0) - { - if (LOG.isDebugEnabled()) - LOG.debug("Decompress: read 0 {}", toDetail(inflater)); + ByteBuffer buffer = accumulator.getBuffer(DECOMPRESS_BUF_SIZE); + int read = inflater.inflate(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.capacity() - buffer.limit()); + buffer.limit(buffer.limit() + read); + if (LOG.isDebugEnabled()) + LOG.debug("Decompressed {} bytes into buffer {} from {}", read, BufferUtil.toDetailString(buffer), toDetail(inflater)); + + if (read <= 0) break; - } - else - { - // do something with output - if (LOG.isDebugEnabled()) - LOG.debug("Decompressed {} bytes: {}", read, toDetail(inflater)); - accumulator.copyChunk(output, 0, read); - } } } From 6e9572215b30df8e00cb52238b3703f699819523 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Thu, 5 Nov 2020 19:11:34 +1100 Subject: [PATCH 05/19] ByteAccumulator transferTo expects buffer in fill mode. Signed-off-by: Lachlan Roberts --- .../websocket/common/extensions/compress/ByteAccumulator.java | 3 +++ .../common/extensions/compress/CompressExtension.java | 1 + 2 files changed, 4 insertions(+) diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java index 77dc5606a91c..30efd2c592c7 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java @@ -72,6 +72,9 @@ public void copyChunk(ByteBuffer buffer) public void transferTo(ByteBuffer buffer) { + // For some reason this method expects the buffer in fill mode but returns a buffer in flush mode. + BufferUtil.flipToFlush(buffer, 0); + int availableSpace = BufferUtil.space(buffer); if (availableSpace < length) { diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java index 957d4e6477a1..426d3b00eeda 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java @@ -162,6 +162,7 @@ protected void forwardIncoming(Frame frame, ByteAccumulator accumulator) ByteBuffer buffer = getBufferPool().acquire(accumulator.getLength(), false); try { + BufferUtil.clearToFill(buffer); accumulator.transferTo(buffer); newFrame.setPayload(buffer); nextIncomingFrame(newFrame); From 3c44df0724150c1247702fadee1b9fc5045f9975 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Fri, 6 Nov 2020 14:09:13 +1100 Subject: [PATCH 06/19] changes from review Signed-off-by: Lachlan Roberts --- .../jetty/io/ByteBufferAccumulator.java | 10 +++---- .../jetty/io/ByteBufferOutputStream2.java | 27 +++++++++++-------- .../extensions/compress/ByteAccumulator.java | 25 ++++++++++++----- .../compress/CompressExtension.java | 3 ++- 4 files changed, 42 insertions(+), 23 deletions(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java index 1ee285f4d075..0bd007d1d59d 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java @@ -52,17 +52,17 @@ public int getLength() return length; } - public ByteBuffer getBuffer() + public ByteBuffer ensureBuffer() { - return getBuffer(DEFAULT_BUFFER_SIZE); + return ensureBuffer(DEFAULT_BUFFER_SIZE); } - public ByteBuffer getBuffer(int minAllocationSize) + public ByteBuffer ensureBuffer(int minAllocationSize) { ByteBuffer buffer = _buffers.isEmpty() ? BufferUtil.EMPTY_BUFFER : _buffers.get(_buffers.size() - 1); if (BufferUtil.space(buffer) <= MIN_SPACE) { - buffer = _bufferPool.acquire(minAllocationSize, false); + buffer = _bufferPool.acquire(Math.max(DEFAULT_BUFFER_SIZE, minAllocationSize), false); _buffers.add(buffer); } @@ -78,7 +78,7 @@ public void copyBuffer(ByteBuffer buffer) { while (buffer.hasRemaining()) { - ByteBuffer b = getBuffer(buffer.remaining()); + ByteBuffer b = ensureBuffer(buffer.remaining()); int pos = BufferUtil.flipToFill(b); BufferUtil.put(buffer, b); BufferUtil.flipToFlush(b, pos); diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java index 1addc9b1d09b..83a63ecb494b 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java @@ -91,7 +91,7 @@ public byte[] toByteArray() public int size() { - return _accumulator.getLength(); + return _size; } @Override @@ -103,18 +103,19 @@ public void write(int b) @Override public void write(byte[] b, int off, int len) { - write(BufferUtil.toBuffer(b, off, len)); + releaseAggregateBuffer(); + _accumulator.copyBytes(b, off, len); } public void write(ByteBuffer buffer) { - while (buffer.hasRemaining()) - { - ByteBuffer lastBuffer = _accumulator.getBuffer(buffer.remaining()); - int pos = BufferUtil.flipToFill(lastBuffer); - _size += BufferUtil.put(buffer, lastBuffer); - BufferUtil.flipToFlush(lastBuffer, pos); - } + releaseAggregateBuffer(); + _accumulator.copyBuffer(buffer); + } + + public void writeTo(ByteBuffer buffer) + { + _accumulator.writeTo(buffer); } public void writeTo(OutputStream out) throws IOException @@ -122,15 +123,19 @@ public void writeTo(OutputStream out) throws IOException _accumulator.writeTo(out); } - @Override - public void close() + private void releaseAggregateBuffer() { if (_combinedByteBuffer != null) { _bufferPool.release(_combinedByteBuffer); _combinedByteBuffer = null; } + } + @Override + public void close() + { + releaseAggregateBuffer(); _accumulator.close(); _size = 0; } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java index 30efd2c592c7..2adc86c0fb16 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java @@ -25,6 +25,10 @@ import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.websocket.api.MessageTooLargeException; +/** + * @deprecated use {@link ByteBufferAccumulator} instead. + */ +@Deprecated public class ByteAccumulator implements AutoCloseable { private final ByteBufferAccumulator accumulator; @@ -47,9 +51,19 @@ public int getLength() return accumulator.getLength(); } - public ByteBuffer getBuffer(int minAllocationSize) + public ByteBuffer ensureBuffer(int minAllocationSize) { - return accumulator.getBuffer(minAllocationSize); + return accumulator.ensureBuffer(minAllocationSize); + } + + public void readBytes(int read) + { + length += read; + if (length > maxSize) + { + String err = String.format("Resulting message size [%d] is too large for configured max of [%d]", length, maxSize); + throw new MessageTooLargeException(err); + } } public void copyChunk(byte[] buf, int offset, int length) @@ -60,13 +74,12 @@ public void copyChunk(byte[] buf, int offset, int length) public void copyChunk(ByteBuffer buffer) { int remaining = buffer.remaining(); - if (getLength() + remaining > maxSize) + int length = getLength(); + if (length + remaining > maxSize) { String err = String.format("Resulting message size [%d] is too large for configured max of [%d]", length + remaining, maxSize); throw new MessageTooLargeException(err); } - - length += remaining; accumulator.copyBuffer(buffer); } @@ -76,6 +89,7 @@ public void transferTo(ByteBuffer buffer) BufferUtil.flipToFlush(buffer, 0); int availableSpace = BufferUtil.space(buffer); + int length = getLength(); if (availableSpace < length) { String err = String.format("Not enough space in ByteBuffer remaining [%d] for accumulated buffers length [%d]", availableSpace, length); @@ -89,7 +103,6 @@ public void transferTo(ByteBuffer buffer) @Override public void close() { - length = 0; accumulator.close(); } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java index 426d3b00eeda..3d5940f36bf8 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java @@ -196,9 +196,10 @@ protected void decompress(ByteAccumulator accumulator, ByteBuffer buf) throws Da while (true) { - ByteBuffer buffer = accumulator.getBuffer(DECOMPRESS_BUF_SIZE); + ByteBuffer buffer = accumulator.ensureBuffer(DECOMPRESS_BUF_SIZE); int read = inflater.inflate(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.capacity() - buffer.limit()); buffer.limit(buffer.limit() + read); + accumulator.readBytes(read); if (LOG.isDebugEnabled()) LOG.debug("Decompressed {} bytes into buffer {} from {}", read, BufferUtil.toDetailString(buffer), toDetail(inflater)); From 8dc0d9932d7e73170b80daa6a3fa06ce9ab2e543 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Fri, 6 Nov 2020 18:52:11 +1100 Subject: [PATCH 07/19] adjust minimum space in ByteBufferAccumulator before buffer allocation Signed-off-by: Lachlan Roberts --- .../main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java index 0bd007d1d59d..f57a3045584e 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java @@ -28,7 +28,7 @@ public class ByteBufferAccumulator implements AutoCloseable { - private static final int MIN_SPACE = 3; + private static final int MIN_SPACE = 8; private static final int DEFAULT_BUFFER_SIZE = 1024; private final List _buffers = new ArrayList<>(); From 595d4bfcc422c5965e55a8146d7bc87842dafe8e Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Wed, 11 Nov 2020 22:18:26 +1100 Subject: [PATCH 08/19] changes from review Signed-off-by: Lachlan Roberts --- .../jetty/io/ByteBufferAccumulator.java | 55 +++++++++++++------ .../jetty/io/ByteBufferOutputStream2.java | 43 ++------------- .../extensions/compress/ByteAccumulator.java | 4 +- 3 files changed, 46 insertions(+), 56 deletions(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java index f57a3045584e..071f75b78d3c 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java @@ -28,9 +28,6 @@ public class ByteBufferAccumulator implements AutoCloseable { - private static final int MIN_SPACE = 8; - private static final int DEFAULT_BUFFER_SIZE = 1024; - private final List _buffers = new ArrayList<>(); private final ByteBufferPool _bufferPool; @@ -52,17 +49,12 @@ public int getLength() return length; } - public ByteBuffer ensureBuffer() - { - return ensureBuffer(DEFAULT_BUFFER_SIZE); - } - - public ByteBuffer ensureBuffer(int minAllocationSize) + public ByteBuffer ensureBuffer(int minSize, int minAllocationSize) { ByteBuffer buffer = _buffers.isEmpty() ? BufferUtil.EMPTY_BUFFER : _buffers.get(_buffers.size() - 1); - if (BufferUtil.space(buffer) <= MIN_SPACE) + if (BufferUtil.space(buffer) <= minSize) { - buffer = _bufferPool.acquire(Math.max(DEFAULT_BUFFER_SIZE, minAllocationSize), false); + buffer = _bufferPool.acquire(minAllocationSize, false); _buffers.add(buffer); } @@ -78,13 +70,47 @@ public void copyBuffer(ByteBuffer buffer) { while (buffer.hasRemaining()) { - ByteBuffer b = ensureBuffer(buffer.remaining()); + ByteBuffer b = ensureBuffer(0, buffer.remaining()); int pos = BufferUtil.flipToFill(b); BufferUtil.put(buffer, b); BufferUtil.flipToFlush(b, pos); } } + public ByteBuffer takeByteBuffer() + { + int length = getLength(); + ByteBuffer combinedBuffer = _bufferPool.acquire(length, false); + for (ByteBuffer buffer : _buffers) + { + combinedBuffer.put(buffer); + } + return combinedBuffer; + } + + public ByteBuffer toByteBuffer() + { + if (_buffers.size() == 1) + return _buffers.get(0); + + ByteBuffer combinedBuffer = takeByteBuffer(); + _buffers.forEach(_bufferPool::release); + _buffers.clear(); + _buffers.add(combinedBuffer); + return combinedBuffer; + } + + public byte[] toByteArray() + { + int length = getLength(); + if (length == 0) + return new byte[0]; + + byte[] bytes = new byte[length]; + writeTo(BufferUtil.toBuffer(bytes)); + return bytes; + } + public void writeTo(ByteBuffer buffer) { int pos = BufferUtil.flipToFill(buffer); @@ -106,10 +132,7 @@ public void writeTo(OutputStream out) throws IOException @Override public void close() { - for (ByteBuffer buffer : _buffers) - { - _bufferPool.release(buffer); - } + _buffers.forEach(_bufferPool::release); _buffers.clear(); } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java index 83a63ecb494b..cedd679b7376 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java @@ -22,8 +22,6 @@ import java.io.OutputStream; import java.nio.ByteBuffer; -import org.eclipse.jetty.util.BufferUtil; - /** * This class implements an output stream in which the data is written into a list of ByteBuffer, * the buffer list automatically grows as data is written to it, the buffers are taken from the @@ -35,7 +33,6 @@ public class ByteBufferOutputStream2 extends OutputStream { private final ByteBufferAccumulator _accumulator; private final ByteBufferPool _bufferPool; - private ByteBuffer _combinedByteBuffer; private int _size = 0; public ByteBufferOutputStream2() @@ -55,22 +52,7 @@ public ByteBufferOutputStream2(ByteBufferPool bufferPool) */ public ByteBuffer toByteBuffer() { - int length = _accumulator.getLength(); - if (length == 0) - return BufferUtil.EMPTY_BUFFER; - - if (_combinedByteBuffer != null && length == _combinedByteBuffer.remaining()) - return _combinedByteBuffer; - - ByteBuffer buffer = _bufferPool.acquire(_size, false); - _accumulator.writeTo(buffer); - if (_combinedByteBuffer != null) - { - _bufferPool.release(_combinedByteBuffer); - _combinedByteBuffer = buffer; - } - - return buffer; + return _accumulator.toByteBuffer(); } /** @@ -79,14 +61,7 @@ public ByteBuffer toByteBuffer() */ public byte[] toByteArray() { - int length = _accumulator.getLength(); - if (length == 0) - return new byte[0]; - - byte[] bytes = new byte[_size]; - ByteBuffer buffer = BufferUtil.toBuffer(bytes); - _accumulator.writeTo(buffer); - return bytes; + return _accumulator.toByteArray(); } public int size() @@ -103,13 +78,13 @@ public void write(int b) @Override public void write(byte[] b, int off, int len) { - releaseAggregateBuffer(); + _size += len; _accumulator.copyBytes(b, off, len); } public void write(ByteBuffer buffer) { - releaseAggregateBuffer(); + _size += buffer.remaining(); _accumulator.copyBuffer(buffer); } @@ -123,19 +98,9 @@ public void writeTo(OutputStream out) throws IOException _accumulator.writeTo(out); } - private void releaseAggregateBuffer() - { - if (_combinedByteBuffer != null) - { - _bufferPool.release(_combinedByteBuffer); - _combinedByteBuffer = null; - } - } - @Override public void close() { - releaseAggregateBuffer(); _accumulator.close(); _size = 0; } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java index 2adc86c0fb16..fce62acbe056 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java @@ -31,6 +31,8 @@ @Deprecated public class ByteAccumulator implements AutoCloseable { + private static final int MIN_SPACE = 8; + private final ByteBufferAccumulator accumulator; private final int maxSize; private int length = 0; @@ -53,7 +55,7 @@ public int getLength() public ByteBuffer ensureBuffer(int minAllocationSize) { - return accumulator.ensureBuffer(minAllocationSize); + return accumulator.ensureBuffer(MIN_SPACE, minAllocationSize); } public void readBytes(int read) From d75e6de1b27d72a6ddd8d75759a1deefa76b75b9 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Thu, 12 Nov 2020 08:48:50 +1100 Subject: [PATCH 09/19] add takeByteBuffer method to ByteBufferOutputStream2 Signed-off-by: Lachlan Roberts --- .../eclipse/jetty/io/ByteBufferOutputStream2.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java index cedd679b7376..7801063c196e 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java @@ -32,7 +32,6 @@ public class ByteBufferOutputStream2 extends OutputStream { private final ByteBufferAccumulator _accumulator; - private final ByteBufferPool _bufferPool; private int _size = 0; public ByteBufferOutputStream2() @@ -42,8 +41,12 @@ public ByteBufferOutputStream2() public ByteBufferOutputStream2(ByteBufferPool bufferPool) { - _bufferPool = (bufferPool == null) ? new NullByteBufferPool() : bufferPool; - _accumulator = new ByteBufferAccumulator(bufferPool); + _accumulator = new ByteBufferAccumulator((bufferPool == null) ? new NullByteBufferPool() : bufferPool); + } + + public ByteBuffer takeByteBuffer() + { + return _accumulator.takeByteBuffer(); } /** @@ -108,7 +111,7 @@ public void close() @Override public synchronized String toString() { - return String.format("%s@%x{size=%d, bufferPool=%s, byteAccumulator=%s}", getClass().getSimpleName(), - hashCode(), _size, _bufferPool, _accumulator); + return String.format("%s@%x{size=%d, byteAccumulator=%s}", getClass().getSimpleName(), + hashCode(), _size, _accumulator); } } From e0031e0585c4c464cc12d285c2c41f2d0e6ec9ea Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Fri, 13 Nov 2020 09:22:57 +1100 Subject: [PATCH 10/19] Issue #5499 - takeBuffer releases all the buffers in the list Signed-off-by: Lachlan Roberts --- .../eclipse/jetty/io/ByteBufferAccumulator.java | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java index 071f75b78d3c..569ac9727bff 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java @@ -79,23 +79,28 @@ public void copyBuffer(ByteBuffer buffer) public ByteBuffer takeByteBuffer() { + ByteBuffer combinedBuffer; + if (_buffers.size() == 1) + { + combinedBuffer = _buffers.get(0); + _buffers.clear(); + return combinedBuffer; + } + int length = getLength(); - ByteBuffer combinedBuffer = _bufferPool.acquire(length, false); + combinedBuffer = _bufferPool.acquire(length, false); for (ByteBuffer buffer : _buffers) { combinedBuffer.put(buffer); + _bufferPool.release(buffer); } + _buffers.clear(); return combinedBuffer; } public ByteBuffer toByteBuffer() { - if (_buffers.size() == 1) - return _buffers.get(0); - ByteBuffer combinedBuffer = takeByteBuffer(); - _buffers.forEach(_bufferPool::release); - _buffers.clear(); _buffers.add(combinedBuffer); return combinedBuffer; } From e7bed39239e6e4b8309803a14fa0702380d65bd9 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Fri, 13 Nov 2020 09:56:32 +1100 Subject: [PATCH 11/19] Issue #5499 - add javadoc for ByteBufferAccumulator Signed-off-by: Lachlan Roberts --- .../jetty/io/ByteBufferAccumulator.java | 37 +++++++++++++++++++ .../jetty/io/ByteBufferOutputStream2.java | 19 ++++++++-- 2 files changed, 52 insertions(+), 4 deletions(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java index 569ac9727bff..ab551e5e1f6c 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java @@ -26,6 +26,15 @@ import org.eclipse.jetty.util.BufferUtil; +/** + * Accumulates data into a list of ByteBuffers which can then be combined into a single buffer or written to an OutputStream. + * The buffer list automatically grows as data is written to it, the buffers are taken from the + * supplied {@link ByteBufferPool} or freshly allocated if one is not supplied. + * + * The method {@link #ensureBuffer(int, int)} is used to write directly to the last buffer stored in the buffer list, + * if there is less than a certain amount of space available in that buffer then a new one will be allocated and returned instead. + * @see #ensureBuffer(int, int) + */ public class ByteBufferAccumulator implements AutoCloseable { private final List _buffers = new ArrayList<>(); @@ -49,6 +58,17 @@ public int getLength() return length; } + public ByteBufferPool getByteBufferPool() + { + return _bufferPool; + } + + /** + * Get the last buffer of the accumulator, this can be written to directly to avoid copying into the accumulator. + * @param minSize the smallest amount of remaining space before a new buffer is allocated. + * @param minAllocationSize new buffers will be allocated to have at least this size. + * @return a buffer with at least {@code minSize} space to write into. + */ public ByteBuffer ensureBuffer(int minSize, int minAllocationSize) { ByteBuffer buffer = _buffers.isEmpty() ? BufferUtil.EMPTY_BUFFER : _buffers.get(_buffers.size() - 1); @@ -77,6 +97,12 @@ public void copyBuffer(ByteBuffer buffer) } } + /** + * Take the combined buffer containing all content written to the accumulator. + * The caller is responsible for releasing this {@link ByteBuffer} back into the {@link ByteBufferPool}. + * @return a buffer containing all content written to the accumulator. + * @see #toByteBuffer() + */ public ByteBuffer takeByteBuffer() { ByteBuffer combinedBuffer; @@ -98,6 +124,14 @@ public ByteBuffer takeByteBuffer() return combinedBuffer; } + /** + * Take the combined buffer containing all content written to the accumulator. + * The returned buffer is still contained within the accumulator and will be released back to the {@link ByteBufferPool} + * when the accumulator is closed. + * @return a buffer containing all content written to the accumulator. + * @see #takeByteBuffer() + * @see #close() + */ public ByteBuffer toByteBuffer() { ByteBuffer combinedBuffer = takeByteBuffer(); @@ -105,6 +139,9 @@ public ByteBuffer toByteBuffer() return combinedBuffer; } + /** + * @return a newly allocated byte array containing all content written into the accumulator. + */ public byte[] toByteArray() { int length = getLength(); diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java index 7801063c196e..2ce77fab6459 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java @@ -44,14 +44,26 @@ public ByteBufferOutputStream2(ByteBufferPool bufferPool) _accumulator = new ByteBufferAccumulator((bufferPool == null) ? new NullByteBufferPool() : bufferPool); } + public ByteBufferPool getByteBufferPool() + { + return _accumulator.getByteBufferPool(); + } + + /** + * Take the combined buffer containing all content written to the OutputStream. + * The caller is responsible for releasing this {@link ByteBuffer} back into the {@link ByteBufferPool}. + * @return a buffer containing all content written to the OutputStream. + */ public ByteBuffer takeByteBuffer() { return _accumulator.takeByteBuffer(); } /** - * Get an aggregated content written to the OutputStream in a ByteBuffer. - * @return the content in a ByteBuffer. + * Take the combined buffer containing all content written to the OutputStream. + * The returned buffer is still contained within the OutputStream and will be released back to the {@link ByteBufferPool} + * when the OutputStream is closed. + * @return a buffer containing all content written to the OutputStream. */ public ByteBuffer toByteBuffer() { @@ -59,8 +71,7 @@ public ByteBuffer toByteBuffer() } /** - * Get an aggregated content written to the OutputStream in a byte array. - * @return the content in a byte array. + * @return a newly allocated byte array containing all content written into the OutputStream. */ public byte[] toByteArray() { From a1aa5dcd14c77f7ccb15cd2dbe8ce21e18b5cb7f Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Mon, 16 Nov 2020 18:33:38 +1100 Subject: [PATCH 12/19] Issue #5499 - use ByteBufferAccumulator for websocket compression Signed-off-by: Lachlan Roberts --- .../jetty/io/ByteBufferAccumulator.java | 2 + .../compress/CompressExtension.java | 69 +++++++++++-------- 2 files changed, 41 insertions(+), 30 deletions(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java index ab551e5e1f6c..7638bd30123e 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java @@ -115,11 +115,13 @@ public ByteBuffer takeByteBuffer() int length = getLength(); combinedBuffer = _bufferPool.acquire(length, false); + BufferUtil.clearToFill(combinedBuffer); for (ByteBuffer buffer : _buffers) { combinedBuffer.put(buffer); _bufferPool.release(buffer); } + BufferUtil.flipToFlush(combinedBuffer, 0); _buffers.clear(); return combinedBuffer; } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java index 3d5940f36bf8..ce819d2af52d 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java @@ -18,7 +18,6 @@ package org.eclipse.jetty.websocket.common.extensions.compress; -import java.io.ByteArrayOutputStream; import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.Queue; @@ -28,6 +27,8 @@ import java.util.zip.Inflater; import java.util.zip.ZipException; +import org.eclipse.jetty.io.ByteBufferAccumulator; +import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.compression.DeflaterPool; @@ -474,7 +475,9 @@ private void compress(FrameEntry entry, boolean first) // Get a chunk of the payload to avoid to blow // the heap if the payload is a huge mapped file. Frame frame = entry.frame; + boolean fin = frame.isFin(); ByteBuffer data = frame.getPayload(); + Deflater deflater = getDeflater(); if (data == null) data = BufferUtil.EMPTY_BUFFER; @@ -484,40 +487,47 @@ private void compress(FrameEntry entry, boolean first) if (LOG.isDebugEnabled()) LOG.debug("Compressing {}: {} bytes in {} bytes chunk", entry, remaining, outputLength); - boolean needsCompress = true; - - Deflater deflater = getDeflater(); - - if (deflater.needsInput() && !supplyInput(deflater, data)) + ByteBuffer payload = BufferUtil.EMPTY_BUFFER; + WriteCallback callback = this; + if (!deflater.needsInput() || supplyInput(deflater, data)) { - // no input supplied - needsCompress = false; - } - - ByteArrayOutputStream out = new ByteArrayOutputStream(); - - byte[] output = new byte[outputLength]; - - boolean fin = frame.isFin(); + ByteBufferPool bufferPool = getBufferPool(); + try (ByteBufferAccumulator accumulator = new ByteBufferAccumulator(bufferPool)) + { + while (true) + { + ByteBuffer buffer = accumulator.ensureBuffer(0, outputLength); + int compressed = deflater.deflate(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.capacity() - buffer.limit(), Deflater.SYNC_FLUSH); + buffer.limit(buffer.limit() + compressed); - // Compress the data - while (needsCompress) - { - int compressed = deflater.deflate(output, 0, outputLength, Deflater.SYNC_FLUSH); + if (LOG.isDebugEnabled()) + LOG.debug("Wrote {} bytes to output buffer", accumulator); - // Append the output for the eventual frame. - if (LOG.isDebugEnabled()) - LOG.debug("Wrote {} bytes to output buffer", compressed); - out.write(output, 0, compressed); + if (compressed <= 0) + break; + } - if (compressed < outputLength) - { - needsCompress = false; + ByteBuffer buffer = accumulator.takeByteBuffer(); + payload = buffer; + callback = new WriteCallback() + { + @Override + public void writeFailed(Throwable x) + { + bufferPool.release(buffer); + Flusher.this.writeFailed(x); + } + + @Override + public void writeSuccess() + { + bufferPool.release(buffer); + Flusher.this.writeSuccess(); + } + }; } } - ByteBuffer payload = ByteBuffer.wrap(out.toByteArray()); - if (payload.remaining() > 0) { // Handle tail bytes generated by SYNC_FLUSH. @@ -566,8 +576,7 @@ else if (fin) } chunk.setPayload(payload); chunk.setFin(fin); - - nextOutgoingFrame(chunk, this, entry.batchMode); + nextOutgoingFrame(chunk, callback, entry.batchMode); } @Override From 5788fe609dae59ca82dfc21489f41e5bba3add40 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Mon, 16 Nov 2020 18:43:22 +1100 Subject: [PATCH 13/19] Fix ByteBufferAccumulator minSize Signed-off-by: Lachlan Roberts --- .../eclipse/jetty/io/ByteBufferAccumulator.java | 14 ++++++++++++-- .../extensions/compress/CompressExtension.java | 2 +- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java index 7638bd30123e..cf6d2dd9c66e 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java @@ -63,6 +63,16 @@ public ByteBufferPool getByteBufferPool() return _bufferPool; } + /** + * Get the last buffer of the accumulator, this can be written to directly to avoid copying into the accumulator. + * @param minAllocationSize new buffers will be allocated to have at least this size. + * @return a buffer with at least {@code minSize} space to write into. + */ + public ByteBuffer ensureBuffer(int minAllocationSize) + { + return ensureBuffer(1, minAllocationSize); + } + /** * Get the last buffer of the accumulator, this can be written to directly to avoid copying into the accumulator. * @param minSize the smallest amount of remaining space before a new buffer is allocated. @@ -72,7 +82,7 @@ public ByteBufferPool getByteBufferPool() public ByteBuffer ensureBuffer(int minSize, int minAllocationSize) { ByteBuffer buffer = _buffers.isEmpty() ? BufferUtil.EMPTY_BUFFER : _buffers.get(_buffers.size() - 1); - if (BufferUtil.space(buffer) <= minSize) + if (BufferUtil.space(buffer) < minSize) { buffer = _bufferPool.acquire(minAllocationSize, false); _buffers.add(buffer); @@ -90,7 +100,7 @@ public void copyBuffer(ByteBuffer buffer) { while (buffer.hasRemaining()) { - ByteBuffer b = ensureBuffer(0, buffer.remaining()); + ByteBuffer b = ensureBuffer(buffer.remaining()); int pos = BufferUtil.flipToFill(b); BufferUtil.put(buffer, b); BufferUtil.flipToFlush(b, pos); diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java index ce819d2af52d..8dd90067c1be 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java @@ -496,7 +496,7 @@ private void compress(FrameEntry entry, boolean first) { while (true) { - ByteBuffer buffer = accumulator.ensureBuffer(0, outputLength); + ByteBuffer buffer = accumulator.ensureBuffer(8, outputLength); int compressed = deflater.deflate(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.capacity() - buffer.limit(), Deflater.SYNC_FLUSH); buffer.limit(buffer.limit() + compressed); From 7c46d96fcea0a1f6c328f472916a39875786b89b Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Tue, 17 Nov 2020 13:59:49 +1100 Subject: [PATCH 14/19] Issue #5499 - add tests for ByteBufferAccumulator Signed-off-by: Lachlan Roberts --- .../jetty/io/ByteBufferAccumulator.java | 8 +- .../jetty/io/ByteBufferAccumulatorTest.java | 333 ++++++++++++++++++ .../compress/ByteAccumulatorTest.java | 75 ---- 3 files changed, 338 insertions(+), 78 deletions(-) create mode 100644 jetty-io/src/test/java/org/eclipse/jetty/io/ByteBufferAccumulatorTest.java diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java index cf6d2dd9c66e..97f009fd9049 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java @@ -161,7 +161,9 @@ public byte[] toByteArray() return new byte[0]; byte[] bytes = new byte[length]; - writeTo(BufferUtil.toBuffer(bytes)); + ByteBuffer buffer = BufferUtil.toBuffer(bytes); + BufferUtil.clear(buffer); + writeTo(buffer); return bytes; } @@ -170,7 +172,7 @@ public void writeTo(ByteBuffer buffer) int pos = BufferUtil.flipToFill(buffer); for (ByteBuffer bb : _buffers) { - buffer.put(bb); + buffer.put(bb.slice()); } BufferUtil.flipToFlush(buffer, pos); } @@ -179,7 +181,7 @@ public void writeTo(OutputStream out) throws IOException { for (ByteBuffer bb : _buffers) { - BufferUtil.writeTo(bb, out); + BufferUtil.writeTo(bb.slice(), out); } } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/ByteBufferAccumulatorTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/ByteBufferAccumulatorTest.java new file mode 100644 index 000000000000..68f72d465593 --- /dev/null +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/ByteBufferAccumulatorTest.java @@ -0,0 +1,333 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.io; + +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.concurrent.atomic.AtomicLong; + +import org.eclipse.jetty.util.BufferUtil; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class ByteBufferAccumulatorTest +{ + private CountingBufferPool byteBufferPool; + private ByteBufferAccumulator accumulator; + + @BeforeEach + public void before() + { + byteBufferPool = new CountingBufferPool(); + accumulator = new ByteBufferAccumulator(byteBufferPool); + } + + @Test + public void testToBuffer() + { + int size = 1024 * 1024; + int allocationSize = 1024; + ByteBuffer content = randomBytes(size); + ByteBuffer slice = content.slice(); + + // We completely fill up the internal buffer with the first write. + ByteBuffer internalBuffer = accumulator.ensureBuffer(1, allocationSize); + assertThat(BufferUtil.space(internalBuffer), greaterThanOrEqualTo(allocationSize)); + writeInFlushMode(slice, internalBuffer); + assertThat(BufferUtil.space(internalBuffer), is(0)); + + // If we ask for min size of 0 we get the same buffer which is full. + internalBuffer = accumulator.ensureBuffer(0, allocationSize); + assertThat(BufferUtil.space(internalBuffer), is(0)); + + // If we need at least 1 minSpace we must allocate a new buffer. + internalBuffer = accumulator.ensureBuffer(1, allocationSize); + assertThat(BufferUtil.space(internalBuffer), greaterThan(0)); + assertThat(BufferUtil.space(internalBuffer), greaterThanOrEqualTo(allocationSize)); + + // Write 13 bytes from the end of the internal buffer. + int bytesToWrite = BufferUtil.space(internalBuffer) - 13; + ByteBuffer buffer = BufferUtil.toBuffer(new byte[bytesToWrite]); + BufferUtil.clear(buffer); + assertThat(writeInFlushMode(slice, buffer), is(bytesToWrite)); + assertThat(writeInFlushMode(buffer, internalBuffer), is(bytesToWrite)); + assertThat(BufferUtil.space(internalBuffer), is(13)); + + // If we request anything under the amount remaining we get back the same buffer. + for (int i = 0; i <= 13; i++) + { + internalBuffer = accumulator.ensureBuffer(i, allocationSize); + assertThat(BufferUtil.space(internalBuffer), is(13)); + } + + // If we request over 13 then we get a new buffer. + internalBuffer = accumulator.ensureBuffer(14, allocationSize); + assertThat(BufferUtil.space(internalBuffer), greaterThanOrEqualTo(1024)); + + // Copy the rest of the content. + while (slice.hasRemaining()) + { + internalBuffer = accumulator.ensureBuffer(1, allocationSize); + assertThat(BufferUtil.space(internalBuffer), greaterThanOrEqualTo(1)); + writeInFlushMode(slice, internalBuffer); + } + + // Check we have the same content as the original buffer. + assertThat(accumulator.getLength(), is(size)); + assertThat(byteBufferPool.getLeasedBuffers(), greaterThan(1L)); + ByteBuffer combinedBuffer = accumulator.toByteBuffer(); + assertThat(byteBufferPool.getLeasedBuffers(), is(1L)); + assertThat(accumulator.getLength(), is(size)); + assertThat(combinedBuffer, is(content)); + + // Close the accumulator and make sure all is returned to bufferPool. + accumulator.close(); + byteBufferPool.verifyClosed(); + } + + @Test + public void testTakeBuffer() + { + int size = 1024 * 1024; + int allocationSize = 1024; + ByteBuffer content = randomBytes(size); + ByteBuffer slice = content.slice(); + + // Copy the content. + while (slice.hasRemaining()) + { + ByteBuffer internalBuffer = accumulator.ensureBuffer(1, allocationSize); + assertThat(BufferUtil.space(internalBuffer), greaterThanOrEqualTo(1)); + writeInFlushMode(slice, internalBuffer); + } + + // Check we have the same content as the original buffer. + assertThat(accumulator.getLength(), is(size)); + assertThat(byteBufferPool.getLeasedBuffers(), greaterThan(1L)); + ByteBuffer combinedBuffer = accumulator.takeByteBuffer(); + assertThat(byteBufferPool.getLeasedBuffers(), is(1L)); + assertThat(accumulator.getLength(), is(0)); + accumulator.close(); + assertThat(byteBufferPool.getLeasedBuffers(), is(1L)); + assertThat(combinedBuffer, is(content)); + + // Return the buffer and make sure all is returned to bufferPool. + byteBufferPool.release(combinedBuffer); + byteBufferPool.verifyClosed(); + } + + @Test + public void testToByteArray() + { + int size = 1024 * 1024; + int allocationSize = 1024; + ByteBuffer content = randomBytes(size); + ByteBuffer slice = content.slice(); + + // Copy the content. + while (slice.hasRemaining()) + { + ByteBuffer internalBuffer = accumulator.ensureBuffer(1, allocationSize); + writeInFlushMode(slice, internalBuffer); + } + + // Check we have the same content as the original buffer. + assertThat(accumulator.getLength(), is(size)); + assertThat(byteBufferPool.getLeasedBuffers(), greaterThan(1L)); + byte[] combinedBuffer = accumulator.toByteArray(); + assertThat(byteBufferPool.getLeasedBuffers(), greaterThan(1L)); + assertThat(accumulator.getLength(), is(size)); + assertThat(BufferUtil.toBuffer(combinedBuffer), is(content)); + + // Close the accumulator and make sure all is returned to bufferPool. + accumulator.close(); + byteBufferPool.verifyClosed(); + } + + @Test + public void testEmptyToBuffer() + { + ByteBuffer combinedBuffer = accumulator.toByteBuffer(); + assertThat(combinedBuffer.remaining(), is(0)); + assertThat(byteBufferPool.getLeasedBuffers(), is(1L)); + accumulator.close(); + byteBufferPool.verifyClosed(); + } + + @Test + public void testEmptyTakeBuffer() + { + ByteBuffer combinedBuffer = accumulator.takeByteBuffer(); + assertThat(combinedBuffer.remaining(), is(0)); + accumulator.close(); + assertThat(byteBufferPool.getLeasedBuffers(), is(1L)); + byteBufferPool.release(combinedBuffer); + byteBufferPool.verifyClosed(); + } + + @Test + public void testWriteTo() + { + int size = 1024 * 1024; + int allocationSize = 1024; + ByteBuffer content = randomBytes(size); + ByteBuffer slice = content.slice(); + + // Copy the content. + while (slice.hasRemaining()) + { + ByteBuffer internalBuffer = accumulator.ensureBuffer(1, allocationSize); + writeInFlushMode(slice, internalBuffer); + } + + // Check we have the same content as the original buffer. + assertThat(byteBufferPool.getLeasedBuffers(), greaterThan(1L)); + ByteBuffer combinedBuffer = byteBufferPool.acquire(accumulator.getLength(), false); + accumulator.writeTo(combinedBuffer); + assertThat(accumulator.getLength(), is(size)); + assertThat(combinedBuffer, is(content)); + byteBufferPool.release(combinedBuffer); + + // Close the accumulator and make sure all is returned to bufferPool. + accumulator.close(); + byteBufferPool.verifyClosed(); + } + + @Test + public void testWriteToBufferTooSmall() + { + int size = 1024 * 1024; + int allocationSize = 1024; + ByteBuffer content = randomBytes(size); + ByteBuffer slice = content.slice(); + + // Copy the content. + while (slice.hasRemaining()) + { + ByteBuffer internalBuffer = accumulator.ensureBuffer(1, allocationSize); + writeInFlushMode(slice, internalBuffer); + } + + // Writing to a buffer too small gives buffer overflow. + assertThat(byteBufferPool.getLeasedBuffers(), greaterThan(1L)); + ByteBuffer combinedBuffer = BufferUtil.toBuffer(new byte[accumulator.getLength() - 1]); + BufferUtil.clear(combinedBuffer); + assertThrows(BufferOverflowException.class, () -> accumulator.writeTo(combinedBuffer)); + + // Close the accumulator and make sure all is returned to bufferPool. + accumulator.close(); + byteBufferPool.verifyClosed(); + } + + @Test + public void testCopy() + { + int size = 1024 * 1024; + ByteBuffer content = randomBytes(size); + ByteBuffer slice = content.slice(); + + // Copy the content. + int tmpBufferSize = 1024; + ByteBuffer tmpBuffer = BufferUtil.toBuffer(new byte[tmpBufferSize]); + BufferUtil.clear(tmpBuffer); + while (slice.hasRemaining()) + { + writeInFlushMode(slice, tmpBuffer); + accumulator.copyBuffer(tmpBuffer); + } + + // Check we have the same content as the original buffer. + assertThat(byteBufferPool.getLeasedBuffers(), greaterThan(1L)); + ByteBuffer combinedBuffer = byteBufferPool.acquire(accumulator.getLength(), false); + accumulator.writeTo(combinedBuffer); + assertThat(accumulator.getLength(), is(size)); + assertThat(combinedBuffer, is(content)); + byteBufferPool.release(combinedBuffer); + + // Close the accumulator and make sure all is returned to bufferPool. + accumulator.close(); + byteBufferPool.verifyClosed(); + } + + private ByteBuffer randomBytes(int size) + { + byte[] data = new byte[size]; + new Random().nextBytes(data); + return BufferUtil.toBuffer(data); + } + + private int writeInFlushMode(ByteBuffer from, ByteBuffer to) + { + int pos = BufferUtil.flipToFill(to); + int written = BufferUtil.put(from, to); + BufferUtil.flipToFlush(to, pos); + return written; + } + + public static class CountingBufferPool extends LeakTrackingByteBufferPool + { + private final AtomicLong _leasedBuffers = new AtomicLong(0); + + public CountingBufferPool() + { + this(new MappedByteBufferPool()); + } + + public CountingBufferPool(ByteBufferPool delegate) + { + super(delegate); + } + + @Override + public ByteBuffer acquire(int size, boolean direct) + { + _leasedBuffers.incrementAndGet(); + return super.acquire(size, direct); + } + + @Override + public void release(ByteBuffer buffer) + { + if (buffer != null) + _leasedBuffers.decrementAndGet(); + super.release(buffer); + } + + public long getLeasedBuffers() + { + return _leasedBuffers.get(); + } + + public void verifyClosed() + { + assertThat(_leasedBuffers.get(), is(0L)); + assertThat(getLeakedAcquires(), is(0L)); + assertThat(getLeakedReleases(), is(0L)); + assertThat(getLeakedResources(), is(0L)); + assertThat(getLeakedRemoves(), is(0L)); + } + } +} diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulatorTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulatorTest.java index 08074de115c4..5aa41765e070 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulatorTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulatorTest.java @@ -91,79 +91,4 @@ public void testCopyChunkNotEnoughSpace() MessageTooLargeException e = assertThrows(MessageTooLargeException.class, () -> accumulator.copyChunk(world, 0, world.length)); assertThat(e.getMessage(), containsString("too large for configured max")); } - - /* - @Test - public void testRecycle() - { - ByteAccumulator accumulator = new ByteAccumulator(10_000, new ArrayByteBufferPool()); - ByteBuffer out0 = ByteBuffer.allocate(200); - ByteBuffer out1 = ByteBuffer.allocate(200); - { - // 1 - ByteBuffer buf = accumulator.newByteBuffer(10); - byte[] hello = "Hello".getBytes(UTF_8); - buf.put(hello).flip(); - accumulator.copyChunk(buf); - - // 2 - buf = accumulator.newByteBuffer(10); - byte[] space = " ".getBytes(UTF_8); - buf.put(space).flip(); - accumulator.copyChunk(buf); - - // 3 - buf = accumulator.newByteBuffer(10); - byte[] world = "World".getBytes(UTF_8); - buf.put(world).flip(); - accumulator.copyChunk(buf); - - assertThat("Length", accumulator.getLength(), is(hello.length + space.length + world.length)); - - accumulator.transferTo(out0); - - // reuse that byte[] - accumulator.recycle(); - } - - { - // 1 - ByteBuffer buf = accumulator.newByteBuffer(10); - byte[] olleh = "olleH".getBytes(UTF_8); - buf.put(olleh).flip(); - accumulator.copyChunk(buf); - - // 2 - buf = accumulator.newByteBuffer(10); - byte[] space = " ".getBytes(UTF_8); - buf.put(space).flip(); - accumulator.copyChunk(buf); - - // 3 - buf = accumulator.newByteBuffer(10); - byte[] dlrow = "dlroW".getBytes(UTF_8); - buf.put(dlrow).flip(); - accumulator.copyChunk(buf); - - // 4 - buf = accumulator.newByteBuffer(10); - byte[] done = " enoD".getBytes(UTF_8); - buf.put(done).flip(); - accumulator.copyChunk(buf); - - assertThat("Length", accumulator.getLength(), is(olleh.length + space.length + dlrow.length + done.length)); - - accumulator.transferTo(out1); - - // reuse that byte[] - accumulator.recycle(); - } - - String result0 = BufferUtil.toUTF8String(out0); - assertThat("result", result0, is("Hello World")); - - String result1 = BufferUtil.toUTF8String(out1); - assertThat("result", result1, is("olleH dlroW enoD")); - } - */ } From f63a741b0f0131300fb5d8e1e040e1c74a73c059 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Tue, 17 Nov 2020 21:16:38 +1100 Subject: [PATCH 15/19] use local length field for ByteAccumulator.getLength() Signed-off-by: Lachlan Roberts --- .../java/org/eclipse/jetty/io/ByteBufferAccumulator.java | 5 +++++ .../common/extensions/compress/ByteAccumulator.java | 4 ++-- .../common/extensions/compress/CompressExtension.java | 2 +- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java index 97f009fd9049..c352bcb5b821 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java @@ -50,6 +50,11 @@ public ByteBufferAccumulator(ByteBufferPool bufferPool) _bufferPool = (bufferPool == null) ? new NullByteBufferPool() : bufferPool; } + /** + * Get the amount of bytes which have been accumulated. + * This will add up the remaining of each buffer in the accumulator. + * @return the total length of the content in the accumulator. + */ public int getLength() { int length = 0; diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java index fce62acbe056..6632b76afa23 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java @@ -50,7 +50,7 @@ public ByteAccumulator(int maxOverallBufferSize, ByteBufferPool byteBufferPool) public int getLength() { - return accumulator.getLength(); + return length; } public ByteBuffer ensureBuffer(int minAllocationSize) @@ -58,7 +58,7 @@ public ByteBuffer ensureBuffer(int minAllocationSize) return accumulator.ensureBuffer(MIN_SPACE, minAllocationSize); } - public void readBytes(int read) + public void addLength(int read) { length += read; if (length > maxSize) diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java index 8dd90067c1be..f7a3995a3d00 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java @@ -200,7 +200,7 @@ protected void decompress(ByteAccumulator accumulator, ByteBuffer buf) throws Da ByteBuffer buffer = accumulator.ensureBuffer(DECOMPRESS_BUF_SIZE); int read = inflater.inflate(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.capacity() - buffer.limit()); buffer.limit(buffer.limit() + read); - accumulator.readBytes(read); + accumulator.addLength(read); if (LOG.isDebugEnabled()) LOG.debug("Decompressed {} bytes into buffer {} from {}", read, BufferUtil.toDetailString(buffer), toDetail(inflater)); From 2629845f173a8e14f287a393d17192685a229eb6 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Tue, 17 Nov 2020 22:16:35 +1100 Subject: [PATCH 16/19] update ByteAccumulator length on copies Signed-off-by: Lachlan Roberts --- .../websocket/common/extensions/compress/ByteAccumulator.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java index 6632b76afa23..02e8092841cf 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java @@ -76,12 +76,13 @@ public void copyChunk(byte[] buf, int offset, int length) public void copyChunk(ByteBuffer buffer) { int remaining = buffer.remaining(); - int length = getLength(); if (length + remaining > maxSize) { String err = String.format("Resulting message size [%d] is too large for configured max of [%d]", length + remaining, maxSize); throw new MessageTooLargeException(err); } + + length += remaining; accumulator.copyBuffer(buffer); } @@ -91,7 +92,6 @@ public void transferTo(ByteBuffer buffer) BufferUtil.flipToFlush(buffer, 0); int availableSpace = BufferUtil.space(buffer); - int length = getLength(); if (availableSpace < length) { String err = String.format("Not enough space in ByteBuffer remaining [%d] for accumulated buffers length [%d]", availableSpace, length); From 602cd7e5c0b613b17bfa7ca4a8a18874a07a2e6b Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Tue, 1 Dec 2020 09:51:09 +1100 Subject: [PATCH 17/19] throw ArithmeticException on integer overflow from size Signed-off-by: Lachlan Roberts --- .../main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java index c352bcb5b821..c6bc659690f3 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java @@ -59,7 +59,7 @@ public int getLength() { int length = 0; for (ByteBuffer buffer : _buffers) - length += buffer.remaining(); + length = Math.addExact(length, buffer.remaining()); return length; } From 6dce1cbffd0768bd172108aba2523f0a17f548f0 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Tue, 1 Dec 2020 10:01:31 +1100 Subject: [PATCH 18/19] Make ByteBufferAccumulator direct configurable Signed-off-by: Lachlan Roberts --- .../java/org/eclipse/jetty/io/ByteBufferAccumulator.java | 8 +++++--- .../org/eclipse/jetty/io/ByteBufferOutputStream2.java | 6 +++--- .../org/eclipse/jetty/io/ByteBufferAccumulatorTest.java | 2 +- .../common/extensions/compress/ByteAccumulator.java | 2 +- .../common/extensions/compress/CompressExtension.java | 2 +- 5 files changed, 11 insertions(+), 9 deletions(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java index c6bc659690f3..462a018325ec 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java @@ -39,15 +39,17 @@ public class ByteBufferAccumulator implements AutoCloseable { private final List _buffers = new ArrayList<>(); private final ByteBufferPool _bufferPool; + private final boolean _direct; public ByteBufferAccumulator() { - this(null); + this(null, false); } - public ByteBufferAccumulator(ByteBufferPool bufferPool) + public ByteBufferAccumulator(ByteBufferPool bufferPool, boolean direct) { _bufferPool = (bufferPool == null) ? new NullByteBufferPool() : bufferPool; + _direct = direct; } /** @@ -129,7 +131,7 @@ public ByteBuffer takeByteBuffer() } int length = getLength(); - combinedBuffer = _bufferPool.acquire(length, false); + combinedBuffer = _bufferPool.acquire(length, _direct); BufferUtil.clearToFill(combinedBuffer); for (ByteBuffer buffer : _buffers) { diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java index 2ce77fab6459..ed11c126875f 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java @@ -36,12 +36,12 @@ public class ByteBufferOutputStream2 extends OutputStream public ByteBufferOutputStream2() { - this(null); + this(null, false); } - public ByteBufferOutputStream2(ByteBufferPool bufferPool) + public ByteBufferOutputStream2(ByteBufferPool bufferPool, boolean direct) { - _accumulator = new ByteBufferAccumulator((bufferPool == null) ? new NullByteBufferPool() : bufferPool); + _accumulator = new ByteBufferAccumulator((bufferPool == null) ? new NullByteBufferPool() : bufferPool, direct); } public ByteBufferPool getByteBufferPool() diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/ByteBufferAccumulatorTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/ByteBufferAccumulatorTest.java index 68f72d465593..22e628be9f9e 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/ByteBufferAccumulatorTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/ByteBufferAccumulatorTest.java @@ -42,7 +42,7 @@ public class ByteBufferAccumulatorTest public void before() { byteBufferPool = new CountingBufferPool(); - accumulator = new ByteBufferAccumulator(byteBufferPool); + accumulator = new ByteBufferAccumulator(byteBufferPool, false); } @Test diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java index 02e8092841cf..00a8db41fa5f 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java @@ -45,7 +45,7 @@ public ByteAccumulator(int maxOverallBufferSize) public ByteAccumulator(int maxOverallBufferSize, ByteBufferPool byteBufferPool) { this.maxSize = maxOverallBufferSize; - this.accumulator = new ByteBufferAccumulator(byteBufferPool); + this.accumulator = new ByteBufferAccumulator(byteBufferPool, false); } public int getLength() diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java index f7a3995a3d00..a70f7c914758 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java @@ -492,7 +492,7 @@ private void compress(FrameEntry entry, boolean first) if (!deflater.needsInput() || supplyInput(deflater, data)) { ByteBufferPool bufferPool = getBufferPool(); - try (ByteBufferAccumulator accumulator = new ByteBufferAccumulator(bufferPool)) + try (ByteBufferAccumulator accumulator = new ByteBufferAccumulator(bufferPool, false)) { while (true) { From 8aedc50048fa114f964903d61b8b30aeece83463 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Wed, 2 Dec 2020 00:07:38 +1100 Subject: [PATCH 19/19] fix missing usage of the new _direct field in ByteBufferAccumulator Signed-off-by: Lachlan Roberts --- .../main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java index 462a018325ec..66bba546aec8 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java @@ -91,7 +91,7 @@ public ByteBuffer ensureBuffer(int minSize, int minAllocationSize) ByteBuffer buffer = _buffers.isEmpty() ? BufferUtil.EMPTY_BUFFER : _buffers.get(_buffers.size() - 1); if (BufferUtil.space(buffer) < minSize) { - buffer = _bufferPool.acquire(minAllocationSize, false); + buffer = _bufferPool.acquire(minAllocationSize, _direct); _buffers.add(buffer); }