Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #5499 - Reduce buffer allocations and copying from ByteAccumulator #5574

Merged
merged 23 commits into from
Dec 15, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
1f5b446
Fix issue #5499
leonchen83 Oct 28, 2020
05dafb8
Move work on ByteAccumulator to jetty-util
lachlan-roberts Nov 5, 2020
145bcff
Merge remote-tracking branch 'origin/jetty-9.4.x' into jetty-9.4.x-By…
lachlan-roberts Nov 5, 2020
a3c3e24
Use the ByteBufferPool in the ByteAccumulator
lachlan-roberts Nov 5, 2020
7bcae99
allow writing directly into the ByteAccumulator
lachlan-roberts Nov 5, 2020
6e95722
ByteAccumulator transferTo expects buffer in fill mode.
lachlan-roberts Nov 5, 2020
3c44df0
changes from review
lachlan-roberts Nov 6, 2020
8dc0d99
adjust minimum space in ByteBufferAccumulator before buffer allocation
lachlan-roberts Nov 6, 2020
595d4bf
changes from review
lachlan-roberts Nov 11, 2020
d75e6de
add takeByteBuffer method to ByteBufferOutputStream2
lachlan-roberts Nov 11, 2020
e0031e0
Issue #5499 - takeBuffer releases all the buffers in the list
lachlan-roberts Nov 12, 2020
e7bed39
Issue #5499 - add javadoc for ByteBufferAccumulator
lachlan-roberts Nov 12, 2020
a1aa5dc
Issue #5499 - use ByteBufferAccumulator for websocket compression
lachlan-roberts Nov 16, 2020
5788fe6
Fix ByteBufferAccumulator minSize
lachlan-roberts Nov 16, 2020
7c46d96
Issue #5499 - add tests for ByteBufferAccumulator
lachlan-roberts Nov 17, 2020
f63a741
use local length field for ByteAccumulator.getLength()
lachlan-roberts Nov 17, 2020
2629845
update ByteAccumulator length on copies
lachlan-roberts Nov 17, 2020
a51b5db
Merge remote-tracking branch 'origin/jetty-9.4.x' into jetty-9.4.x-By…
lachlan-roberts Nov 30, 2020
602cd7e
throw ArithmeticException on integer overflow from size
lachlan-roberts Nov 30, 2020
6dce1cb
Make ByteBufferAccumulator direct configurable
lachlan-roberts Nov 30, 2020
8aedc50
fix missing usage of the new _direct field in ByteBufferAccumulator
lachlan-roberts Dec 1, 2020
8b3cffb
Merge remote-tracking branch 'origin/jetty-9.4.x' into jetty-9.4.x-By…
lachlan-roberts Dec 8, 2020
41cffa0
Merge remote-tracking branch 'origin/jetty-9.4.x' into jetty-9.4.x-By…
lachlan-roberts Dec 15, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
138 changes: 138 additions & 0 deletions jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//

package org.eclipse.jetty.io;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

import org.eclipse.jetty.util.BufferUtil;

