Skip to content

Commit

Permalink
Improve #6322 extensible ArrayRetainableByteBufferPool (#6538)
Browse files Browse the repository at this point in the history
Add a exponential bucket size impl to test ArrayRetainableByteBufferPool extensibility

Signed-off-by: Greg Wilkins <gregw@webtide.com>
  • Loading branch information
gregw committed Jul 26, 2021
1 parent 4148166 commit d781ec3
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 46 deletions.
Expand Up @@ -13,36 +13,41 @@

package org.eclipse.jetty.io;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;

import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.ManagedOperation;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.DumpableCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ManagedObject
public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool
public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool, Dumpable
{
private static final Logger LOG = LoggerFactory.getLogger(ArrayRetainableByteBufferPool.class);

private final Pool<RetainableByteBuffer>[] _direct;
private final Pool<RetainableByteBuffer>[] _indirect;
private final int _factor;
private final Bucket[] _direct;
private final Bucket[] _indirect;
private final int _minCapacity;
private final int _maxCapacity;
private final long _maxHeapMemory;
private final long _maxDirectMemory;
private final AtomicLong _currentHeapMemory = new AtomicLong();
private final AtomicLong _currentDirectMemory = new AtomicLong();
private final Function<Integer, Integer> _bucketIndexFor;

public ArrayRetainableByteBufferPool()
{
this(0, 1024, 65536, Integer.MAX_VALUE, -1L, -1L);
this(0, -1, -1, Integer.MAX_VALUE, -1L, -1L);
}

public ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize)
Expand All @@ -52,48 +57,72 @@ public ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacit

public ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
{
_factor = factor <= 0 ? 1024 : factor;
this._maxHeapMemory = maxHeapMemory;
this._maxDirectMemory = maxDirectMemory;
this(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory, null, null);
}

protected ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory,
Function<Integer, Integer> bucketIndexFor, Function<Integer, Integer> bucketCapacity)
{
if (minCapacity <= 0)
minCapacity = 0;
_minCapacity = minCapacity;
if (maxCapacity <= 0)
maxCapacity = 64 * 1024;
if ((maxCapacity % _factor) != 0 || _factor >= maxCapacity)

int f = factor <= 0 ? 1024 : factor;
if ((maxCapacity % f) != 0 || f >= maxCapacity)
throw new IllegalArgumentException("The capacity factor must be a divisor of maxCapacity");

int length = maxCapacity / _factor;
if (bucketIndexFor == null)
bucketIndexFor = c -> (c - 1) / f;
if (bucketCapacity == null)
bucketCapacity = i -> (i + 1) * f;

@SuppressWarnings("unchecked")
Pool<RetainableByteBuffer>[] directArray = new Pool[length];
@SuppressWarnings("unchecked")
Pool<RetainableByteBuffer>[] indirectArray = new Pool[length];
int length = bucketIndexFor.apply(maxCapacity) + 1;
Bucket[] directArray = new Bucket[length];
Bucket[] indirectArray = new Bucket[length];
for (int i = 0; i < directArray.length; i++)
{
directArray[i] = new Pool<>(Pool.StrategyType.THREAD_ID, maxBucketSize, true);
indirectArray[i] = new Pool<>(Pool.StrategyType.THREAD_ID, maxBucketSize, true);
int capacity = Math.min(bucketCapacity.apply(i), maxCapacity);
directArray[i] = new Bucket(capacity, maxBucketSize);
indirectArray[i] = new Bucket(capacity, maxBucketSize);
}

_minCapacity = minCapacity;
_maxCapacity = maxCapacity;
_direct = directArray;
_indirect = indirectArray;
_maxHeapMemory = maxHeapMemory;
_maxDirectMemory = maxDirectMemory;
_bucketIndexFor = bucketIndexFor;
}

@ManagedAttribute("The minimum pooled buffer capacity")
public int getMinCapacity()
{
return _minCapacity;
}

@ManagedAttribute("The maximum pooled buffer capacity")
public int getMaxCapacity()
{
return _maxCapacity;
}

