Skip to content

Commit

Permalink
Issue #6974 - fix various bugs ByteBufferPool implementations
Browse files Browse the repository at this point in the history
Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
  • Loading branch information
lachlan-roberts committed Oct 19, 2021
1 parent 5a89e2a commit 6666cd8
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ abstract class AbstractByteBufferPool implements ByteBufferPool
private final int _factor;
private final int _maxQueueLength;
private final long _maxHeapMemory;
private final AtomicLong _heapMemory = new AtomicLong();
private final long _maxDirectMemory;
private final AtomicLong _heapMemory = new AtomicLong();
private final AtomicLong _directMemory = new AtomicLong();

protected AbstractByteBufferPool(int factor, int maxQueueLength, long maxHeapMemory, long maxDirectMemory)
Expand All @@ -54,11 +54,13 @@ protected int getMaxQueueLength()
return _maxQueueLength;
}

@Deprecated
protected void decrementMemory(ByteBuffer buffer)
{
updateMemory(buffer, false);
}

@Deprecated
protected void incrementMemory(ByteBuffer buffer)
{
updateMemory(buffer, true);
Expand Down Expand Up @@ -101,6 +103,11 @@ public long getMemory(boolean direct)
return memory.get();
}

public AtomicLong getSizeAtomic(boolean direct)
{
return (direct) ? _directMemory : _heapMemory;
}

@ManagedOperation(value = "Clears this ByteBufferPool", impact = "ACTION")
public void clear()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
import java.util.function.IntFunction;

import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
Expand Down Expand Up @@ -109,13 +108,12 @@ public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int max
public ByteBuffer acquire(int size, boolean direct)
{
int capacity = size < _minCapacity ? size : (bucketFor(size) + 1) * getCapacityFactor();
ByteBufferPool.Bucket bucket = bucketFor(size, direct, null);
ByteBufferPool.Bucket bucket = bucketFor(size, direct, false);
if (bucket == null)
return newByteBuffer(capacity, direct);
ByteBuffer buffer = bucket.acquire();
if (buffer == null)
return newByteBuffer(capacity, direct);
decrementMemory(buffer);
return buffer;
}

Expand All @@ -135,18 +133,19 @@ public void release(ByteBuffer buffer)
}

boolean direct = buffer.isDirect();
ByteBufferPool.Bucket bucket = bucketFor(capacity, direct, this::newBucket);
ByteBufferPool.Bucket bucket = bucketFor(capacity, direct, true);
if (bucket != null)
{
bucket.release(buffer);
incrementMemory(buffer);
releaseExcessMemory(direct, this::clearOldestBucket);
}
}

private Bucket newBucket(int key)
private Bucket newBucket(int key, boolean direct)
{
return new Bucket(this, key * getCapacityFactor(), getMaxQueueLength());
Bucket bucket = new Bucket(this, key * getCapacityFactor(), getMaxQueueLength());
bucket.setPoolSizeAtomic(getSizeAtomic(direct));
return bucket;
}

@Override
Expand Down Expand Up @@ -174,7 +173,7 @@ private void clearOldestBucket(boolean direct)
for (int i = 0; i < buckets.length; ++i)
{
Bucket bucket = buckets[i];
if (bucket == null)
if (bucket == null || bucket.isEmpty())
continue;
long lastUpdate = bucket.getLastUpdate();
if (lastUpdate < oldest)
Expand All @@ -186,11 +185,9 @@ private void clearOldestBucket(boolean direct)
if (index >= 0)
{
Bucket bucket = buckets[index];
buckets[index] = null;
// The same bucket may be concurrently
// removed, so we need this null guard.
// Null guard in case this.clear() is called concurrently.
if (bucket != null)
bucket.clear(this::decrementMemory);
bucket.clear();
}
}

Expand All @@ -199,7 +196,7 @@ private int bucketFor(int capacity)
return (capacity - 1) / getCapacityFactor();
}

