From d7598f3ff20a469e04277e35e7bff6777acfab0f Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Tue, 24 Nov 2020 13:40:48 +1100 Subject: [PATCH 1/4] basic WebSocketProxy implementation Signed-off-by: Lachlan Roberts --- .../websocket/tests/proxy/WebSocketProxy.java | 272 ++++++++++++++++++ .../tests/proxy/WebSocketProxyTest.java | 50 ++++ 2 files changed, 322 insertions(+) create mode 100644 jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxy.java create mode 100644 jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxyTest.java diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxy.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxy.java new file mode 100644 index 000000000000..3867c80e50ab --- /dev/null +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxy.java @@ -0,0 +1,272 @@ +package org.eclipse.jetty.websocket.tests.proxy; + +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.concurrent.Future; + +import org.eclipse.jetty.util.FutureCallback; +import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketConnectionListener; +import org.eclipse.jetty.websocket.api.WebSocketException; +import org.eclipse.jetty.websocket.api.WebSocketPartialListener; +import org.eclipse.jetty.websocket.api.WebSocketPingPongListener; +import org.eclipse.jetty.websocket.client.WebSocketClient; + +public class WebSocketProxy +{ + private final WebSocketClient client = new WebSocketClient(); + private final URI serverUri = URI.create("ws://echo.websocket.org"); + private final ClientToProxy clientToProxy = new ClientToProxy(); + private final ProxyToServer proxyToServer = new ProxyToServer(); + + public WebSocketProxy() + { + LifeCycle.start(client); + } + + public WebSocketConnectionListener getWebSocketConnectionListener() + { + return clientToProxy; + } + + public class ClientToProxy implements WebSocketPartialListener, WebSocketPingPongListener + { + private Session session; + private FutureCallback pongWait; + + public Session getSession() + { + return session; + } + + public void receivedPong() + { + if (pongWait != null) + { + pongWait.succeeded(); + pongWait = null; + } + } + + @Override + public void onWebSocketConnect(Session session) + { + Future connect = null; + try + { + this.session = session; + connect = client.connect(proxyToServer, serverUri); + connect.get(); + } + catch (Exception e) + { + if (connect != null) + connect.cancel(true); + throw new WebSocketException(e); + } + } + + @Override + public void onWebSocketPartialBinary(ByteBuffer payload, boolean fin) + { + try + { + proxyToServer.getSession().getRemote().sendPartialBytes(payload, fin); + } + catch (Exception e) + { + throw new WebSocketException(e); + } + } + + @Override + public void onWebSocketPartialText(String payload, boolean fin) + { + try + { + proxyToServer.getSession().getRemote().sendPartialString(payload, fin); + } + catch (Exception e) + { + throw new WebSocketException(e); + } + } + + @Override + public void onWebSocketPing(ByteBuffer payload) + { + try + { + proxyToServer.getSession().getRemote().sendPing(payload); + // Block until we get pong response back from server. + // An automatic pong will occur from the implementation after we exit from here. + pongWait.get(); + } + catch (Exception e) + { + throw new WebSocketException(e); + } + } + + @Override + public void onWebSocketPong(ByteBuffer payload) + { + try + { + // Notify the other side we have received a Pong. + proxyToServer.receivedPong(); + } + catch (Exception e) + { + throw new WebSocketException(e); + } + } + + @Override + public void onWebSocketError(Throwable cause) + { + cause.printStackTrace(); + + try + { + // TODO: need to fail ProxyToServer as well. + if (pongWait != null) + pongWait.cancel(true); + } + catch (Exception e) + { + throw new WebSocketException(e); + } + } + + @Override + public void onWebSocketClose(int statusCode, String reason) + { + try + { + Session session = proxyToServer.getSession(); + if (session != null) + session.close(statusCode, reason); + } + catch (Exception e) + { + throw new WebSocketException(e); + } + } + } + + public class ProxyToServer implements WebSocketPartialListener, WebSocketPingPongListener + { + private Session session; + private FutureCallback pongWait; + + public Session getSession() + { + return session; + } + + public void receivedPong() + { + if (pongWait != null) + { + pongWait.succeeded(); + pongWait = null; + } + } + + @Override + public void onWebSocketConnect(Session session) + { + this.session = session; + } + + @Override + public void onWebSocketPartialBinary(ByteBuffer payload, boolean fin) + { + try + { + clientToProxy.getSession().getRemote().sendPartialBytes(payload, fin); + } + catch (Exception e) + { + throw new WebSocketException(e); + } + } + + @Override + public void onWebSocketPartialText(String payload, boolean fin) + { + try + { + clientToProxy.getSession().getRemote().sendPartialString(payload, fin); + } + catch (Exception e) + { + throw new WebSocketException(e); + } + } + + @Override + public void onWebSocketPing(ByteBuffer payload) + { + try + { + clientToProxy.getSession().getRemote().sendPing(payload); + // Block until we get pong response back from client. + // An automatic pong will occur from the implementation after we exit from here. + pongWait.get(); + } + catch (Exception e) + { + throw new WebSocketException(e); + } + } + + @Override + public void onWebSocketPong(ByteBuffer payload) + { + try + { + // Notify the other side we have received a Pong. + clientToProxy.receivedPong(); + } + catch (Exception e) + { + throw new WebSocketException(e); + } + } + + @Override + public void onWebSocketError(Throwable cause) + { + cause.printStackTrace(); + + try + { + // TODO: need to fail ProxyToServer as well. + if (pongWait != null) + pongWait.cancel(true); + } + catch (Exception e) + { + throw new WebSocketException(e); + } + } + + @Override + public void onWebSocketClose(int statusCode, String reason) + { + try + { + Session session = clientToProxy.getSession(); + if (session != null) + session.close(statusCode, reason); + } + catch (Exception e) + { + throw new WebSocketException(e); + } + } + } +} diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxyTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxyTest.java new file mode 100644 index 000000000000..f1638149477f --- /dev/null +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxyTest.java @@ -0,0 +1,50 @@ +package org.eclipse.jetty.websocket.tests.proxy; + +import java.net.URI; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.websocket.server.NativeWebSocketServletContainerInitializer; +import org.eclipse.jetty.websocket.server.WebSocketUpgradeFilter; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class WebSocketProxyTest +{ + private Server server; + private URI serverUri; + + @BeforeEach + public void before() throws Exception + { + server = new Server(); + ServerConnector connector = new ServerConnector(server); + connector.setPort(8080); // TODO: remove... + server.addConnector(connector); + + ServletContextHandler contextHandler = new ServletContextHandler(); + WebSocketUpgradeFilter.configure(contextHandler); + NativeWebSocketServletContainerInitializer.configure(contextHandler, ((context, container) -> + { + container.addMapping("/*", (req, resp) -> new WebSocketProxy().getWebSocketConnectionListener()); + })); + + server.setHandler(contextHandler); + server.start(); + serverUri = URI.create("ws://localhost:" + connector.getLocalPort()); + } + + @AfterEach + public void after() throws Exception + { + server.stop(); + } + + @Test + public void test() throws Exception + { + server.join(); + } +} From b89adb8dae1e266a7bd4dfa80c167234c1bd059d Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Tue, 24 Nov 2020 18:55:09 +1100 Subject: [PATCH 2/4] Improve WebSocketProxy, and write tests for it Signed-off-by: Lachlan Roberts --- .../org/eclipse/jetty/util/BufferUtil.java | 18 ++ .../websocket/tests/proxy/WebSocketProxy.java | 264 ++++++++++----- .../tests/proxy/WebSocketProxyTest.java | 300 +++++++++++++++++- .../test/resources/jetty-logging.properties | 1 + 4 files changed, 503 insertions(+), 80 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java b/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java index 584eb2d2c247..95bc9937e9d5 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java @@ -135,6 +135,24 @@ public static ByteBuffer allocateDirect(int capacity) return buf; } + /** + * Deep copy of a buffer + * + * @param buffer The buffer to copy + * @return A copy of the buffer + */ + public static ByteBuffer copy(ByteBuffer buffer) + { + if (buffer == null) + return null; + int p = buffer.position(); + ByteBuffer clone = buffer.isDirect() ? ByteBuffer.allocateDirect(buffer.remaining()) : ByteBuffer.allocate(buffer.remaining()); + clone.put(buffer); + clone.flip(); + buffer.position(p); + return clone; + } + /** * Clear the buffer to be empty in flush mode. * The position and limit are set to 0; diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxy.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxy.java index 3867c80e50ab..027a912a30ee 100644 --- a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxy.java +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxy.java @@ -1,28 +1,57 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + package org.eclipse.jetty.websocket.tests.proxy; import java.net.URI; import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.FutureCallback; -import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.util.StringUtil; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.StatusCode; import org.eclipse.jetty.websocket.api.WebSocketConnectionListener; import org.eclipse.jetty.websocket.api.WebSocketException; import org.eclipse.jetty.websocket.api.WebSocketPartialListener; import org.eclipse.jetty.websocket.api.WebSocketPingPongListener; +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; import org.eclipse.jetty.websocket.client.WebSocketClient; public class WebSocketProxy { - private final WebSocketClient client = new WebSocketClient(); - private final URI serverUri = URI.create("ws://echo.websocket.org"); + private static final Logger LOG = Log.getLogger(WebSocketProxy.class); + + private final WebSocketClient client; + private final URI serverUri; private final ClientToProxy clientToProxy = new ClientToProxy(); private final ProxyToServer proxyToServer = new ProxyToServer(); - public WebSocketProxy() + public WebSocketProxy(WebSocketClient webSocketClient, URI serverUri) { - LifeCycle.start(client); + this.client = webSocketClient; + this.serverUri = serverUri; } public WebSocketConnectionListener getWebSocketConnectionListener() @@ -30,33 +59,101 @@ public WebSocketConnectionListener getWebSocketConnectionListener() return clientToProxy; } + public boolean awaitClose(long timeout) + { + try + { + if (!clientToProxy.closeLatch.await(timeout, TimeUnit.MILLISECONDS)) + return false; + if (proxyToServer.getSession() == null) + return true; + return proxyToServer.closeLatch.await(timeout, TimeUnit.MILLISECONDS); + } + catch (Exception e) + { + return false; + } + } + + /** + * We use this to wait until we receive a pong from other websocket connection before sending back the response pong. + * This is problematic because the protocol allows unsolicited PongMessages. Ideally it would be best if we could + * disable the automatic pong response through something like the {@link org.eclipse.jetty.websocket.api.WebSocketPolicy}. + */ + private static class PongWait + { + private final FutureCallback COMPLETED = new FutureCallback(true); + private final AtomicReference reference = new AtomicReference<>(); + + /** + * @return gives back a Future which is completed when this is notified that a pong has been received. + */ + public FutureCallback waitForPong() + { + FutureCallback futureCallback = new FutureCallback(); + if (!reference.compareAndSet(null, futureCallback)) + throw new IllegalStateException(); + return futureCallback; + } + + /** + * @return true if the pong will be automatically forwarded, otherwise it must be sent manually. + */ + public boolean receivedPong() + { + FutureCallback futureCallback = reference.getAndSet(null); + if (futureCallback != null) + { + futureCallback.succeeded(); + return true; + } + + return false; + } + + public void cancel() + { + FutureCallback futureCallback = reference.getAndSet(COMPLETED); + if (futureCallback != null) + futureCallback.cancel(true); + } + } + public class ClientToProxy implements WebSocketPartialListener, WebSocketPingPongListener { private Session session; - private FutureCallback pongWait; + private final CountDownLatch closeLatch = new CountDownLatch(1); + private final PongWait pongWait = new PongWait(); public Session getSession() { return session; } - public void receivedPong() + public boolean receivedPong() { - if (pongWait != null) - { - pongWait.succeeded(); - pongWait = null; - } + return pongWait.receivedPong(); + } + + public void fail(Throwable failure) + { + session.close(StatusCode.SERVER_ERROR, failure.getMessage()); } @Override public void onWebSocketConnect(Session session) { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketConnect({})", getClass().getSimpleName(), session); + Future connect = null; try { this.session = session; - connect = client.connect(proxyToServer, serverUri); + ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest(); + upgradeRequest.setSubProtocols(session.getUpgradeRequest().getSubProtocols()); + upgradeRequest.setExtensions(session.getUpgradeRequest().getExtensions()); + connect = client.connect(proxyToServer, serverUri, upgradeRequest); connect.get(); } catch (Exception e) @@ -70,6 +167,9 @@ public void onWebSocketConnect(Session session) @Override public void onWebSocketPartialBinary(ByteBuffer payload, boolean fin) { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketPartialBinary({}, {})", getClass().getSimpleName(), BufferUtil.toDetailString(payload), fin); + try { proxyToServer.getSession().getRemote().sendPartialBytes(payload, fin); @@ -83,6 +183,9 @@ public void onWebSocketPartialBinary(ByteBuffer payload, boolean fin) @Override public void onWebSocketPartialText(String payload, boolean fin) { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketPartialText({}, {})", getClass().getSimpleName(), StringUtil.truncate(payload, 100), fin); + try { proxyToServer.getSession().getRemote().sendPartialString(payload, fin); @@ -96,12 +199,15 @@ public void onWebSocketPartialText(String payload, boolean fin) @Override public void onWebSocketPing(ByteBuffer payload) { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketPing({})", getClass().getSimpleName(), BufferUtil.toDetailString(payload)); + try { - proxyToServer.getSession().getRemote().sendPing(payload); - // Block until we get pong response back from server. - // An automatic pong will occur from the implementation after we exit from here. - pongWait.get(); + // Block until we get pong response back from server. An automatic pong will be sent after this method. + FutureCallback futureCallback = pongWait.waitForPong(); + proxyToServer.getSession().getRemote().sendPing(BufferUtil.copy(payload)); + futureCallback.get(); } catch (Exception e) { @@ -112,10 +218,16 @@ public void onWebSocketPing(ByteBuffer payload) @Override public void onWebSocketPong(ByteBuffer payload) { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketPong({})", getClass().getSimpleName(), BufferUtil.toDetailString(payload)); + try { - // Notify the other side we have received a Pong. - proxyToServer.receivedPong(); + // We do not forward on the pong message unless it was an unsolicited pong. + // Instead we notify the other side we have received pong which will then unblock in the + // thread in onPing() which will trigger the automatic pong response from the implementation. + if (!proxyToServer.receivedPong()) + proxyToServer.session.getRemote().sendPong(BufferUtil.copy(payload)); } catch (Exception e) { @@ -126,64 +238,65 @@ public void onWebSocketPong(ByteBuffer payload) @Override public void onWebSocketError(Throwable cause) { - cause.printStackTrace(); + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketError()", getClass().getSimpleName(), cause); - try - { - // TODO: need to fail ProxyToServer as well. - if (pongWait != null) - pongWait.cancel(true); - } - catch (Exception e) - { - throw new WebSocketException(e); - } + proxyToServer.fail(cause); + pongWait.cancel(); } @Override public void onWebSocketClose(int statusCode, String reason) { - try - { - Session session = proxyToServer.getSession(); - if (session != null) - session.close(statusCode, reason); - } - catch (Exception e) - { - throw new WebSocketException(e); - } + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketClose({} {})", getClass().getSimpleName(), statusCode, reason); + + Session session = proxyToServer.getSession(); + if (session != null) + session.close(statusCode, reason); + pongWait.cancel(); + closeLatch.countDown(); } } public class ProxyToServer implements WebSocketPartialListener, WebSocketPingPongListener { private Session session; - private FutureCallback pongWait; + private final CountDownLatch closeLatch = new CountDownLatch(1); + private final PongWait pongWait = new PongWait(); public Session getSession() { return session; } - public void receivedPong() + public boolean receivedPong() { - if (pongWait != null) - { - pongWait.succeeded(); - pongWait = null; - } + return pongWait.receivedPong(); + } + + public void fail(Throwable failure) + { + // Only ProxyToServer can be failed before it is opened (if ClientToProxy fails before the connect completes). + if (session != null) + session.close(StatusCode.SERVER_ERROR, failure.getMessage()); } @Override public void onWebSocketConnect(Session session) { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketConnect({})", getClass().getSimpleName(), session); + this.session = session; } @Override public void onWebSocketPartialBinary(ByteBuffer payload, boolean fin) { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketPartialBinary({}, {})", getClass().getSimpleName(), BufferUtil.toDetailString(payload), fin); + try { clientToProxy.getSession().getRemote().sendPartialBytes(payload, fin); @@ -197,6 +310,9 @@ public void onWebSocketPartialBinary(ByteBuffer payload, boolean fin) @Override public void onWebSocketPartialText(String payload, boolean fin) { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketPartialText({}, {})", getClass().getSimpleName(), StringUtil.truncate(payload, 100), fin); + try { clientToProxy.getSession().getRemote().sendPartialString(payload, fin); @@ -210,12 +326,15 @@ public void onWebSocketPartialText(String payload, boolean fin) @Override public void onWebSocketPing(ByteBuffer payload) { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketPing({})", getClass().getSimpleName(), BufferUtil.toDetailString(payload)); + try { - clientToProxy.getSession().getRemote().sendPing(payload); - // Block until we get pong response back from client. - // An automatic pong will occur from the implementation after we exit from here. - pongWait.get(); + // Block until we get pong response back from client. An automatic pong will be sent after this method. + FutureCallback futureCallback = pongWait.waitForPong(); + clientToProxy.getSession().getRemote().sendPing(BufferUtil.copy(payload)); + futureCallback.get(); } catch (Exception e) { @@ -226,10 +345,16 @@ public void onWebSocketPing(ByteBuffer payload) @Override public void onWebSocketPong(ByteBuffer payload) { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketPong({})", getClass().getSimpleName(), BufferUtil.toDetailString(payload)); + try { - // Notify the other side we have received a Pong. - clientToProxy.receivedPong(); + // We do not forward on the pong message unless it was an unsolicited pong. + // Instead we notify the other side we have received pong which will then unblock in the + // thread in onPing() which will trigger the automatic pong response from the implementation. + if (!clientToProxy.receivedPong()) + clientToProxy.session.getRemote().sendPong(BufferUtil.copy(payload)); } catch (Exception e) { @@ -240,33 +365,24 @@ public void onWebSocketPong(ByteBuffer payload) @Override public void onWebSocketError(Throwable cause) { - cause.printStackTrace(); + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketError()", getClass().getSimpleName(), cause); - try - { - // TODO: need to fail ProxyToServer as well. - if (pongWait != null) - pongWait.cancel(true); - } - catch (Exception e) - { - throw new WebSocketException(e); - } + clientToProxy.fail(cause); + pongWait.cancel(); } @Override public void onWebSocketClose(int statusCode, String reason) { - try - { - Session session = clientToProxy.getSession(); - if (session != null) - session.close(statusCode, reason); - } - catch (Exception e) - { - throw new WebSocketException(e); - } + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketClose({} {})", getClass().getSimpleName(), statusCode, reason); + + Session session = clientToProxy.getSession(); + if (session != null) + session.close(statusCode, reason); + pongWait.cancel(); + closeLatch.countDown(); } } } diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxyTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxyTest.java index f1638149477f..fce4655a3053 100644 --- a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxyTest.java +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxyTest.java @@ -1,50 +1,338 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + package org.eclipse.jetty.websocket.tests.proxy; +import java.io.IOException; import java.net.URI; +import java.nio.ByteBuffer; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import org.eclipse.jetty.server.HttpChannel; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.util.BlockingArrayQueue; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.log.StacklessLogging; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.StatusCode; +import org.eclipse.jetty.websocket.api.WebSocketException; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketFrame; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.eclipse.jetty.websocket.api.extensions.Frame; +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.eclipse.jetty.websocket.common.OpCode; import org.eclipse.jetty.websocket.server.NativeWebSocketServletContainerInitializer; import org.eclipse.jetty.websocket.server.WebSocketUpgradeFilter; +import org.eclipse.jetty.websocket.tests.EchoSocket; +import org.eclipse.jetty.websocket.tests.EventSocket; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + public class WebSocketProxyTest { + private static final int PORT = 49998; + private Server server; - private URI serverUri; + private EventSocket serverSocket; + private WebSocketProxy webSocketProxy; + private WebSocketClient client; + private URI proxyUri; @BeforeEach public void before() throws Exception { server = new Server(); ServerConnector connector = new ServerConnector(server); - connector.setPort(8080); // TODO: remove... + connector.setPort(PORT); server.addConnector(connector); + client = new WebSocketClient(); + client.start(); + proxyUri = URI.create("ws://localhost:" + PORT + "/proxy"); + URI echoUri = URI.create("ws://localhost:" + PORT + "/echo"); + webSocketProxy = new WebSocketProxy(client, echoUri); + ServletContextHandler contextHandler = new ServletContextHandler(); WebSocketUpgradeFilter.configure(contextHandler); + serverSocket = new EchoSocket(); NativeWebSocketServletContainerInitializer.configure(contextHandler, ((context, container) -> { - container.addMapping("/*", (req, resp) -> new WebSocketProxy().getWebSocketConnectionListener()); + container.addMapping("/proxy", (req, resp) -> webSocketProxy.getWebSocketConnectionListener()); + container.addMapping("/echo", (req, resp) -> + { + if (req.hasSubProtocol("fail")) + throw new WebSocketException("failing during upgrade"); + return serverSocket; + }); })); server.setHandler(contextHandler); server.start(); - serverUri = URI.create("ws://localhost:" + connector.getLocalPort()); } @AfterEach public void after() throws Exception { + client.stop(); server.stop(); } @Test - public void test() throws Exception + public void testEcho() throws Exception + { + EventSocket clientSocket = new EventSocket(); + client.connect(clientSocket, proxyUri); + assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS)); + + // Test an echo spread across multiple frames. + clientSocket.session.getRemote().sendPartialString("hell", false); + clientSocket.session.getRemote().sendPartialString("o w", false); + clientSocket.session.getRemote().sendPartialString("orld", false); + clientSocket.session.getRemote().sendPartialString("!", true); + String response = clientSocket.textMessages.poll(5, TimeUnit.SECONDS); + assertThat(response, is("hello world!")); + + // Test we closed successfully on the client side. + clientSocket.session.close(StatusCode.NORMAL, "test initiated close"); + assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(clientSocket.closeCode, is(StatusCode.NORMAL)); + assertThat(clientSocket.closeReason, is("test initiated close")); + + // Test we closed successfully on the server side. + assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(serverSocket.closeCode, is(StatusCode.NORMAL)); + assertThat(serverSocket.closeReason, is("test initiated close")); + + // No errors occurred. + assertNull(clientSocket.error); + assertNull(serverSocket.error); + + // WebSocketProxy has been completely closed. + assertTrue(webSocketProxy.awaitClose(5000)); + } + + @Test + public void testFailServerUpgrade() throws Exception + { + EventSocket clientSocket = new EventSocket(); + ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest(); + upgradeRequest.setSubProtocols("fail"); + + try (StacklessLogging ignored = new StacklessLogging(HttpChannel.class)) + { + client.connect(clientSocket, proxyUri, upgradeRequest); + assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS)); + } + + // WebSocketProxy has been completely closed. + assertTrue(webSocketProxy.awaitClose(5000)); + } + + @Test + public void testClientError() throws Exception + { + EventSocket clientSocket = new OnOpenThrowingSocket(); + client.connect(clientSocket, proxyUri); + assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS)); + + // Verify expected client close. + assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(clientSocket.closeCode, is(StatusCode.NO_CLOSE)); + assertThat(clientSocket.closeReason, is("simulated onOpen error")); + assertNotNull(clientSocket.error); + + // Verify expected server close. + assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(serverSocket.closeCode, is(StatusCode.NO_CLOSE)); + assertThat(serverSocket.closeReason, is("Disconnected")); + assertNull(serverSocket.error); + + // WebSocketProxy has been completely closed. + assertTrue(webSocketProxy.awaitClose(5000)); + } + + @Test + public void testServerError() throws Exception + { + serverSocket = new OnOpenThrowingSocket(); + + EventSocket clientSocket = new EventSocket(); + client.connect(clientSocket, proxyUri); + assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS)); + + // Verify expected client close. + assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(clientSocket.closeCode, is(StatusCode.SERVER_ERROR)); + assertThat(clientSocket.closeReason, is("simulated onOpen error")); + assertNull(clientSocket.error); + + // Verify expected server close. + assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(serverSocket.closeCode, is(StatusCode.SERVER_ERROR)); + assertThat(serverSocket.closeReason, is("simulated onOpen error")); + assertNotNull(serverSocket.error); + + // WebSocketProxy has been completely closed. + assertTrue(webSocketProxy.awaitClose(5000)); + } + + @Test + public void testServerErrorClientNoResponse() throws Exception { - server.join(); + serverSocket = new OnTextThrowingSocket(); + + EventSocket clientSocket = new EventSocket(); + client.connect(clientSocket, proxyUri); + assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS)); + assertTrue(serverSocket.openLatch.await(5, TimeUnit.SECONDS)); + + clientSocket.session.getRemote().sendString("hello world!"); + + // Verify expected client close. + assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(clientSocket.closeCode, is(StatusCode.SERVER_ERROR)); + assertThat(clientSocket.closeReason, is("simulated onMessage error")); + assertNull(clientSocket.error); + + // Verify expected server close. + assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(serverSocket.closeCode, is(StatusCode.SERVER_ERROR)); + assertThat(serverSocket.closeReason, is("simulated onMessage error")); + assertNotNull(serverSocket.error); + + assertNull(clientSocket.textMessages.poll(1, TimeUnit.SECONDS)); + assertTrue(webSocketProxy.awaitClose(5000)); + } + + @Test + public void testPingPong() throws Exception + { + PingPongSocket serverEndpoint = new PingPongSocket(); + serverSocket = serverEndpoint; + + PingPongSocket clientSocket = new PingPongSocket(); + client.connect(clientSocket, proxyUri); + assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS)); + assertTrue(serverSocket.openLatch.await(5, TimeUnit.SECONDS)); + + // Test unsolicited pong from client. + clientSocket.session.getRemote().sendPong(BufferUtil.toBuffer("unsolicited pong from client")); + assertThat(serverEndpoint.pingMessages.size(), is(0)); + assertThat(serverEndpoint.pongMessages.poll(5, TimeUnit.SECONDS), is(BufferUtil.toBuffer("unsolicited pong from client"))); + + // Test unsolicited pong from server. + serverEndpoint.session.getRemote().sendPong(BufferUtil.toBuffer("unsolicited pong from server")); + assertThat(clientSocket.pingMessages.size(), is(0)); + assertThat(clientSocket.pongMessages.poll(5, TimeUnit.SECONDS), is(BufferUtil.toBuffer("unsolicited pong from server"))); + + // Test pings from client. + for (int i = 0; i < 10; i++) + clientSocket.session.getRemote().sendPing(BufferUtil.toBuffer(i)); + for (int i = 0; i < 10; i++) + { + assertThat(serverEndpoint.pingMessages.poll(5, TimeUnit.SECONDS), is(BufferUtil.toBuffer(i))); + assertThat(clientSocket.pongMessages.poll(5, TimeUnit.SECONDS), is(BufferUtil.toBuffer(i))); + } + + // Test pings from server. + for (int i = 0; i < 10; i++) + serverEndpoint.session.getRemote().sendPing(BufferUtil.toBuffer(i)); + for (int i = 0; i < 10; i++) + { + assertThat(clientSocket.pingMessages.poll(5, TimeUnit.SECONDS), is(BufferUtil.toBuffer(i))); + assertThat(serverEndpoint.pongMessages.poll(5, TimeUnit.SECONDS), is(BufferUtil.toBuffer(i))); + } + + clientSocket.session.close(StatusCode.NORMAL, "closing from test"); + + // Verify expected client close. + assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(clientSocket.closeCode, is(StatusCode.NORMAL)); + assertThat(clientSocket.closeReason, is("closing from test")); + assertNull(clientSocket.error); + + // Verify expected server close. + assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(serverSocket.closeCode, is(StatusCode.NORMAL)); + assertThat(serverSocket.closeReason, is("closing from test")); + assertNull(serverSocket.error); + + // WebSocketProxy has been completely closed. + assertTrue(webSocketProxy.awaitClose(5000)); + + // Check we had no unexpected pings or pongs sent. + assertThat(clientSocket.pingMessages.size(), is(0)); + assertThat(serverEndpoint.pingMessages.size(), is(0)); + } + + @WebSocket + public static class PingPongSocket extends EventSocket + { + public BlockingQueue pingMessages = new BlockingArrayQueue<>(); + public BlockingQueue pongMessages = new BlockingArrayQueue<>(); + + @OnWebSocketFrame + public void onWebSocketFrame(Frame frame) + { + switch (frame.getOpCode()) + { + case OpCode.PING: + pingMessages.add(BufferUtil.copy(frame.getPayload())); + break; + case OpCode.PONG: + pongMessages.add(BufferUtil.copy(frame.getPayload())); + break; + default: + break; + } + } + } + + @WebSocket + public static class OnOpenThrowingSocket extends EventSocket + { + @Override + public void onOpen(Session session) + { + super.onOpen(session); + throw new IllegalStateException("simulated onOpen error"); + } + } + + @WebSocket + public static class OnTextThrowingSocket extends EventSocket + { + @Override + public void onMessage(String message) throws IOException + { + super.onMessage(message); + throw new IllegalStateException("simulated onMessage error"); + } } } diff --git a/jetty-websocket/jetty-websocket-tests/src/test/resources/jetty-logging.properties b/jetty-websocket/jetty-websocket-tests/src/test/resources/jetty-logging.properties index a429c612f5e1..8564d5228c00 100644 --- a/jetty-websocket/jetty-websocket-tests/src/test/resources/jetty-logging.properties +++ b/jetty-websocket/jetty-websocket-tests/src/test/resources/jetty-logging.properties @@ -12,6 +12,7 @@ org.eclipse.jetty.LEVEL=INFO # org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.LEVEL=DEBUG # org.eclipse.jetty.websocket.common.io.IOState.LEVEL=DEBUG # org.eclipse.jetty.websocket.common.test.LEVEL=DEBUG +# org.eclipse.jetty.websocket.tests.proxy.LEVEL=DEBUG # org.eclipse.jetty.websocket.common.Generator.LEVEL=DEBUG # org.eclipse.jetty.websocket.common.Parser.LEVEL=DEBUG From 6a9acaaa9d494e1d16f9b438da64247467650a35 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Wed, 25 Nov 2020 11:48:33 +1100 Subject: [PATCH 3/4] do not block for websocket PING and PONG messages Signed-off-by: Lachlan Roberts --- .../websocket/tests/proxy/WebSocketProxy.java | 102 ++++-------------- 1 file changed, 21 insertions(+), 81 deletions(-) diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxy.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxy.java index 027a912a30ee..cfa6c85aabaf 100644 --- a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxy.java +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxy.java @@ -23,10 +23,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -75,66 +74,17 @@ public boolean awaitClose(long timeout) } } - /** - * We use this to wait until we receive a pong from other websocket connection before sending back the response pong. - * This is problematic because the protocol allows unsolicited PongMessages. Ideally it would be best if we could - * disable the automatic pong response through something like the {@link org.eclipse.jetty.websocket.api.WebSocketPolicy}. - */ - private static class PongWait - { - private final FutureCallback COMPLETED = new FutureCallback(true); - private final AtomicReference reference = new AtomicReference<>(); - - /** - * @return gives back a Future which is completed when this is notified that a pong has been received. - */ - public FutureCallback waitForPong() - { - FutureCallback futureCallback = new FutureCallback(); - if (!reference.compareAndSet(null, futureCallback)) - throw new IllegalStateException(); - return futureCallback; - } - - /** - * @return true if the pong will be automatically forwarded, otherwise it must be sent manually. - */ - public boolean receivedPong() - { - FutureCallback futureCallback = reference.getAndSet(null); - if (futureCallback != null) - { - futureCallback.succeeded(); - return true; - } - - return false; - } - - public void cancel() - { - FutureCallback futureCallback = reference.getAndSet(COMPLETED); - if (futureCallback != null) - futureCallback.cancel(true); - } - } - public class ClientToProxy implements WebSocketPartialListener, WebSocketPingPongListener { private Session session; private final CountDownLatch closeLatch = new CountDownLatch(1); - private final PongWait pongWait = new PongWait(); + private final AtomicInteger pingsReceived = new AtomicInteger(); public Session getSession() { return session; } - public boolean receivedPong() - { - return pongWait.receivedPong(); - } - public void fail(Throwable failure) { session.close(StatusCode.SERVER_ERROR, failure.getMessage()); @@ -154,6 +104,8 @@ public void onWebSocketConnect(Session session) upgradeRequest.setSubProtocols(session.getUpgradeRequest().getSubProtocols()); upgradeRequest.setExtensions(session.getUpgradeRequest().getExtensions()); connect = client.connect(proxyToServer, serverUri, upgradeRequest); + + //This is blocking as we really want the client to be connected before receiving any messages. connect.get(); } catch (Exception e) @@ -204,10 +156,9 @@ public void onWebSocketPing(ByteBuffer payload) try { - // Block until we get pong response back from server. An automatic pong will be sent after this method. - FutureCallback futureCallback = pongWait.waitForPong(); + // The implementation automatically sends pong response. + pingsReceived.incrementAndGet(); proxyToServer.getSession().getRemote().sendPing(BufferUtil.copy(payload)); - futureCallback.get(); } catch (Exception e) { @@ -223,11 +174,11 @@ public void onWebSocketPong(ByteBuffer payload) try { - // We do not forward on the pong message unless it was an unsolicited pong. - // Instead we notify the other side we have received pong which will then unblock in the - // thread in onPing() which will trigger the automatic pong response from the implementation. - if (!proxyToServer.receivedPong()) - proxyToServer.session.getRemote().sendPong(BufferUtil.copy(payload)); + // If we have sent out a ping then we have already responded with automatic pong. + // If this is an unsolicited pong we still need to forward it to the server. + int valueBeforeUpdate = pingsReceived.getAndUpdate(i -> i > 0 ? i - 1 : i); + if (valueBeforeUpdate == 0) + proxyToServer.getSession().getRemote().sendPong(BufferUtil.copy(payload)); } catch (Exception e) { @@ -242,7 +193,6 @@ public void onWebSocketError(Throwable cause) LOG.debug("{} onWebSocketError()", getClass().getSimpleName(), cause); proxyToServer.fail(cause); - pongWait.cancel(); } @Override @@ -251,10 +201,10 @@ public void onWebSocketClose(int statusCode, String reason) if (LOG.isDebugEnabled()) LOG.debug("{} onWebSocketClose({} {})", getClass().getSimpleName(), statusCode, reason); + // Session may be null if connection to the server failed. Session session = proxyToServer.getSession(); if (session != null) session.close(statusCode, reason); - pongWait.cancel(); closeLatch.countDown(); } } @@ -263,18 +213,13 @@ public class ProxyToServer implements WebSocketPartialListener, WebSocketPingPon { private Session session; private final CountDownLatch closeLatch = new CountDownLatch(1); - private final PongWait pongWait = new PongWait(); + private final AtomicInteger pingsReceived = new AtomicInteger(); public Session getSession() { return session; } - public boolean receivedPong() - { - return pongWait.receivedPong(); - } - public void fail(Throwable failure) { // Only ProxyToServer can be failed before it is opened (if ClientToProxy fails before the connect completes). @@ -331,10 +276,9 @@ public void onWebSocketPing(ByteBuffer payload) try { - // Block until we get pong response back from client. An automatic pong will be sent after this method. - FutureCallback futureCallback = pongWait.waitForPong(); + // The implementation automatically sends pong response. + pingsReceived.incrementAndGet(); clientToProxy.getSession().getRemote().sendPing(BufferUtil.copy(payload)); - futureCallback.get(); } catch (Exception e) { @@ -350,11 +294,11 @@ public void onWebSocketPong(ByteBuffer payload) try { - // We do not forward on the pong message unless it was an unsolicited pong. - // Instead we notify the other side we have received pong which will then unblock in the - // thread in onPing() which will trigger the automatic pong response from the implementation. - if (!clientToProxy.receivedPong()) - clientToProxy.session.getRemote().sendPong(BufferUtil.copy(payload)); + // If we have sent out a ping then we have already responded with automatic pong. + // If this is an unsolicited pong we still need to forward it to the client. + int valueBeforeUpdate = pingsReceived.getAndUpdate(i -> i > 0 ? i - 1 : i); + if (valueBeforeUpdate == 0) + clientToProxy.getSession().getRemote().sendPong(BufferUtil.copy(payload)); } catch (Exception e) { @@ -369,7 +313,6 @@ public void onWebSocketError(Throwable cause) LOG.debug("{} onWebSocketError()", getClass().getSimpleName(), cause); clientToProxy.fail(cause); - pongWait.cancel(); } @Override @@ -378,10 +321,7 @@ public void onWebSocketClose(int statusCode, String reason) if (LOG.isDebugEnabled()) LOG.debug("{} onWebSocketClose({} {})", getClass().getSimpleName(), statusCode, reason); - Session session = clientToProxy.getSession(); - if (session != null) - session.close(statusCode, reason); - pongWait.cancel(); + clientToProxy.getSession().close(statusCode, reason); closeLatch.countDown(); } } From 104beb98244df017bf4ae6dcf962e375e76c2faa Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Thu, 26 Nov 2020 11:22:24 +1100 Subject: [PATCH 4/4] add IdleTimeout test, use volatile sessions Signed-off-by: Lachlan Roberts --- .../websocket/tests/proxy/WebSocketProxy.java | 5 +-- .../tests/proxy/WebSocketProxyTest.java | 34 ++++++++++++++++++- 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxy.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxy.java index cfa6c85aabaf..4ea59266fde1 100644 --- a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxy.java +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxy.java @@ -76,7 +76,7 @@ public boolean awaitClose(long timeout) public class ClientToProxy implements WebSocketPartialListener, WebSocketPingPongListener { - private Session session; + private volatile Session session; private final CountDownLatch closeLatch = new CountDownLatch(1); private final AtomicInteger pingsReceived = new AtomicInteger(); @@ -211,7 +211,7 @@ public void onWebSocketClose(int statusCode, String reason) public class ProxyToServer implements WebSocketPartialListener, WebSocketPingPongListener { - private Session session; + private volatile Session session; private final CountDownLatch closeLatch = new CountDownLatch(1); private final AtomicInteger pingsReceived = new AtomicInteger(); @@ -223,6 +223,7 @@ public Session getSession() public void fail(Throwable failure) { // Only ProxyToServer can be failed before it is opened (if ClientToProxy fails before the connect completes). + Session session = this.session; if (session != null) session.close(StatusCode.SERVER_ERROR, failure.getMessage()); } diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxyTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxyTest.java index fce4655a3053..b542583eeab2 100644 --- a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxyTest.java +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxyTest.java @@ -49,6 +49,7 @@ import org.junit.jupiter.api.Test; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; @@ -203,7 +204,7 @@ public void testServerError() throws Exception } @Test - public void testServerErrorClientNoResponse() throws Exception + public void testServerThrowsOnMessage() throws Exception { serverSocket = new OnTextThrowingSocket(); @@ -230,6 +231,37 @@ public void testServerErrorClientNoResponse() throws Exception assertTrue(webSocketProxy.awaitClose(5000)); } + @Test + public void timeoutTest() throws Exception + { + long clientSessionIdleTimeout = 2000; + + EventSocket clientSocket = new EventSocket(); + client.connect(clientSocket, proxyUri); + assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS)); + assertTrue(serverSocket.openLatch.await(5, TimeUnit.SECONDS)); + + // Configure infinite idleTimeout on the server session and short timeout on the client session. + clientSocket.session.setIdleTimeout(clientSessionIdleTimeout); + serverSocket.session.setIdleTimeout(-1); + + // Send and receive an echo message. + clientSocket.session.getRemote().sendString("test echo message"); + assertThat(clientSocket.textMessages.poll(clientSessionIdleTimeout, TimeUnit.SECONDS), is("test echo message")); + + // Wait more than the idleTimeout period, the clientToProxy connection should fail which should fail the proxyToServer. + assertTrue(clientSocket.closeLatch.await(clientSessionIdleTimeout * 2, TimeUnit.MILLISECONDS)); + assertTrue(serverSocket.closeLatch.await(clientSessionIdleTimeout * 2, TimeUnit.MILLISECONDS)); + + // Check errors and close status. + assertThat(clientSocket.error.getMessage(), containsString("Idle timeout expired")); + assertThat(clientSocket.closeCode, is(StatusCode.SHUTDOWN)); + assertThat(clientSocket.closeReason, containsString("Idle timeout expired")); + assertNull(serverSocket.error); + assertThat(serverSocket.closeCode, is(StatusCode.SHUTDOWN)); + assertThat(serverSocket.closeReason, containsString("Idle timeout expired")); + } + @Test public void testPingPong() throws Exception {