From 2f3808026fa948a4bf419149b7162e9a0c41d3ff Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Tue, 22 Sep 2020 18:10:11 +1000 Subject: [PATCH 1/3] Issue #5170 - add testing to verify if this issue exists in 9.4.x Signed-off-by: Lachlan Roberts --- .../UpgradeWithLeftOverHttpBytesTest.java | 355 ++++++++++++++++++ .../websocket/client/WebSocketClient.java | 6 + 2 files changed, 361 insertions(+) create mode 100644 jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/UpgradeWithLeftOverHttpBytesTest.java diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/UpgradeWithLeftOverHttpBytesTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/UpgradeWithLeftOverHttpBytesTest.java new file mode 100644 index 000000000000..1b5d5eb414fc --- /dev/null +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/UpgradeWithLeftOverHttpBytesTest.java @@ -0,0 +1,355 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under +// the terms of the Eclipse Public License 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0 +// +// This Source Code may also be made available under the following +// Secondary Licenses when the conditions for such availability set +// forth in the Eclipse Public License, v. 2.0 are satisfied: +// the Apache License v2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Scanner; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.io.MappedByteBufferPool; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.StringUtil; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.StatusCode; +import org.eclipse.jetty.websocket.api.WebSocketPolicy; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.eclipse.jetty.websocket.api.extensions.Frame; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.eclipse.jetty.websocket.common.AcceptHash; +import org.eclipse.jetty.websocket.common.CloseInfo; +import org.eclipse.jetty.websocket.common.Generator; +import org.eclipse.jetty.websocket.common.frames.TextFrame; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class UpgradeWithLeftOverHttpBytesTest +{ + private ServerSocket server; + private URI serverUri; + private WebSocketClient client; + private final Generator generator = new Generator(WebSocketPolicy.newServerPolicy(), new MappedByteBufferPool()); + + @BeforeEach + public void start() throws Exception + { + client = new WebSocketClient(); + client.start(); + server = new ServerSocket(0); + serverUri = URI.create("ws://localhost:" + server.getLocalPort()); + } + + @AfterEach + public void stop() throws Exception + { + client.stop(); + server.close(); + } + + @WebSocket + public static class OnOpenSocket extends EventSocket + { + CountDownLatch onOpenBlocked = new CountDownLatch(1); + + @Override + public void onOpen(Session session) + { + super.onOpen(session); + assertDoesNotThrow(() -> assertTrue(onOpenBlocked.await(1, TimeUnit.MINUTES))); + } + } + + @Test + public void testRequestCompletesFirst_NoWebSocketBytesInResponse() throws Exception + { + // Initiate connection. + OnOpenSocket clientEndpoint = new OnOpenSocket(); + client.connect(clientEndpoint, serverUri); + Socket serverSocket = server.accept(); + + // Upgrade to WebSocket. + String upgradeRequest = getRequestHeaders(serverSocket.getInputStream()); + assertThat(upgradeRequest, containsString("HTTP/1.1")); + assertThat(upgradeRequest, containsString("Upgrade: websocket")); + String upgradeResponse = "HTTP/1.1 101 Switching Protocols\n" + + "Upgrade: WebSocket\n" + + "Connection: Upgrade\n" + + "Sec-WebSocket-Accept: " + getAcceptKey(upgradeRequest) + "\n" + + "\n"; + serverSocket.getOutputStream().write(upgradeResponse.getBytes(StandardCharsets.ISO_8859_1)); + + // Wait for WebSocket to be opened, wait 1 sec before allowing it to continue. + assertTrue(clientEndpoint.openLatch.await(5, TimeUnit.SECONDS)); + Thread.sleep(1000); + clientEndpoint.onOpenBlocked.countDown(); + + // Send some websocket data. + int numFrames = 1000; + for (int i = 0; i < numFrames; i++) + { + Frame frame = new TextFrame().setPayload(BufferUtil.toBuffer(Integer.toString(i))); + serverSocket.getOutputStream().write(toByteArray(frame)); + } + Frame closeFrame = new CloseInfo(StatusCode.NORMAL, "closed by test").asFrame(); + serverSocket.getOutputStream().write(toByteArray(closeFrame)); + + // We receive the data correctly. + for (int i = 0; i < numFrames; i++) + { + String msg = clientEndpoint.textMessages.poll(5, TimeUnit.SECONDS); + assertThat(msg, is(Integer.toString(i))); + } + + // Closed successfully with correct status. + assertTrue(clientEndpoint.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(clientEndpoint.closeCode, is(StatusCode.NORMAL)); + assertThat(clientEndpoint.closeReason, is("closed by test")); + } + + @Test + public void testRequestCompletesFirst_WithWebSocketBytesInResponse() throws Exception + { + // Initiate connection. + OnOpenSocket clientEndpoint = new OnOpenSocket(); + client.connect(clientEndpoint, serverUri); + Socket serverSocket = server.accept(); + + // Upgrade to WebSocket, sending first websocket frame with the upgrade response. + String upgradeRequest = getRequestHeaders(serverSocket.getInputStream()); + assertThat(upgradeRequest, containsString("HTTP/1.1")); + assertThat(upgradeRequest, containsString("Upgrade: websocket")); + String upgradeResponse = "HTTP/1.1 101 Switching Protocols\n" + + "Upgrade: WebSocket\n" + + "Connection: Upgrade\n" + + "Sec-WebSocket-Accept: " + getAcceptKey(upgradeRequest) + "\n" + + "\n"; + Frame firstFrame = new TextFrame().setPayload("first message payload"); + byte[] bytes = combineToByteArray(BufferUtil.toBuffer(upgradeResponse), generateFrame(firstFrame)); + serverSocket.getOutputStream().write(bytes); + + // Now we send the rest of the data. + int numFrames = 1000; + for (int i = 0; i < numFrames; i++) + { + Frame frame = new TextFrame().setPayload(BufferUtil.toBuffer(Integer.toString(i))); + serverSocket.getOutputStream().write(toByteArray(frame)); + } + Frame closeFrame = new CloseInfo(StatusCode.NORMAL, "closed by test").asFrame(); + serverSocket.getOutputStream().write(toByteArray(closeFrame)); + + // Wait for WebSocket to be opened, wait 1 sec before allowing it to continue. + // We delay to ensure HttpConnection is not still reading from network. + assertTrue(clientEndpoint.openLatch.await(5, TimeUnit.SECONDS)); + Thread.sleep(1000); + clientEndpoint.onOpenBlocked.countDown(); + + // We receive the data correctly. + assertThat(clientEndpoint.textMessages.poll(5, TimeUnit.SECONDS), is("first message payload")); + for (int i = 0; i < numFrames; i++) + { + String msg = clientEndpoint.textMessages.poll(5, TimeUnit.SECONDS); + assertThat(msg, is(Integer.toString(i))); + } + + // Closed successfully with correct status. + assertTrue(clientEndpoint.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(clientEndpoint.closeCode, is(StatusCode.NORMAL)); + assertThat(clientEndpoint.closeReason, is("closed by test")); + } + + @Test + public void testResponseCompletesFirst_NoWebSocketBytesInResponse() throws Exception + { + // We delay the request to finish until after the response is complete. + client.addBean(new Request.Listener() + { + @Override + public void onCommit(Request request) + { + assertDoesNotThrow(() -> Thread.sleep(1000)); + } + }); + + // Initiate connection. + OnOpenSocket clientEndpoint = new OnOpenSocket(); + client.connect(clientEndpoint, serverUri); + Socket serverSocket = server.accept(); + + // Upgrade to WebSocket. + String upgradeRequest = getRequestHeaders(serverSocket.getInputStream()); + assertThat(upgradeRequest, containsString("HTTP/1.1")); + assertThat(upgradeRequest, containsString("Upgrade: websocket")); + String upgradeResponse = "HTTP/1.1 101 Switching Protocols\n" + + "Upgrade: WebSocket\n" + + "Connection: Upgrade\n" + + "Sec-WebSocket-Accept: " + getAcceptKey(upgradeRequest) + "\n" + + "\n"; + serverSocket.getOutputStream().write(upgradeResponse.getBytes(StandardCharsets.ISO_8859_1)); + + + // Wait for WebSocket to be opened, wait 1 sec before allowing it to continue. + assertTrue(clientEndpoint.openLatch.await(5, TimeUnit.SECONDS)); + Thread.sleep(1000); + clientEndpoint.onOpenBlocked.countDown(); + + + // Send some websocket data. + int numFrames = 1000; + for (int i = 0; i < numFrames; i++) + { + Frame frame = new TextFrame().setPayload(BufferUtil.toBuffer(Integer.toString(i))); + serverSocket.getOutputStream().write(toByteArray(frame)); + } + Frame closeFrame = new CloseInfo(StatusCode.NORMAL, "closed by test").asFrame(); + serverSocket.getOutputStream().write(toByteArray(closeFrame)); + + // We receive the data correctly. + for (int i = 0; i < numFrames; i++) + { + String msg = clientEndpoint.textMessages.poll(5, TimeUnit.SECONDS); + assertThat(msg, is(Integer.toString(i))); + } + + // Closed successfully with correct status. + assertTrue(clientEndpoint.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(clientEndpoint.closeCode, is(StatusCode.NORMAL)); + assertThat(clientEndpoint.closeReason, is("closed by test")); + } + + @Test + public void testResponseCompletesFirst_WithWebSocketBytesInResponse() throws Exception + { + // We delay the request to finish until after the response is complete. + client.addBean(new Request.Listener() + { + @Override + public void onCommit(Request request) + { + assertDoesNotThrow(() -> Thread.sleep(1000)); + } + }); + + // Initiate connection. + OnOpenSocket clientEndpoint = new OnOpenSocket(); + client.connect(clientEndpoint, serverUri); + Socket serverSocket = server.accept(); + + // Upgrade to WebSocket, sending first websocket frame with the upgrade response. + String upgradeRequest = getRequestHeaders(serverSocket.getInputStream()); + assertThat(upgradeRequest, containsString("HTTP/1.1")); + assertThat(upgradeRequest, containsString("Upgrade: websocket")); + String upgradeResponse = "HTTP/1.1 101 Switching Protocols\n" + + "Upgrade: WebSocket\n" + + "Connection: Upgrade\n" + + "Sec-WebSocket-Accept: " + getAcceptKey(upgradeRequest) + "\n" + + "\n"; + Frame firstFrame = new TextFrame().setPayload("first message payload"); + byte[] bytes = combineToByteArray(BufferUtil.toBuffer(upgradeResponse), generateFrame(firstFrame)); + serverSocket.getOutputStream().write(bytes); + + // Now we send the rest of the data. + int numFrames = 1000; + for (int i = 0; i < numFrames; i++) + { + Frame frame = new TextFrame().setPayload(BufferUtil.toBuffer(Integer.toString(i))); + serverSocket.getOutputStream().write(toByteArray(frame)); + } + Frame closeFrame = new CloseInfo(StatusCode.NORMAL, "closed by test").asFrame(); + serverSocket.getOutputStream().write(toByteArray(closeFrame)); + + // Wait for WebSocket to be opened, wait 1 sec before allowing it to continue. + // We delay to ensure HttpConnection is not still reading from network. + assertTrue(clientEndpoint.openLatch.await(5, TimeUnit.SECONDS)); + Thread.sleep(1000); + clientEndpoint.onOpenBlocked.countDown(); + + // We receive the data correctly. + assertThat(clientEndpoint.textMessages.poll(5, TimeUnit.SECONDS), is("first message payload")); + for (int i = 0; i < numFrames; i++) + { + String msg = clientEndpoint.textMessages.poll(5, TimeUnit.SECONDS); + assertThat(msg, is(Integer.toString(i))); + } + + // Closed successfully with correct status. + assertTrue(clientEndpoint.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(clientEndpoint.closeCode, is(StatusCode.NORMAL)); + assertThat(clientEndpoint.closeReason, is("closed by test")); + } + + public ByteBuffer generateFrame(Frame frame) + { + int size = Generator.MAX_HEADER_LENGTH + frame.getPayloadLength(); + ByteBuffer buffer = BufferUtil.allocate(size); + int pos = BufferUtil.flipToFill(buffer); + generator.generateWholeFrame(frame, buffer); + BufferUtil.flipToFlush(buffer, pos); + return buffer; + } + + String getAcceptKey(String upgradeRequest) + { + Matcher matcher = Pattern.compile(".*Sec-WebSocket-Key: ([^\n\r]+)\r?\n.*", Pattern.DOTALL | Pattern.MULTILINE) + .matcher(upgradeRequest); + assertTrue(matcher.matches()); + String key = matcher.group(1); + assertFalse(StringUtil.isEmpty(key)); + return AcceptHash.hashKey(key); + } + + static String getRequestHeaders(InputStream is) + { + Scanner s = new Scanner(is).useDelimiter("\r\n\r\n"); + return s.hasNext() ? s.next() : ""; + } + + byte[] combineToByteArray(ByteBuffer... buffers) throws IOException + { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + for (ByteBuffer bb : buffers) + { + BufferUtil.writeTo(bb, baos); + } + + return baos.toByteArray(); + } + + byte[] toByteArray(Frame frame) + { + return BufferUtil.toArray(generateFrame(frame)); + } +} diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClient.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClient.java index f118b865c405..9043a676d557 100644 --- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClient.java +++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClient.java @@ -34,6 +34,7 @@ import java.util.function.Supplier; import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.DecoratedObjectFactory; import org.eclipse.jetty.util.StringUtil; @@ -377,6 +378,11 @@ public Future connect(Object websocket, URI toUri, ClientUpgradeRequest WebSocketUpgradeRequest wsReq = new WebSocketUpgradeRequest(this, httpClient, request); wsReq.timeout(request.getTimeout(), TimeUnit.MILLISECONDS); wsReq.setUpgradeListener(upgradeListener); + for (Request.Listener l : getBeans(Request.Listener.class)) + { + wsReq.listener(l); + } + return wsReq.sendAsync(); } From a3090a61e86de2e60e452beaf7ebc7510cebf941 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Tue, 22 Sep 2020 18:25:11 +1000 Subject: [PATCH 2/3] Issue #5170 - backport fix to websocket upgrade from jetty-10 Signed-off-by: Lachlan Roberts --- .../client/http/HttpReceiverOverHTTP.java | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java index 9dae09c1d51d..c06a609a6977 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java @@ -47,6 +47,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res private boolean shutdown; private boolean complete; private boolean unsolicited; + private int status; public HttpReceiverOverHTTP(HttpChannelOverHTTP channel) { @@ -118,15 +119,17 @@ private void releaseNetworkBuffer() protected ByteBuffer onUpgradeFrom() { + ByteBuffer upgradeBuffer = null; if (networkBuffer.hasRemaining()) { - ByteBuffer upgradeBuffer = BufferUtil.allocate(networkBuffer.remaining()); + upgradeBuffer = BufferUtil.allocate(networkBuffer.remaining()); BufferUtil.clearToFill(upgradeBuffer); BufferUtil.put(networkBuffer.getBuffer(), upgradeBuffer); BufferUtil.flipToFlush(upgradeBuffer, 0); - return upgradeBuffer; } - return null; + + releaseNetworkBuffer(); + return upgradeBuffer; } private void process() @@ -145,12 +148,11 @@ private void process() return; } - // Connection may be closed or upgraded in a parser callback. - boolean upgraded = connection != endPoint.getConnection(); - if (connection.isClosed() || upgraded) + // Connection may be closed in a parser callback. + if (connection.isClosed()) { if (LOG.isDebugEnabled()) - LOG.debug("{} {}", upgraded ? "Upgraded" : "Closed", connection); + LOG.debug("Closed {}", connection); releaseNetworkBuffer(); return; } @@ -215,6 +217,14 @@ private boolean parse() if (LOG.isDebugEnabled()) LOG.debug("Parse complete={}, remaining {} {}", complete, networkBuffer.remaining(), parser); + if (complete) + { + int status = this.status; + this.status = 0; + if (status == HttpStatus.SWITCHING_PROTOCOLS_101) + return true; + } + if (networkBuffer.isEmpty()) return false; @@ -269,6 +279,7 @@ public boolean startResponse(HttpVersion version, int status, String reason) if (exchange == null) return false; + this.status = status; String method = exchange.getRequest().getMethod(); parser.setHeadResponse(HttpMethod.HEAD.is(method) || (HttpMethod.CONNECT.is(method) && status == HttpStatus.OK_200)); From 5d951e840ea0417a2903f212909d6af03d519c09 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Tue, 22 Sep 2020 22:45:45 +1000 Subject: [PATCH 3/3] fix licence header and checkstyle Signed-off-by: Lachlan Roberts --- .../UpgradeWithLeftOverHttpBytesTest.java | 34 +++++++++---------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/UpgradeWithLeftOverHttpBytesTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/UpgradeWithLeftOverHttpBytesTest.java index 1b5d5eb414fc..db18d6ae19c6 100644 --- a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/UpgradeWithLeftOverHttpBytesTest.java +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/UpgradeWithLeftOverHttpBytesTest.java @@ -1,19 +1,19 @@ // -// ======================================================================== -// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. // -// This program and the accompanying materials are made available under -// the terms of the Eclipse Public License 2.0 which is available at -// https://www.eclipse.org/legal/epl-2.0 +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html // -// This Source Code may also be made available under the following -// Secondary Licenses when the conditions for such availability set -// forth in the Eclipse Public License, v. 2.0 are satisfied: -// the Apache License v2.0 which is available at -// https://www.apache.org/licenses/LICENSE-2.0 +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php // -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// ======================================================================== +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== // package org.eclipse.jetty.websocket.tests; @@ -94,7 +94,7 @@ public void onOpen(Session session) } @Test - public void testRequestCompletesFirst_NoWebSocketBytesInResponse() throws Exception + public void testRequestCompletesFirstNoWebSocketBytesInResponse() throws Exception { // Initiate connection. OnOpenSocket clientEndpoint = new OnOpenSocket(); @@ -141,7 +141,7 @@ public void testRequestCompletesFirst_NoWebSocketBytesInResponse() throws Except } @Test - public void testRequestCompletesFirst_WithWebSocketBytesInResponse() throws Exception + public void testRequestCompletesFirstWithWebSocketBytesInResponse() throws Exception { // Initiate connection. OnOpenSocket clientEndpoint = new OnOpenSocket(); @@ -192,7 +192,7 @@ public void testRequestCompletesFirst_WithWebSocketBytesInResponse() throws Exce } @Test - public void testResponseCompletesFirst_NoWebSocketBytesInResponse() throws Exception + public void testResponseCompletesFirstNoWebSocketBytesInResponse() throws Exception { // We delay the request to finish until after the response is complete. client.addBean(new Request.Listener() @@ -220,13 +220,11 @@ public void onCommit(Request request) "\n"; serverSocket.getOutputStream().write(upgradeResponse.getBytes(StandardCharsets.ISO_8859_1)); - // Wait for WebSocket to be opened, wait 1 sec before allowing it to continue. assertTrue(clientEndpoint.openLatch.await(5, TimeUnit.SECONDS)); Thread.sleep(1000); clientEndpoint.onOpenBlocked.countDown(); - // Send some websocket data. int numFrames = 1000; for (int i = 0; i < numFrames; i++) @@ -251,7 +249,7 @@ public void onCommit(Request request) } @Test - public void testResponseCompletesFirst_WithWebSocketBytesInResponse() throws Exception + public void testResponseCompletesFirstWithWebSocketBytesInResponse() throws Exception { // We delay the request to finish until after the response is complete. client.addBean(new Request.Listener()