private ByteBufferPool.Bucket bucketFor(int capacity, boolean direct, IntFunction<Bucket> newBucket)
private ByteBufferPool.Bucket bucketFor(int capacity, boolean direct, boolean create)
{
if (capacity < _minCapacity)
return null;
Expand All @@ -208,9 +205,16 @@ private ByteBufferPool.Bucket bucketFor(int capacity, boolean direct, IntFunctio
return null;
Bucket[] buckets = bucketsFor(direct);
Bucket bucket = buckets[b];
if (bucket == null && newBucket != null)
buckets[b] = bucket = newBucket.apply(b + 1);
return bucket;
if (bucket != null)
return bucket;

synchronized (this)
{
bucket = buckets[b];
if (bucket == null && create)
buckets[b] = bucket = newBucket(b + 1, direct);
return bucket;
}
}

@ManagedAttribute("The number of pooled direct ByteBuffers")
Expand Down
21 changes: 12 additions & 9 deletions jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

import org.eclipse.jetty.util.BufferUtil;

Expand Down Expand Up @@ -160,6 +159,7 @@ class Bucket
private final int _maxSize;
private final AtomicInteger _size;
private final AtomicLong _lastUpdate = new AtomicLong(System.nanoTime());
private AtomicLong _poolSize;

public Bucket(ByteBufferPool pool, int capacity, int maxSize)
{
Expand All @@ -169,6 +169,11 @@ public Bucket(ByteBufferPool pool, int capacity, int maxSize)
_size = maxSize > 0 ? new AtomicInteger() : null;
}

void setPoolSizeAtomic(AtomicLong poolSize)
{
_poolSize = poolSize;
}

public ByteBuffer acquire()
{
ByteBuffer buffer = queuePoll();
Expand Down Expand Up @@ -208,20 +213,13 @@ else if (_size.incrementAndGet() <= _maxSize)
}

public void clear()
{
clear(null);
}

void clear(Consumer<ByteBuffer> memoryFn)
{
int size = _size == null ? 0 : _size.get() - 1;
while (size >= 0)
{
ByteBuffer buffer = queuePoll();
if (buffer == null)
break;
if (memoryFn != null)
memoryFn.accept(buffer);
if (_size != null)
{
_size.decrementAndGet();
Expand All @@ -233,11 +231,16 @@ void clear(Consumer<ByteBuffer> memoryFn)
private void queueOffer(ByteBuffer buffer)
{
_queue.offer(buffer);
if (_poolSize != null)
_poolSize.addAndGet(buffer.capacity());
}

private ByteBuffer queuePoll()
{
return _queue.poll();
ByteBuffer buffer = _queue.poll();
if (buffer != null && _poolSize != null)
_poolSize.addAndGet(-buffer.capacity());
return buffer;
}

boolean isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,14 @@ public MappedByteBufferPool(int factor, int maxQueueLength, Function<Integer, Bu
public MappedByteBufferPool(int factor, int maxQueueLength, Function<Integer, Bucket> newBucket, long maxHeapMemory, long maxDirectMemory)
{
super(factor, maxQueueLength, maxHeapMemory, maxDirectMemory);
_newBucket = newBucket != null ? newBucket : this::newBucket;
_newBucket = newBucket;
}

private Bucket newBucket(int key)
private Bucket newBucket(int key, boolean direct)
{
return new Bucket(this, key * getCapacityFactor(), getMaxQueueLength());
Bucket bucket = (_newBucket != null) ? _newBucket.apply(key) : new Bucket(this, key * getCapacityFactor(), getMaxQueueLength());
bucket.setPoolSizeAtomic(getSizeAtomic(direct));
return bucket;
}

@Override
Expand All @@ -119,7 +121,6 @@ public ByteBuffer acquire(int size, boolean direct)
ByteBuffer buffer = bucket.acquire();
if (buffer == null)
return newByteBuffer(capacity, direct);
decrementMemory(buffer);
return buffer;
}

Expand All @@ -141,9 +142,8 @@ public void release(ByteBuffer buffer)
int b = bucketFor(capacity);
boolean direct = buffer.isDirect();
ConcurrentMap<Integer, Bucket> buckets = bucketsFor(direct);
Bucket bucket = buckets.computeIfAbsent(b, _newBucket);
Bucket bucket = buckets.computeIfAbsent(b, i -> newBucket(i, direct));
bucket.release(buffer);
incrementMemory(buffer);
releaseExcessMemory(direct, this::clearOldestBucket);
}

Expand Down Expand Up @@ -174,11 +174,10 @@ private void clearOldestBucket(boolean direct)
}
if (index >= 0)
{
Bucket bucket = buckets.remove(index);
// The same bucket may be concurrently
// removed, so we need this null guard.
Bucket bucket = buckets.get(index);
// Null guard in case this.clear() is called concurrently.
if (bucket != null)
bucket.clear(this::decrementMemory);
bucket.clear();
}
}

Expand Down

0 comments on commit 6666cd8

Please sign in to comment.