diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java b/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java index 584eb2d2c247..95bc9937e9d5 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java @@ -135,6 +135,24 @@ public static ByteBuffer allocateDirect(int capacity) return buf; } + /** + * Deep copy of a buffer + * + * @param buffer The buffer to copy + * @return A copy of the buffer + */ + public static ByteBuffer copy(ByteBuffer buffer) + { + if (buffer == null) + return null; + int p = buffer.position(); + ByteBuffer clone = buffer.isDirect() ? ByteBuffer.allocateDirect(buffer.remaining()) : ByteBuffer.allocate(buffer.remaining()); + clone.put(buffer); + clone.flip(); + buffer.position(p); + return clone; + } + /** * Clear the buffer to be empty in flush mode. * The position and limit are set to 0; diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxy.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxy.java new file mode 100644 index 000000000000..4ea59266fde1 --- /dev/null +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxy.java @@ -0,0 +1,329 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests.proxy; + +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.StringUtil; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.StatusCode; +import org.eclipse.jetty.websocket.api.WebSocketConnectionListener; +import org.eclipse.jetty.websocket.api.WebSocketException; +import org.eclipse.jetty.websocket.api.WebSocketPartialListener; +import org.eclipse.jetty.websocket.api.WebSocketPingPongListener; +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; +import org.eclipse.jetty.websocket.client.WebSocketClient; + +public class WebSocketProxy +{ + private static final Logger LOG = Log.getLogger(WebSocketProxy.class); + + private final WebSocketClient client; + private final URI serverUri; + private final ClientToProxy clientToProxy = new ClientToProxy(); + private final ProxyToServer proxyToServer = new ProxyToServer(); + + public WebSocketProxy(WebSocketClient webSocketClient, URI serverUri) + { + this.client = webSocketClient; + this.serverUri = serverUri; + } + + public WebSocketConnectionListener getWebSocketConnectionListener() + { + return clientToProxy; + } + + public boolean awaitClose(long timeout) + { + try + { + if (!clientToProxy.closeLatch.await(timeout, TimeUnit.MILLISECONDS)) + return false; + if (proxyToServer.getSession() == null) + return true; + return proxyToServer.closeLatch.await(timeout, TimeUnit.MILLISECONDS); + } + catch (Exception e) + { + return false; + } + } + + public class ClientToProxy implements WebSocketPartialListener, WebSocketPingPongListener + { + private volatile Session session; + private final CountDownLatch closeLatch = new CountDownLatch(1); + private final AtomicInteger pingsReceived = new AtomicInteger(); + + public Session getSession() + { + return session; + } + + public void fail(Throwable failure) + { + session.close(StatusCode.SERVER_ERROR, failure.getMessage()); + } + + @Override + public void onWebSocketConnect(Session session) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketConnect({})", getClass().getSimpleName(), session); + + Future connect = null; + try + { + this.session = session; + ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest(); + upgradeRequest.setSubProtocols(session.getUpgradeRequest().getSubProtocols()); + upgradeRequest.setExtensions(session.getUpgradeRequest().getExtensions()); + connect = client.connect(proxyToServer, serverUri, upgradeRequest); + + //This is blocking as we really want the client to be connected before receiving any messages. + connect.get(); + } + catch (Exception e) + { + if (connect != null) + connect.cancel(true); + throw new WebSocketException(e); + } + } + + @Override + public void onWebSocketPartialBinary(ByteBuffer payload, boolean fin) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketPartialBinary({}, {})", getClass().getSimpleName(), BufferUtil.toDetailString(payload), fin); + + try + { + proxyToServer.getSession().getRemote().sendPartialBytes(payload, fin); + } + catch (Exception e) + { + throw new WebSocketException(e); + } + } + + @Override + public void onWebSocketPartialText(String payload, boolean fin) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketPartialText({}, {})", getClass().getSimpleName(), StringUtil.truncate(payload, 100), fin); + + try + { + proxyToServer.getSession().getRemote().sendPartialString(payload, fin); + } + catch (Exception e) + { + throw new WebSocketException(e); + } + } + + @Override + public void onWebSocketPing(ByteBuffer payload) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketPing({})", getClass().getSimpleName(), BufferUtil.toDetailString(payload)); + + try + { + // The implementation automatically sends pong response. + pingsReceived.incrementAndGet(); + proxyToServer.getSession().getRemote().sendPing(BufferUtil.copy(payload)); + } + catch (Exception e) + { + throw new WebSocketException(e); + } + } + + @Override + public void onWebSocketPong(ByteBuffer payload) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketPong({})", getClass().getSimpleName(), BufferUtil.toDetailString(payload)); + + try + { + // If we have sent out a ping then we have already responded with automatic pong. + // If this is an unsolicited pong we still need to forward it to the server. + int valueBeforeUpdate = pingsReceived.getAndUpdate(i -> i > 0 ? i - 1 : i); + if (valueBeforeUpdate == 0) + proxyToServer.getSession().getRemote().sendPong(BufferUtil.copy(payload)); + } + catch (Exception e) + { + throw new WebSocketException(e); + } + } + + @Override + public void onWebSocketError(Throwable cause) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketError()", getClass().getSimpleName(), cause); + + proxyToServer.fail(cause); + } + + @Override + public void onWebSocketClose(int statusCode, String reason) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketClose({} {})", getClass().getSimpleName(), statusCode, reason); + + // Session may be null if connection to the server failed. + Session session = proxyToServer.getSession(); + if (session != null) + session.close(statusCode, reason); + closeLatch.countDown(); + } + } + + public class ProxyToServer implements WebSocketPartialListener, WebSocketPingPongListener + { + private volatile Session session; + private final CountDownLatch closeLatch = new CountDownLatch(1); + private final AtomicInteger pingsReceived = new AtomicInteger(); + + public Session getSession() + { + return session; + } + + public void fail(Throwable failure) + { + // Only ProxyToServer can be failed before it is opened (if ClientToProxy fails before the connect completes). + Session session = this.session; + if (session != null) + session.close(StatusCode.SERVER_ERROR, failure.getMessage()); + } + + @Override + public void onWebSocketConnect(Session session) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketConnect({})", getClass().getSimpleName(), session); + + this.session = session; + } + + @Override + public void onWebSocketPartialBinary(ByteBuffer payload, boolean fin) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketPartialBinary({}, {})", getClass().getSimpleName(), BufferUtil.toDetailString(payload), fin); + + try + { + clientToProxy.getSession().getRemote().sendPartialBytes(payload, fin); + } + catch (Exception e) + { + throw new WebSocketException(e); + } + } + + @Override + public void onWebSocketPartialText(String payload, boolean fin) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketPartialText({}, {})", getClass().getSimpleName(), StringUtil.truncate(payload, 100), fin); + + try + { + clientToProxy.getSession().getRemote().sendPartialString(payload, fin); + } + catch (Exception e) + { + throw new WebSocketException(e); + } + } + + @Override + public void onWebSocketPing(ByteBuffer payload) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketPing({})", getClass().getSimpleName(), BufferUtil.toDetailString(payload)); + + try + { + // The implementation automatically sends pong response. + pingsReceived.incrementAndGet(); + clientToProxy.getSession().getRemote().sendPing(BufferUtil.copy(payload)); + } + catch (Exception e) + { + throw new WebSocketException(e); + } + } + + @Override + public void onWebSocketPong(ByteBuffer payload) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketPong({})", getClass().getSimpleName(), BufferUtil.toDetailString(payload)); + + try + { + // If we have sent out a ping then we have already responded with automatic pong. + // If this is an unsolicited pong we still need to forward it to the client. + int valueBeforeUpdate = pingsReceived.getAndUpdate(i -> i > 0 ? i - 1 : i); + if (valueBeforeUpdate == 0) + clientToProxy.getSession().getRemote().sendPong(BufferUtil.copy(payload)); + } + catch (Exception e) + { + throw new WebSocketException(e); + } + } + + @Override + public void onWebSocketError(Throwable cause) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketError()", getClass().getSimpleName(), cause); + + clientToProxy.fail(cause); + } + + @Override + public void onWebSocketClose(int statusCode, String reason) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} onWebSocketClose({} {})", getClass().getSimpleName(), statusCode, reason); + + clientToProxy.getSession().close(statusCode, reason); + closeLatch.countDown(); + } + } +} diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxyTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxyTest.java new file mode 100644 index 000000000000..b542583eeab2 --- /dev/null +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxyTest.java @@ -0,0 +1,370 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests.proxy; + +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.server.HttpChannel; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.util.BlockingArrayQueue; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.log.StacklessLogging; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.StatusCode; +import org.eclipse.jetty.websocket.api.WebSocketException; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketFrame; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.eclipse.jetty.websocket.api.extensions.Frame; +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.eclipse.jetty.websocket.common.OpCode; +import org.eclipse.jetty.websocket.server.NativeWebSocketServletContainerInitializer; +import org.eclipse.jetty.websocket.server.WebSocketUpgradeFilter; +import org.eclipse.jetty.websocket.tests.EchoSocket; +import org.eclipse.jetty.websocket.tests.EventSocket; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class WebSocketProxyTest +{ + private static final int PORT = 49998; + + private Server server; + private EventSocket serverSocket; + private WebSocketProxy webSocketProxy; + private WebSocketClient client; + private URI proxyUri; + + @BeforeEach + public void before() throws Exception + { + server = new Server(); + ServerConnector connector = new ServerConnector(server); + connector.setPort(PORT); + server.addConnector(connector); + + client = new WebSocketClient(); + client.start(); + proxyUri = URI.create("ws://localhost:" + PORT + "/proxy"); + URI echoUri = URI.create("ws://localhost:" + PORT + "/echo"); + webSocketProxy = new WebSocketProxy(client, echoUri); + + ServletContextHandler contextHandler = new ServletContextHandler(); + WebSocketUpgradeFilter.configure(contextHandler); + serverSocket = new EchoSocket(); + NativeWebSocketServletContainerInitializer.configure(contextHandler, ((context, container) -> + { + container.addMapping("/proxy", (req, resp) -> webSocketProxy.getWebSocketConnectionListener()); + container.addMapping("/echo", (req, resp) -> + { + if (req.hasSubProtocol("fail")) + throw new WebSocketException("failing during upgrade"); + return serverSocket; + }); + })); + + server.setHandler(contextHandler); + server.start(); + } + + @AfterEach + public void after() throws Exception + { + client.stop(); + server.stop(); + } + + @Test + public void testEcho() throws Exception + { + EventSocket clientSocket = new EventSocket(); + client.connect(clientSocket, proxyUri); + assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS)); + + // Test an echo spread across multiple frames. + clientSocket.session.getRemote().sendPartialString("hell", false); + clientSocket.session.getRemote().sendPartialString("o w", false); + clientSocket.session.getRemote().sendPartialString("orld", false); + clientSocket.session.getRemote().sendPartialString("!", true); + String response = clientSocket.textMessages.poll(5, TimeUnit.SECONDS); + assertThat(response, is("hello world!")); + + // Test we closed successfully on the client side. + clientSocket.session.close(StatusCode.NORMAL, "test initiated close"); + assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(clientSocket.closeCode, is(StatusCode.NORMAL)); + assertThat(clientSocket.closeReason, is("test initiated close")); + + // Test we closed successfully on the server side. + assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(serverSocket.closeCode, is(StatusCode.NORMAL)); + assertThat(serverSocket.closeReason, is("test initiated close")); + + // No errors occurred. + assertNull(clientSocket.error); + assertNull(serverSocket.error); + + // WebSocketProxy has been completely closed. + assertTrue(webSocketProxy.awaitClose(5000)); + } + + @Test + public void testFailServerUpgrade() throws Exception + { + EventSocket clientSocket = new EventSocket(); + ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest(); + upgradeRequest.setSubProtocols("fail"); + + try (StacklessLogging ignored = new StacklessLogging(HttpChannel.class)) + { + client.connect(clientSocket, proxyUri, upgradeRequest); + assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS)); + } + + // WebSocketProxy has been completely closed. + assertTrue(webSocketProxy.awaitClose(5000)); + } + + @Test + public void testClientError() throws Exception + { + EventSocket clientSocket = new OnOpenThrowingSocket(); + client.connect(clientSocket, proxyUri); + assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS)); + + // Verify expected client close. + assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(clientSocket.closeCode, is(StatusCode.NO_CLOSE)); + assertThat(clientSocket.closeReason, is("simulated onOpen error")); + assertNotNull(clientSocket.error); + + // Verify expected server close. + assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(serverSocket.closeCode, is(StatusCode.NO_CLOSE)); + assertThat(serverSocket.closeReason, is("Disconnected")); + assertNull(serverSocket.error); + + // WebSocketProxy has been completely closed. + assertTrue(webSocketProxy.awaitClose(5000)); + } + + @Test + public void testServerError() throws Exception + { + serverSocket = new OnOpenThrowingSocket(); + + EventSocket clientSocket = new EventSocket(); + client.connect(clientSocket, proxyUri); + assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS)); + + // Verify expected client close. + assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(clientSocket.closeCode, is(StatusCode.SERVER_ERROR)); + assertThat(clientSocket.closeReason, is("simulated onOpen error")); + assertNull(clientSocket.error); + + // Verify expected server close. + assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(serverSocket.closeCode, is(StatusCode.SERVER_ERROR)); + assertThat(serverSocket.closeReason, is("simulated onOpen error")); + assertNotNull(serverSocket.error); + + // WebSocketProxy has been completely closed. + assertTrue(webSocketProxy.awaitClose(5000)); + } + + @Test + public void testServerThrowsOnMessage() throws Exception + { + serverSocket = new OnTextThrowingSocket(); + + EventSocket clientSocket = new EventSocket(); + client.connect(clientSocket, proxyUri); + assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS)); + assertTrue(serverSocket.openLatch.await(5, TimeUnit.SECONDS)); + + clientSocket.session.getRemote().sendString("hello world!"); + + // Verify expected client close. + assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(clientSocket.closeCode, is(StatusCode.SERVER_ERROR)); + assertThat(clientSocket.closeReason, is("simulated onMessage error")); + assertNull(clientSocket.error); + + // Verify expected server close. + assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(serverSocket.closeCode, is(StatusCode.SERVER_ERROR)); + assertThat(serverSocket.closeReason, is("simulated onMessage error")); + assertNotNull(serverSocket.error); + + assertNull(clientSocket.textMessages.poll(1, TimeUnit.SECONDS)); + assertTrue(webSocketProxy.awaitClose(5000)); + } + + @Test + public void timeoutTest() throws Exception + { + long clientSessionIdleTimeout = 2000; + + EventSocket clientSocket = new EventSocket(); + client.connect(clientSocket, proxyUri); + assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS)); + assertTrue(serverSocket.openLatch.await(5, TimeUnit.SECONDS)); + + // Configure infinite idleTimeout on the server session and short timeout on the client session. + clientSocket.session.setIdleTimeout(clientSessionIdleTimeout); + serverSocket.session.setIdleTimeout(-1); + + // Send and receive an echo message. + clientSocket.session.getRemote().sendString("test echo message"); + assertThat(clientSocket.textMessages.poll(clientSessionIdleTimeout, TimeUnit.SECONDS), is("test echo message")); + + // Wait more than the idleTimeout period, the clientToProxy connection should fail which should fail the proxyToServer. + assertTrue(clientSocket.closeLatch.await(clientSessionIdleTimeout * 2, TimeUnit.MILLISECONDS)); + assertTrue(serverSocket.closeLatch.await(clientSessionIdleTimeout * 2, TimeUnit.MILLISECONDS)); + + // Check errors and close status. + assertThat(clientSocket.error.getMessage(), containsString("Idle timeout expired")); + assertThat(clientSocket.closeCode, is(StatusCode.SHUTDOWN)); + assertThat(clientSocket.closeReason, containsString("Idle timeout expired")); + assertNull(serverSocket.error); + assertThat(serverSocket.closeCode, is(StatusCode.SHUTDOWN)); + assertThat(serverSocket.closeReason, containsString("Idle timeout expired")); + } + + @Test + public void testPingPong() throws Exception + { + PingPongSocket serverEndpoint = new PingPongSocket(); + serverSocket = serverEndpoint; + + PingPongSocket clientSocket = new PingPongSocket(); + client.connect(clientSocket, proxyUri); + assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS)); + assertTrue(serverSocket.openLatch.await(5, TimeUnit.SECONDS)); + + // Test unsolicited pong from client. + clientSocket.session.getRemote().sendPong(BufferUtil.toBuffer("unsolicited pong from client")); + assertThat(serverEndpoint.pingMessages.size(), is(0)); + assertThat(serverEndpoint.pongMessages.poll(5, TimeUnit.SECONDS), is(BufferUtil.toBuffer("unsolicited pong from client"))); + + // Test unsolicited pong from server. + serverEndpoint.session.getRemote().sendPong(BufferUtil.toBuffer("unsolicited pong from server")); + assertThat(clientSocket.pingMessages.size(), is(0)); + assertThat(clientSocket.pongMessages.poll(5, TimeUnit.SECONDS), is(BufferUtil.toBuffer("unsolicited pong from server"))); + + // Test pings from client. + for (int i = 0; i < 10; i++) + clientSocket.session.getRemote().sendPing(BufferUtil.toBuffer(i)); + for (int i = 0; i < 10; i++) + { + assertThat(serverEndpoint.pingMessages.poll(5, TimeUnit.SECONDS), is(BufferUtil.toBuffer(i))); + assertThat(clientSocket.pongMessages.poll(5, TimeUnit.SECONDS), is(BufferUtil.toBuffer(i))); + } + + // Test pings from server. + for (int i = 0; i < 10; i++) + serverEndpoint.session.getRemote().sendPing(BufferUtil.toBuffer(i)); + for (int i = 0; i < 10; i++) + { + assertThat(clientSocket.pingMessages.poll(5, TimeUnit.SECONDS), is(BufferUtil.toBuffer(i))); + assertThat(serverEndpoint.pongMessages.poll(5, TimeUnit.SECONDS), is(BufferUtil.toBuffer(i))); + } + + clientSocket.session.close(StatusCode.NORMAL, "closing from test"); + + // Verify expected client close. + assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(clientSocket.closeCode, is(StatusCode.NORMAL)); + assertThat(clientSocket.closeReason, is("closing from test")); + assertNull(clientSocket.error); + + // Verify expected server close. + assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(serverSocket.closeCode, is(StatusCode.NORMAL)); + assertThat(serverSocket.closeReason, is("closing from test")); + assertNull(serverSocket.error); + + // WebSocketProxy has been completely closed. + assertTrue(webSocketProxy.awaitClose(5000)); + + // Check we had no unexpected pings or pongs sent. + assertThat(clientSocket.pingMessages.size(), is(0)); + assertThat(serverEndpoint.pingMessages.size(), is(0)); + } + + @WebSocket + public static class PingPongSocket extends EventSocket + { + public BlockingQueue pingMessages = new BlockingArrayQueue<>(); + public BlockingQueue pongMessages = new BlockingArrayQueue<>(); + + @OnWebSocketFrame + public void onWebSocketFrame(Frame frame) + { + switch (frame.getOpCode()) + { + case OpCode.PING: + pingMessages.add(BufferUtil.copy(frame.getPayload())); + break; + case OpCode.PONG: + pongMessages.add(BufferUtil.copy(frame.getPayload())); + break; + default: + break; + } + } + } + + @WebSocket + public static class OnOpenThrowingSocket extends EventSocket + { + @Override + public void onOpen(Session session) + { + super.onOpen(session); + throw new IllegalStateException("simulated onOpen error"); + } + } + + @WebSocket + public static class OnTextThrowingSocket extends EventSocket + { + @Override + public void onMessage(String message) throws IOException + { + super.onMessage(message); + throw new IllegalStateException("simulated onMessage error"); + } + } +} diff --git a/jetty-websocket/jetty-websocket-tests/src/test/resources/jetty-logging.properties b/jetty-websocket/jetty-websocket-tests/src/test/resources/jetty-logging.properties index a429c612f5e1..8564d5228c00 100644 --- a/jetty-websocket/jetty-websocket-tests/src/test/resources/jetty-logging.properties +++ b/jetty-websocket/jetty-websocket-tests/src/test/resources/jetty-logging.properties @@ -12,6 +12,7 @@ org.eclipse.jetty.LEVEL=INFO # org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.LEVEL=DEBUG # org.eclipse.jetty.websocket.common.io.IOState.LEVEL=DEBUG # org.eclipse.jetty.websocket.common.test.LEVEL=DEBUG +# org.eclipse.jetty.websocket.tests.proxy.LEVEL=DEBUG # org.eclipse.jetty.websocket.common.Generator.LEVEL=DEBUG # org.eclipse.jetty.websocket.common.Parser.LEVEL=DEBUG