Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Content.Chunk a RetainableByteBuffer #11598

Closed
wants to merge 80 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
bc02230
Make chunk a RBB
gregw Mar 31, 2024
078f080
Make chunk a RBB
gregw Mar 31, 2024
e2818d4
revert EventsHandler change
gregw Mar 31, 2024
e86ab37
WIP
gregw Mar 31, 2024
76db844
Merge branch 'jetty-12.0.x' into fix/jetty-12/10541/byteBufferAccumul…
gregw Apr 2, 2024
c878ddc
updates from review
gregw Apr 2, 2024
df38486
updates from review
gregw Apr 2, 2024
59d2d41
updates from review
gregw Apr 3, 2024
2ef1be9
more tests
gregw Apr 3, 2024
21451a6
inline asReadOnly
gregw Apr 3, 2024
7e61866
Added Appendable as a replacement for both Accumulator and Aggregator
gregw Apr 10, 2024
2553d72
Better slice implementation
gregw Apr 10, 2024
b69fc2a
protect from modification if retained
gregw Apr 10, 2024
1b05042
avoid flip-flopping in append loops
gregw Apr 10, 2024
4f9e5a3
long methods
gregw Apr 10, 2024
cc494bb
use some appends
gregw Apr 10, 2024
a2311ff
javadoc
gregw Apr 10, 2024
ce310ba
Fixed test
gregw Apr 11, 2024
48b0e4c
appendable wrap
gregw Apr 11, 2024
7d92f48
detailString
gregw Apr 11, 2024
2bf7d6c
Revert NON_POOLING change
gregw Apr 11, 2024
059719c
Merge branch 'jetty-12.0.x' into fix/jetty-12/10541/byteBufferAccumul…
gregw Apr 12, 2024
6bc618d
Testing Fixed
gregw Apr 12, 2024
fe175ef
Limit usage to buffer size requests
gregw Apr 12, 2024
22aadc0
Simplified hierarchy and naming
gregw Apr 12, 2024
d2f7ddf
Simplified hierarchy and naming
gregw Apr 12, 2024
31b24b4
more testing
gregw Apr 12, 2024
3ac2df7
improved test
gregw Apr 12, 2024
a514d68
disable leaky tests
gregw Apr 12, 2024
068a1aa
removed redundant NetworkBuffer class
gregw Apr 13, 2024
a0facb7
Merge branch 'jetty-12.0.x' into fix/jetty-12/10541/byteBufferAccumul…
gregw Apr 13, 2024
608a107
Merge branch 'jetty-12.0.x' into fix/jetty-12/10541/byteBufferAccumul…
gregw Apr 17, 2024
7294f0f
Merge branch 'jetty-12.0.x' into fix/jetty-12/10541/byteBufferAccumul…
gregw Apr 18, 2024
fe4beb3
Merge remote-tracking branch 'origin/jetty-12.0.x' into fix/jetty-12/…
gregw Apr 22, 2024
d4c08b8
Better heuristic for retaining buffers
gregw Apr 22, 2024
c3e1932
Merge branch 'jetty-12.0.x' into fix/jetty-12/10541/byteBufferAccumul…
gregw Apr 24, 2024
182e8cf
Merge branch 'jetty-12.0.x' into fix/jetty-12/10541/byteBufferAccumul…
gregw Apr 25, 2024
55029dc
Updated BufferedResponseHandler
gregw Apr 25, 2024
1ce874b
Use RBB in BAEP
gregw Apr 25, 2024
791b41a
added takeRetainableByteBuffer
gregw Apr 25, 2024
0e7f7d5
Reworked ChunkAccumulator
gregw Apr 25, 2024
34a3390
Removed ByteBufferAggregator usage
gregw Apr 26, 2024
52fffa0
Improved BufferedContentSink
gregw Apr 26, 2024
9848775
Improved BufferedContentSink
gregw Apr 26, 2024
1242113
Improved BufferedContentSink
gregw Apr 27, 2024
8201d9e
Deprecate various external extensions of RBB
gregw Apr 28, 2024
b52ffb5
Fixes for retain during debugging.
gregw Apr 29, 2024
55bd55b
Merge branch 'jetty-12.0.x' into fix/jetty-12/10541/byteBufferAccumul…
gregw May 1, 2024
c99778c
Revert tags
gregw May 1, 2024
d132dcc
Avoid flaky tests
gregw May 1, 2024
902874c
Made NonPooled wrapping explicit
gregw May 1, 2024
20f5d0b
Merge branch 'jetty-12.0.x' into fix/jetty-12/10541/byteBufferAccumul…
gregw May 1, 2024
924ab56
Merge branch 'jetty-12.0.x' into fix/jetty-12/10541/byteBufferAccumul…
gregw May 5, 2024
c1bfecf
javadoc
gregw May 6, 2024
4ec6b9b
Avoid bizarre wait on buffer init
gregw May 6, 2024
2693f67
added add and put methods
gregw May 8, 2024
f0196c0
Renamed Appendable to Mutable
gregw May 10, 2024
f331973
WIP on HTTP2
gregw May 10, 2024
3879f67
WIP on HTTP2
gregw May 10, 2024
13c9517
WIP on HTTP2
gregw May 12, 2024
f75c8b2
WIP
gregw May 12, 2024
35dd389
Merge remote-tracking branch 'origin/jetty-12.0.x' into fix/jetty-12/…
gregw May 12, 2024
ba7963d
WIP
gregw May 13, 2024
e2bc67b
Merge branch 'jetty-12.0.x' into fix/jetty-12/10541/byteBufferAccumul…
gregw May 13, 2024
36b677b
reverted HTTP2 WIP
gregw May 13, 2024
91940ff
Changed add to throw OverFlow
gregw May 13, 2024
9742cfe
WIP on HTTP2
gregw May 13, 2024
20b63b2
Fixed HTTP2ServerTest
gregw May 14, 2024
4967343
WIP on HTTP2
gregw May 14, 2024
41af493
WIP on HTTP2
gregw May 14, 2024
ce73c3c
WIP on HTTP2
gregw May 15, 2024
73685ae
Merge branch 'jetty-12.0.x' into fix/jetty-12/10541/byteBufferAccumul…
gregw May 15, 2024
00733a4
WIP on HTTP2
gregw May 15, 2024
e529bde
Added toDetailString
gregw May 15, 2024
6c5ae50
cleanup
gregw May 15, 2024
7516400
fixed clear usage
gregw May 15, 2024
49a1605
fixed clear usage
gregw May 15, 2024
37f8239
fixed ws message buffer
gregw May 15, 2024
f5b770b
fixed ws message buffer
gregw May 15, 2024
22d880d
Marked test as flaky
gregw May 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -245,7 +245,7 @@ private boolean parseAndFill()
while (true)
{
if (LOG.isDebugEnabled())
LOG.debug("Parsing {} in {}", BufferUtil.toDetailString(networkBuffer.getByteBuffer()), this);
LOG.debug("Parsing {} in {}", networkBuffer, this);
// Always parse even empty buffers to advance the parser.
if (parse())
{
Expand Down Expand Up @@ -347,7 +347,7 @@ private boolean parse()
if (getHttpChannel().isTunnel(method, status))
return true;

if (!networkBuffer.hasRemaining())
if (networkBuffer.isEmpty())
return false;

if (!HttpStatus.isInformational(status))
Expand All @@ -359,7 +359,7 @@ private boolean parse()
return false;
}

if (!networkBuffer.hasRemaining())
if (networkBuffer.isEmpty())
return false;
}
}
Expand Down
Expand Up @@ -52,8 +52,10 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;

