diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayRetainableByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayRetainableByteBufferPool.java index ff358e8f0f81..a33159a49489 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayRetainableByteBufferPool.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayRetainableByteBufferPool.java @@ -13,36 +13,41 @@ package org.eclipse.jetty.io; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import java.util.function.Function; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Pool; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedOperation; +import org.eclipse.jetty.util.component.Dumpable; +import org.eclipse.jetty.util.component.DumpableCollection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @ManagedObject -public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool +public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool, Dumpable { private static final Logger LOG = LoggerFactory.getLogger(ArrayRetainableByteBufferPool.class); - private final Pool[] _direct; - private final Pool[] _indirect; - private final int _factor; + private final Bucket[] _direct; + private final Bucket[] _indirect; private final int _minCapacity; + private final int _maxCapacity; private final long _maxHeapMemory; private final long _maxDirectMemory; private final AtomicLong _currentHeapMemory = new AtomicLong(); private final AtomicLong _currentDirectMemory = new AtomicLong(); + private final Function _bucketIndexFor; public ArrayRetainableByteBufferPool() { - this(0, 1024, 65536, Integer.MAX_VALUE, -1L, -1L); + this(0, -1, -1, Integer.MAX_VALUE, -1L, -1L); } public ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize) @@ -52,48 +57,72 @@ public ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacit public ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory) { - _factor = factor <= 0 ? 1024 : factor; - this._maxHeapMemory = maxHeapMemory; - this._maxDirectMemory = maxDirectMemory; + this(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory, null, null); + } + + protected ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory, + Function bucketIndexFor, Function bucketCapacity) + { if (minCapacity <= 0) minCapacity = 0; - _minCapacity = minCapacity; if (maxCapacity <= 0) maxCapacity = 64 * 1024; - if ((maxCapacity % _factor) != 0 || _factor >= maxCapacity) + + int f = factor <= 0 ? 1024 : factor; + if ((maxCapacity % f) != 0 || f >= maxCapacity) throw new IllegalArgumentException("The capacity factor must be a divisor of maxCapacity"); - int length = maxCapacity / _factor; + if (bucketIndexFor == null) + bucketIndexFor = c -> (c - 1) / f; + if (bucketCapacity == null) + bucketCapacity = i -> (i + 1) * f; - @SuppressWarnings("unchecked") - Pool[] directArray = new Pool[length]; - @SuppressWarnings("unchecked") - Pool[] indirectArray = new Pool[length]; + int length = bucketIndexFor.apply(maxCapacity) + 1; + Bucket[] directArray = new Bucket[length]; + Bucket[] indirectArray = new Bucket[length]; for (int i = 0; i < directArray.length; i++) { - directArray[i] = new Pool<>(Pool.StrategyType.THREAD_ID, maxBucketSize, true); - indirectArray[i] = new Pool<>(Pool.StrategyType.THREAD_ID, maxBucketSize, true); + int capacity = Math.min(bucketCapacity.apply(i), maxCapacity); + directArray[i] = new Bucket(capacity, maxBucketSize); + indirectArray[i] = new Bucket(capacity, maxBucketSize); } + + _minCapacity = minCapacity; + _maxCapacity = maxCapacity; _direct = directArray; _indirect = indirectArray; + _maxHeapMemory = maxHeapMemory; + _maxDirectMemory = maxDirectMemory; + _bucketIndexFor = bucketIndexFor; + } + + @ManagedAttribute("The minimum pooled buffer capacity") + public int getMinCapacity() + { + return _minCapacity; + } + + @ManagedAttribute("The maximum pooled buffer capacity") + public int getMaxCapacity() + { + return _maxCapacity; } @Override public RetainableByteBuffer acquire(int size, boolean direct) { - int capacity = (bucketIndexFor(size) + 1) * _factor; - Pool bucket = bucketFor(size, direct); + Bucket bucket = bucketFor(size, direct); if (bucket == null) return newRetainableByteBuffer(size, direct, byteBuffer -> {}); - Pool.Entry entry = bucket.acquire(); + Bucket.Entry entry = bucket.acquire(); RetainableByteBuffer buffer; if (entry == null) { - Pool.Entry reservedEntry = bucket.reserve(); + Bucket.Entry reservedEntry = bucket.reserve(); if (reservedEntry != null) { - buffer = newRetainableByteBuffer(capacity, direct, byteBuffer -> + buffer = newRetainableByteBuffer(bucket._capacity, direct, byteBuffer -> { BufferUtil.clear(byteBuffer); reservedEntry.release(); @@ -127,22 +156,17 @@ private RetainableByteBuffer newRetainableByteBuffer(int capacity, boolean direc return retainableByteBuffer; } - private Pool bucketFor(int capacity, boolean direct) + private Bucket bucketFor(int capacity, boolean direct) { if (capacity < _minCapacity) return null; - int idx = bucketIndexFor(capacity); - Pool[] buckets = direct ? _direct : _indirect; + int idx = _bucketIndexFor.apply(capacity); + Bucket[] buckets = direct ? _direct : _indirect; if (idx >= buckets.length) return null; return buckets[idx]; } - private int bucketIndexFor(int capacity) - { - return (capacity - 1) / _factor; - } - @ManagedAttribute("The number of pooled direct ByteBuffers") public long getDirectByteBufferCount() { @@ -157,8 +181,8 @@ public long getHeapByteBufferCount() private long getByteBufferCount(boolean direct) { - Pool[] buckets = direct ? _direct : _indirect; - return Arrays.stream(buckets).mapToLong(Pool::size).sum(); + Bucket[] buckets = direct ? _direct : _indirect; + return Arrays.stream(buckets).mapToLong(Bucket::size).sum(); } @ManagedAttribute("The number of pooled direct ByteBuffers that are available") @@ -175,8 +199,8 @@ public long getAvailableHeapByteBufferCount() private long getAvailableByteBufferCount(boolean direct) { - Pool[] buckets = direct ? _direct : _indirect; - return Arrays.stream(buckets).mapToLong(pool -> pool.values().stream().filter(Pool.Entry::isIdle).count()).sum(); + Bucket[] buckets = direct ? _direct : _indirect; + return Arrays.stream(buckets).mapToLong(bucket -> bucket.values().stream().filter(Pool.Entry::isIdle).count()).sum(); } @ManagedAttribute("The bytes retained by direct ByteBuffers") @@ -213,12 +237,11 @@ public long getAvailableHeapMemory() private long getAvailableMemory(boolean direct) { - Pool[] buckets = direct ? _direct : _indirect; + Bucket[] buckets = direct ? _direct : _indirect; long total = 0L; - for (int i = 0; i < buckets.length; i++) + for (Bucket bucket : buckets) { - Pool bucket = buckets[i]; - long capacity = (i + 1L) * _factor; + int capacity = bucket._capacity; total += bucket.values().stream().filter(Pool.Entry::isIdle).count() * capacity; } return total; @@ -231,11 +254,11 @@ public void clear() clearArray(_indirect, _currentHeapMemory); } - private void clearArray(Pool[] poolArray, AtomicLong memoryCounter) + private void clearArray(Bucket[] poolArray, AtomicLong memoryCounter) { - for (Pool retainableByteBufferPool : poolArray) + for (Bucket pool : poolArray) { - for (Pool.Entry entry : retainableByteBufferPool.values()) + for (Bucket.Entry entry : pool.values()) { entry.remove(); memoryCounter.addAndGet(-entry.getPooled().capacity()); @@ -266,13 +289,13 @@ private void evict(boolean direct, long excess) long now = System.nanoTime(); long totalClearedCapacity = 0L; - Pool[] buckets = direct ? _direct : _indirect; + Bucket[] buckets = direct ? _direct : _indirect; while (totalClearedCapacity < excess) { - for (Pool bucket : buckets) + for (Bucket bucket : buckets) { - Pool.Entry oldestEntry = findOldestEntry(now, bucket); + Bucket.Entry oldestEntry = findOldestEntry(now, bucket); if (oldestEntry == null) continue; @@ -293,10 +316,32 @@ private void evict(boolean direct, long excess) LOG.debug("eviction done, cleared {} bytes from {} pools", totalClearedCapacity, (direct ? "direct" : "heap")); } + @Override + public String toString() + { + return String.format("%s{min=%d,max=%d,buckets=%d,heap=%d/%d,direct=%d/%d}", + super.toString(), + _minCapacity, _maxCapacity, + _direct.length, + _currentHeapMemory.get(), _maxHeapMemory, + _currentDirectMemory.get(), _maxDirectMemory); + } + + @Override + public void dump(Appendable out, String indent) throws IOException + { + Dumpable.dumpObjects( + out, + indent, + this, + DumpableCollection.fromArray("direct", _direct), + DumpableCollection.fromArray("indirect", _indirect)); + } + private Pool.Entry findOldestEntry(long now, Pool bucket) { - Pool.Entry oldestEntry = null; - for (Pool.Entry entry : bucket.values()) + Bucket.Entry oldestEntry = null; + for (Bucket.Entry entry : bucket.values()) { if (oldestEntry != null) { @@ -311,4 +356,34 @@ private Pool.Entry findOldestEntry(long now, Pool + { + private final int _capacity; + + Bucket(int capacity, int size) + { + super(Pool.StrategyType.THREAD_ID, size, true); + _capacity = capacity; + } + + @Override + public String toString() + { + int entries = 0; + int inUse = 0; + for (Entry entry : values()) + { + entries++; + if (entry.isInUse()) + inUse++; + } + + return String.format("%s{capacity=%d,inuse=%d(%d%%)}", + super.toString(), + _capacity, + inUse, + entries > 0 ? (inUse * 100) / entries : 0); + } + } } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayRetainableByteBufferPoolTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayRetainableByteBufferPoolTest.java index 9dba1740f57d..f0b7201588d4 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayRetainableByteBufferPoolTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayRetainableByteBufferPoolTest.java @@ -13,12 +13,15 @@ package org.eclipse.jetty.io; +import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.hamcrest.Matchers; import org.junit.jupiter.api.Test; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -313,4 +316,67 @@ public void testAcquireRelease() assertThat(pool.getDirectMemory(), is(0L)); assertThat(pool.getHeapMemory(), is(0L)); } + + @Test + public void testExponentialPool() throws IOException + { + ArrayRetainableByteBufferPool pool = new ExponentialPool(); + assertThat(pool.acquire(1, false).capacity(), is(1)); + assertThat(pool.acquire(2, false).capacity(), is(2)); + RetainableByteBuffer b3 = pool.acquire(3, false); + assertThat(b3.capacity(), is(4)); + RetainableByteBuffer b4 = pool.acquire(4, false); + assertThat(b4.capacity(), is(4)); + + int capacity = 4; + while (true) + { + RetainableByteBuffer b = pool.acquire(capacity - 1, false); + assertThat(b.capacity(), Matchers.is(capacity)); + b = pool.acquire(capacity, false); + assertThat(b.capacity(), Matchers.is(capacity)); + + if (capacity >= pool.getMaxCapacity()) + break; + + b = pool.acquire(capacity + 1, false); + assertThat(b.capacity(), Matchers.is(capacity * 2)); + + capacity = capacity * 2; + } + + b3.release(); + b4.getBuffer().limit(b4.getBuffer().capacity() - 2); + assertThat(pool.dump(), containsString("[size=4 closed=false]{capacity=4,inuse=3(75%)")); + } + + /** + * A variant of the {@link ArrayRetainableByteBufferPool} that + * uses buckets of buffers that increase in size by a power of + * 2 (eg 1k, 2k, 4k, 8k, etc.). + */ + public static class ExponentialPool extends ArrayRetainableByteBufferPool + { + public ExponentialPool() + { + this(0, -1, Integer.MAX_VALUE); + } + + public ExponentialPool(int minCapacity, int maxCapacity, int maxBucketSize) + { + this(minCapacity, maxCapacity, maxBucketSize, -1L, -1L); + } + + public ExponentialPool(int minCapacity, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory) + { + super(minCapacity, + -1, + maxCapacity, + maxBucketSize, + maxHeapMemory, + maxDirectMemory, + c -> 32 - Integer.numberOfLeadingZeros(c - 1), + i -> 1 << i); + } + } }