Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #5170 - fix NPE during websocket upgrade #5213

Merged
merged 3 commits into from Sep 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -141,6 +141,7 @@ protected ByteBuffer onUpgradeFrom()
BufferUtil.flipToFlush(upgradeBuffer, 0);
return upgradeBuffer;
}
releaseNetworkBuffer();
return null;
}

Expand All @@ -160,12 +161,11 @@ private void process()
return;
}

// Connection may be closed or upgraded in a parser callback.
boolean upgraded = connection != endPoint.getConnection();
if (connection.isClosed() || upgraded)
// Connection may be closed in a parser callback.
if (connection.isClosed())
{
if (LOG.isDebugEnabled())
LOG.debug("{} {}", upgraded ? "Upgraded" : "Closed", connection);
LOG.debug("Closed {}", connection);
releaseNetworkBuffer();
return;
}
Expand Down Expand Up @@ -235,6 +235,10 @@ private boolean parse()

if (complete)
{
HttpExchange httpExchange = getHttpExchange();
if (httpExchange != null && httpExchange.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
gregw marked this conversation as resolved.
Show resolved Hide resolved
return true;

if (LOG.isDebugEnabled())
LOG.debug("Discarding unexpected content after response: {}", networkBuffer);
networkBuffer.clear();
Expand Down
Expand Up @@ -469,9 +469,14 @@ private void fillAndParse()
}
catch (Throwable t)
{
LOG.warn(t.toString());
BufferUtil.clear(networkBuffer.getBuffer());
releaseNetworkBuffer();
if (LOG.isDebugEnabled())
LOG.debug("Error during fillAndParse()", t);

if (networkBuffer != null)
{
BufferUtil.clear(networkBuffer.getBuffer());
releaseNetworkBuffer();
}
sbordet marked this conversation as resolved.
Show resolved Hide resolved
coreSession.processConnectionError(t, Callback.NOOP);
}
}
Expand Down
Expand Up @@ -34,6 +34,7 @@ public class TestMessageHandler extends MessageHandler
public CoreSession coreSession;
public BlockingQueue<String> textMessages = new BlockingArrayQueue<>();
public BlockingQueue<ByteBuffer> binaryMessages = new BlockingArrayQueue<>();
public CloseStatus closeStatus;
public volatile Throwable error;
public CountDownLatch openLatch = new CountDownLatch(1);
public CountDownLatch errorLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -73,6 +74,7 @@ public void onClosed(CloseStatus closeStatus, Callback callback)
if (LOG.isDebugEnabled())
LOG.debug("onClosed {}", closeStatus);
super.onClosed(closeStatus, callback);
this.closeStatus = closeStatus;
closeLatch.countDown();
}

Expand All @@ -82,6 +84,7 @@ protected void onText(String message, Callback callback)
if (LOG.isDebugEnabled())
LOG.debug("onText {}", message);
textMessages.offer(message);
callback.succeeded();
}

@Override
Expand All @@ -90,5 +93,6 @@ protected void onBinary(ByteBuffer message, Callback callback)
if (LOG.isDebugEnabled())
LOG.debug("onBinary {}", message);
binaryMessages.offer(message);
callback.succeeded();
}
}
@@ -0,0 +1,134 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//

package org.eclipse.jetty.websocket.core;

import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
import org.eclipse.jetty.websocket.core.internal.Generator;
import org.eclipse.jetty.websocket.core.internal.WebSocketCore;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class UpgradeWithLeftOverHttpBytesTest extends WebSocketTester
{
private ServerSocket server;
private URI serverUri;
private WebSocketCoreClient client;
private final Generator generator = new Generator();

@BeforeEach
public void start() throws Exception
{
client = new WebSocketCoreClient();
client.getHttpClient().setIdleTimeout(5000);
client.start();
server = new ServerSocket(0);
serverUri = URI.create("ws://localhost:" + server.getLocalPort());
}

@AfterEach
public void stop() throws Exception
{
client.stop();
server.close();
}

@Test
public void testUpgradeWithLeftOverHttpBytes() throws Exception
{
TestMessageHandler clientEndpoint = new TestMessageHandler();
CompletableFuture<CoreSession> clientConnect = client.connect(clientEndpoint, serverUri);
Socket serverSocket = server.accept();

String upgradeRequest = getRequestHeaders(serverSocket.getInputStream());
assertThat(upgradeRequest, containsString("HTTP/1.1"));
assertThat(upgradeRequest, containsString("Upgrade: websocket"));

// Send upgrade response in the same write as two websocket frames.
String upgradeResponse = "HTTP/1.1 101 Switching Protocols\n" +
"Upgrade: WebSocket\n" +
"Connection: Upgrade\n" +
"Sec-WebSocket-Accept: " + getAcceptKey(upgradeRequest) + "\n" +
"\n";

Frame dataFrame = new Frame(OpCode.TEXT, BufferUtil.toBuffer("first message payload"));
Frame closeFrame = new CloseStatus(CloseStatus.NORMAL, "closed by test").toFrame();

ByteArrayOutputStream baos = new ByteArrayOutputStream();
baos.write(upgradeResponse.getBytes(StandardCharsets.ISO_8859_1));
BufferUtil.writeTo(generateFrame(dataFrame), baos);
BufferUtil.writeTo(generateFrame(closeFrame), baos);
serverSocket.getOutputStream().write(baos.toByteArray());

// Check the client receives upgrade response and then the two websocket frames.
CoreSession coreSession = clientConnect.get(5, TimeUnit.SECONDS);
assertNotNull(coreSession);
assertTrue(clientEndpoint.openLatch.await(5, TimeUnit.SECONDS));
assertThat(clientEndpoint.textMessages.poll(5, TimeUnit.SECONDS), is("first message payload"));
assertTrue(clientEndpoint.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(clientEndpoint.closeStatus.getCode(), is(CloseStatus.NORMAL));
assertThat(clientEndpoint.closeStatus.getReason(), is("closed by test"));
}

public ByteBuffer generateFrame(Frame frame)
{
int size = Generator.MAX_HEADER_LENGTH + frame.getPayloadLength();
ByteBuffer buffer = BufferUtil.allocate(size);
generator.generateWholeFrame(frame, buffer);
return buffer;
}

String getAcceptKey(String upgradeRequest)
{
Matcher matcher = Pattern.compile(".*Sec-WebSocket-Key: ([^\n\r]+)\r?\n.*", Pattern.DOTALL | Pattern.MULTILINE)
.matcher(upgradeRequest);
assertTrue(matcher.matches());
String key = matcher.group(1);
assertFalse(StringUtil.isEmpty(key));
return WebSocketCore.hashKey(key);
}

static String getRequestHeaders(InputStream is)
{
Scanner s = new Scanner(is).useDelimiter("\r\n\r\n");
return s.hasNext() ? s.next() : "";
}
}