Skip to content

Commit

Permalink
Issue #4772 - support partial messages for Jetty WS API annotations
Browse files Browse the repository at this point in the history
Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
  • Loading branch information
lachlan-roberts committed Jun 4, 2021
1 parent 9e03775 commit bab1a78
Show file tree
Hide file tree
Showing 4 changed files with 326 additions and 35 deletions.
Expand Up @@ -104,7 +104,7 @@ public interface RemoteEndpoint
* @param isLast true if this is the last piece of the partial bytes
* @param callback callback to notify of success or failure of the write operation
*/
void sendPartialString(String fragment, boolean isLast, WriteCallback callback) throws IOException;
void sendPartialString(String fragment, boolean isLast, WriteCallback callback);

/**
* Send a Ping message containing the given application data to the remote endpoint, blocking until all of the
Expand Down
Expand Up @@ -35,9 +35,8 @@
* <li><code>public void methodName(Reader reader)</code></li>
* <li><code>public void methodName({@link Session} session, Reader reader)</code></li>
* </ol>
* Note: that the {@link Reader} in this case will always use UTF-8 encoding/charset (this is dictated by the RFC 6455 spec for Text Messages. If you need to
* use a non-UTF-8 encoding/charset, you are instructed to use the binary messaging techniques.
* <p>
* <p>Note: that the {@link Reader} in this case will always use UTF-8 encoding/charset (this is dictated by the RFC 6455 spec for Text Messages. If you need to
* use a non-UTF-8 encoding/charset, you are instructed to use the binary messaging techniques.</p><br>
* <u>Binary Message Versions</u>
* <ol>
* <li><code>public void methodName(ByteBuffer message)</code></li>
Expand All @@ -47,6 +46,16 @@
* <li><code>public void methodName(InputStream stream)</code></li>
* <li><code>public void methodName({@link Session} session, InputStream stream)</code></li>
* </ol>
* <u>Partial Message Variations</u>
* <p>These are used to receive partial messages without aggregating them into a complete WebSocket message. Instead the a boolean
* argument is supplied to indicate whether this is the last segment of data of the message. This can be done most efficiently
* for the {@link java.nio.ByteBuffer} type as no copying needs to occur to get the data into this format.</p>
* <ol>
* <li><code>public void methodName(ByteBuffer payload, boolean last)</code></li>
* <li><code>public void methodName(byte payload, boolean last)</code></li>
* <li><code>public void methodName(String payload, boolean last)</code></li>
* </ol>
* <p>Note: Similar to the signatures above these can all be used with an optional first {@link Session} parameter.</p>
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
Expand Down
Expand Up @@ -53,6 +53,7 @@
import org.eclipse.jetty.websocket.core.internal.messages.ByteBufferMessageSink;
import org.eclipse.jetty.websocket.core.internal.messages.InputStreamMessageSink;
import org.eclipse.jetty.websocket.core.internal.messages.MessageSink;
import org.eclipse.jetty.websocket.core.internal.messages.PartialByteArrayMessageSink;
import org.eclipse.jetty.websocket.core.internal.messages.PartialByteBufferMessageSink;
import org.eclipse.jetty.websocket.core.internal.messages.PartialStringMessageSink;
import org.eclipse.jetty.websocket.core.internal.messages.ReaderMessageSink;
Expand All @@ -78,6 +79,51 @@
*/
public class JettyWebSocketFrameHandlerFactory extends ContainerLifeCycle
{
private static final InvokerUtils.Arg[] textCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(String.class).required()
};

private static final InvokerUtils.Arg[] binaryBufferCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(ByteBuffer.class).required()
};

private static final InvokerUtils.Arg[] binaryArrayCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(byte[].class).required(),
new InvokerUtils.Arg(int.class), // offset
new InvokerUtils.Arg(int.class) // length
};

private static final InvokerUtils.Arg[] inputStreamCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(InputStream.class).required()
};

private static final InvokerUtils.Arg[] readerCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(Reader.class).required()
};

private static final InvokerUtils.Arg[] textPartialCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(String.class).required(),
new InvokerUtils.Arg(boolean.class).required()
};

private static final InvokerUtils.Arg[] binaryPartialBufferCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(ByteBuffer.class).required(),
new InvokerUtils.Arg(boolean.class).required()
};

private static final InvokerUtils.Arg[] binaryPartialArrayCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(byte[].class).required(),
new InvokerUtils.Arg(boolean.class).required()
};

private final WebSocketContainer container;
private final WebSocketComponents components;
private final Map<Class<?>, JettyWebSocketFrameHandlerMetadata> metadataMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -333,34 +379,6 @@ private JettyWebSocketFrameHandlerMetadata createAnnotatedMetadata(WebSocket ann
if (onMessages != null && onMessages.length > 0)
{
// The different kind of @OnWebSocketMessage method parameter signatures expected

InvokerUtils.Arg[] textCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(String.class).required()
};

InvokerUtils.Arg[] binaryBufferCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(ByteBuffer.class).required()
};

InvokerUtils.Arg[] binaryArrayCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(byte[].class).required(),
new InvokerUtils.Arg(int.class), // offset
new InvokerUtils.Arg(int.class) // length
};

InvokerUtils.Arg[] inputStreamCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(InputStream.class).required()
};

InvokerUtils.Arg[] readerCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(Reader.class).required()
};

for (Method onMsg : onMessages)
{
assertSignatureValid(endpointClass, onMsg, OnWebSocketMessage.class);
Expand Down Expand Up @@ -409,11 +427,36 @@ private JettyWebSocketFrameHandlerMetadata createAnnotatedMetadata(WebSocket ann
metadata.setTextHandler(ReaderMessageSink.class, methodHandle, onMsg);
continue;
}
else

methodHandle = InvokerUtils.optionalMutatedInvoker(lookup, endpointClass, onMsg, textPartialCallingArgs);
if (methodHandle != null)
{
// Partial Text Message
assertSignatureValid(endpointClass, onMsg, OnWebSocketMessage.class);
metadata.setTextHandler(PartialStringMessageSink.class, methodHandle, onMsg);
continue;
}

methodHandle = InvokerUtils.optionalMutatedInvoker(lookup, endpointClass, onMsg, binaryPartialBufferCallingArgs);
if (methodHandle != null)
{
// Partial ByteBuffer Message
assertSignatureValid(endpointClass, onMsg, OnWebSocketMessage.class);
metadata.setBinaryHandle(PartialByteBufferMessageSink.class, methodHandle, onMsg);
continue;
}

methodHandle = InvokerUtils.optionalMutatedInvoker(lookup, endpointClass, onMsg, binaryPartialArrayCallingArgs);
if (methodHandle != null)
{
// Not a valid @OnWebSocketMessage declaration signature
throw InvalidSignatureException.build(endpointClass, OnWebSocketMessage.class, onMsg);
// Partial byte array Message
assertSignatureValid(endpointClass, onMsg, OnWebSocketMessage.class);
metadata.setBinaryHandle(PartialByteArrayMessageSink.class, methodHandle, onMsg);
continue;
}

// Not a valid @OnWebSocketMessage declaration signature
throw InvalidSignatureException.build(endpointClass, OnWebSocketMessage.class, onMsg);
}
}

Expand Down

0 comments on commit bab1a78

Please sign in to comment.