Skip to content

Commit

Permalink
Improve #6322 log buckets in RetainableByteBufferPool
Browse files Browse the repository at this point in the history
Add a Log2 bucket size imple for ArrayRetainableByteBufferPool

Signed-off-by: Greg Wilkins <gregw@webtide.com>
  • Loading branch information
gregw committed Jul 12, 2021
1 parent 410d402 commit f2b545c
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 19 deletions.
Expand Up @@ -17,9 +17,11 @@
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;

import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.ManagedOperation;
Expand All @@ -31,14 +33,17 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool
{
private static final Logger LOG = LoggerFactory.getLogger(ArrayRetainableByteBufferPool.class);

private final Pool<RetainableByteBuffer>[] _direct;
private final Pool<RetainableByteBuffer>[] _indirect;
private final Bucket[] _direct;
private final Bucket[] _indirect;
private final int _factor;
private final int _minCapacity;
private final int _maxCapacity;
private final long _maxHeapMemory;
private final long _maxDirectMemory;
private final AtomicLong _currentHeapMemory = new AtomicLong();
private final AtomicLong _currentDirectMemory = new AtomicLong();
private final Function<Integer, Integer> _bucketIndexFor;
private final Function<Integer, Integer> _bucketCapacity;

public ArrayRetainableByteBufferPool()
{
Expand All @@ -51,38 +56,59 @@ public ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacit
}

public ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
{
this(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory,
c -> (c - 1) / factor,
i -> (i + 1) * factor);
}

protected ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory,
Function<Integer, Integer> bucketIndexFor, Function<Integer, Integer> bucketCapacity)
{
_factor = factor <= 0 ? 1024 : factor;
this._maxHeapMemory = maxHeapMemory;
this._maxDirectMemory = maxDirectMemory;
if (minCapacity <= 0)
minCapacity = 0;
_minCapacity = minCapacity;
_maxCapacity = maxCapacity;
if (maxCapacity <= 0)
maxCapacity = 64 * 1024;
if ((maxCapacity % _factor) != 0 || _factor >= maxCapacity)
throw new IllegalArgumentException("The capacity factor must be a divisor of maxCapacity");

int length = maxCapacity / _factor;
int length = bucketIndexFor.apply(maxCapacity) + 1;

@SuppressWarnings("unchecked")
Pool<RetainableByteBuffer>[] directArray = new Pool[length];
Bucket[] directArray = new Bucket[length];
@SuppressWarnings("unchecked")
Pool<RetainableByteBuffer>[] indirectArray = new Pool[length];
Bucket[] indirectArray = new Bucket[length];
for (int i = 0; i < directArray.length; i++)
{
directArray[i] = new Pool<>(Pool.StrategyType.THREAD_ID, maxBucketSize, true);
indirectArray[i] = new Pool<>(Pool.StrategyType.THREAD_ID, maxBucketSize, true);
int capacity = bucketCapacity.apply(i);
directArray[i] = new Bucket(capacity, maxBucketSize);
indirectArray[i] = new Bucket(capacity, maxBucketSize);
}
_direct = directArray;
_indirect = indirectArray;
_bucketIndexFor = bucketIndexFor;
_bucketCapacity = bucketCapacity;
}

public int getMinCapacity()
{
return _minCapacity;
}

public int getMaxCapacity()
{
return _maxCapacity;
}