@Override
public RetainableByteBuffer acquire(int size, boolean direct)
{
int capacity = (bucketIndexFor(size) + 1) * _factor;
Pool<RetainableByteBuffer> bucket = bucketFor(size, direct);
Bucket bucket = bucketFor(size, direct);
if (bucket == null)
return newRetainableByteBuffer(size, direct, byteBuffer -> {});
Pool<RetainableByteBuffer>.Entry entry = bucket.acquire();
Bucket.Entry entry = bucket.acquire();

RetainableByteBuffer buffer;
if (entry == null)
{
Pool<RetainableByteBuffer>.Entry reservedEntry = bucket.reserve();
Bucket.Entry reservedEntry = bucket.reserve();
if (reservedEntry != null)
{
buffer = newRetainableByteBuffer(capacity, direct, byteBuffer ->
buffer = newRetainableByteBuffer(bucket._capacity, direct, byteBuffer ->
{
BufferUtil.clear(byteBuffer);
reservedEntry.release();
Expand Down Expand Up @@ -127,22 +156,17 @@ private RetainableByteBuffer newRetainableByteBuffer(int capacity, boolean direc
return retainableByteBuffer;
}

private Pool<RetainableByteBuffer> bucketFor(int capacity, boolean direct)
private Bucket bucketFor(int capacity, boolean direct)
{
if (capacity < _minCapacity)
return null;
int idx = bucketIndexFor(capacity);
Pool<RetainableByteBuffer>[] buckets = direct ? _direct : _indirect;
int idx = _bucketIndexFor.apply(capacity);
Bucket[] buckets = direct ? _direct : _indirect;
if (idx >= buckets.length)
return null;
return buckets[idx];
}

private int bucketIndexFor(int capacity)
{
return (capacity - 1) / _factor;
}

@ManagedAttribute("The number of pooled direct ByteBuffers")
public long getDirectByteBufferCount()
{
Expand All @@ -157,8 +181,8 @@ public long getHeapByteBufferCount()

private long getByteBufferCount(boolean direct)
{
Pool<RetainableByteBuffer>[] buckets = direct ? _direct : _indirect;
return Arrays.stream(buckets).mapToLong(Pool::size).sum();
Bucket[] buckets = direct ? _direct : _indirect;
return Arrays.stream(buckets).mapToLong(Bucket::size).sum();
}

@ManagedAttribute("The number of pooled direct ByteBuffers that are available")
Expand All @@ -175,8 +199,8 @@ public long getAvailableHeapByteBufferCount()

private long getAvailableByteBufferCount(boolean direct)
{
Pool<RetainableByteBuffer>[] buckets = direct ? _direct : _indirect;
return Arrays.stream(buckets).mapToLong(pool -> pool.values().stream().filter(Pool.Entry::isIdle).count()).sum();
Bucket[] buckets = direct ? _direct : _indirect;
return Arrays.stream(buckets).mapToLong(bucket -> bucket.values().stream().filter(Pool.Entry::isIdle).count()).sum();
}

@ManagedAttribute("The bytes retained by direct ByteBuffers")
Expand Down Expand Up @@ -213,12 +237,11 @@ public long getAvailableHeapMemory()

private long getAvailableMemory(boolean direct)
{
Pool<RetainableByteBuffer>[] buckets = direct ? _direct : _indirect;
Bucket[] buckets = direct ? _direct : _indirect;
long total = 0L;
for (int i = 0; i < buckets.length; i++)
for (Bucket bucket : buckets)
{
Pool<RetainableByteBuffer> bucket = buckets[i];
long capacity = (i + 1L) * _factor;
int capacity = bucket._capacity;
total += bucket.values().stream().filter(Pool.Entry::isIdle).count() * capacity;
}
return total;
Expand All @@ -231,11 +254,11 @@ public void clear()
clearArray(_indirect, _currentHeapMemory);
}

private void clearArray(Pool<RetainableByteBuffer>[] poolArray, AtomicLong memoryCounter)
private void clearArray(Bucket[] poolArray, AtomicLong memoryCounter)
{
for (Pool<RetainableByteBuffer> retainableByteBufferPool : poolArray)
for (Bucket pool : poolArray)
{
for (Pool<RetainableByteBuffer>.Entry entry : retainableByteBufferPool.values())
for (Bucket.Entry entry : pool.values())
{
entry.remove();
memoryCounter.addAndGet(-entry.getPooled().capacity());
Expand Down Expand Up @@ -266,13 +289,13 @@ private void evict(boolean direct, long excess)
long now = System.nanoTime();
long totalClearedCapacity = 0L;

Pool<RetainableByteBuffer>[] buckets = direct ? _direct : _indirect;
Bucket[] buckets = direct ? _direct : _indirect;

while (totalClearedCapacity < excess)
{
for (Pool<RetainableByteBuffer> bucket : buckets)
for (Bucket bucket : buckets)
{
Pool<RetainableByteBuffer>.Entry oldestEntry = findOldestEntry(now, bucket);
Bucket.Entry oldestEntry = findOldestEntry(now, bucket);
if (oldestEntry == null)
continue;

Expand All @@ -293,10 +316,32 @@ private void evict(boolean direct, long excess)
LOG.debug("eviction done, cleared {} bytes from {} pools", totalClearedCapacity, (direct ? "direct" : "heap"));
}

@Override
public String toString()
{
return String.format("%s{min=%d,max=%d,buckets=%d,heap=%d/%d,direct=%d/%d}",
super.toString(),
_minCapacity, _maxCapacity,
_direct.length,
_currentHeapMemory.get(), _maxHeapMemory,
_currentDirectMemory.get(), _maxDirectMemory);
}

@Override
public void dump(Appendable out, String indent) throws IOException
{
Dumpable.dumpObjects(
out,
indent,
this,
DumpableCollection.fromArray("direct", _direct),
DumpableCollection.fromArray("indirect", _indirect));
}

private Pool<RetainableByteBuffer>.Entry findOldestEntry(long now, Pool<RetainableByteBuffer> bucket)
{
Pool<RetainableByteBuffer>.Entry oldestEntry = null;
for (Pool<RetainableByteBuffer>.Entry entry : bucket.values())
Bucket.Entry oldestEntry = null;
for (Bucket.Entry entry : bucket.values())
{
if (oldestEntry != null)
{
Expand All @@ -311,4 +356,34 @@ private Pool<RetainableByteBuffer>.Entry findOldestEntry(long now, Pool<Retainab
}
return oldestEntry;
}

private static class Bucket extends Pool<RetainableByteBuffer>
{
private final int _capacity;

Bucket(int capacity, int size)
{
super(Pool.StrategyType.THREAD_ID, size, true);
_capacity = capacity;
}

@Override
public String toString()
{
int entries = 0;
int inUse = 0;
for (Entry entry : values())
{
entries++;
if (entry.isInUse())
inUse++;
}

return String.format("%s{capacity=%d,inuse=%d(%d%%)}",
super.toString(),
_capacity,
inUse,
entries > 0 ? (inUse * 100) / entries : 0);
}
}
}
Expand Up @@ -13,12 +13,15 @@

package org.eclipse.jetty.io;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
Expand Down Expand Up @@ -313,4 +316,67 @@ public void testAcquireRelease()
assertThat(pool.getDirectMemory(), is(0L));
assertThat(pool.getHeapMemory(), is(0L));
}

@Test
public void testExponentialPool() throws IOException
{
ArrayRetainableByteBufferPool pool = new ExponentialPool();
assertThat(pool.acquire(1, false).capacity(), is(1));
assertThat(pool.acquire(2, false).capacity(), is(2));
RetainableByteBuffer b3 = pool.acquire(3, false);
assertThat(b3.capacity(), is(4));
RetainableByteBuffer b4 = pool.acquire(4, false);
assertThat(b4.capacity(), is(4));

int capacity = 4;
while (true)
{
RetainableByteBuffer b = pool.acquire(capacity - 1, false);
assertThat(b.capacity(), Matchers.is(capacity));
b = pool.acquire(capacity, false);
assertThat(b.capacity(), Matchers.is(capacity));

if (capacity >= pool.getMaxCapacity())
break;

b = pool.acquire(capacity + 1, false);
assertThat(b.capacity(), Matchers.is(capacity * 2));

capacity = capacity * 2;
}

b3.release();
b4.getBuffer().limit(b4.getBuffer().capacity() - 2);
assertThat(pool.dump(), containsString("[size=4 closed=false]{capacity=4,inuse=3(75%)"));
}

/**
* A variant of the {@link ArrayRetainableByteBufferPool} that
* uses buckets of buffers that increase in size by a power of
* 2 (eg 1k, 2k, 4k, 8k, etc.).
*/
public static class ExponentialPool extends ArrayRetainableByteBufferPool
{
public ExponentialPool()
{
this(0, -1, Integer.MAX_VALUE);
}

public ExponentialPool(int minCapacity, int maxCapacity, int maxBucketSize)
{
this(minCapacity, maxCapacity, maxBucketSize, -1L, -1L);
}

public ExponentialPool(int minCapacity, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
{
super(minCapacity,
-1,
maxCapacity,
maxBucketSize,
maxHeapMemory,
maxDirectMemory,
c -> 32 - Integer.numberOfLeadingZeros(c - 1),
i -> 1 << i);
}
}
}

0 comments on commit d781ec3

Please sign in to comment.