Skip to content

Commit

Permalink
Issue #5499 - use ByteBufferAccumulator for websocket compression
Browse files Browse the repository at this point in the history
Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
  • Loading branch information
lachlan-roberts committed Nov 16, 2020
1 parent e7bed39 commit a1aa5dc
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 30 deletions.
Expand Up @@ -115,11 +115,13 @@ public ByteBuffer takeByteBuffer()

int length = getLength();
combinedBuffer = _bufferPool.acquire(length, false);
BufferUtil.clearToFill(combinedBuffer);
for (ByteBuffer buffer : _buffers)
{
combinedBuffer.put(buffer);
_bufferPool.release(buffer);
}
BufferUtil.flipToFlush(combinedBuffer, 0);
_buffers.clear();
return combinedBuffer;
}
Expand Down
Expand Up @@ -18,7 +18,6 @@

package org.eclipse.jetty.websocket.common.extensions.compress;

import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue;
Expand All @@ -28,6 +27,8 @@
import java.util.zip.Inflater;
import java.util.zip.ZipException;

import org.eclipse.jetty.io.ByteBufferAccumulator;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.compression.DeflaterPool;
Expand Down Expand Up @@ -474,7 +475,9 @@ private void compress(FrameEntry entry, boolean first)
// Get a chunk of the payload to avoid to blow
// the heap if the payload is a huge mapped file.
Frame frame = entry.frame;
boolean fin = frame.isFin();
ByteBuffer data = frame.getPayload();
Deflater deflater = getDeflater();

if (data == null)
data = BufferUtil.EMPTY_BUFFER;
Expand All @@ -484,40 +487,47 @@ private void compress(FrameEntry entry, boolean first)
if (LOG.isDebugEnabled())
LOG.debug("Compressing {}: {} bytes in {} bytes chunk", entry, remaining, outputLength);

boolean needsCompress = true;

Deflater deflater = getDeflater();

if (deflater.needsInput() && !supplyInput(deflater, data))
ByteBuffer payload = BufferUtil.EMPTY_BUFFER;
WriteCallback callback = this;
if (!deflater.needsInput() || supplyInput(deflater, data))
{
// no input supplied
needsCompress = false;
}

ByteArrayOutputStream out = new ByteArrayOutputStream();

byte[] output = new byte[outputLength];

boolean fin = frame.isFin();
ByteBufferPool bufferPool = getBufferPool();
try (ByteBufferAccumulator accumulator = new ByteBufferAccumulator(bufferPool))
{
while (true)
{
ByteBuffer buffer = accumulator.ensureBuffer(0, outputLength);
int compressed = deflater.deflate(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.capacity() - buffer.limit(), Deflater.SYNC_FLUSH);
buffer.limit(buffer.limit() + compressed);

// Compress the data
while (needsCompress)
{
int compressed = deflater.deflate(output, 0, outputLength, Deflater.SYNC_FLUSH);
if (LOG.isDebugEnabled())
LOG.debug("Wrote {} bytes to output buffer", accumulator);

// Append the output for the eventual frame.
if (LOG.isDebugEnabled())
LOG.debug("Wrote {} bytes to output buffer", compressed);
out.write(output, 0, compressed);
if (compressed <= 0)
break;
}

if (compressed < outputLength)
{
needsCompress = false;
ByteBuffer buffer = accumulator.takeByteBuffer();
payload = buffer;
callback = new WriteCallback()
{
@Override
public void writeFailed(Throwable x)
{
bufferPool.release(buffer);
Flusher.this.writeFailed(x);
}

@Override
public void writeSuccess()
{
bufferPool.release(buffer);
Flusher.this.writeSuccess();
}
};
}
}

ByteBuffer payload = ByteBuffer.wrap(out.toByteArray());

if (payload.remaining() > 0)
{
// Handle tail bytes generated by SYNC_FLUSH.
Expand Down Expand Up @@ -566,8 +576,7 @@ else if (fin)
}
chunk.setPayload(payload);
chunk.setFin(fin);

nextOutgoingFrame(chunk, this, entry.batchMode);
nextOutgoingFrame(chunk, callback, entry.batchMode);
}

@Override
Expand Down

0 comments on commit a1aa5dc

Please sign in to comment.