Skip to content

Commit

Permalink
Issue #6974 - improvements & fixes to ByteBufferPool implementations (#…
Browse files Browse the repository at this point in the history
…7017)

- WebSocket should user server ByteBufferPool if possible
- fix various bugs ByteBufferPool implementations
- add heuristic for maxHeapMemory and maxDirectMemory
- Add dump for ByteBufferPools
- add LogArrayByteBufferPool that does exponential scaling of bucket size.
- ByteBufferPools should default to use maxMemory heuristic
- Add module jetty-bytebufferpool-logarithmic

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
Co-authored-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
lachlan-roberts and sbordet committed Nov 24, 2021
1 parent b365b3c commit c19921e
Show file tree
Hide file tree
Showing 12 changed files with 635 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.IntConsumer;

import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
Expand All @@ -27,16 +28,24 @@ abstract class AbstractByteBufferPool implements ByteBufferPool
private final int _factor;
private final int _maxQueueLength;
private final long _maxHeapMemory;
private final AtomicLong _heapMemory = new AtomicLong();
private final long _maxDirectMemory;
private final AtomicLong _heapMemory = new AtomicLong();
private final AtomicLong _directMemory = new AtomicLong();

/**
* Creates a new ByteBufferPool with the given configuration.
*
* @param factor the capacity factor
* @param maxQueueLength the maximum ByteBuffer queue length
* @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic.
* @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic.
*/
protected AbstractByteBufferPool(int factor, int maxQueueLength, long maxHeapMemory, long maxDirectMemory)
{
_factor = factor <= 0 ? 1024 : factor;
_maxQueueLength = maxQueueLength;
_maxHeapMemory = maxHeapMemory;
_maxDirectMemory = maxDirectMemory;
_maxHeapMemory = (maxHeapMemory != 0) ? maxHeapMemory : Runtime.getRuntime().maxMemory() / 4;
_maxDirectMemory = (maxDirectMemory != 0) ? maxDirectMemory : Runtime.getRuntime().maxMemory() / 4;
}

protected int getCapacityFactor()
Expand All @@ -49,11 +58,13 @@ protected int getMaxQueueLength()
return _maxQueueLength;
}

@Deprecated
protected void decrementMemory(ByteBuffer buffer)
{
updateMemory(buffer, false);
}

@Deprecated
protected void incrementMemory(ByteBuffer buffer)
{
updateMemory(buffer, true);
Expand Down Expand Up @@ -90,12 +101,29 @@ public long getHeapMemory()
return getMemory(false);
}

@ManagedAttribute("The max num of bytes that can be retained from direct ByteBuffers")
public long getMaxDirectMemory()
{
return _maxDirectMemory;
}

@ManagedAttribute("The max num of bytes that can be retained from heap ByteBuffers")
public long getMaxHeapMemory()
{
return _maxHeapMemory;
}

public long getMemory(boolean direct)
{
AtomicLong memory = direct ? _directMemory : _heapMemory;
return memory.get();
}

IntConsumer updateMemory(boolean direct)
{
return (direct) ? _directMemory::addAndGet : _heapMemory::addAndGet;
}

@ManagedOperation(value = "Clears this ByteBufferPool", impact = "ACTION")
public void clear()
{
Expand Down
131 changes: 91 additions & 40 deletions jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,19 @@

package org.eclipse.jetty.io;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.function.IntFunction;
import java.util.stream.Collectors;

import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.DumpableCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -31,13 +36,15 @@
* 2048, and so on.</p>
*/
@ManagedObject
public class ArrayByteBufferPool extends AbstractByteBufferPool
public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpable
{
private static final Logger LOG = LoggerFactory.getLogger(MappedByteBufferPool.class);

private final int _maxCapacity;
private final int _minCapacity;
private final ByteBufferPool.Bucket[] _direct;
private final ByteBufferPool.Bucket[] _indirect;
private boolean _detailedDump = false;

/**
* Creates a new ArrayByteBufferPool with a default configuration.
Expand All @@ -56,7 +63,7 @@ public ArrayByteBufferPool()
*/
public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity)
{
this(minCapacity, factor, maxCapacity, -1, -1, -1);
this(minCapacity, factor, maxCapacity, -1, 0, 0);
}

/**
Expand All @@ -69,7 +76,7 @@ public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity)
*/
public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxQueueLength)
{
this(minCapacity, factor, maxCapacity, maxQueueLength, -1, -1);
this(minCapacity, factor, maxCapacity, maxQueueLength, 0, 0);
}

/**
Expand All @@ -79,8 +86,8 @@ public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int max
* @param factor the capacity factor
* @param maxCapacity the maximum ByteBuffer capacity
* @param maxQueueLength the maximum ByteBuffer queue length
* @param maxHeapMemory the max heap memory in bytes
* @param maxDirectMemory the max direct memory in bytes
* @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic.
* @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic.
*/
public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxQueueLength, long maxHeapMemory, long maxDirectMemory)
{
Expand All @@ -93,24 +100,30 @@ public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int max
maxCapacity = 64 * 1024;
if ((maxCapacity % factor) != 0 || factor >= maxCapacity)
throw new IllegalArgumentException("The capacity factor must be a divisor of maxCapacity");
_maxCapacity = maxCapacity;
_minCapacity = minCapacity;

int length = maxCapacity / factor;
// Initialize all buckets in constructor and never modify the array again.
int length = bucketFor(maxCapacity);
_direct = new ByteBufferPool.Bucket[length];
_indirect = new ByteBufferPool.Bucket[length];
for (int i = 0; i < length; i++)
{
_direct[i] = newBucket(i + 1, true);
_indirect[i] = newBucket(i + 1, false);
}
}