@Override
public RetainableByteBuffer acquire(int size, boolean direct)
{
int capacity = (bucketIndexFor(size) + 1) * _factor;
Pool<RetainableByteBuffer> bucket = bucketFor(size, direct);
Bucket bucket = bucketFor(size, direct);
if (bucket == null)
return newRetainableByteBuffer(size, direct, byteBuffer -> {});
Pool<RetainableByteBuffer>.Entry entry = bucket.acquire();
Expand All @@ -93,7 +119,7 @@ public RetainableByteBuffer acquire(int size, boolean direct)
Pool<RetainableByteBuffer>.Entry reservedEntry = bucket.reserve();
if (reservedEntry != null)
{
buffer = newRetainableByteBuffer(capacity, direct, byteBuffer ->
buffer = newRetainableByteBuffer(bucket.capacity, direct, byteBuffer ->
{
BufferUtil.clear(byteBuffer);
reservedEntry.release();
Expand Down Expand Up @@ -127,22 +153,17 @@ private RetainableByteBuffer newRetainableByteBuffer(int capacity, boolean direc
return retainableByteBuffer;
}

private Pool<RetainableByteBuffer> bucketFor(int capacity, boolean direct)
private Bucket bucketFor(int capacity, boolean direct)
{
if (capacity < _minCapacity)
return null;
int idx = bucketIndexFor(capacity);
Pool<RetainableByteBuffer>[] buckets = direct ? _direct : _indirect;
int idx = _bucketIndexFor.apply(capacity);
Bucket[] buckets = direct ? _direct : _indirect;
if (idx >= buckets.length)
return null;
return buckets[idx];
}

private int bucketIndexFor(int capacity)
{
return (capacity - 1) / _factor;
}

@ManagedAttribute("The number of pooled direct ByteBuffers")
public long getDirectByteBufferCount()
{
Expand Down Expand Up @@ -311,4 +332,40 @@ private Pool<RetainableByteBuffer>.Entry findOldestEntry(long now, Pool<Retainab
}
return oldestEntry;
}

private static class Bucket extends Pool<RetainableByteBuffer>
{
private final int capacity;

Bucket(int capacity, int size)
{
super(Pool.StrategyType.THREAD_ID, size, true);
this.capacity = capacity;
}
}

public static class LogBuckets extends ArrayRetainableByteBufferPool
{
public LogBuckets()
{
this(0, 65536, Integer.MAX_VALUE);
}

public LogBuckets(int minCapacity, int maxCapacity, int maxBucketSize)
{
this(minCapacity, maxCapacity, maxBucketSize, -1L, -1L);
}

public LogBuckets(int minCapacity, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
{
super(minCapacity,
TypeUtil.nextPowerOf2(maxCapacity) / 2,
TypeUtil.nextPowerOf2(maxCapacity),
maxBucketSize,
maxHeapMemory,
maxDirectMemory,
c -> TypeUtil.log2NextPowerOf2(c),
i -> 1 << i);
}
}
}
Expand Up @@ -16,6 +16,7 @@
import java.util.ArrayList;
import java.util.List;

import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -313,4 +314,31 @@ public void testAcquireRelease()
assertThat(pool.getDirectMemory(), is(0L));
assertThat(pool.getHeapMemory(), is(0L));
}

@Test
public void testLogBuckets()
{
ArrayRetainableByteBufferPool pool = new ArrayRetainableByteBufferPool.LogBuckets();
assertThat(pool.acquire(1, false).capacity(), is(1));
assertThat(pool.acquire(2, false).capacity(), is(2));
assertThat(pool.acquire(3, false).capacity(), is(4));
assertThat(pool.acquire(4, false).capacity(), is(4));

int capacity = 4;
while (true)
{
RetainableByteBuffer b = pool.acquire(capacity - 1, false);
assertThat(b.capacity(), Matchers.is(capacity));
b = pool.acquire(capacity, false);
assertThat(b.capacity(), Matchers.is(capacity));

if (capacity >= pool.getMaxCapacity())
break;

b = pool.acquire(capacity + 1, false);
assertThat(b.capacity(), Matchers.is(capacity * 2));

capacity = capacity * 2;
}
}
}
38 changes: 38 additions & 0 deletions jetty-util/src/main/java/org/eclipse/jetty/util/TypeUtil.java
Expand Up @@ -800,4 +800,42 @@ public static <T> Stream<ServiceLoader.Provider<T>> serviceProviderStream(Servic
{
return StreamSupport.stream(new ServiceLoaderSpliterator<>(serviceLoader), false);
}

/**
* Round up to the next power of 2.
* @param v An integer &gt; 0 and &lt;= half of {@link Integer#MAX_VALUE}
* @return The next power of two that is equal too or larger than the passed integer.
*/
public static int nextPowerOf2(int v)
{
if (v < 0 || v > (Integer.MAX_VALUE / 2))
throw new IllegalArgumentException(Integer.toString(v));

// This algorithm is from http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
// and gives good performance on most architectures.
v--;
v |= v >> 1;
v |= v >> 2;
v |= v >> 4;
v |= v >> 8;
v |= v >> 16;
v++;
return v;
}

private static final int[] MultiplyDeBruijnBitPosition2 =
{
0, 1, 28, 2, 29, 14, 24, 3, 30, 22, 20, 15, 25, 17, 4, 8, 31, 27, 13, 23, 21, 19, 16, 7, 26, 12, 18, 6, 11, 5, 10, 9
};

/**
* The log2 of the value rounded up to the next power of 2.
* @param v An integer &gt; 0 and &lt;= half of {@link Integer#MAX_VALUE}
* @return The log2 of next power of two that is equal too or larger than the passed integer.
*/
public static int log2NextPowerOf2(int v)
{
// This algorithm is from https://graphics.stanford.edu/~seander/bithacks.html#IntegerLogDeBruijn
return MultiplyDeBruijnBitPosition2[(int)((0xFFFFFFFFL & (nextPowerOf2(v) * 0x077CB531L)) >> 27)];
}
}
Expand Up @@ -19,12 +19,12 @@
import org.eclipse.jetty.util.resource.Resource;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.JRE;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

Expand Down Expand Up @@ -188,4 +188,57 @@ public void testGetLocationJavaLangThreadDeathJPMS()
String expectedJavaBase = "/java.base";
assertThat(TypeUtil.getLocationOfClass(java.lang.ThreadDeath.class).toASCIIString(), containsString(expectedJavaBase));
}

@Test
public void testNextPowerOf2()
{
assertThat(TypeUtil.nextPowerOf2(0), Matchers.is(0));
assertThat(TypeUtil.nextPowerOf2(1), Matchers.is(1));
assertThat(TypeUtil.nextPowerOf2(2), Matchers.is(2));
assertThat(TypeUtil.nextPowerOf2(3), Matchers.is(4));
assertThat(TypeUtil.nextPowerOf2(4), Matchers.is(4));

int value = 4;
while (value < Integer.MAX_VALUE / 2)
{
assertThat(TypeUtil.nextPowerOf2(value - 1), Matchers.is(value));
assertThat(TypeUtil.nextPowerOf2(value), Matchers.is(value));
assertThat(TypeUtil.nextPowerOf2(value + 1), Matchers.is(value * 2));
value = value * 2;
}

assertThat(TypeUtil.nextPowerOf2(Integer.MAX_VALUE / 2), Matchers.is(0x40000000));
assertThrows(IllegalArgumentException.class, () -> TypeUtil.nextPowerOf2((Integer.MAX_VALUE / 2) + 1));
assertThrows(IllegalArgumentException.class, () -> TypeUtil.nextPowerOf2(Integer.MAX_VALUE));
assertThrows(IllegalArgumentException.class, () -> TypeUtil.nextPowerOf2(-1));
assertThrows(IllegalArgumentException.class, () -> TypeUtil.nextPowerOf2(Integer.MIN_VALUE));
}

@Test
public void testLogNextPowerOf2()
{
assertThat(TypeUtil.log2NextPowerOf2(0), Matchers.is(0));
assertThat(TypeUtil.log2NextPowerOf2(1), Matchers.is(0));
assertThat(TypeUtil.log2NextPowerOf2(2), Matchers.is(1));
assertThat(TypeUtil.log2NextPowerOf2(3), Matchers.is(2));
assertThat(TypeUtil.log2NextPowerOf2(4), Matchers.is(2));

int value = 4;
int power = 2;
while (value < Integer.MAX_VALUE / 2)
{
System.err.printf("v=%d p=%d%n", value, power);
assertThat(TypeUtil.log2NextPowerOf2(value - 1), Matchers.is(power));
assertThat(TypeUtil.log2NextPowerOf2(value), Matchers.is(power));
assertThat(TypeUtil.log2NextPowerOf2(value + 1), Matchers.is(power + 1));
value = value * 2;
power = power + 1;
}

assertThat(TypeUtil.log2NextPowerOf2(Integer.MAX_VALUE / 2), Matchers.is(30));
assertThrows(IllegalArgumentException.class, () -> TypeUtil.log2NextPowerOf2((Integer.MAX_VALUE / 2) + 1));
assertThrows(IllegalArgumentException.class, () -> TypeUtil.log2NextPowerOf2(Integer.MAX_VALUE));
assertThrows(IllegalArgumentException.class, () -> TypeUtil.log2NextPowerOf2(-1));
assertThrows(IllegalArgumentException.class, () -> TypeUtil.log2NextPowerOf2(Integer.MIN_VALUE));
}
}

0 comments on commit f2b545c

Please sign in to comment.