From 71df3b57eef08040203337f6d83ff28adc86d7c7 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Thu, 30 Apr 2020 09:15:49 +1000 Subject: [PATCH 1/5] Issue #4824 - add configuration on RemoteEndpoint for maxOutgoingFrames Signed-off-by: Lachlan Roberts --- .../jetty/websocket/api/RemoteEndpoint.java | 18 ++++++ .../common/WebSocketRemoteEndpoint.java | 59 +++++++++++++++++++ 2 files changed, 77 insertions(+) diff --git a/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/RemoteEndpoint.java b/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/RemoteEndpoint.java index 45a8fcf47800..37dcec929481 100644 --- a/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/RemoteEndpoint.java +++ b/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/RemoteEndpoint.java @@ -141,6 +141,24 @@ public interface RemoteEndpoint */ void setBatchMode(BatchMode mode); + /** + * Set the maximum number of frames which allowed to be waiting to be sent at any one time. + * The default value is -1, this indicates there is no limit on how many frames can be + * queued to be sent by the implementation. + * + * @param maxOutgoingFrames the max number of frames. + */ + void setMaxOutgoingFrames(int maxOutgoingFrames); + + /** + * Get the maximum number of frames which allowed to be waiting to be sent at any one time. + * The default value is -1, this indicates there is no limit on how many frames can be + * queued to be sent by the implementation. + * + * @return the max number of frames. + */ + int getMaxOutgoingFrames(); + /** * Get the InetSocketAddress for the established connection. * diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java index c53d71cffa3c..aa827f3754a8 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java @@ -81,7 +81,9 @@ public void writeFailed(Throwable x) private final OutgoingFrames outgoing; private final AtomicInteger msgState = new AtomicInteger(); private final BlockingWriteCallback blocker = new BlockingWriteCallback(); + private final AtomicInteger numOutgoingFrames = new AtomicInteger(); private volatile BatchMode batchMode; + private int maxNumOutgoingFrames = -1; public WebSocketRemoteEndpoint(LogicalConnection connection, OutgoingFrames outgoing) { @@ -303,6 +305,19 @@ public void uncheckedSendFrame(WebSocketFrame frame, WriteCallback callback) BatchMode batchMode = BatchMode.OFF; if (frame.isDataFrame()) batchMode = getBatchMode(); + + if (maxNumOutgoingFrames > 0 && frame.isDataFrame()) + { + // Increase the number of outgoing frames, will be decremented when callback is completed. + int outgoingFrames = numOutgoingFrames.incrementAndGet(); + callback = from(callback, numOutgoingFrames::decrementAndGet); + if (outgoingFrames > maxNumOutgoingFrames) + { + callback.writeFailed(new IOException("Exceeded max outgoing frames: " + outgoingFrames + ">" + maxNumOutgoingFrames)); + return; + } + } + outgoing.outgoingFrame(frame, callback, batchMode); } @@ -439,6 +454,18 @@ public void setBatchMode(BatchMode batchMode) this.batchMode = batchMode; } + @Override + public void setMaxOutgoingFrames(int maxOutgoingFrames) + { + this.maxNumOutgoingFrames = maxOutgoingFrames; + } + + @Override + public int getMaxOutgoingFrames() + { + return maxNumOutgoingFrames; + } + @Override public void flush() throws IOException { @@ -459,4 +486,36 @@ public String toString() { return String.format("%s@%x[batching=%b]", getClass().getSimpleName(), hashCode(), getBatchMode()); } + + private static WriteCallback from(WriteCallback callback, Runnable completed) + { + return new WriteCallback() + { + @Override + public void writeFailed(Throwable x) + { + try + { + callback.writeFailed(x); + } + finally + { + completed.run(); + } + } + + @Override + public void writeSuccess() + { + try + { + callback.writeSuccess(); + } + finally + { + completed.run(); + } + } + }; + } } From f788260abd097308fb5f560baca2345a59d4175f Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Thu, 3 Sep 2020 14:58:09 +1000 Subject: [PATCH 2/5] Issue #4824 - use WritePendingException instead of IOException Signed-off-by: Lachlan Roberts --- .../jetty/websocket/common/WebSocketRemoteEndpoint.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java index aa827f3754a8..c047cb9481fa 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.WritePendingException; import java.nio.charset.StandardCharsets; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; @@ -313,7 +314,7 @@ public void uncheckedSendFrame(WebSocketFrame frame, WriteCallback callback) callback = from(callback, numOutgoingFrames::decrementAndGet); if (outgoingFrames > maxNumOutgoingFrames) { - callback.writeFailed(new IOException("Exceeded max outgoing frames: " + outgoingFrames + ">" + maxNumOutgoingFrames)); + callback.writeFailed(new WritePendingException()); return; } } From 52c9f8730b3881585f814ecbd32733b801e52f4d Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Fri, 4 Sep 2020 10:45:47 +1000 Subject: [PATCH 3/5] Issue #4824 - create test for WebSocket maxOutgoingFrames Signed-off-by: Lachlan Roberts --- .../tests/MaxOutgoingFramesTest.java | 187 ++++++++++++++++++ 1 file changed, 187 insertions(+) create mode 100644 jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/MaxOutgoingFramesTest.java diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/MaxOutgoingFramesTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/MaxOutgoingFramesTest.java new file mode 100644 index 000000000000..755a1441a756 --- /dev/null +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/MaxOutgoingFramesTest.java @@ -0,0 +1,187 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests; + +import java.net.URI; +import java.nio.channels.WritePendingException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.websocket.api.BatchMode; +import org.eclipse.jetty.websocket.api.RemoteEndpoint; +import org.eclipse.jetty.websocket.api.WriteCallback; +import org.eclipse.jetty.websocket.api.extensions.Frame; +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.eclipse.jetty.websocket.common.extensions.AbstractExtension; +import org.eclipse.jetty.websocket.common.io.FutureWriteCallback; +import org.eclipse.jetty.websocket.server.NativeWebSocketServletContainerInitializer; +import org.eclipse.jetty.websocket.server.WebSocketUpgradeFilter; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class MaxOutgoingFramesTest +{ + public static CountDownLatch outgoingBlocked; + public static CountDownLatch firstFrameBlocked; + + private final EventSocket serverSocket = new EventSocket(); + private Server server; + private ServerConnector connector; + private WebSocketClient client; + + @BeforeEach + public void start() throws Exception + { + outgoingBlocked = new CountDownLatch(1); + firstFrameBlocked = new CountDownLatch(1); + + server = new Server(); + connector = new ServerConnector(server); + server.addConnector(connector); + + ServletContextHandler contextHandler = new ServletContextHandler(ServletContextHandler.SESSIONS); + contextHandler.setContextPath("/"); + NativeWebSocketServletContainerInitializer.configure(contextHandler, (context, container) -> + { + container.addMapping("/", (req, resp) -> serverSocket); + container.getFactory().getExtensionFactory().register(BlockingOutgoingExtension.class.getName(), BlockingOutgoingExtension.class); + }); + + WebSocketUpgradeFilter.configure(contextHandler); + server.setHandler(contextHandler); + + client = new WebSocketClient(); + server.start(); + client.start(); + } + + @AfterEach + public void stop() throws Exception + { + outgoingBlocked.countDown(); + server.stop(); + client.stop(); + } + + public static class BlockingOutgoingExtension extends AbstractExtension + { + @Override + public String getName() + { + return BlockingOutgoingExtension.class.getName(); + } + + @Override + public void incomingFrame(Frame frame) + { + getNextIncoming().incomingFrame(frame); + } + + @Override + public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode) + { + try + { + firstFrameBlocked.countDown(); + outgoingBlocked.await(); + getNextOutgoing().outgoingFrame(frame, callback, batchMode); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + } + + public static class CountingCallback implements WriteCallback + { + private final CountDownLatch successes; + + public CountingCallback(int count) + { + successes = new CountDownLatch(count); + } + + @Override + public void writeSuccess() + { + successes.countDown(); + } + + @Override + public void writeFailed(Throwable t) + { + t.printStackTrace(); + } + } + + @Test + public void testMaxOutgoingFrames() throws Exception + { + // We need to have the frames queued but not yet sent, we do this by blocking in the ExtensionStack. + client.getExtensionFactory().register(BlockingOutgoingExtension.class.getName(), BlockingOutgoingExtension.class); + + URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + "/"); + EventSocket socket = new EventSocket(); + ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest(); + upgradeRequest.addExtensions(BlockingOutgoingExtension.class.getName()); + client.connect(socket, uri, upgradeRequest).get(5, TimeUnit.SECONDS); + assertTrue(socket.openLatch.await(5, TimeUnit.SECONDS)); + + int numFrames = 30; + RemoteEndpoint remote = socket.session.getRemote(); + remote.setMaxOutgoingFrames(numFrames); + + // Verify that we can send up to numFrames without any problem. + // First send will block in the Extension so it needs to be done in new thread, others frames will be queued. + CountingCallback countingCallback = new CountingCallback(numFrames); + new Thread(() -> remote.sendString("0", countingCallback)).start(); + assertTrue(firstFrameBlocked.await(5, TimeUnit.SECONDS)); + for (int i = 1; i < numFrames; i++) + { + remote.sendString(Integer.toString(i), countingCallback); + } + + // Sending any more frames will result in WritePendingException. + FutureWriteCallback callback = new FutureWriteCallback(); + remote.sendString("fail", callback); + ExecutionException executionException = assertThrows(ExecutionException.class, () -> callback.get(5, TimeUnit.SECONDS)); + assertThat(executionException.getCause(), instanceOf(WritePendingException.class)); + + // Check that all callbacks are succeeded when the server processes the frames. + outgoingBlocked.countDown(); + assertTrue(countingCallback.successes.await(5, TimeUnit.SECONDS)); + + // Close successfully. + socket.session.close(); + assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertTrue(socket.closeLatch.await(5, TimeUnit.SECONDS)); + } +} From b5810b930d756f5e1f0878cb6b202abc78ecb164 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Fri, 4 Sep 2020 17:43:36 +1000 Subject: [PATCH 4/5] Issue #5824 - improve javadocs for RemoteEndpoint maxOutgoingFrames Signed-off-by: Lachlan Roberts --- .../org/eclipse/jetty/websocket/api/RemoteEndpoint.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/RemoteEndpoint.java b/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/RemoteEndpoint.java index 37dcec929481..0b13c57fa3a0 100644 --- a/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/RemoteEndpoint.java +++ b/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/RemoteEndpoint.java @@ -144,7 +144,9 @@ public interface RemoteEndpoint /** * Set the maximum number of frames which allowed to be waiting to be sent at any one time. * The default value is -1, this indicates there is no limit on how many frames can be - * queued to be sent by the implementation. + * queued to be sent by the implementation. If the limit is exceeded, subsequent frames + * sent are failed with a {@link java.nio.channels.WritePendingException} but + * the connection is not failed and will remain open. * * @param maxOutgoingFrames the max number of frames. */ @@ -153,7 +155,9 @@ public interface RemoteEndpoint /** * Get the maximum number of frames which allowed to be waiting to be sent at any one time. * The default value is -1, this indicates there is no limit on how many frames can be - * queued to be sent by the implementation. + * queued to be sent by the implementation. If the limit is exceeded, subsequent frames + * sent are failed with a {@link java.nio.channels.WritePendingException} but + * the connection is not failed and will remain open. * * @return the max number of frames. */ From b44b62038c72524433da5c482c0651257689c027 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Thu, 10 Sep 2020 09:50:52 +1000 Subject: [PATCH 5/5] reorder methods for maxOutgoingFrames, fix javadoc Signed-off-by: Lachlan Roberts --- .../eclipse/jetty/websocket/api/RemoteEndpoint.java | 12 ++++++------ .../websocket/common/WebSocketRemoteEndpoint.java | 8 ++++---- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/RemoteEndpoint.java b/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/RemoteEndpoint.java index 0b13c57fa3a0..5c96d94a55a9 100644 --- a/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/RemoteEndpoint.java +++ b/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/RemoteEndpoint.java @@ -142,26 +142,26 @@ public interface RemoteEndpoint void setBatchMode(BatchMode mode); /** - * Set the maximum number of frames which allowed to be waiting to be sent at any one time. + * Get the maximum number of data frames allowed to be waiting to be sent at any one time. * The default value is -1, this indicates there is no limit on how many frames can be * queued to be sent by the implementation. If the limit is exceeded, subsequent frames * sent are failed with a {@link java.nio.channels.WritePendingException} but * the connection is not failed and will remain open. * - * @param maxOutgoingFrames the max number of frames. + * @return the max number of frames. */ - void setMaxOutgoingFrames(int maxOutgoingFrames); + int getMaxOutgoingFrames(); /** - * Get the maximum number of frames which allowed to be waiting to be sent at any one time. + * Set the maximum number of data frames allowed to be waiting to be sent at any one time. * The default value is -1, this indicates there is no limit on how many frames can be * queued to be sent by the implementation. If the limit is exceeded, subsequent frames * sent are failed with a {@link java.nio.channels.WritePendingException} but * the connection is not failed and will remain open. * - * @return the max number of frames. + * @param maxOutgoingFrames the max number of frames. */ - int getMaxOutgoingFrames(); + void setMaxOutgoingFrames(int maxOutgoingFrames); /** * Get the InetSocketAddress for the established connection. diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java index c047cb9481fa..f4aae110026f 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java @@ -456,15 +456,15 @@ public void setBatchMode(BatchMode batchMode) } @Override - public void setMaxOutgoingFrames(int maxOutgoingFrames) + public int getMaxOutgoingFrames() { - this.maxNumOutgoingFrames = maxOutgoingFrames; + return maxNumOutgoingFrames; } @Override - public int getMaxOutgoingFrames() + public void setMaxOutgoingFrames(int maxOutgoingFrames) { - return maxNumOutgoingFrames; + this.maxNumOutgoingFrames = maxOutgoingFrames; } @Override