Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve #6322 extensible ArrayRetainableByteBufferPool #6538

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -13,36 +13,41 @@

package org.eclipse.jetty.io;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;

import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.ManagedOperation;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.DumpableCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ManagedObject
public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool
public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool, Dumpable
{
private static final Logger LOG = LoggerFactory.getLogger(ArrayRetainableByteBufferPool.class);

private final Pool<RetainableByteBuffer>[] _direct;
private final Pool<RetainableByteBuffer>[] _indirect;
private final int _factor;
private final Bucket[] _direct;
private final Bucket[] _indirect;
private final int _minCapacity;
private final int _maxCapacity;
private final long _maxHeapMemory;
private final long _maxDirectMemory;
private final AtomicLong _currentHeapMemory = new AtomicLong();
private final AtomicLong _currentDirectMemory = new AtomicLong();
private final Function<Integer, Integer> _bucketIndexFor;

public ArrayRetainableByteBufferPool()
{
this(0, 1024, 65536, Integer.MAX_VALUE, -1L, -1L);
this(0, -1, -1, Integer.MAX_VALUE, -1L, -1L);
}

public ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize)
Expand All @@ -52,48 +57,72 @@ public ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacit

public ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
{
_factor = factor <= 0 ? 1024 : factor;
this._maxHeapMemory = maxHeapMemory;
this._maxDirectMemory = maxDirectMemory;
this(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory, null, null);
}

protected ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory,
Function<Integer, Integer> bucketIndexFor, Function<Integer, Integer> bucketCapacity)
{
if (minCapacity <= 0)
minCapacity = 0;
_minCapacity = minCapacity;
if (maxCapacity <= 0)
maxCapacity = 64 * 1024;
if ((maxCapacity % _factor) != 0 || _factor >= maxCapacity)

int f = factor <= 0 ? 1024 : factor;
if ((maxCapacity % f) != 0 || f >= maxCapacity)
throw new IllegalArgumentException("The capacity factor must be a divisor of maxCapacity");

int length = maxCapacity / _factor;
if (bucketIndexFor == null)
bucketIndexFor = c -> (c - 1) / f;
if (bucketCapacity == null)
bucketCapacity = i -> (i + 1) * f;

@SuppressWarnings("unchecked")
Pool<RetainableByteBuffer>[] directArray = new Pool[length];
@SuppressWarnings("unchecked")
Pool<RetainableByteBuffer>[] indirectArray = new Pool[length];
int length = bucketIndexFor.apply(maxCapacity) + 1;
Bucket[] directArray = new Bucket[length];
Bucket[] indirectArray = new Bucket[length];
for (int i = 0; i < directArray.length; i++)
{
directArray[i] = new Pool<>(Pool.StrategyType.THREAD_ID, maxBucketSize, true);
indirectArray[i] = new Pool<>(Pool.StrategyType.THREAD_ID, maxBucketSize, true);
int capacity = Math.min(bucketCapacity.apply(i), maxCapacity);
directArray[i] = new Bucket(capacity, maxBucketSize);
indirectArray[i] = new Bucket(capacity, maxBucketSize);
}

_minCapacity = minCapacity;
_maxCapacity = maxCapacity;
_direct = directArray;
_indirect = indirectArray;
_maxHeapMemory = maxHeapMemory;
_maxDirectMemory = maxDirectMemory;
_bucketIndexFor = bucketIndexFor;
}

@ManagedAttribute("The minimum pooled buffer capacity")
public int getMinCapacity()
{
return _minCapacity;
}

@ManagedAttribute("The maximum pooled buffer capacity")
public int getMaxCapacity()
{
return _maxCapacity;
}