public class ByteBufferAccumulator implements AutoCloseable
{
private final List<ByteBuffer> _buffers = new ArrayList<>();
private final ByteBufferPool _bufferPool;

public ByteBufferAccumulator()
{
this(null);
}

public ByteBufferAccumulator(ByteBufferPool bufferPool)
{
_bufferPool = (bufferPool == null) ? new NullByteBufferPool() : bufferPool;
}

public int getLength()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This length accumulation could potentially overflow Integer.MAX_VALUE. We should handle this by either returning a long from here, or detecting the integer overflow and throwing an exception.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the accumulation needs to eventually fit into a single buffer, the int should do with overflow detection elsewhere.
An ultimate buffer abstraction could allow for chains of buffers that can be gather written for a long length, but not this abstraction

{
int length = 0;
for (ByteBuffer buffer : _buffers)
length += buffer.remaining();
return length;
}

public ByteBuffer ensureBuffer(int minSize, int minAllocationSize)
{
ByteBuffer buffer = _buffers.isEmpty() ? BufferUtil.EMPTY_BUFFER : _buffers.get(_buffers.size() - 1);
if (BufferUtil.space(buffer) <= minSize)
{
buffer = _bufferPool.acquire(minAllocationSize, false);
gregw marked this conversation as resolved.
Show resolved Hide resolved
lorban marked this conversation as resolved.
Show resolved Hide resolved
_buffers.add(buffer);
}

return buffer;
}

public void copyBytes(byte[] buf, int offset, int length)
{
copyBuffer(BufferUtil.toBuffer(buf, offset, length));
}

public void copyBuffer(ByteBuffer buffer)
gregw marked this conversation as resolved.
Show resolved Hide resolved
{
while (buffer.hasRemaining())
{
ByteBuffer b = ensureBuffer(0, buffer.remaining());
int pos = BufferUtil.flipToFill(b);
BufferUtil.put(buffer, b);
BufferUtil.flipToFlush(b, pos);
}
}

public ByteBuffer takeByteBuffer()
{
int length = getLength();
ByteBuffer combinedBuffer = _bufferPool.acquire(length, false);
for (ByteBuffer buffer : _buffers)
{
combinedBuffer.put(buffer);
}
return combinedBuffer;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public ByteBuffer takeByteBuffer()
{
int length = getLength();
ByteBuffer combinedBuffer = _bufferPool.acquire(length, false);
for (ByteBuffer buffer : _buffers)
{
combinedBuffer.put(buffer);
}
return combinedBuffer;
}
public ByteBuffer takeByteBuffer()
{
int length = getLength();
switch(length)
{
case 0:
return null; // TODO or empty buffer from pool?
case 1:
return _buffers.remove(0);
default:
  ByteBuffer combinedBuffer = _bufferPool.acquire(length, false);
  for (ByteBuffer buffer : _buffers)
  {
  combinedBuffer.put(buffer);
  _bufferPool.release(buffer);
  }
  _buffers.clear();
  return combinedBuffer;
 }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really like the idea of returning null from here, it will introduce null checks everywhere this is used.
How can we get empty buffer from pool, will _bufferPool.aquire(0, false) work, because that's what we are currently doing for 0 length.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also makes it so you can only call takeBuffer() once, but you can call toBuffer() multiple times and get the same buffer.

Why wouldn't we just release these buffers on close() like we would do in other cases?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the idea of takeBuffer is that you take the buffer and all it's data, hence it can't be called again.
The intent is that once you call takeBuffer you will pass the resulting buffer along some further chain of events and then only release it once it is free.
freeing the buffer on close makes no sense for most uses, as we want to have a accumulator that lives as long as the connection, during which time it may create many buffers that are taken and passed on.

Happy not to return null.


public ByteBuffer toByteBuffer()
{
if (_buffers.size() == 1)
return _buffers.get(0);

ByteBuffer combinedBuffer = takeByteBuffer();
_buffers.forEach(_bufferPool::release);
_buffers.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_buffers.forEach(_bufferPool::release);
_buffers.clear();

_buffers.add(combinedBuffer);
return combinedBuffer;
}

public byte[] toByteArray()
{
int length = getLength();
if (length == 0)
return new byte[0];

byte[] bytes = new byte[length];
writeTo(BufferUtil.toBuffer(bytes));
return bytes;
}

public void writeTo(ByteBuffer buffer)
{
int pos = BufferUtil.flipToFill(buffer);
for (ByteBuffer bb : _buffers)
{
buffer.put(bb);
}
BufferUtil.flipToFlush(buffer, pos);
}

public void writeTo(OutputStream out) throws IOException
{
for (ByteBuffer bb : _buffers)
{
BufferUtil.writeTo(bb, out);
}
}

@Override
public void close()
{
_buffers.forEach(_bufferPool::release);
_buffers.clear();
}
gregw marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//

package org.eclipse.jetty.io;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;

/**
* This class implements an output stream in which the data is written into a list of ByteBuffer,
* the buffer list automatically grows as data is written to it, the buffers are taken from the
* supplied {@link ByteBufferPool} or freshly allocated if one is not supplied.
*
* Designed to mimic {@link java.io.ByteArrayOutputStream} but with better memory usage, and less copying.
*/
public class ByteBufferOutputStream2 extends OutputStream
{
private final ByteBufferAccumulator _accumulator;
private final ByteBufferPool _bufferPool;
lachlan-roberts marked this conversation as resolved.
Show resolved Hide resolved
private int _size = 0;

public ByteBufferOutputStream2()
{
this(null);
}

public ByteBufferOutputStream2(ByteBufferPool bufferPool)
{
_bufferPool = (bufferPool == null) ? new NullByteBufferPool() : bufferPool;
_accumulator = new ByteBufferAccumulator(bufferPool);
lachlan-roberts marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Get an aggregated content written to the OutputStream in a ByteBuffer.
* @return the content in a ByteBuffer.
*/
public ByteBuffer toByteBuffer()
gregw marked this conversation as resolved.
Show resolved Hide resolved
{
return _accumulator.toByteBuffer();
}

lachlan-roberts marked this conversation as resolved.
Show resolved Hide resolved
/**
* Get an aggregated content written to the OutputStream in a byte array.
* @return the content in a byte array.
*/
public byte[] toByteArray()
{
return _accumulator.toByteArray();
}

public int size()
{
return _size;
}

@Override
public void write(int b)
{
write(new byte[]{(byte)b}, 0, 1);
}

@Override
public void write(byte[] b, int off, int len)
{
_size += len;
_accumulator.copyBytes(b, off, len);
}

public void write(ByteBuffer buffer)
{
gregw marked this conversation as resolved.
Show resolved Hide resolved
_size += buffer.remaining();
_accumulator.copyBuffer(buffer);
}

public void writeTo(ByteBuffer buffer)
{
_accumulator.writeTo(buffer);
}

public void writeTo(OutputStream out) throws IOException
{
_accumulator.writeTo(out);
}

@Override
public void close()
{
_accumulator.close();
_size = 0;
}

@Override
public synchronized String toString()
{
return String.format("%s@%x{size=%d, bufferPool=%s, byteAccumulator=%s}", getClass().getSimpleName(),
hashCode(), _size, _bufferPool, _accumulator);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//

package org.eclipse.jetty.io;

import java.nio.ByteBuffer;

import org.eclipse.jetty.util.BufferUtil;

public class NullByteBufferPool implements ByteBufferPool
{
@Override
public ByteBuffer acquire(int size, boolean direct)
{
if (direct)
return BufferUtil.allocateDirect(size);
else
return BufferUtil.allocate(size);
}

@Override
public void release(ByteBuffer buffer)
{
BufferUtil.clear(buffer);
}
}