Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Content.Chunk a RetainableByteBuffer #11598

Closed
wants to merge 80 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
bc02230
Make chunk a RBB
gregw Mar 31, 2024
078f080
Make chunk a RBB
gregw Mar 31, 2024
e2818d4
revert EventsHandler change
gregw Mar 31, 2024
e86ab37
WIP
gregw Mar 31, 2024
76db844
Merge branch 'jetty-12.0.x' into fix/jetty-12/10541/byteBufferAccumul…
gregw Apr 2, 2024
c878ddc
updates from review
gregw Apr 2, 2024
df38486
updates from review
gregw Apr 2, 2024
59d2d41
updates from review
gregw Apr 3, 2024
2ef1be9
more tests
gregw Apr 3, 2024
21451a6
inline asReadOnly
gregw Apr 3, 2024
7e61866
Added Appendable as a replacement for both Accumulator and Aggregator
gregw Apr 10, 2024
2553d72
Better slice implementation
gregw Apr 10, 2024
b69fc2a
protect from modification if retained
gregw Apr 10, 2024
1b05042
avoid flip-flopping in append loops
gregw Apr 10, 2024
4f9e5a3
long methods
gregw Apr 10, 2024
cc494bb
use some appends
gregw Apr 10, 2024
a2311ff
javadoc
gregw Apr 10, 2024
ce310ba
Fixed test
gregw Apr 11, 2024
48b0e4c
appendable wrap
gregw Apr 11, 2024
7d92f48
detailString
gregw Apr 11, 2024
2bf7d6c
Revert NON_POOLING change
gregw Apr 11, 2024
059719c
Merge branch 'jetty-12.0.x' into fix/jetty-12/10541/byteBufferAccumul…
gregw Apr 12, 2024
6bc618d
Testing Fixed
gregw Apr 12, 2024
fe175ef
Limit usage to buffer size requests
gregw Apr 12, 2024
22aadc0
Simplified hierarchy and naming
gregw Apr 12, 2024
d2f7ddf
Simplified hierarchy and naming
gregw Apr 12, 2024
31b24b4
more testing
gregw Apr 12, 2024
3ac2df7
improved test
gregw Apr 12, 2024
a514d68
disable leaky tests
gregw Apr 12, 2024
068a1aa
removed redundant NetworkBuffer class
gregw Apr 13, 2024
a0facb7
Merge branch 'jetty-12.0.x' into fix/jetty-12/10541/byteBufferAccumul…
gregw Apr 13, 2024
608a107
Merge branch 'jetty-12.0.x' into fix/jetty-12/10541/byteBufferAccumul…
gregw Apr 17, 2024
7294f0f
Merge branch 'jetty-12.0.x' into fix/jetty-12/10541/byteBufferAccumul…
gregw Apr 18, 2024
fe4beb3
Merge remote-tracking branch 'origin/jetty-12.0.x' into fix/jetty-12/…
gregw Apr 22, 2024
d4c08b8
Better heuristic for retaining buffers
gregw Apr 22, 2024
c3e1932
Merge branch 'jetty-12.0.x' into fix/jetty-12/10541/byteBufferAccumul…
gregw Apr 24, 2024
182e8cf
Merge branch 'jetty-12.0.x' into fix/jetty-12/10541/byteBufferAccumul…
gregw Apr 25, 2024
55029dc
Updated BufferedResponseHandler
gregw Apr 25, 2024
1ce874b
Use RBB in BAEP
gregw Apr 25, 2024
791b41a
added takeRetainableByteBuffer
gregw Apr 25, 2024
0e7f7d5
Reworked ChunkAccumulator
gregw Apr 25, 2024
34a3390
Removed ByteBufferAggregator usage
gregw Apr 26, 2024
52fffa0
Improved BufferedContentSink
gregw Apr 26, 2024
9848775
Improved BufferedContentSink
gregw Apr 26, 2024
1242113
Improved BufferedContentSink
gregw Apr 27, 2024
8201d9e
Deprecate various external extensions of RBB
gregw Apr 28, 2024
b52ffb5
Fixes for retain during debugging.
gregw Apr 29, 2024
55bd55b
Merge branch 'jetty-12.0.x' into fix/jetty-12/10541/byteBufferAccumul…
gregw May 1, 2024
c99778c
Revert tags
gregw May 1, 2024
d132dcc
Avoid flaky tests
gregw May 1, 2024
902874c
Made NonPooled wrapping explicit
gregw May 1, 2024
20f5d0b
Merge branch 'jetty-12.0.x' into fix/jetty-12/10541/byteBufferAccumul…
gregw May 1, 2024
924ab56
Merge branch 'jetty-12.0.x' into fix/jetty-12/10541/byteBufferAccumul…
gregw May 5, 2024
c1bfecf
javadoc
gregw May 6, 2024
4ec6b9b
Avoid bizarre wait on buffer init
gregw May 6, 2024
2693f67
added add and put methods
gregw May 8, 2024
f0196c0
Renamed Appendable to Mutable
gregw May 10, 2024
f331973
WIP on HTTP2
gregw May 10, 2024
3879f67
WIP on HTTP2
gregw May 10, 2024
13c9517
WIP on HTTP2
gregw May 12, 2024
f75c8b2
WIP
gregw May 12, 2024
35dd389
Merge remote-tracking branch 'origin/jetty-12.0.x' into fix/jetty-12/…
gregw May 12, 2024
ba7963d
WIP
gregw May 13, 2024
e2bc67b
Merge branch 'jetty-12.0.x' into fix/jetty-12/10541/byteBufferAccumul…
gregw May 13, 2024
36b677b
reverted HTTP2 WIP
gregw May 13, 2024
91940ff
Changed add to throw OverFlow
gregw May 13, 2024
9742cfe
WIP on HTTP2
gregw May 13, 2024
20b63b2
Fixed HTTP2ServerTest
gregw May 14, 2024
4967343
WIP on HTTP2
gregw May 14, 2024
41af493
WIP on HTTP2
gregw May 14, 2024
ce73c3c
WIP on HTTP2
gregw May 15, 2024
73685ae
Merge branch 'jetty-12.0.x' into fix/jetty-12/10541/byteBufferAccumul…
gregw May 15, 2024
00733a4
WIP on HTTP2
gregw May 15, 2024
e529bde
Added toDetailString
gregw May 15, 2024
6c5ae50
cleanup
gregw May 15, 2024
7516400
fixed clear usage
gregw May 15, 2024
49a1605
fixed clear usage
gregw May 15, 2024
37f8239
fixed ws message buffer
gregw May 15, 2024
f5b770b
fixed ws message buffer
gregw May 15, 2024
22d880d
Marked test as flaky
gregw May 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -109,7 +109,7 @@ public RetainableByteBuffer decode(ByteBuffer compressed)
RetainableByteBuffer result = acquire(length);
for (RetainableByteBuffer buffer : _inflateds)
{
BufferUtil.append(result.getByteBuffer(), buffer.getByteBuffer());
buffer.appendTo(result);
buffer.release();
}
_inflateds.clear();
Expand Down
Expand Up @@ -1114,7 +1114,7 @@ else if (type != HttpTokens.Type.SPACE && type != HttpTokens.Type.HTAB)
if (state == State.EPILOGUE)
notifyComplete();
else
throw new EOFException("unexpected EOF");
throw new EOFException("unexpected EOF in " + state);
}
}
catch (Throwable x)
Expand Down
Expand Up @@ -478,6 +478,12 @@ public boolean canRetain()
return retainable.canRetain();
}