@Override
public RetainableByteBuffer acquire(int size, boolean direct)
{
int capacity = (bucketIndexFor(size) + 1) * _factor;
Pool<RetainableByteBuffer> bucket = bucketFor(size, direct);
Bucket bucket = bucketFor(size, direct);
if (bucket == null)
return newRetainableByteBuffer(size, direct, byteBuffer -> {});
Pool<RetainableByteBuffer>.Entry entry = bucket.acquire();
Bucket.Entry entry = bucket.acquire();

RetainableByteBuffer buffer;
if (entry == null)
{
Pool<RetainableByteBuffer>.Entry reservedEntry = bucket.reserve();
Bucket.Entry reservedEntry = bucket.reserve();
if (reservedEntry != null)
{
buffer = newRetainableByteBuffer(capacity, direct, byteBuffer ->
buffer = newRetainableByteBuffer(bucket._capacity, direct, byteBuffer ->
{
BufferUtil.clear(byteBuffer);
reservedEntry.release();
Expand Down Expand Up @@ -127,22 +156,17 @@ private RetainableByteBuffer newRetainableByteBuffer(int capacity, boolean direc
return retainableByteBuffer;
}

private Pool<RetainableByteBuffer> bucketFor(int capacity, boolean direct)
private Bucket bucketFor(int capacity, boolean direct)
{
if (capacity < _minCapacity)
return null;
int idx = bucketIndexFor(capacity);
Pool<RetainableByteBuffer>[] buckets = direct ? _direct : _indirect;
int idx = _bucketIndexFor.apply(capacity);
Bucket[] buckets = direct ? _direct : _indirect;
if (idx >= buckets.length)
return null;
return buckets[idx];
}

private int bucketIndexFor(int capacity)
{
return (capacity - 1) / _factor;
}

@ManagedAttribute("The number of pooled direct ByteBuffers")
public long getDirectByteBufferCount()
{
Expand All @@ -157,8 +181,8 @@ public long getHeapByteBufferCount()

private long getByteBufferCount(boolean direct)
{
Pool<RetainableByteBuffer>[] buckets = direct ? _direct : _indirect;
return Arrays.stream(buckets).mapToLong(Pool::size).sum();
Bucket[] buckets = direct ? _direct : _indirect;
return Arrays.stream(buckets).mapToLong(Bucket::size).sum();
}

@ManagedAttribute("The number of pooled direct ByteBuffers that are available")
Expand All @@ -175,8 +199,8 @@ public long getAvailableHeapByteBufferCount()

private long getAvailableByteBufferCount(boolean direct)
{
Pool<RetainableByteBuffer>[] buckets = direct ? _direct : _indirect;
return Arrays.stream(buckets).mapToLong(pool -> pool.values().stream().filter(Pool.Entry::isIdle).count()).sum();
Bucket[] buckets = direct ? _direct : _indirect;
return Arrays.stream(buckets).mapToLong(bucket -> bucket.values().stream().filter(Pool.Entry::isIdle).count()).sum();
}

@ManagedAttribute("The bytes retained by direct ByteBuffers")
Expand Down Expand Up @@ -213,12 +237,11 @@ public long getAvailableHeapMemory()

private long getAvailableMemory(boolean direct)
{
Pool<RetainableByteBuffer>[] buckets = direct ? _direct : _indirect;
Bucket[] buckets = direct ? _direct : _indirect;
long total = 0L;
for (int i = 0; i < buckets.length; i++)
for (Bucket bucket : buckets)
{
Pool<RetainableByteBuffer> bucket = buckets[i];
long capacity = (i + 1L) * _factor;
int capacity = bucket._capacity;
total += bucket.values().stream().filter(Pool.Entry::isIdle).count() * capacity;
}
return total;
Expand All @@ -231,11 +254,11 @@ public void clear()
clearArray(_indirect, _currentHeapMemory);
}

private void clearArray(Pool<RetainableByteBuffer>[] poolArray, AtomicLong memoryCounter)
private void clearArray(Bucket[] poolArray, AtomicLong memoryCounter)
{
for (Pool<RetainableByteBuffer> retainableByteBufferPool : poolArray)
for (Bucket pool : poolArray)
{
for (Pool<RetainableByteBuffer>.Entry entry : retainableByteBufferPool.values())
for (Bucket.Entry entry : pool.values())
{
entry.remove();
memoryCounter.addAndGet(-entry.getPooled().capacity());
Expand Down Expand Up @@ -266,13 +289,13 @@ private void evict(boolean direct, long excess)
long now = System.nanoTime();
long totalClearedCapacity = 0L;

Pool<RetainableByteBuffer>[] buckets = direct ? _direct : _indirect;
Bucket[] buckets = direct ? _direct : _indirect;

while (totalClearedCapacity < excess)
{
for (Pool<RetainableByteBuffer> bucket : buckets)
for (Bucket bucket : buckets)
{
Pool<RetainableByteBuffer>.Entry oldestEntry = findOldestEntry(now, bucket);
Bucket.Entry oldestEntry = findOldestEntry(now, bucket);
if (oldestEntry == null)
continue;

Expand All @@ -293,10 +316,32 @@ private void evict(boolean direct, long excess)
LOG.debug("eviction done, cleared {} bytes from {} pools", totalClearedCapacity, (direct ? "direct" : "heap"));
}

@Override
public String toString()
{
return String.format("%s{min=%d,max=%d,buckets=%d,heap=%d/%d,direct=%d/%d}",
super.toString(),
_minCapacity, _maxCapacity,
_direct.length,
_currentHeapMemory.get(), _maxHeapMemory,
_currentDirectMemory.get(), _maxDirectMemory);
}

@Override
public void dump(Appendable out, String indent) throws IOException
{
Dumpable.dumpObjects(
out,
indent,
this,
DumpableCollection.fromArray("direct", _direct),
DumpableCollection.fromArray("indirect", _indirect));
}

private Pool<RetainableByteBuffer>.Entry findOldestEntry(long now, Pool<RetainableByteBuffer> bucket)
{
Pool<RetainableByteBuffer>.Entry oldestEntry = null;
for (Pool<RetainableByteBuffer>.Entry entry : bucket.values())
Bucket.Entry oldestEntry = null;
for (Bucket.Entry entry : bucket.values())
{
if (oldestEntry != null)
{
Expand All @@ -311,4 +356,34 @@ private Pool<RetainableByteBuffer>.Entry findOldestEntry(long now, Pool<Retainab
}
return oldestEntry;
}

private static class Bucket extends Pool<RetainableByteBuffer>
{
private final int _capacity;

Bucket(int capacity, int size)
{
super(Pool.StrategyType.THREAD_ID, size, true);
_capacity = capacity;
}

@Override
public String toString()
{
int entries = 0;
int inUse = 0;
for (Entry entry : values())
{
entries++;
if (entry.isInUse())
inUse++;
}

return String.format("%s{capacity=%d,inuse=%d(%d%%)}",
super.toString(),
_capacity,
inUse,
entries > 0 ? (inUse * 100) / entries : 0);
}
}
}
Expand Up @@ -13,12 +13,15 @@

package org.eclipse.jetty.io;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
Expand Down Expand Up @@ -313,4 +316,67 @@ public void testAcquireRelease()
assertThat(pool.getDirectMemory(), is(0L));
assertThat(pool.getHeapMemory(), is(0L));
}

@Test
public void testExponentialPool() throws IOException
{
ArrayRetainableByteBufferPool pool = new ExponentialPool();
assertThat(pool.acquire(1, false).capacity(), is(1));
assertThat(pool.acquire(2, false).capacity(), is(2));
RetainableByteBuffer b3 = pool.acquire(3, false);
assertThat(b3.capacity(), is(4));
RetainableByteBuffer b4 = pool.acquire(4, false);
assertThat(b4.capacity(), is(4));

int capacity = 4;
while (true)
{
RetainableByteBuffer b = pool.acquire(capacity - 1, false);
assertThat(b.capacity(), Matchers.is(capacity));
b = pool.acquire(capacity, false);
assertThat(b.capacity(), Matchers.is(capacity));

if (capacity >= pool.getMaxCapacity())
break;

b = pool.acquire(capacity + 1, false);
assertThat(b.capacity(), Matchers.is(capacity * 2));

capacity = capacity * 2;
}

b3.release();
b4.getBuffer().limit(b4.getBuffer().capacity() - 2);
assertThat(pool.dump(), containsString("[size=4 closed=false]{capacity=4,inuse=3(75%)"));
}

/**
* A variant of the {@link ArrayRetainableByteBufferPool} that
* uses buckets of buffers that increase in size by a power of
* 2 (eg 1k, 2k, 4k, 8k, etc.).
*/
public static class ExponentialPool extends ArrayRetainableByteBufferPool
{
public ExponentialPool()
{
this(0, -1, Integer.MAX_VALUE);
}

public ExponentialPool(int minCapacity, int maxCapacity, int maxBucketSize)
{
this(minCapacity, maxCapacity, maxBucketSize, -1L, -1L);
}

public ExponentialPool(int minCapacity, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
{
super(minCapacity,
-1,
maxCapacity,
maxBucketSize,
maxHeapMemory,
maxDirectMemory,
c -> 32 - Integer.numberOfLeadingZeros(c - 1),
i -> 1 << i);
}
}
}