Skip to content

Commit

Permalink
Fix buffer composition where middle buffers have no readable bytes (#…
Browse files Browse the repository at this point in the history
…12460)

Motivation:
Composite buffers are picky with how their internal buffers are structured.
To cope, the buffer components are sanitised and trimmed to make internal offset calculations easier.
We had an issue where buffer components without any readable bytes, were allowed to sit between components that did have readable bytes.
This could cause an exception to be thrown, if a multi-byte access was striding over an empty buffer.
Effectively, we were always expected such accesses to be able to reach at least one byte.

Modification:
Do more thorough component sanitation, such that we prevent buffer components with no readable bytes, from being placed in between components with readable bytes.
This allows the turn accesses to continue to work as before.

Result:
No more exceptions from torn accesses that straddle across an empty buffer component.
  • Loading branch information
chrisvest committed Jun 13, 2022
1 parent 5ca3e82 commit 63e4f36
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 116 deletions.
236 changes: 146 additions & 90 deletions buffer/src/main/java/io/netty5/buffer/api/DefaultCompositeBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Stream;

import static io.netty5.buffer.api.internal.Statics.MAX_BUFFER_SIZE;
import static io.netty5.buffer.api.internal.Statics.bufferIsClosed;
Expand Down Expand Up @@ -150,7 +150,7 @@ public static CompositeBuffer compose(BufferAllocator allocator, Iterable<Send<B
if (receiveException != null) {
throw receiveException;
}
return new DefaultCompositeBuffer(allocator, filterExternalBufs(bufs.stream()), COMPOSITE_DROP);
return new DefaultCompositeBuffer(allocator, filterExternalBufs(bufs), COMPOSITE_DROP);
}

