Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement and test a WebSocket Proxy with the 9.4 Jetty API #5726

Merged
merged 5 commits into from Nov 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 18 additions & 0 deletions jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java
Expand Up @@ -135,6 +135,24 @@ public static ByteBuffer allocateDirect(int capacity)
return buf;
}

/**
* Deep copy of a buffer
*
* @param buffer The buffer to copy
* @return A copy of the buffer
*/
public static ByteBuffer copy(ByteBuffer buffer)
{
if (buffer == null)
return null;
int p = buffer.position();
ByteBuffer clone = buffer.isDirect() ? ByteBuffer.allocateDirect(buffer.remaining()) : ByteBuffer.allocate(buffer.remaining());
clone.put(buffer);
clone.flip();
buffer.position(p);
return clone;
}

/**
* Clear the buffer to be empty in flush mode.
* The position and limit are set to 0;
Expand Down
@@ -0,0 +1,329 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//

package org.eclipse.jetty.websocket.tests.proxy;

import java.net.URI;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketConnectionListener;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPartialListener;
import org.eclipse.jetty.websocket.api.WebSocketPingPongListener;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;

public class WebSocketProxy
{
private static final Logger LOG = Log.getLogger(WebSocketProxy.class);

private final WebSocketClient client;
private final URI serverUri;
private final ClientToProxy clientToProxy = new ClientToProxy();
private final ProxyToServer proxyToServer = new ProxyToServer();

public WebSocketProxy(WebSocketClient webSocketClient, URI serverUri)
{
this.client = webSocketClient;
this.serverUri = serverUri;
}

public WebSocketConnectionListener getWebSocketConnectionListener()
{
return clientToProxy;
}

public boolean awaitClose(long timeout)
{
try
{
if (!clientToProxy.closeLatch.await(timeout, TimeUnit.MILLISECONDS))
return false;
if (proxyToServer.getSession() == null)
return true;
return proxyToServer.closeLatch.await(timeout, TimeUnit.MILLISECONDS);
}
catch (Exception e)
{
return false;
}
}

public class ClientToProxy implements WebSocketPartialListener, WebSocketPingPongListener
{
private volatile Session session;
private final CountDownLatch closeLatch = new CountDownLatch(1);
private final AtomicInteger pingsReceived = new AtomicInteger();

public Session getSession()
{
return session;
}

public void fail(Throwable failure)
{
session.close(StatusCode.SERVER_ERROR, failure.getMessage());
}

@Override
public void onWebSocketConnect(Session session)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onWebSocketConnect({})", getClass().getSimpleName(), session);

Future<Session> connect = null;
try
{
this.session = session;
ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest();
upgradeRequest.setSubProtocols(session.getUpgradeRequest().getSubProtocols());
upgradeRequest.setExtensions(session.getUpgradeRequest().getExtensions());
connect = client.connect(proxyToServer, serverUri, upgradeRequest);

//This is blocking as we really want the client to be connected before receiving any messages.
connect.get();
}
catch (Exception e)
{
if (connect != null)
connect.cancel(true);
throw new WebSocketException(e);
}
}

@Override
public void onWebSocketPartialBinary(ByteBuffer payload, boolean fin)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onWebSocketPartialBinary({}, {})", getClass().getSimpleName(), BufferUtil.toDetailString(payload), fin);

try
{
proxyToServer.getSession().getRemote().sendPartialBytes(payload, fin);
}
catch (Exception e)
{
throw new WebSocketException(e);
}
}

@Override
public void onWebSocketPartialText(String payload, boolean fin)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onWebSocketPartialText({}, {})", getClass().getSimpleName(), StringUtil.truncate(payload, 100), fin);

try
{
proxyToServer.getSession().getRemote().sendPartialString(payload, fin);
}
catch (Exception e)
{
throw new WebSocketException(e);
}
}

@Override
public void onWebSocketPing(ByteBuffer payload)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onWebSocketPing({})", getClass().getSimpleName(), BufferUtil.toDetailString(payload));

try
{
// The implementation automatically sends pong response.
pingsReceived.incrementAndGet();
proxyToServer.getSession().getRemote().sendPing(BufferUtil.copy(payload));
}
catch (Exception e)
{
throw new WebSocketException(e);
}
}

@Override
public void onWebSocketPong(ByteBuffer payload)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onWebSocketPong({})", getClass().getSimpleName(), BufferUtil.toDetailString(payload));

try
{
// If we have sent out a ping then we have already responded with automatic pong.
// If this is an unsolicited pong we still need to forward it to the server.
int valueBeforeUpdate = pingsReceived.getAndUpdate(i -> i > 0 ? i - 1 : i);
if (valueBeforeUpdate == 0)
proxyToServer.getSession().getRemote().sendPong(BufferUtil.copy(payload));
}
catch (Exception e)
{
throw new WebSocketException(e);
}
}

@Override
public void onWebSocketError(Throwable cause)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onWebSocketError()", getClass().getSimpleName(), cause);

proxyToServer.fail(cause);
}

@Override
public void onWebSocketClose(int statusCode, String reason)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onWebSocketClose({} {})", getClass().getSimpleName(), statusCode, reason);

// Session may be null if connection to the server failed.
Session session = proxyToServer.getSession();
if (session != null)
session.close(statusCode, reason);
closeLatch.countDown();
}
}

public class ProxyToServer implements WebSocketPartialListener, WebSocketPingPongListener
{
private volatile Session session;
private final CountDownLatch closeLatch = new CountDownLatch(1);
private final AtomicInteger pingsReceived = new AtomicInteger();

public Session getSession()
{
return session;
}

public void fail(Throwable failure)
{
// Only ProxyToServer can be failed before it is opened (if ClientToProxy fails before the connect completes).
Session session = this.session;
if (session != null)
session.close(StatusCode.SERVER_ERROR, failure.getMessage());
}

@Override
public void onWebSocketConnect(Session session)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onWebSocketConnect({})", getClass().getSimpleName(), session);

this.session = session;
}

@Override
public void onWebSocketPartialBinary(ByteBuffer payload, boolean fin)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onWebSocketPartialBinary({}, {})", getClass().getSimpleName(), BufferUtil.toDetailString(payload), fin);

try
{
clientToProxy.getSession().getRemote().sendPartialBytes(payload, fin);
}
catch (Exception e)
{
throw new WebSocketException(e);
}
}

@Override
public void onWebSocketPartialText(String payload, boolean fin)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onWebSocketPartialText({}, {})", getClass().getSimpleName(), StringUtil.truncate(payload, 100), fin);

try
{
clientToProxy.getSession().getRemote().sendPartialString(payload, fin);
}
catch (Exception e)
{
throw new WebSocketException(e);
}
}

@Override
public void onWebSocketPing(ByteBuffer payload)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onWebSocketPing({})", getClass().getSimpleName(), BufferUtil.toDetailString(payload));

try
{
// The implementation automatically sends pong response.
pingsReceived.incrementAndGet();
clientToProxy.getSession().getRemote().sendPing(BufferUtil.copy(payload));
}
catch (Exception e)
{
throw new WebSocketException(e);
}
}

@Override
public void onWebSocketPong(ByteBuffer payload)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onWebSocketPong({})", getClass().getSimpleName(), BufferUtil.toDetailString(payload));

try
{
// If we have sent out a ping then we have already responded with automatic pong.
// If this is an unsolicited pong we still need to forward it to the client.
int valueBeforeUpdate = pingsReceived.getAndUpdate(i -> i > 0 ? i - 1 : i);
if (valueBeforeUpdate == 0)
clientToProxy.getSession().getRemote().sendPong(BufferUtil.copy(payload));
}
catch (Exception e)
{
throw new WebSocketException(e);
}
}

@Override
public void onWebSocketError(Throwable cause)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onWebSocketError()", getClass().getSimpleName(), cause);

clientToProxy.fail(cause);
}

@Override
public void onWebSocketClose(int statusCode, String reason)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onWebSocketClose({} {})", getClass().getSimpleName(), statusCode, reason);

clientToProxy.getSession().close(statusCode, reason);
closeLatch.countDown();
}
}
}