Skip to content

Commit

Permalink
Issue #4772 - support partial messages for Jetty WS API annotations (#…
Browse files Browse the repository at this point in the history
…6357)

* Issue #4772 - support partial messages for Jetty WS API annotations

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
  • Loading branch information
lachlan-roberts committed Jun 10, 2021
1 parent d997a11 commit 6dea025
Show file tree
Hide file tree
Showing 5 changed files with 328 additions and 44 deletions.
Expand Up @@ -103,6 +103,7 @@ public interface RemoteEndpoint
* @param fragment the text being sent
* @param isLast true if this is the last piece of the partial bytes
* @param callback callback to notify of success or failure of the write operation
* @throws IOException this never throws IOException, it was a mistake to have this in the signature.
*/
void sendPartialString(String fragment, boolean isLast, WriteCallback callback) throws IOException;

Expand Down
Expand Up @@ -21,6 +21,7 @@
import java.lang.annotation.Target;

import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketPartialListener;

/**
* Annotation for tagging methods to receive Binary or Text Message events.
Expand All @@ -30,23 +31,31 @@
* <p>
* <u>Text Message Versions</u>
* <ol>
* <li><code>public void methodName(String text)</code></li>
* <li><code>public void methodName({@link Session} session, String text)</code></li>
* <li><code>public void methodName(Reader reader)</code></li>
* <li><code>public void methodName({@link Session} session, Reader reader)</code></li>
* <li>{@code public void methodName(String text)}</li>
* <li>{@code public void methodName(Session session, String text)}</li>
* <li>{@code public void methodName(Reader reader)}</li>
* <li>{@code public void methodName(Session session, Reader reader)}</li>
* </ol>
* Note: that the {@link Reader} in this case will always use UTF-8 encoding/charset (this is dictated by the RFC 6455 spec for Text Messages. If you need to
* use a non-UTF-8 encoding/charset, you are instructed to use the binary messaging techniques.
* <p>
* <p>Note: that the {@link Reader} in this case will always use UTF-8 encoding/charset (this is dictated by the RFC 6455 spec for Text Messages. If you need to
* use a non-UTF-8 encoding/charset, you are instructed to use the binary messaging techniques.</p>
* <u>Binary Message Versions</u>
* <ol>
* <li><code>public void methodName(ByteBuffer message)</code></li>
* <li><code>public void methodName({@link Session} session, ByteBuffer message)</code></li>
* <li><code>public void methodName(byte buf[], int offset, int length)</code></li>
* <li><code>public void methodName({@link Session} session, byte buf[], int offset, int length)</code></li>
* <li><code>public void methodName(InputStream stream)</code></li>
* <li><code>public void methodName({@link Session} session, InputStream stream)</code></li>
* <li>{@code public void methodName(ByteBuffer message)}</li>
* <li>{@code public void methodName(Session session, ByteBuffer message)}</li>
* <li>{@code public void methodName(byte[] buf, int offset, int length)}</li>
* <li>{@code public void methodName(Session session, byte[] buf, int offset, int length)}</li>
* <li>{@code public void methodName(InputStream stream)}</li>
* <li>{@code public void methodName(Session session, InputStream stream)}</li>
* </ol>
* <u>Partial Message Variations</u>
* <p>These are used to receive partial messages without aggregating them into a complete WebSocket message. Instead the a boolean
* argument is supplied to indicate whether this is the last segment of data of the message. See {@link WebSocketPartialListener}
* interface for more details on partial messages.</p>
* <ol>
* <li>{@code public void methodName(ByteBuffer payload, boolean last)}</li>
* <li>{@code public void methodName(String payload, boolean last)}</li>
* </ol>
* <p>Note: Similar to the signatures above these can all be used with an optional first {@link Session} parameter.</p>
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
Expand Down
Expand Up @@ -78,6 +78,51 @@
*/
public class JettyWebSocketFrameHandlerFactory extends ContainerLifeCycle
{
private static final InvokerUtils.Arg[] textCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(String.class).required()
};

