From c8dd4cdaebb496ce2f8e55a7676629f180854aa7 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Wed, 10 Mar 2021 21:30:43 +1100 Subject: [PATCH 1/3] Issue #6050 - fix bug for permessage deflate buffer aggregation Signed-off-by: Lachlan Roberts --- .../tests/PermessageDeflateBufferTest.java | 135 ++++++++++++++++++ .../compress/CompressExtension.java | 4 +- 2 files changed, 137 insertions(+), 2 deletions(-) create mode 100644 jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/PermessageDeflateBufferTest.java diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/PermessageDeflateBufferTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/PermessageDeflateBufferTest.java new file mode 100644 index 000000000000..494c5ef1b548 --- /dev/null +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/PermessageDeflateBufferTest.java @@ -0,0 +1,135 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests; + +import java.net.URI; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.eclipse.jetty.websocket.server.NativeWebSocketServletContainerInitializer; +import org.eclipse.jetty.websocket.server.WebSocketUpgradeFilter; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class PermessageDeflateBufferTest +{ + private Server server; + private ServerConnector connector; + private WebSocketClient client; + + // @checkstyle-disable-check : AvoidEscapedUnicodeCharactersCheck + private static final List DICT = Arrays.asList( + "\uD83C\uDF09", + "\uD83C\uDF0A", + "\uD83C\uDF0B", + "\uD83C\uDF0C", + "\uD83C\uDF0D", + "\uD83C\uDF0F", + "\uD83C\uDFC0", + "\uD83C\uDFC1", + "\uD83C\uDFC2", + "\uD83C\uDFC3", + "\uD83C\uDFC4", + "\uD83C\uDFC5" + ); + + private static String randomText() + { + Random rnd = new Random(); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 15000; i++) + { + sb.append(DICT.get(rnd.nextInt(DICT.size()))); + } + return sb.toString(); + } + + @BeforeEach + public void before() throws Exception + { + server = new Server(); + connector = new ServerConnector(server); + server.addConnector(connector); + + ServletContextHandler contextHandler = new ServletContextHandler(ServletContextHandler.SESSIONS); + contextHandler.setContextPath("/"); + server.setHandler(contextHandler); + WebSocketUpgradeFilter.configure(contextHandler); + NativeWebSocketServletContainerInitializer.configure(contextHandler, (context, container) -> + { + container.getPolicy().setMaxTextMessageBufferSize(65535); + container.getPolicy().setInputBufferSize(16384); + container.addMapping("/", ServerSocket.class); + }); + + server.start(); + client = new WebSocketClient(); + client.start(); + } + + @AfterEach + public void after() throws Exception + { + client.stop(); + server.stop(); + } + + @WebSocket + public static class ServerSocket extends EchoSocket + { + @Override + public void onError(Throwable cause) + { + cause.printStackTrace(); + super.onError(cause); + } + } + + @Test + public void testPermessageDeflateAggregation() throws Exception + { + EventSocket socket = new EventSocket(); + ClientUpgradeRequest clientUpgradeRequest = new ClientUpgradeRequest(); + clientUpgradeRequest.addExtensions("permessage-deflate"); + + URI uri = URI.create("ws://localhost:" + connector.getLocalPort()); + Session session = client.connect(socket, uri, clientUpgradeRequest).get(5, TimeUnit.SECONDS); + + String s = randomText(); + session.getRemote().sendString(s); + assertThat(socket.textMessages.poll(5, TimeUnit.SECONDS), is(s)); + + session.close(); + assertTrue(socket.closeLatch.await(5, TimeUnit.SECONDS)); + } +} diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java index 2f3a630cdb36..d57da313e823 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java @@ -198,7 +198,7 @@ protected void decompress(ByteAccumulator accumulator, ByteBuffer buf) throws Da while (true) { ByteBuffer buffer = accumulator.ensureBuffer(DECOMPRESS_BUF_SIZE); - int read = inflater.inflate(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.capacity() - buffer.limit()); + int read = inflater.inflate(buffer.array(), buffer.arrayOffset() + buffer.limit(), buffer.capacity() - buffer.limit()); buffer.limit(buffer.limit() + read); accumulator.addLength(read); if (LOG.isDebugEnabled()) @@ -496,7 +496,7 @@ private void compress(FrameEntry entry, boolean first) while (true) { ByteBuffer buffer = accumulator.ensureBuffer(8, outputLength); - int compressed = deflater.deflate(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.capacity() - buffer.limit(), Deflater.SYNC_FLUSH); + int compressed = deflater.deflate(buffer.array(), buffer.arrayOffset() + buffer.limit(), buffer.capacity() - buffer.limit(), Deflater.SYNC_FLUSH); buffer.limit(buffer.limit() + compressed); if (LOG.isDebugEnabled()) From 2f2b6ba8beea59d638d7add3fd9477643df4b397 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Thu, 11 Mar 2021 11:49:38 +1100 Subject: [PATCH 2/3] Issue #6050 - add comment and change variable name to clarify Signed-off-by: Lachlan Roberts --- .../extensions/compress/CompressExtension.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java index d57da313e823..169db02c4429 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java @@ -197,14 +197,15 @@ protected void decompress(ByteAccumulator accumulator, ByteBuffer buf) throws Da while (true) { + // The buffer returned by the accumulator might not be empty, so we must append starting from the limit. ByteBuffer buffer = accumulator.ensureBuffer(DECOMPRESS_BUF_SIZE); - int read = inflater.inflate(buffer.array(), buffer.arrayOffset() + buffer.limit(), buffer.capacity() - buffer.limit()); - buffer.limit(buffer.limit() + read); - accumulator.addLength(read); + int written = inflater.inflate(buffer.array(), buffer.arrayOffset() + buffer.limit(), buffer.capacity() - buffer.limit()); + buffer.limit(buffer.limit() + written); + accumulator.addLength(written); if (LOG.isDebugEnabled()) - LOG.debug("Decompressed {} bytes into buffer {} from {}", read, BufferUtil.toDetailString(buffer), toDetail(inflater)); + LOG.debug("Decompressed {} bytes into buffer {} from {}", written, BufferUtil.toDetailString(buffer), toDetail(inflater)); - if (read <= 0) + if (written <= 0) break; } } @@ -495,14 +496,15 @@ private void compress(FrameEntry entry, boolean first) { while (true) { + // The buffer returned by the accumulator might not be empty, so we must append starting from the limit. ByteBuffer buffer = accumulator.ensureBuffer(8, outputLength); - int compressed = deflater.deflate(buffer.array(), buffer.arrayOffset() + buffer.limit(), buffer.capacity() - buffer.limit(), Deflater.SYNC_FLUSH); - buffer.limit(buffer.limit() + compressed); + int written = deflater.deflate(buffer.array(), buffer.arrayOffset() + buffer.limit(), buffer.capacity() - buffer.limit(), Deflater.SYNC_FLUSH); + buffer.limit(buffer.limit() + written); if (LOG.isDebugEnabled()) LOG.debug("Wrote {} bytes to output buffer", accumulator); - if (compressed <= 0) + if (written <= 0) break; } From 2c5ab59c9d6e8998a7de0e1bfe642d9b0315ac85 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Wed, 17 Mar 2021 19:03:10 +1100 Subject: [PATCH 3/3] Issue #6050 - use compressed and decompressed as variable names Signed-off-by: Lachlan Roberts --- .../extensions/compress/CompressExtension.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java index 169db02c4429..14bdf0bdfb45 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java @@ -199,13 +199,13 @@ protected void decompress(ByteAccumulator accumulator, ByteBuffer buf) throws Da { // The buffer returned by the accumulator might not be empty, so we must append starting from the limit. ByteBuffer buffer = accumulator.ensureBuffer(DECOMPRESS_BUF_SIZE); - int written = inflater.inflate(buffer.array(), buffer.arrayOffset() + buffer.limit(), buffer.capacity() - buffer.limit()); - buffer.limit(buffer.limit() + written); - accumulator.addLength(written); + int decompressed = inflater.inflate(buffer.array(), buffer.arrayOffset() + buffer.limit(), buffer.capacity() - buffer.limit()); + buffer.limit(buffer.limit() + decompressed); + accumulator.addLength(decompressed); if (LOG.isDebugEnabled()) - LOG.debug("Decompressed {} bytes into buffer {} from {}", written, BufferUtil.toDetailString(buffer), toDetail(inflater)); + LOG.debug("Decompressed {} bytes into buffer {} from {}", decompressed, BufferUtil.toDetailString(buffer), toDetail(inflater)); - if (written <= 0) + if (decompressed <= 0) break; } } @@ -498,13 +498,13 @@ private void compress(FrameEntry entry, boolean first) { // The buffer returned by the accumulator might not be empty, so we must append starting from the limit. ByteBuffer buffer = accumulator.ensureBuffer(8, outputLength); - int written = deflater.deflate(buffer.array(), buffer.arrayOffset() + buffer.limit(), buffer.capacity() - buffer.limit(), Deflater.SYNC_FLUSH); - buffer.limit(buffer.limit() + written); + int compressed = deflater.deflate(buffer.array(), buffer.arrayOffset() + buffer.limit(), buffer.capacity() - buffer.limit(), Deflater.SYNC_FLUSH); + buffer.limit(buffer.limit() + compressed); if (LOG.isDebugEnabled()) LOG.debug("Wrote {} bytes to output buffer", accumulator); - if (written <= 0) + if (compressed <= 0) break; }