@Override
public ByteBuffer acquire(int size, boolean direct)
{
int capacity = size < _minCapacity ? size : (bucketFor(size) + 1) * getCapacityFactor();
ByteBufferPool.Bucket bucket = bucketFor(size, direct, null);
int capacity = size < _minCapacity ? size : capacityFor(bucketFor(size));
ByteBufferPool.Bucket bucket = bucketFor(size, direct);
if (bucket == null)
return newByteBuffer(capacity, direct);
ByteBuffer buffer = bucket.acquire();
if (buffer == null)
return newByteBuffer(capacity, direct);
decrementMemory(buffer);
return buffer;
}

Expand All @@ -122,26 +135,29 @@ public void release(ByteBuffer buffer)

int capacity = buffer.capacity();
// Validate that this buffer is from this pool.
if ((capacity % getCapacityFactor()) != 0)
if (capacity != capacityFor(bucketFor(capacity)))
{
if (LOG.isDebugEnabled())
LOG.debug("ByteBuffer {} does not belong to this pool, discarding it", BufferUtil.toDetailString(buffer));
return;
}

// Don't release into the pool if greater than the maximum ByteBuffer capacity.
if (capacity > _maxCapacity)
return;

boolean direct = buffer.isDirect();
ByteBufferPool.Bucket bucket = bucketFor(capacity, direct, this::newBucket);
ByteBufferPool.Bucket bucket = bucketFor(capacity, direct);
if (bucket != null)
{
bucket.release(buffer);
incrementMemory(buffer);
releaseExcessMemory(direct, this::clearOldestBucket);
releaseExcessMemory(direct, this::releaseMemory);
}
}

private Bucket newBucket(int key)
private Bucket newBucket(int key, boolean direct)
{
return new Bucket(key * getCapacityFactor(), getMaxQueueLength());
return new Bucket(capacityFor(key), getMaxQueueLength(), updateMemory(direct));
}

@Override
Expand All @@ -150,26 +166,20 @@ public void clear()
super.clear();
for (int i = 0; i < _direct.length; ++i)
{
Bucket bucket = _direct[i];
if (bucket != null)
bucket.clear();
_direct[i] = null;
bucket = _indirect[i];
if (bucket != null)
bucket.clear();
_indirect[i] = null;
_direct[i].clear();
_indirect[i].clear();
}
}

private void clearOldestBucket(boolean direct)
protected void releaseMemory(boolean direct)
{
long oldest = Long.MAX_VALUE;
int index = -1;
Bucket[] buckets = bucketsFor(direct);
for (int i = 0; i < buckets.length; ++i)
{
Bucket bucket = buckets[i];
if (bucket == null)
if (bucket.isEmpty())
continue;
long lastUpdate = bucket.getLastUpdate();
if (lastUpdate < oldest)
Expand All @@ -181,31 +191,29 @@ private void clearOldestBucket(boolean direct)
if (index >= 0)
{
Bucket bucket = buckets[index];
buckets[index] = null;
// The same bucket may be concurrently
// removed, so we need this null guard.
if (bucket != null)
bucket.clear(this::decrementMemory);
bucket.clear();
}
}

private int bucketFor(int capacity)
protected int bucketFor(int capacity)
{
return (int)Math.ceil((double)capacity / getCapacityFactor());
}

protected int capacityFor(int bucket)
{
return (capacity - 1) / getCapacityFactor();
return bucket * getCapacityFactor();
}

private ByteBufferPool.Bucket bucketFor(int capacity, boolean direct, IntFunction<Bucket> newBucket)
private ByteBufferPool.Bucket bucketFor(int capacity, boolean direct)
{
if (capacity < _minCapacity)
return null;
int b = bucketFor(capacity);
if (b >= _direct.length)
int index = bucketFor(capacity) - 1;
if (index >= _direct.length)
return null;
Bucket[] buckets = bucketsFor(direct);
Bucket bucket = buckets[b];
if (bucket == null && newBucket != null)
buckets[b] = bucket = newBucket.apply(b + 1);
return bucket;
return buckets[index];
}

@ManagedAttribute("The number of pooled direct ByteBuffers")
Expand Down Expand Up @@ -233,4 +241,47 @@ ByteBufferPool.Bucket[] bucketsFor(boolean direct)
{
return direct ? _direct : _indirect;
}

public boolean isDetailedDump()
{
return _detailedDump;
}

public void setDetailedDump(boolean detailedDump)
{
_detailedDump = detailedDump;
}

@Override
public void dump(Appendable out, String indent) throws IOException
{
List<Object> dump = new ArrayList<>();
dump.add(String.format("HeapMemory: %d/%d", getHeapMemory(), getMaxHeapMemory()));
dump.add(String.format("DirectMemory: %d/%d", getDirectMemory(), getMaxDirectMemory()));

List<Bucket> indirect = Arrays.stream(_indirect).filter(b -> !b.isEmpty()).collect(Collectors.toList());
List<Bucket> direct = Arrays.stream(_direct).filter(b -> !b.isEmpty()).collect(Collectors.toList());
if (isDetailedDump())
{
dump.add(new DumpableCollection("Indirect Buckets", indirect));
dump.add(new DumpableCollection("Direct Buckets", direct));
}
else
{
dump.add("Indirect Buckets size=" + indirect.size());
dump.add("Direct Buckets size=" + direct.size());
}
Dumpable.dumpObjects(out, indent, this, dump);
}

@Override
public String toString()
{
return String.format("%s@%x{minBufferCapacity=%s, maxBufferCapacity=%s, maxQueueLength=%s, factor=%s}",
this.getClass().getSimpleName(), hashCode(),
_minCapacity,
_maxCapacity,
getMaxQueueLength(),
getCapacityFactor());
}
}

0 comments on commit c19921e

Please sign in to comment.