@Override
public boolean isRetained()
{
return retainable.isRetained();
}

@Override
public void retain()
{
Expand Down
Expand Up @@ -438,7 +438,7 @@ public default void onClosed(Stream stream)
/**
* <p>A {@link Retainable} wrapper of a {@link DataFrame}.</p>
*/
public abstract static class Data implements Retainable
abstract class Data implements Retainable
{
public static Data eof(int streamId)
{
Expand Down
Expand Up @@ -445,6 +445,12 @@ public boolean canRetain()
return retainable.canRetain();
}

@Override
public boolean isRetained()
{
return retainable.isRetained();
}

@Override
public void retain()
{
Expand Down
Expand Up @@ -384,7 +384,7 @@ public default void onFailure(Stream.Server stream, long error, Throwable failur
*
* @see Stream#readData()
*/
public abstract static class Data implements Retainable
abstract class Data implements Retainable
{
public static final Data EOF = new EOFData();

Expand Down
Expand Up @@ -666,9 +666,19 @@ public Tracking(int minCapacity, int maxCapacity, int maxBucketSize)
this(minCapacity, maxCapacity, maxBucketSize, -1L, -1L);
}

public Tracking(int minCapacity, int factor, int maxCapacity, int maxBucketSize)
{
this(minCapacity, factor, maxCapacity, maxBucketSize, 0L, 0L);
gregw marked this conversation as resolved.
Show resolved Hide resolved
}

public Tracking(int minCapacity, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
{
super(minCapacity, -1, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory);
this(minCapacity, -1, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory);
}

public Tracking(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
{
super(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory);
}

@Override
Expand Down
Expand Up @@ -27,7 +27,9 @@
/**
* An accumulator of {@link Content.Chunk}s used to facilitate minimal copy
* aggregation of multiple chunks.
* @deprecated use {@link Content.Source#asRetainableByteBuffer(Content.Source, ByteBufferPool, boolean, int)} instead.
*/
@Deprecated
public class ChunkAccumulator
{
private static final ByteBufferPool NON_POOLING = new ByteBufferPool.NonPooling();
gregw marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -109,7 +111,7 @@ public RetainableByteBuffer take(ByteBufferPool pool, boolean direct)
for (Chunk chunk : _chunks)
{
offset += chunk.remaining();
BufferUtil.append(buffer.getByteBuffer(), chunk.getByteBuffer());
chunk.appendTo(buffer);
chunk.release();
}
assert offset == _length;
Expand Down
125 changes: 67 additions & 58 deletions jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Content.java
Expand Up @@ -13,6 +13,7 @@

package org.eclipse.jetty.io;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -33,6 +34,7 @@
import org.eclipse.jetty.io.internal.ContentCopier;
import org.eclipse.jetty.io.internal.ContentSourceByteBuffer;
import org.eclipse.jetty.io.internal.ContentSourceConsumer;
import org.eclipse.jetty.io.internal.ContentSourceRetainableByteBuffer;
import org.eclipse.jetty.io.internal.ContentSourceString;
import org.eclipse.jetty.util.Blocker;
import org.eclipse.jetty.util.BufferUtil;
Expand Down Expand Up @@ -192,7 +194,7 @@ static ByteBuffer asByteBuffer(Source source) throws IOException
*/
static CompletableFuture<byte[]> asByteArrayAsync(Source source, int maxSize)
{
return new ChunkAccumulator().readAll(source, maxSize);
return asRetainableByteBuffer(source, null, false, maxSize).thenApply(rbb -> rbb.getByteBuffer().array());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mental note: asRetainableByteBuffer() should have been called toRetainableByteBuffer(): as prefix for modifying the presentation (wrap/unwrap) and to prefix for anything implying havier work like mem copies.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've always thought that as prefix should be used when returning a different view of the same object. The to prefix should be used when making a new object from the current one.

So this could be thought of as a to as it is creates a whole new object.... but that object also mutates the original object (by consuming all its input), so it is kind of a view onto the original object.... if the resulting RBB delays reading the source until it knows how it is going to be used, then it is really is a view onto the source. So as works as well.

I'm enough on the fence not to disrupt things by changing the name at this point.

}

/**
Expand Down Expand Up @@ -230,7 +232,31 @@ static CompletableFuture<ByteBuffer> asByteBufferAsync(Source source, int maxSiz
*/
static CompletableFuture<RetainableByteBuffer> asRetainableByteBuffer(Source source, ByteBufferPool pool, boolean direct, int maxSize)
{
return new ChunkAccumulator().readAll(source, pool, direct, maxSize);
Promise.Completable<RetainableByteBuffer> promise = new Promise.Completable<>()
{
@Override
public void succeeded(RetainableByteBuffer result)
{
result.retain();
super.succeeded(result);
}
};
asRetainableByteBuffer(source, pool, direct, maxSize, promise);
return promise;
}

/**
* <p>Reads, non-blocking, the whole content source into a {@link RetainableByteBuffer}.</p>
*
* @param source the source to read
* @param pool The {@link ByteBufferPool} to acquire the buffer from, or null for a non {@link Retainable} buffer
* @param direct True if the buffer should be direct.
* @param maxSize The maximum size to read, or -1 for no limit
* @param promise the promise to notify when the whole content has been read into a RetainableByteBuffer.
*/
static void asRetainableByteBuffer(Source source, ByteBufferPool pool, boolean direct, int maxSize, Promise<RetainableByteBuffer> promise)
{
new ContentSourceRetainableByteBuffer(source, pool, direct, maxSize, promise).run();
}

/**
Expand Down Expand Up @@ -470,6 +496,43 @@ default boolean rewind()
*/
public interface Sink
{
/**
* <p>Wraps the given {@link OutputStream} as a {@link Sink}.
* @param out The stream to wrap
* @return A sink wrapping the stream
*/
static Sink from(OutputStream out)
{
return new Sink()
{
boolean closed;

@Override
public void write(boolean last, ByteBuffer byteBuffer, Callback callback)
{
if (closed)
{
callback.failed(new EOFException());
return;
}
try
{
BufferUtil.writeTo(byteBuffer, out);
if (last)
{
closed = true;
out.close();
}
callback.succeeded();
}
catch (Throwable t)
{
callback.failed(t);
}
}
};
}

/**
* <p>Wraps the given content sink with a buffering sink.</p>
*
Expand Down Expand Up @@ -561,7 +624,7 @@ static void write(Sink sink, boolean last, String utf8Content, Callback callback
* to release the {@code ByteBuffer} back into a pool), or the
* {@link #release()} method overridden.</p>
*/
public interface Chunk extends Retainable
public interface Chunk extends RetainableByteBuffer
{
/**
* <p>An empty, non-last, chunk.</p>
Expand Down Expand Up @@ -804,11 +867,6 @@ static boolean isFailure(Chunk chunk, boolean last)
return chunk != null && chunk.getFailure() != null && chunk.isLast() == last;
}

/**
* @return the ByteBuffer of this Chunk
*/
ByteBuffer getByteBuffer();

/**
* Get a failure (which may be from a {@link Source#fail(Throwable) failure} or
* a {@link Source#fail(Throwable, boolean) warning}), if any, associated with the chunk.
Expand All @@ -831,59 +889,10 @@ default Throwable getFailure()
*/
boolean isLast();

/**
* @return the number of bytes remaining in this Chunk
*/
default int remaining()
{
return getByteBuffer().remaining();
}

/**
* @return whether this Chunk has remaining bytes
*/
default boolean hasRemaining()
{
return getByteBuffer().hasRemaining();
}

/**
* <p>Copies the bytes from this Chunk to the given byte array.</p>
*
* @param bytes the byte array to copy the bytes into
* @param offset the offset within the byte array
* @param length the maximum number of bytes to copy
* @return the number of bytes actually copied
*/
default int get(byte[] bytes, int offset, int length)
{
ByteBuffer b = getByteBuffer();
if (b == null || !b.hasRemaining())
return 0;
length = Math.min(length, b.remaining());
b.get(bytes, offset, length);
return length;
}

/**
* <p>Skips, advancing the ByteBuffer position, the given number of bytes.</p>
*
* @param length the maximum number of bytes to skip
* @return the number of bytes actually skipped
*/
default int skip(int length)
{
if (length == 0)
return 0;
ByteBuffer byteBuffer = getByteBuffer();
length = Math.min(byteBuffer.remaining(), length);
byteBuffer.position(byteBuffer.position() + length);
return length;
}

/**
* @return an immutable version of this Chunk
*/
@Deprecated(forRemoval = true, since = "12.0.9")
default Chunk asReadOnly()
{
if (getByteBuffer().isReadOnly())
Expand Down
Expand Up @@ -62,6 +62,15 @@ default boolean canRetain()
return false;
}

/**
* @return whether this instance is retained
* @see ReferenceCounter#isRetained()
*/
default boolean isRetained()
{
return false;
}

/**
* <p>Retains this resource, potentially incrementing a reference count if there are resources that will be released.</p>
*/
Expand Down Expand Up @@ -103,6 +112,12 @@ public boolean canRetain()
return getWrapped().canRetain();
}

@Override
public boolean isRetained()
{
return getWrapped().isRetained();
}

@Override
public void retain()
{
Expand Down Expand Up @@ -141,7 +156,7 @@ public ReferenceCounter()
this(1);
}

protected ReferenceCounter(int initialCount)
public ReferenceCounter(int initialCount)
gregw marked this conversation as resolved.
Show resolved Hide resolved
{
references = new AtomicInteger(initialCount);
}
Expand Down Expand Up @@ -195,11 +210,7 @@ public boolean release()
return ref == 0;
}

/**
* <p>Returns whether {@link #retain()} has been called at least one more time than {@link #release()}.</p>
*
* @return whether this buffer is retained
*/
@Override
gregw marked this conversation as resolved.
Show resolved Hide resolved
public boolean isRetained()
{
return references.get() > 1;
Expand Down