import static org.eclipse.jetty.io.Content.Source.asByteBuffer;
import static org.eclipse.jetty.toolchain.test.StackUtils.supply;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.eclipse.jetty.util.BufferUtil.toBuffer;
import static org.eclipse.jetty.util.BufferUtil.toHexString;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -169,7 +171,7 @@ protected void process(MultiPartFormData.Parts parts) throws Exception
MultiPart.Part part = parts.iterator().next();
assertEquals(name, part.getName());
assertEquals("text/plain", part.getHeaders().get(HttpHeader.CONTENT_TYPE));
assertArrayEquals(data, Content.Source.asByteBuffer(part.getContentSource()).array());
assertEquals(toHexString(toBuffer(data)), toHexString(asByteBuffer(part.getContentSource())));
}
});

Expand Down Expand Up @@ -222,7 +224,7 @@ protected void process(MultiPartFormData.Parts parts) throws Exception
assertEquals(contentType, part.getHeaders().get(HttpHeader.CONTENT_TYPE));
assertEquals(fileName, part.getFileName());
assertEquals(data.length, part.getContentSource().getLength());
assertArrayEquals(data, Content.Source.asByteBuffer(part.getContentSource()).array());
assertEquals(toHexString(toBuffer(data)), toHexString(asByteBuffer(part.getContentSource())));
}
});

