Skip to content

Commit

Permalink
Issue #6974 - add heuristic for maxHeapMemory and maxDirectMemory
Browse files Browse the repository at this point in the history
Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
  • Loading branch information
lachlan-roberts committed Oct 20, 2021
1 parent f847a61 commit 40114e8
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,20 @@ abstract class AbstractByteBufferPool implements ByteBufferPool
private final AtomicLong _heapMemory = new AtomicLong();
private final AtomicLong _directMemory = new AtomicLong();

/**
* Creates a new ByteBufferPool with the given configuration.
*
* @param factor the capacity factor
* @param maxQueueLength the maximum ByteBuffer queue length
* @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic.
* @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic.
*/
protected AbstractByteBufferPool(int factor, int maxQueueLength, long maxHeapMemory, long maxDirectMemory)
{
_factor = factor <= 0 ? 1024 : factor;
_maxQueueLength = maxQueueLength;
_maxHeapMemory = maxHeapMemory;
_maxDirectMemory = maxDirectMemory;
_maxHeapMemory = (maxHeapMemory != 0) ? maxHeapMemory : Runtime.getRuntime().totalMemory() / 4;
_maxDirectMemory = (maxDirectMemory != 0) ? maxDirectMemory : Runtime.getRuntime().totalMemory() / 4;
}

protected int getCapacityFactor()
Expand Down Expand Up @@ -97,6 +105,18 @@ public long getHeapMemory()
return getMemory(false);
}

@ManagedAttribute("The max num of bytes that can be retained from direct ByteBuffers")
public long getMaxDirectMemory()
{
return _maxDirectMemory;
}

@ManagedAttribute("The max num of bytes that can be retained from heap ByteBuffers")
public long getMaxHeapMemory()
{
return _maxHeapMemory;
}

public long getMemory(boolean direct)
{
AtomicLong memory = direct ? _directMemory : _heapMemory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@

package org.eclipse.jetty.io;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.DumpableCollection;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;

Expand All @@ -39,9 +45,11 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool
{
private static final Logger LOG = Log.getLogger(MappedByteBufferPool.class);

private final int _maxCapacity;
private final int _minCapacity;
private final ByteBufferPool.Bucket[] _direct;
private final ByteBufferPool.Bucket[] _indirect;
private boolean _detailedDump = false;

/**
* Creates a new ArrayByteBufferPool with a default configuration.
Expand Down Expand Up @@ -83,8 +91,8 @@ public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int max
* @param factor the capacity factor
* @param maxCapacity the maximum ByteBuffer capacity
* @param maxQueueLength the maximum ByteBuffer queue length
* @param maxHeapMemory the max heap memory in bytes
* @param maxDirectMemory the max direct memory in bytes
* @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic.
* @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic.
*/
public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxQueueLength, long maxHeapMemory, long maxDirectMemory)
{
Expand All @@ -97,17 +105,17 @@ public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int max
maxCapacity = 64 * 1024;
if ((maxCapacity % factor) != 0 || factor >= maxCapacity)
throw new IllegalArgumentException("The capacity factor must be a divisor of maxCapacity");
_maxCapacity = maxCapacity;
_minCapacity = minCapacity;

int length = maxCapacity / factor;
int length = getBucketNumber(maxCapacity);
_direct = new ByteBufferPool.Bucket[length];
_indirect = new ByteBufferPool.Bucket[length];
}

@Override
public ByteBuffer acquire(int size, boolean direct)
{
int capacity = size < _minCapacity ? size : (bucketFor(size) + 1) * getCapacityFactor();
int capacity = size < _minCapacity ? size : getBucketCapacity(getBucketNumber(size));
ByteBufferPool.Bucket bucket = bucketFor(size, direct, false);
if (bucket == null)
return newByteBuffer(capacity, direct);
Expand All @@ -125,25 +133,29 @@ public void release(ByteBuffer buffer)

int capacity = buffer.capacity();
// Validate that this buffer is from this pool.
if ((capacity % getCapacityFactor()) != 0)
if (capacity != getBucketCapacity(getBucketNumber(capacity)))
{
if (LOG.isDebugEnabled())
LOG.debug("ByteBuffer {} does not belong to this pool, discarding it", BufferUtil.toDetailString(buffer));
return;
}

// Don't release into the pool if greater than the maximum ByteBuffer capacity.
if (capacity > _maxCapacity)
return;

boolean direct = buffer.isDirect();
ByteBufferPool.Bucket bucket = bucketFor(capacity, direct, true);
if (bucket != null)
{
bucket.release(buffer);
releaseExcessMemory(direct, this::clearOldestBucket);
releaseExcessMemory(direct, this::releaseMemory);
}
}

private Bucket newBucket(int key, boolean direct)
{
Bucket bucket = new Bucket(this, key * getCapacityFactor(), getMaxQueueLength());
Bucket bucket = new Bucket(this, getBucketCapacity(key), getMaxQueueLength());
bucket.setPoolSizeAtomic(getSizeAtomic(direct));
return bucket;
}
Expand All @@ -165,7 +177,7 @@ public void clear()
}
}