/**
Expand All @@ -160,53 +160,165 @@ public static CompositeBuffer compose(BufferAllocator allocator) {
return new DefaultCompositeBuffer(allocator, EMPTY_BUFFER_ARRAY, COMPOSITE_DROP);
}

private static Buffer[] filterExternalBufs(Stream<Buffer> refs) {
private static Buffer[] filterExternalBufs(Iterable<Buffer> externals) {
// We filter out all zero-capacity buffers because they wouldn't contribute to the composite buffer anyway,
// and also, by ensuring that all constituent buffers contribute to the size of the composite buffer,
// we make sure that the number of composite buffers will never become greater than the number of bytes in
// we make sure the number of composite buffers will never become greater than the number of bytes in
// the composite buffer.
// We also filter out middle buffers that don't contribute any readable bytes, due to their trimming.
// This restriction guarantees that methods like countComponents, forEachReadable and forEachWritable,
// will never overflow their component counts.
// Allocating a new array unconditionally also prevents external modification of the array.
Buffer[] bufs = refs
.filter(DefaultCompositeBuffer::discardEmpty)
.flatMap(DefaultCompositeBuffer::flattenBuffer)
.toArray(Buffer[]::new);
// Make sure there are no duplicates among the buffers.
Set<Buffer> duplicatesCheck = Collections.newSetFromMap(new IdentityHashMap<>());
duplicatesCheck.addAll(Arrays.asList(bufs));
if (duplicatesCheck.size() < bufs.length) {
IllegalArgumentException iae = new IllegalArgumentException(
"Cannot create composite buffer with duplicate constituent buffer components.");
for (Buffer buf : bufs) {
Collector collector = new Collector(externals);
collector.collect(externals);
return collector.toArray();
}

private static final class Collector {
private Buffer[] array;
private int index;

Collector(Iterable<Buffer> externals) {
final Map<Buffer, Buffer> dupeCheck;
if (externals instanceof Collection) {
dupeCheck = new IdentityHashMap<>(((Collection<?>) externals).size());
} else {
dupeCheck = new IdentityHashMap<>();
}
int size = 0;
for (Buffer buf : externals) {
if (dupeCheck.put(buf, buf) != null) {
// Throw if there are duplicates among the buffers.
// We can do this before we decompose composites, because the components are owned,
// and no two composite buffers can have the same components, unless they violate
// their API contract.
closeAllAndThrowDupeException(externals);
}
size += buf.countComponents();
}
array = new Buffer[size];
}

private static void closeAllAndThrowDupeException(Iterable<Buffer> externals) {
IllegalArgumentException iae = new IllegalArgumentException("Cannot compose duplicate buffers.");
for (Buffer toClose : externals) {
try {
buf.close();
toClose.close();
} catch (Exception closeExc) {
iae.addSuppressed(closeExc);
}
}
throw iae;
}
return bufs;

void add(Buffer buffer) {
if (index == array.length) {
array = Arrays.copyOf(array, array.length * 2);
}
array[index] = buffer;
index++;
}

void collect(Iterable<Buffer> externals) {
for (Buffer buf : externals) {
if (buf.capacity() == 0) {
buf.close();
} else if (CompositeBuffer.isComposite(buf)) {
CompositeBuffer cbuf = (CompositeBuffer) buf;
collect(Arrays.asList(cbuf.decomposeBuffer()));
} else {
add(buf);
}
}
}

Buffer[] toArray() {
int firstReadable = -1;
int lastReadable = -1;
for (int i = 0; i < index; i++) {
if (array[i].readableBytes() != 0) {
if (firstReadable == -1) {
firstReadable = i;
}
lastReadable = i;
}
}
// Bytes already read, in components after the first readable section, must be trimmed off.
// Likewise, writable bytes prior to the last readable section must be trimmed off.
if (firstReadable != -1) {
// Remove middle buffers entirely that have no readable bytes.
for (int i = firstReadable + 1; i < lastReadable; i++) {
Buffer buf = array[i];
if (buf.readableBytes() == 0) {
buf.close();
if (i <= index - 2) {
System.arraycopy(array, i + 1, array, i, index - i - 1);
}
i--;
index--;
lastReadable--;
}
}
// Remove already-read bytes from middle and end buffers.
for (int i = firstReadable + 1; i < index; i++) {
Buffer buf = array[i];
if (buf.readerOffset() > 0) {
buf.readSplit(0).close();
}
}
// Remove writable-bytes from front and middle buffers.
for (int i = 0; i < lastReadable; i++) {
Buffer buf = array[i];
if (buf.writableBytes() > 0) {
array[i] = buf.split();
buf.close();
}
}
}
return array.length == index? array : Arrays.copyOf(array, index);
}
}

private static boolean discardEmpty(Buffer buf) {
if (buf.capacity() > 0) {
return true;
} else {
// If we filter a buffer out, then we must make sure to close it since it's ownership was sent to us.
buf.close();
return false;
private static final class ConcatIterable<T> implements Iterable<T> {
private final Iterable<T> first;
private final Iterable<T> second;

ConcatIterable(Iterable<T> first, Iterable<T> second) {
this.first = first;
this.second = second;
}

@Override
public Iterator<T> iterator() {
return new ConcatIterator(first.iterator(), second.iterator());
}
}

private static Stream<Buffer> flattenBuffer(Buffer buf) {
if (CompositeBuffer.isComposite(buf)) {
// Extract components so composite buffers always have non-composite components.
var composite = (CompositeBuffer) buf;
return Stream.of(composite.decomposeBuffer());
private static final class ConcatIterator<T> implements Iterator<T> {
private Iterator<T> current;
private Iterator<T> next;

ConcatIterator(Iterator<T> first, Iterator<T> second) {
current = first;
next = second;
}

@Override
public boolean hasNext() {
return current != null && current.hasNext() || next != null && next.hasNext();
}

@Override
public T next() {
while (current != null) {
if (current.hasNext()) {
return current.next();
}
current = next;
next = null;
}
throw new NoSuchElementException();
}
return Stream.of(buf);
}

private DefaultCompositeBuffer(BufferAllocator allocator, Buffer[] bufs, Drop<DefaultCompositeBuffer> drop) {
Expand Down Expand Up @@ -240,35 +352,6 @@ private DefaultCompositeBuffer(BufferAllocator allocator, Buffer[] bufs, Drop<De
}

private void computeBufferOffsets() {
int firstReadable = -1;
int lastReadable = -1;
int len = bufs.length;
for (int i = 0; i < len; i++) {
if (bufs[i].readableBytes() != 0) {
if (firstReadable == -1) {
firstReadable = i;
}
lastReadable = i;
}
}
// Bytes already read, in components after the first readable section, must be trimmed off.
// Likewise, writable bytes prior to the last readable section must be trimmed off.
if (firstReadable != -1) {
for (int i = firstReadable + 1; i < len; i++) {
Buffer buf = bufs[i];
if (buf.readerOffset() > 0) {
buf.readSplit(0).close();
}
}
for (int i = 0; i < lastReadable; i++) {
Buffer buf = bufs[i];
if (buf.writableBytes() > 0) {
bufs[i] = buf.split();
buf.close();
}
}
}

int woff = 0;
int roff = 0;
if (bufs.length > 0) {
Expand Down Expand Up @@ -824,29 +907,8 @@ public CompositeBuffer extendWith(Send<Buffer> extension) {

Buffer[] restoreTemp = bufs; // We need this to restore our buffer array, in case offset computations fail.
try {
if (CompositeBuffer.isComposite(buffer)) {
// If the extension is itself a composite buffer, then extend this one by all the constituent
// component buffers.
CompositeBuffer compositeExtension = (CompositeBuffer) buffer;
Buffer[] addedBuffers = compositeExtension.decomposeBuffer();
Set<Buffer> duplicatesCheck = Collections.newSetFromMap(new IdentityHashMap<>());
duplicatesCheck.addAll(Arrays.asList(bufs));
duplicatesCheck.addAll(Arrays.asList(addedBuffers));
if (duplicatesCheck.size() < bufs.length + addedBuffers.length) {
throw extensionDuplicatesException();
}
int extendAtIndex = bufs.length;
bufs = Arrays.copyOf(bufs, extendAtIndex + addedBuffers.length);
System.arraycopy(addedBuffers, 0, bufs, extendAtIndex, addedBuffers.length);
computeBufferOffsets();
} else {
for (Buffer buf : restoreTemp) {
if (buf == buffer) {
throw extensionDuplicatesException();
}
}
unsafeExtendWith(buffer);
}
bufs = filterExternalBufs(new ConcatIterable<>(Arrays.asList(bufs), List.of(buffer)));
computeBufferOffsets();
if (restoreTemp.length == 0) {
readOnly = buffer.readOnly();
}
Expand All @@ -857,12 +919,6 @@ public CompositeBuffer extendWith(Send<Buffer> extension) {
return this;
}

private static IllegalArgumentException extensionDuplicatesException() {
return new IllegalArgumentException(
"The composite buffer cannot be extended with the given extension," +
" as it would cause the buffer to have duplicate constituent buffers.");
}

private void unsafeExtendWith(Buffer extension) {
bufs = Arrays.copyOf(bufs, bufs.length + 1);
bufs[bufs.length - 1] = extension;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.List;

import static io.netty5.buffer.api.internal.Statics.acquire;
import static io.netty5.buffer.api.internal.Statics.isOwned;
import static java.util.Arrays.asList;
Expand Down Expand Up @@ -375,6 +377,42 @@ public void whenExtendingCompositeBufferWithReadOffsetLessThanCapacityThenReadab
}
}

@Test
public void composeMustIgnoreMiddleBuffersWithNoReadableBytes() {
try (BufferAllocator allocator = BufferAllocator.onHeapUnpooled();
Buffer a = allocator.allocate(8).writerOffset(8).readerOffset(4);
Buffer b = allocator.allocate(8).writerOffset(4).readerOffset(4);
Buffer c = allocator.allocate(8).writerOffset(4).readerOffset(0);
CompositeBuffer composite = allocator.compose(List.of(a.send(), b.send(), c.send()))) {
assertThat(composite.countComponents()).isEqualTo(2);
}
}

@Test
public void extendWithMustIgnoreMiddleBuffersWithNoReadableBytes() {
try (BufferAllocator allocator = BufferAllocator.onHeapUnpooled();
Buffer a = allocator.allocate(8).writerOffset(8).readerOffset(4);
Buffer b = allocator.allocate(8).writerOffset(4).readerOffset(4);
Buffer c = allocator.allocate(8).writerOffset(4).readerOffset(0);
CompositeBuffer composite = allocator.compose()) {
composite.extendWith(a.send());
composite.extendWith(b.send());
composite.extendWith(c.send());
assertThat(composite.countComponents()).isEqualTo(2);
}
}

@Test
public void extendWithMustFlattenCompositeBuffers() {
try (BufferAllocator allocator = BufferAllocator.onHeapUnpooled();
Buffer a = allocator.copyOf(new byte[1]);
Buffer b = allocator.copyOf(new byte[1]);
CompositeBuffer composite = allocator.compose()) {
composite.extendWith(allocator.compose(List.of(a.send(), b.send())).send());
assertThat(composite.countComponents()).isEqualTo(2);
}
}

@Test
public void composingReadOnlyBuffersMustCreateReadOnlyCompositeBuffer() {
try (BufferAllocator allocator = BufferAllocator.onHeapUnpooled();
Expand Down Expand Up @@ -416,25 +454,19 @@ public void composingReadOnlyAndWritableBuffersMustThrow() {

@Test
public void compositeWritableBufferCannotBeExtendedWithReadOnlyBuffer() {
try (BufferAllocator allocator = BufferAllocator.onHeapUnpooled()) {
CompositeBuffer composite;
try (Buffer a = allocator.allocate(8)) {
composite = allocator.compose(a.send());
}
try (composite; Buffer b = allocator.allocate(8).makeReadOnly()) {
try (BufferAllocator allocator = BufferAllocator.onHeapUnpooled();
CompositeBuffer composite = allocator.compose(allocator.allocate(8).send())) {
try (Buffer b = allocator.allocate(8).makeReadOnly()) {
assertThrows(IllegalArgumentException.class, () -> composite.extendWith(b.send()));
}
}
}

@Test
public void compositeReadOnlyBufferCannotBeExtendedWithWritableBuffer() {
try (BufferAllocator allocator = BufferAllocator.onHeapUnpooled()) {
CompositeBuffer composite;
try (Buffer a = allocator.allocate(8).makeReadOnly()) {
composite = allocator.compose(a.send());
}
try (composite; Buffer b = allocator.allocate(8)) {
try (BufferAllocator allocator = BufferAllocator.onHeapUnpooled();
CompositeBuffer composite = allocator.compose(allocator.allocate(8).makeReadOnly().send())) {
try (Buffer b = allocator.allocate(8)) {
assertThrows(IllegalArgumentException.class, () -> composite.extendWith(b.send()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -630,11 +630,6 @@ public Buffer cumulate(BufferAllocator alloc, Buffer cumulation, Buffer in) {
Buffer tmp = cumulation.copy();
cumulation.close();
cumulation = tmp;
} else if (cumulation.writableBytes() > 0) {
// Prevent writer-offset gaps from an initial cumulation.
Buffer tmp = cumulation.split();
cumulation.close();
cumulation = tmp;
}
if (CompositeBuffer.isComposite(cumulation)) {
CompositeBuffer composite = (CompositeBuffer) cumulation;
Expand Down

0 comments on commit 63e4f36

Please sign in to comment.