Expand Down Expand Up @@ -336,7 +338,7 @@ protected void process(MultiPartFormData.Parts parts) throws Exception
assertEquals("application/octet-stream", filePart.getHeaders().get(HttpHeader.CONTENT_TYPE));
assertEquals(tmpPath.getFileName().toString(), filePart.getFileName());
assertEquals(Files.size(tmpPath), filePart.getContentSource().getLength());
assertArrayEquals(data, Content.Source.asByteBuffer(filePart.getContentSource()).array());
assertEquals(toHexString(toBuffer(data)), toHexString(asByteBuffer(filePart.getContentSource())));
}
});

Expand Down Expand Up @@ -377,7 +379,7 @@ protected void process(MultiPartFormData.Parts parts) throws Exception
assertEquals("file", filePart.getName());
assertEquals("application/octet-stream", filePart.getHeaders().get(HttpHeader.CONTENT_TYPE));
assertEquals("fileName", filePart.getFileName());
assertArrayEquals(fileData, Content.Source.asByteBuffer(filePart.getContentSource()).array());
assertEquals(toHexString(toBuffer(fileData)), toHexString(asByteBuffer(filePart.getContentSource())));
}
});

Expand Down
Expand Up @@ -109,7 +109,7 @@ public RetainableByteBuffer decode(ByteBuffer compressed)
RetainableByteBuffer result = acquire(length);
for (RetainableByteBuffer buffer : _inflateds)
{
BufferUtil.append(result.getByteBuffer(), buffer.getByteBuffer());
buffer.appendTo(result);
buffer.release();
}
_inflateds.clear();
Expand Down
Expand Up @@ -1114,7 +1114,7 @@ else if (type != HttpTokens.Type.SPACE && type != HttpTokens.Type.HTAB)
if (state == State.EPILOGUE)
notifyComplete();
else
throw new EOFException("unexpected EOF");
throw new EOFException("unexpected EOF in " + state);
}
}
catch (Throwable x)
Expand Down
Expand Up @@ -53,7 +53,7 @@ public void before()
public RetainableByteBuffer acquire(int size, boolean direct)
{
counter.incrementAndGet();
return new RetainableByteBuffer.Wrapper(super.acquire(size, direct))
return new RetainableByteBuffer.Mutable.Wrapper(super.acquire(size, direct))
{
@Override
public boolean release()
Expand Down
Expand Up @@ -243,7 +243,7 @@ public void onHeaders(HeadersFrame frame)
@Override
public void onData(DataFrame frame)
{
NetworkBuffer networkBuffer = producer.networkBuffer;
RetainableByteBuffer.Mutable networkBuffer = producer.networkBuffer;
session.onData(new StreamData(frame, networkBuffer));
}

Expand Down Expand Up @@ -311,15 +311,15 @@ public void onFlushed(long bytes) throws IOException
protected class HTTP2Producer implements ExecutionStrategy.Producer
{
private final Callback fillableCallback = new FillableCallback();
private NetworkBuffer networkBuffer;
private RetainableByteBuffer.Mutable networkBuffer;
private boolean shutdown;
private boolean failed;

private void setInputBuffer(ByteBuffer byteBuffer)
{
acquireNetworkBuffer();
// TODO handle buffer overflow?
networkBuffer.put(byteBuffer);
if (!networkBuffer.append(byteBuffer))
LOG.warn("overflow");
}

@Override
Expand All @@ -346,7 +346,7 @@ public Runnable produce()
{
while (networkBuffer.hasRemaining())
{
session.getParser().parse(networkBuffer.getBuffer());
session.getParser().parse(networkBuffer.getByteBuffer());
if (failed)
return null;
}
Expand All @@ -364,7 +364,7 @@ public Runnable produce()

// Here we know that this.networkBuffer is not retained by
// application code: either it has been released, or it's a new one.
int filled = fill(getEndPoint(), networkBuffer.getBuffer());
int filled = fill(getEndPoint(), networkBuffer.getByteBuffer());
if (LOG.isDebugEnabled())
LOG.debug("Filled {} bytes in {}", filled, networkBuffer);

Expand Down Expand Up @@ -398,30 +398,30 @@ private void acquireNetworkBuffer()
{
if (networkBuffer == null)
{
networkBuffer = new NetworkBuffer();
networkBuffer = bufferPool.acquire(bufferSize, isUseInputDirectByteBuffers()).asMutable();
if (LOG.isDebugEnabled())
LOG.debug("Acquired {}", networkBuffer);
}
}

private void reacquireNetworkBuffer()
{
NetworkBuffer currentBuffer = networkBuffer;
RetainableByteBuffer.Mutable currentBuffer = networkBuffer;
if (currentBuffer == null)
throw new IllegalStateException();

if (currentBuffer.hasRemaining())
throw new IllegalStateException();

currentBuffer.release();
networkBuffer = new NetworkBuffer();
networkBuffer = bufferPool.acquire(bufferSize, isUseInputDirectByteBuffers()).asMutable();
if (LOG.isDebugEnabled())
LOG.debug("Reacquired {}<-{}", currentBuffer, networkBuffer);
}

private void releaseNetworkBuffer()
{
NetworkBuffer currentBuffer = networkBuffer;
RetainableByteBuffer.Mutable currentBuffer = networkBuffer;
if (currentBuffer == null)
throw new IllegalStateException();

Expand Down Expand Up @@ -479,69 +479,21 @@ public boolean canRetain()
}

@Override
public void retain()
{
retainable.retain();
}

@Override
public boolean release()
{
return retainable.release();
}
}

private class NetworkBuffer implements Retainable
{
private final RetainableByteBuffer delegate;

private NetworkBuffer()
{
delegate = bufferPool.acquire(bufferSize, isUseInputDirectByteBuffers());
}

public ByteBuffer getBuffer()
{
return delegate.getByteBuffer();
}

public boolean isRetained()
{
return delegate.isRetained();
}

public boolean hasRemaining()
{
return delegate.hasRemaining();
}

@Override
public boolean canRetain()
{
return delegate.canRetain();
return retainable.isRetained();
}

@Override
public void retain()
{
delegate.retain();
retainable.retain();
}

@Override
public boolean release()
{
if (delegate.release())
{
if (LOG.isDebugEnabled())
LOG.debug("Released retained {}", this);
return true;
}
return false;
}

private void put(ByteBuffer source)
{
BufferUtil.append(delegate.getByteBuffer(), source);
return retainable.release();
}
}
}
Expand Up @@ -57,9 +57,9 @@
import org.eclipse.jetty.http2.hpack.HpackException;
import org.eclipse.jetty.http2.internal.HTTP2Flusher;
import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.util.AtomicBiInteger;
import org.eclipse.jetty.util.Atomics;
Expand Down Expand Up @@ -1261,7 +1261,7 @@ public int getDataBytesRemaining()
return 0;
}

public abstract boolean generate(ByteBufferPool.Accumulator accumulator) throws HpackException;
public abstract boolean generate(RetainableByteBuffer.Mutable accumulator) throws HpackException;

public abstract long onFlushed(long bytes) throws IOException;

Expand Down Expand Up @@ -1348,7 +1348,7 @@ public int getFrameBytesGenerated()
}