private static final InvokerUtils.Arg[] binaryBufferCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(ByteBuffer.class).required()
};

private static final InvokerUtils.Arg[] binaryArrayCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(byte[].class).required(),
new InvokerUtils.Arg(int.class), // offset
new InvokerUtils.Arg(int.class) // length
};

private static final InvokerUtils.Arg[] inputStreamCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(InputStream.class).required()
};

private static final InvokerUtils.Arg[] readerCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(Reader.class).required()
};

private static final InvokerUtils.Arg[] textPartialCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(String.class).required(),
new InvokerUtils.Arg(boolean.class).required()
};

private static final InvokerUtils.Arg[] binaryPartialBufferCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(ByteBuffer.class).required(),
new InvokerUtils.Arg(boolean.class).required()
};

private static final InvokerUtils.Arg[] binaryPartialArrayCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(byte[].class).required(),
new InvokerUtils.Arg(boolean.class).required()
};

private final WebSocketContainer container;
private final WebSocketComponents components;
private final Map<Class<?>, JettyWebSocketFrameHandlerMetadata> metadataMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -333,34 +378,6 @@ private JettyWebSocketFrameHandlerMetadata createAnnotatedMetadata(WebSocket ann
if (onMessages != null && onMessages.length > 0)
{
// The different kind of @OnWebSocketMessage method parameter signatures expected

InvokerUtils.Arg[] textCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(String.class).required()
};

InvokerUtils.Arg[] binaryBufferCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(ByteBuffer.class).required()
};

InvokerUtils.Arg[] binaryArrayCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(byte[].class).required(),
new InvokerUtils.Arg(int.class), // offset
new InvokerUtils.Arg(int.class) // length
};

InvokerUtils.Arg[] inputStreamCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(InputStream.class).required()
};

InvokerUtils.Arg[] readerCallingArgs = new InvokerUtils.Arg[]{
new InvokerUtils.Arg(Session.class),
new InvokerUtils.Arg(Reader.class).required()
};

for (Method onMsg : onMessages)
{
assertSignatureValid(endpointClass, onMsg, OnWebSocketMessage.class);
Expand Down Expand Up @@ -409,11 +426,27 @@ private JettyWebSocketFrameHandlerMetadata createAnnotatedMetadata(WebSocket ann
metadata.setTextHandler(ReaderMessageSink.class, methodHandle, onMsg);
continue;
}
else

methodHandle = InvokerUtils.optionalMutatedInvoker(lookup, endpointClass, onMsg, textPartialCallingArgs);
if (methodHandle != null)
{
// Partial Text Message
assertSignatureValid(endpointClass, onMsg, OnWebSocketMessage.class);
metadata.setTextHandler(PartialStringMessageSink.class, methodHandle, onMsg);
continue;
}

methodHandle = InvokerUtils.optionalMutatedInvoker(lookup, endpointClass, onMsg, binaryPartialBufferCallingArgs);
if (methodHandle != null)
{
// Not a valid @OnWebSocketMessage declaration signature
throw InvalidSignatureException.build(endpointClass, OnWebSocketMessage.class, onMsg);
// Partial ByteBuffer Message
assertSignatureValid(endpointClass, onMsg, OnWebSocketMessage.class);
metadata.setBinaryHandle(PartialByteBufferMessageSink.class, methodHandle, onMsg);
continue;
}

// Not a valid @OnWebSocketMessage declaration signature
throw InvalidSignatureException.build(endpointClass, OnWebSocketMessage.class, onMsg);
}
}

Expand Down
Expand Up @@ -156,6 +156,7 @@ public void sendPartialString(String fragment, boolean isLast) throws IOExceptio
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
}

// FIXME: Remove the throws IOException from API for this method in the next major release.
@Override
public void sendPartialString(String fragment, boolean isLast, WriteCallback callback)
{
Expand Down

0 comments on commit 6dea025

Please sign in to comment.