Skip to content

Commit

Permalink
AdaptivePoolingAllocator EventLoop Magazine's affinity (#14047)
Browse files Browse the repository at this point in the history
  • Loading branch information
franz1981 committed May 14, 2024
1 parent 0b10894 commit c77c02e
Showing 1 changed file with 80 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,20 @@
import io.netty5.buffer.internal.CleanerDrop;
import io.netty5.buffer.internal.InternalBufferUtils;
import io.netty5.util.NettyRuntime;
import io.netty5.util.concurrent.EventExecutor;
import io.netty5.util.concurrent.FastThreadLocal;
import io.netty5.util.concurrent.FastThreadLocalThread;
import io.netty5.util.internal.PlatformDependent;
import io.netty5.util.internal.SystemPropertyUtil;
import io.netty5.util.internal.ThreadExecutorMap;
import org.jetbrains.annotations.NotNull;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.Arrays;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.locks.StampedLock;
import java.util.function.Supplier;

Expand Down Expand Up @@ -69,6 +75,9 @@
* The {@link #createSharedChunkQueue()} method can be overridden to customize this queue.
*/
public class AdaptivePoolingAllocator implements BufferAllocator {

private static final int EXPANSION_ATTEMPTS = 3;
private static final int INITIAL_MAGAZINES = 4;
private static final int RETIRE_CAPACITY = 4 * 1024;
private static final int MIN_CHUNK_SIZE = 128 * 1024;
private static final int MAX_STRIPES = NettyRuntime.availableProcessors() * 2;
Expand All @@ -93,29 +102,61 @@ public class AdaptivePoolingAllocator implements BufferAllocator {
protected static final int CENTRAL_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
"io.netty5.allocator.centralQueueCapacity", NettyRuntime.availableProcessors());

private static final Object NO_MAGAZINE = Boolean.TRUE;

private final AllocationType allocationType;
private final MemoryManager manager;
private final Queue<Buffer> centralQueue;
private final AllocatorControl allocatorControl;
private final StampedLock magazineExpandLock;
private volatile Magazine[] magazines;
private volatile boolean closed;
private final FastThreadLocal<Object> threadLocalMagazine;
private final Set<Magazine> liveCachedMagazines;

public AdaptivePoolingAllocator() {
this(PlatformDependent.directBufferPreferred());
}

public AdaptivePoolingAllocator(boolean direct) {
this(MemoryManager.instance(), direct);
this(MemoryManager.instance(), direct, true);
}

public AdaptivePoolingAllocator(MemoryManager manager, boolean direct) {
AdaptivePoolingAllocator(MemoryManager manager, boolean direct, boolean eventExecutorMagazines) {
allocationType = direct ? StandardAllocationTypes.OFF_HEAP : StandardAllocationTypes.ON_HEAP;
this.manager = manager;
centralQueue = requireNonNull(createSharedChunkQueue());
allocatorControl = new SimpleAllocatorControl(this);
magazineExpandLock = new StampedLock();
Magazine[] mags = new Magazine[4];
if (eventExecutorMagazines) {
final Set<Magazine> liveMagazines = new CopyOnWriteArraySet<>();
threadLocalMagazine = new FastThreadLocal<>() {
@Override
protected Object initialValue() {
EventExecutor executor = ThreadExecutorMap.currentExecutor();
if (executor == null) {
return NO_MAGAZINE;
}
Magazine mag = new Magazine(AdaptivePoolingAllocator.this, executor);
liveMagazines.add(mag);
return mag;
}

@Override
protected void onRemoval(final Object value) {
if (value != NO_MAGAZINE) {
if (liveMagazines.remove(value)) {
((Magazine) value).close();
}
}
}
};
liveCachedMagazines = liveMagazines;
} else {
threadLocalMagazine = null;
liveCachedMagazines = null;
}
Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
for (int i = 0; i < mags.length; i++) {
mags[i] = new Magazine(this);
}
Expand Down Expand Up @@ -165,7 +206,15 @@ public Buffer allocate(int size) {
InternalBufferUtils.assertValidBufferSize(size);
if (size <= MAX_CHUNK_SIZE) {
int sizeBucket = AllocationStatistics.sizeBucket(size); // Compute outside of Magazine lock for better ILP.
long threadId = threadId(Thread.currentThread());
FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
Thread currentThread = Thread.currentThread();
if (threadLocalMagazine != null && currentThread instanceof FastThreadLocalThread) {
Object mag = threadLocalMagazine.get();
if (mag != NO_MAGAZINE) {
return ((Magazine) mag).allocate(size, sizeBucket);
}
}
long threadId = threadId(currentThread);
int expansions = 0;
Magazine[] mags;
do {
Expand All @@ -184,7 +233,7 @@ public Buffer allocate(int size) {
}
}
expansions++;
} while (expansions <= 3 && tryExpandMagazines(mags.length));
} while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines(mags.length));
}
// The magazines failed us, or the buffer is too big to be pooled. Allocate unpooled buffer.
return manager.allocateShared(allocatorControl, size, standardDrop(manager), allocationType);
Expand Down Expand Up @@ -241,6 +290,20 @@ public void close() {
} finally {
magazineExpandLock.unlockWrite(magsExpandWriteLock);
}
if (liveCachedMagazines != null) {
liveCachedMagazines.forEach(mag -> {
try {
mag.ownerEventExecutor.execute(() -> {
threadLocalMagazine.remove();
drainCloseCentralQueue();
});
} catch (Throwable ignore) {
// if we've been rejected here, it's likely the event executor will take care of remove it
// but if the event executor was running on an FastThreadLocalThread which doesn't remove it,
// we're in trouble :"(
}
});
}
drainCloseCentralQueue();
}

Expand Down Expand Up @@ -273,6 +336,7 @@ private static class AllocationStatistics extends StampedLock {
private static final int HISTO_MAX_BUCKET_MASK = HISTO_BUCKET_COUNT - 1;

protected final AdaptivePoolingAllocator parent;
protected final EventExecutor ownerEventExecutor;
private final short[][] histos = {
new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
Expand All @@ -286,8 +350,9 @@ private static class AllocationStatistics extends StampedLock {
private volatile int sharedPrefChunkSize = MIN_CHUNK_SIZE;
protected volatile int localPrefChunkSize = MIN_CHUNK_SIZE;

private AllocationStatistics(AdaptivePoolingAllocator parent) {
private AllocationStatistics(AdaptivePoolingAllocator parent, EventExecutor ownerEventExecutor) {
this.parent = parent;
this.ownerEventExecutor = ownerEventExecutor;
}

protected void recordAllocationSize(int bucket) {
Expand Down Expand Up @@ -326,8 +391,10 @@ private void rotateHistograms() {
int percentileSize = 1 << sizeBucket + HISTO_MIN_BUCKET_SHIFT;
int prefChunkSize = Math.max(percentileSize * BUFS_PER_CHUNK, MIN_CHUNK_SIZE);
localPrefChunkSize = prefChunkSize;
for (Magazine mag : parent.magazines) {
prefChunkSize = Math.max(prefChunkSize, mag.localPrefChunkSize);
if (ownerEventExecutor == null) {
for (Magazine mag : parent.magazines) {
prefChunkSize = Math.max(prefChunkSize, mag.localPrefChunkSize);
}
}
if (sharedPrefChunkSize != prefChunkSize) {
// Preferred chunk size changed. Increase check frequency.
Expand Down Expand Up @@ -373,7 +440,11 @@ private static final class Magazine extends AllocationStatistics {
private volatile Buffer nextInLine;

Magazine(AdaptivePoolingAllocator parent) {
super(parent);
super(parent, null);
}

Magazine(AdaptivePoolingAllocator parent, EventExecutor ownerEventExecutor) {
super(parent, ownerEventExecutor);
}

public Buffer allocate(int size, int sizeBucket) {
Expand Down

0 comments on commit c77c02e

Please sign in to comment.