Skip to content

Commit

Permalink
Merge pull request #5213 from eclipse/jetty-10.0.x-5170-HttpUpgradeBu…
Browse files Browse the repository at this point in the history
…ffer

Issue #5170 - fix NPE during websocket upgrade
  • Loading branch information
lachlan-roberts committed Sep 2, 2020
2 parents 37f93bb + 5bdcea2 commit cdb12b4
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 7 deletions.
Expand Up @@ -141,6 +141,7 @@ protected ByteBuffer onUpgradeFrom()
BufferUtil.flipToFlush(upgradeBuffer, 0);
return upgradeBuffer;
}
releaseNetworkBuffer();
return null;
}

Expand All @@ -160,12 +161,11 @@ private void process()
return;
}

// Connection may be closed or upgraded in a parser callback.
boolean upgraded = connection != endPoint.getConnection();
if (connection.isClosed() || upgraded)
// Connection may be closed in a parser callback.
if (connection.isClosed())
{
if (LOG.isDebugEnabled())
LOG.debug("{} {}", upgraded ? "Upgraded" : "Closed", connection);
LOG.debug("Closed {}", connection);
releaseNetworkBuffer();
return;
}
Expand Down Expand Up @@ -235,6 +235,10 @@ private boolean parse()

if (complete)
{
HttpExchange httpExchange = getHttpExchange();
if (httpExchange != null && httpExchange.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
return true;

if (LOG.isDebugEnabled())
LOG.debug("Discarding unexpected content after response: {}", networkBuffer);
networkBuffer.clear();
Expand Down
Expand Up @@ -469,9 +469,14 @@ private void fillAndParse()
}
catch (Throwable t)
{
LOG.warn(t.toString());
BufferUtil.clear(networkBuffer.getBuffer());
releaseNetworkBuffer();
if (LOG.isDebugEnabled())
LOG.debug("Error during fillAndParse()", t);

if (networkBuffer != null)
{
BufferUtil.clear(networkBuffer.getBuffer());
releaseNetworkBuffer();
}
coreSession.processConnectionError(t, Callback.NOOP);
}
}
Expand Down
Expand Up @@ -34,6 +34,7 @@ public class TestMessageHandler extends MessageHandler
public CoreSession coreSession;
public BlockingQueue<String> textMessages = new BlockingArrayQueue<>();
public BlockingQueue<ByteBuffer> binaryMessages = new BlockingArrayQueue<>();
public CloseStatus closeStatus;
public volatile Throwable error;
public CountDownLatch openLatch = new CountDownLatch(1);
public CountDownLatch errorLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -73,6 +74,7 @@ public void onClosed(CloseStatus closeStatus, Callback callback)
if (LOG.isDebugEnabled())
LOG.debug("onClosed {}", closeStatus);
super.onClosed(closeStatus, callback);
this.closeStatus = closeStatus;
closeLatch.countDown();
}

Expand All @@ -82,6 +84,7 @@ protected void onText(String message, Callback callback)
if (LOG.isDebugEnabled())
LOG.debug("onText {}", message);
textMessages.offer(message);
callback.succeeded();
}

@Override
Expand All @@ -90,5 +93,6 @@ protected void onBinary(ByteBuffer message, Callback callback)
if (LOG.isDebugEnabled())
LOG.debug("onBinary {}", message);
binaryMessages.offer(message);
callback.succeeded();
}
}
@@ -0,0 +1,134 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//

package org.eclipse.jetty.websocket.core;

import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
import org.eclipse.jetty.websocket.core.internal.Generator;
import org.eclipse.jetty.websocket.core.internal.WebSocketCore;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class UpgradeWithLeftOverHttpBytesTest extends WebSocketTester
{
private ServerSocket server;
private URI serverUri;
private WebSocketCoreClient client;
private final Generator generator = new Generator();

@BeforeEach
public void start() throws Exception
{
client = new WebSocketCoreClient();
client.getHttpClient().setIdleTimeout(5000);
client.start();
server = new ServerSocket(0);
serverUri = URI.create("ws://localhost:" + server.getLocalPort());
}

@AfterEach
public void stop() throws Exception
{
client.stop();
server.close();
}

@Test
public void testUpgradeWithLeftOverHttpBytes() throws Exception
{
TestMessageHandler clientEndpoint = new TestMessageHandler();
CompletableFuture<CoreSession> clientConnect = client.connect(clientEndpoint, serverUri);
Socket serverSocket = server.accept();

String upgradeRequest = getRequestHeaders(serverSocket.getInputStream());
assertThat(upgradeRequest, containsString("HTTP/1.1"));
assertThat(upgradeRequest, containsString("Upgrade: websocket"));

// Send upgrade response in the same write as two websocket frames.
String upgradeResponse = "HTTP/1.1 101 Switching Protocols\n" +
"Upgrade: WebSocket\n" +
"Connection: Upgrade\n" +
"Sec-WebSocket-Accept: " + getAcceptKey(upgradeRequest) + "\n" +
"\n";

Frame dataFrame = new Frame(OpCode.TEXT, BufferUtil.toBuffer("first message payload"));
Frame closeFrame = new CloseStatus(CloseStatus.NORMAL, "closed by test").toFrame();

ByteArrayOutputStream baos = new ByteArrayOutputStream();
baos.write(upgradeResponse.getBytes(StandardCharsets.ISO_8859_1));
BufferUtil.writeTo(generateFrame(dataFrame), baos);
BufferUtil.writeTo(generateFrame(closeFrame), baos);
serverSocket.getOutputStream().write(baos.toByteArray());

// Check the client receives upgrade response and then the two websocket frames.
CoreSession coreSession = clientConnect.get(5, TimeUnit.SECONDS);
assertNotNull(coreSession);
assertTrue(clientEndpoint.openLatch.await(5, TimeUnit.SECONDS));
assertThat(clientEndpoint.textMessages.poll(5, TimeUnit.SECONDS), is("first message payload"));
assertTrue(clientEndpoint.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(clientEndpoint.closeStatus.getCode(), is(CloseStatus.NORMAL));
assertThat(clientEndpoint.closeStatus.getReason(), is("closed by test"));
}

public ByteBuffer generateFrame(Frame frame)
{
int size = Generator.MAX_HEADER_LENGTH + frame.getPayloadLength();
ByteBuffer buffer = BufferUtil.allocate(size);
generator.generateWholeFrame(frame, buffer);
return buffer;
}

String getAcceptKey(String upgradeRequest)
{
Matcher matcher = Pattern.compile(".*Sec-WebSocket-Key: ([^\n\r]+)\r?\n.*", Pattern.DOTALL | Pattern.MULTILINE)
.matcher(upgradeRequest);
assertTrue(matcher.matches());
String key = matcher.group(1);
assertFalse(StringUtil.isEmpty(key));
return WebSocketCore.hashKey(key);
}

static String getRequestHeaders(InputStream is)
{
Scanner s = new Scanner(is).useDelimiter("\r\n\r\n");
return s.hasNext() ? s.next() : "";
}
}

0 comments on commit cdb12b4

Please sign in to comment.