@Override
public boolean generate(ByteBufferPool.Accumulator accumulator) throws HpackException
public boolean generate(RetainableByteBuffer.Mutable accumulator) throws HpackException
{
frameBytes = generator.control(accumulator, frame);
beforeSend();
Expand Down Expand Up @@ -1461,7 +1461,7 @@ public int getDataBytesRemaining()
}

@Override
public boolean generate(ByteBufferPool.Accumulator accumulator)
public boolean generate(RetainableByteBuffer.Mutable accumulator)
{
int dataRemaining = getDataBytesRemaining();

Expand Down
Expand Up @@ -438,7 +438,7 @@ public default void onClosed(Stream stream)
/**
* <p>A {@link Retainable} wrapper of a {@link DataFrame}.</p>
*/
public abstract static class Data implements Retainable
abstract class Data implements Retainable
{
public static Data eof(int streamId)
{
Expand Down
Expand Up @@ -19,9 +19,7 @@
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.FrameType;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.util.BufferUtil;

public class DataGenerator
{
Expand All @@ -32,12 +30,12 @@ public DataGenerator(HeaderGenerator headerGenerator)
this.headerGenerator = headerGenerator;
}

public int generate(ByteBufferPool.Accumulator accumulator, DataFrame frame, int maxLength)
public int generate(RetainableByteBuffer.Mutable accumulator, DataFrame frame, int maxLength)
{
return generateData(accumulator, frame.getStreamId(), frame.getByteBuffer(), frame.isEndStream(), maxLength);
}

public int generateData(ByteBufferPool.Accumulator accumulator, int streamId, ByteBuffer data, boolean last, int maxLength)
public int generateData(RetainableByteBuffer.Mutable accumulator, int streamId, ByteBuffer data, boolean last, int maxLength)
{
if (streamId < 0)
throw new IllegalArgumentException("Invalid stream id: " + streamId);
Expand All @@ -62,19 +60,17 @@ public int generateData(ByteBufferPool.Accumulator accumulator, int streamId, By
return Frame.HEADER_LENGTH + length;
}

private void generateFrame(ByteBufferPool.Accumulator accumulator, int streamId, ByteBuffer data, boolean last)
private void generateFrame(RetainableByteBuffer.Mutable accumulator, int streamId, ByteBuffer data, boolean last)
{
int length = data.remaining();

int flags = Flags.NONE;
if (last)
flags |= Flags.END_STREAM;

RetainableByteBuffer header = headerGenerator.generate(FrameType.DATA, Frame.HEADER_LENGTH + length, length, flags, streamId);
BufferUtil.flipToFlush(header.getByteBuffer(), 0);
accumulator.append(header);
headerGenerator.generate(accumulator, FrameType.DATA, Frame.HEADER_LENGTH + length, length, flags, streamId);
// Skip empty data buffers.
if (data.remaining() > 0)
accumulator.append(RetainableByteBuffer.wrap(data));
accumulator.add(data);
}
}
Expand Up @@ -20,7 +20,6 @@
import org.eclipse.jetty.http2.frames.FrameType;
import org.eclipse.jetty.http2.hpack.HpackEncoder;
import org.eclipse.jetty.http2.hpack.HpackException;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.util.BufferUtil;

Expand All @@ -33,11 +32,11 @@ protected FrameGenerator(HeaderGenerator headerGenerator)
this.headerGenerator = headerGenerator;
}

public abstract int generate(ByteBufferPool.Accumulator accumulator, Frame frame) throws HpackException;
public abstract int generate(RetainableByteBuffer.Mutable accumulator, Frame frame) throws HpackException;

protected RetainableByteBuffer generateHeader(FrameType frameType, int length, int flags, int streamId)
protected void generateHeader(RetainableByteBuffer.Mutable accumulator, FrameType frameType, int length, int flags, int streamId)
{
return headerGenerator.generate(frameType, Frame.HEADER_LENGTH + length, length, flags, streamId);
headerGenerator.generate(accumulator, frameType, Frame.HEADER_LENGTH + length, length, flags, streamId);
}

public int getMaxFrameSize()
Expand Down
Expand Up @@ -19,6 +19,7 @@
import org.eclipse.jetty.http2.hpack.HpackEncoder;
import org.eclipse.jetty.http2.hpack.HpackException;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.RetainableByteBuffer;

public class Generator
{
Expand Down Expand Up @@ -76,12 +77,12 @@ public void setMaxFrameSize(int maxFrameSize)
headerGenerator.setMaxFrameSize(maxFrameSize);
}

public int control(ByteBufferPool.Accumulator accumulator, Frame frame) throws HpackException
public int control(RetainableByteBuffer.Mutable accumulator, Frame frame) throws HpackException
{
return generators[frame.getType().getType()].generate(accumulator, frame);
}

public int data(ByteBufferPool.Accumulator accumulator, DataFrame frame, int maxLength)
public int data(RetainableByteBuffer.Mutable accumulator, DataFrame frame, int maxLength)
{
return dataGenerator.generate(accumulator, frame, maxLength);
}
Expand Down