Skip to content

Commit

Permalink
Transport: AdaptiveRecvByteBufAllocator does not correctly calculate … (
Browse files Browse the repository at this point in the history
#13882)

…initial buffer capacity in some cases

Motivation:

We did not correctly calculate the initial index sometimes which could
lead to the initial buffer capacity to exceed the configured maximum
size.

Modifications:

- Correctly calculate initialIndex
- Add unit tests

Result:

Fixes #13722
  • Loading branch information
normanmaurer committed Mar 2, 2024
1 parent 45a7543 commit 85cee9d
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 8 deletions.
Expand Up @@ -94,16 +94,20 @@ private static int getSizeTableIndex(final int size) {
private final class HandleImpl extends MaxMessageHandle {
private final int minIndex;
private final int maxIndex;
private final int minCapacity;
private final int maxCapacity;
private int index;
private int nextReceiveBufferSize;
private boolean decreaseNow;

HandleImpl(int minIndex, int maxIndex, int initial) {
HandleImpl(int minIndex, int maxIndex, int initialIndex, int minCapacity, int maxCapacity) {
this.minIndex = minIndex;
this.maxIndex = maxIndex;

index = getSizeTableIndex(initial);
nextReceiveBufferSize = SIZE_TABLE[index];
index = initialIndex;
nextReceiveBufferSize = max(SIZE_TABLE[index], minCapacity);
this.minCapacity = minCapacity;
this.maxCapacity = maxCapacity;
}

@Override
Expand All @@ -127,14 +131,14 @@ private void record(int actualReadBytes) {
if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) {
if (decreaseNow) {
index = max(index - INDEX_DECREMENT, minIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
nextReceiveBufferSize = max(SIZE_TABLE[index], minCapacity);
decreaseNow = false;
} else {
decreaseNow = true;
}
} else if (actualReadBytes >= nextReceiveBufferSize) {
index = min(index + INDEX_INCREMENT, maxIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
nextReceiveBufferSize = min(SIZE_TABLE[index], maxCapacity);
decreaseNow = false;
}
}
Expand All @@ -147,7 +151,9 @@ public void readComplete() {

private final int minIndex;
private final int maxIndex;
private final int initial;
private final int initialIndex;
private final int minCapacity;
private final int maxCapacity;

/**
* Creates a new predictor with the default parameters. With the default
Expand Down Expand Up @@ -188,13 +194,20 @@ public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
this.maxIndex = maxIndex;
}

this.initial = initial;
int initialIndex = getSizeTableIndex(initial);
if (SIZE_TABLE[initialIndex] > initial) {
this.initialIndex = initialIndex - 1;
} else {
this.initialIndex = initialIndex;
}
this.minCapacity = minimum;
this.maxCapacity = maximum;
}

@SuppressWarnings("deprecation")
@Override
public Handle newHandle() {
return new HandleImpl(minIndex, maxIndex, initial);
return new HandleImpl(minIndex, maxIndex, initialIndex, minCapacity, maxCapacity);
}

@Override
Expand Down
Expand Up @@ -20,9 +20,11 @@
import io.netty.buffer.UnpooledByteBufAllocator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import org.mockito.Mock;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -98,6 +100,54 @@ public void lastPartialReadCanRampUp() {
allocReadExpected(handle, alloc, 131072);
}

@Test
public void doesNotExceedMaximum() {
AdaptiveRecvByteBufAllocator recvByteBufAllocator = new AdaptiveRecvByteBufAllocator(64, 9000, 9000);
RecvByteBufAllocator.ExtendedHandle handle =
(RecvByteBufAllocator.ExtendedHandle) recvByteBufAllocator.newHandle();
handle.reset(config);
allocReadExpected(handle, alloc, 8192);
}

@Test
public void doesSetCorrectMinBounds() {
AdaptiveRecvByteBufAllocator recvByteBufAllocator = new AdaptiveRecvByteBufAllocator(81, 95, 95);
RecvByteBufAllocator.ExtendedHandle handle =
(RecvByteBufAllocator.ExtendedHandle) recvByteBufAllocator.newHandle();
handle.reset(config);
allocReadExpected(handle, alloc, 81);
}

@Test
public void throwsIfInitialIsBiggerThenMaximum() {
assertThrows(IllegalArgumentException.class, new Executable() {
@Override
public void execute() {
new AdaptiveRecvByteBufAllocator(64, 4096 , 1024);
}
});
}

@Test
public void throwsIfInitialIsSmallerThenMinimum() {
assertThrows(IllegalArgumentException.class, new Executable() {
@Override
public void execute() {
new AdaptiveRecvByteBufAllocator(512, 64 , 1024);
}
});
}

@Test
public void throwsIfMinimumIsBiggerThenMaximum() {
assertThrows(IllegalArgumentException.class, new Executable() {
@Override
public void execute() {
new AdaptiveRecvByteBufAllocator(2048, 64 , 1024);
}
});
}

private static void allocReadExpected(RecvByteBufAllocator.ExtendedHandle handle,
ByteBufAllocator alloc,
int expectedSize) {
Expand Down

0 comments on commit 85cee9d

Please sign in to comment.