private void clearOldestBucket(boolean direct)
protected void releaseMemory(boolean direct)
{
long oldest = Long.MAX_VALUE;
int index = -1;
Expand All @@ -191,28 +203,33 @@ private void clearOldestBucket(boolean direct)
}
}

private int bucketFor(int capacity)
protected int getBucketNumber(int capacity)
{
return (int)Math.ceil((double)capacity / getCapacityFactor());
}

protected int getBucketCapacity(int bucket)
{
return (capacity - 1) / getCapacityFactor();
return bucket * getCapacityFactor();
}

private ByteBufferPool.Bucket bucketFor(int capacity, boolean direct, boolean create)
{
if (capacity < _minCapacity)
return null;
int b = bucketFor(capacity);
if (b >= _direct.length)
int index = getBucketNumber(capacity) - 1;
if (index >= _direct.length)
return null;
Bucket[] buckets = bucketsFor(direct);
Bucket bucket = buckets[b];
Bucket bucket = buckets[index];
if (bucket != null)
return bucket;

synchronized (this)
{
bucket = buckets[b];
bucket = buckets[index];
if (bucket == null && create)
buckets[b] = bucket = newBucket(b + 1, direct);
buckets[index] = bucket = newBucket(index + 1, direct);
return bucket;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public ByteBuffer acquire(boolean direct)

public void release(ByteBuffer buffer)
{
_lastUpdate.lazySet(System.nanoTime());
resetUpdateTime();
BufferUtil.clear(buffer);
if (_size == null)
queueOffer(buffer);
Expand All @@ -212,6 +212,11 @@ else if (_size.incrementAndGet() <= _maxSize)
_size.decrementAndGet();
}

void resetUpdateTime()
{
_lastUpdate.lazySet(System.nanoTime());
}

public void clear()
{
int size = _size == null ? 0 : _size.get() - 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@

package org.eclipse.jetty.io;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -28,6 +31,8 @@
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.DumpableCollection;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;

Expand All @@ -45,6 +50,7 @@ public class MappedByteBufferPool extends AbstractByteBufferPool
private final ConcurrentMap<Integer, Bucket> _directBuffers = new ConcurrentHashMap<>();
private final ConcurrentMap<Integer, Bucket> _heapBuffers = new ConcurrentHashMap<>();
private final Function<Integer, Bucket> _newBucket;
private boolean _detailedDump = false;

/**
* Creates a new MappedByteBufferPool with a default configuration.
Expand Down Expand Up @@ -93,8 +99,8 @@ public MappedByteBufferPool(int factor, int maxQueueLength, Function<Integer, Bu
* @param factor the capacity factor
* @param maxQueueLength the maximum ByteBuffer queue length
* @param newBucket the function that creates a Bucket
* @param maxHeapMemory the max heap memory in bytes
* @param maxDirectMemory the max direct memory in bytes
* @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic.
* @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic.
*/
public MappedByteBufferPool(int factor, int maxQueueLength, Function<Integer, Bucket> newBucket, long maxHeapMemory, long maxDirectMemory)
{
Expand All @@ -112,8 +118,8 @@ private Bucket newBucket(int key, boolean direct)
@Override
public ByteBuffer acquire(int size, boolean direct)
{
int b = bucketFor(size);
int capacity = b * getCapacityFactor();
int b = getBucketNumber(size);
int capacity = getBucketCapacity(b);
ConcurrentMap<Integer, Bucket> buffers = bucketsFor(direct);
Bucket bucket = buffers.get(b);
if (bucket == null)
Expand All @@ -131,20 +137,20 @@ public void release(ByteBuffer buffer)
return; // nothing to do

int capacity = buffer.capacity();
int b = getBucketNumber(capacity);
// Validate that this buffer is from this pool.
if ((capacity % getCapacityFactor()) != 0)
if (capacity != getBucketCapacity(b))
{
if (LOG.isDebugEnabled())
LOG.debug("ByteBuffer {} does not belong to this pool, discarding it", BufferUtil.toDetailString(buffer));
return;
}

int b = bucketFor(capacity);
boolean direct = buffer.isDirect();
ConcurrentMap<Integer, Bucket> buckets = bucketsFor(direct);
Bucket bucket = buckets.computeIfAbsent(b, i -> newBucket(i, direct));
bucket.release(buffer);
releaseExcessMemory(direct, this::clearOldestBucket);
releaseExcessMemory(direct, this::releaseMemory);
}

@Override
Expand All @@ -157,14 +163,17 @@ public void clear()
_heapBuffers.clear();
}

private void clearOldestBucket(boolean direct)
protected void releaseMemory(boolean direct)
{
long oldest = Long.MAX_VALUE;
int index = -1;
ConcurrentMap<Integer, Bucket> buckets = bucketsFor(direct);
for (Map.Entry<Integer, Bucket> entry : buckets.entrySet())
{
Bucket bucket = entry.getValue();
if (bucket.isEmpty())
continue;

long lastUpdate = bucket.getLastUpdate();
if (lastUpdate < oldest)
{
Expand All @@ -181,13 +190,14 @@ private void clearOldestBucket(boolean direct)
}
}

private int bucketFor(int size)
protected int getBucketNumber(int capacity)
{
int factor = getCapacityFactor();
int bucket = size / factor;
if (bucket * factor != size)
++bucket;
return bucket;
return (int)Math.ceil((double)capacity / getCapacityFactor());
}

protected int getBucketCapacity(int bucket)
{
return bucket * getCapacityFactor();
}

@ManagedAttribute("The number of pooled direct ByteBuffers")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public void test() throws Exception
_client.start();

int numThreads = 100;
int maxMessageSize = 1024 * 64;
for (int msgSize = 1024; msgSize < maxMessageSize; msgSize += 512)
int maxMessageSize = 1024 * 1024;
for (int msgSize = 1024; msgSize < maxMessageSize; msgSize += 1024)
{
ContentResponse get = httpClient.GET("http://localhost:8080/setCount?numThreads=" + numThreads);
assertThat(get.getStatus(), is(200));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.eclipse.jetty.io.ArrayByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.jmx.MBeanContainer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
Expand Down Expand Up @@ -58,9 +57,11 @@ public void before() throws Exception
{
_server = new Server();
// _server.addBean(new LogArrayByteBufferPool(512, -1, -1, maxMemory, maxMemory));
int maxMemory = 1024 * 8;
ByteBufferPool bufferPool = new ArrayByteBufferPool(-1, -1, -1, -1, maxMemory, maxMemory);
// ByteBufferPool bufferPool = new MappedByteBufferPool(-1, -1, null, maxMemory, maxMemory);
int maxMemory = 0;//-1;//1024 * 8;
// ByteBufferPool bufferPool = new ArrayByteBufferPool(-1, -1, -1, -1, maxMemory, maxMemory);
// ByteBufferPool bufferPool = new LogArrayByteBufferPool(-1, -1, -1, maxMemory, maxMemory);
MappedByteBufferPool bufferPool = new MappedByteBufferPool(-1, -1, null, maxMemory, maxMemory);
bufferPool.setDetailedDump(true);
// ByteBufferPool bufferPool = new NullByteBufferPool();
_server.addBean(bufferPool);
ServerConnector _connector = new ServerConnector(_server);
Expand Down

0 comments on commit 40114e8

Please sign in to comment.