Skip to content

Commit

Permalink
Fixes #3766 - Introduce HTTP/2 API to batch frames.
Browse files Browse the repository at this point in the history
Updates after review.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Sep 9, 2020
1 parent 7822cc5 commit fc6344f
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 89 deletions.
Expand Up @@ -589,11 +589,11 @@ private Throwable toFailure(int error, String reason)
@Override
public void newStream(HeadersFrame frame, Promise<Stream> promise, Stream.Listener listener)
{
newStream(new Stream.FrameList(frame), promise, listener);
newStream(new IStream.FrameList(frame), promise, listener);
}

@Override
public void newStream(Stream.FrameList frames, Promise<Stream> promise, Stream.Listener listener)
public void newStream(IStream.FrameList frames, Promise<Stream> promise, Stream.Listener listener)
{
streamCreator.newStream(frames, promise, listener);
}
Expand Down Expand Up @@ -1697,7 +1697,7 @@ private int priority(PriorityFrame frame, Callback callback)
return streamId;
}

private void newStream(Stream.FrameList frameList, Promise<Stream> promise, Stream.Listener listener)
private void newStream(IStream.FrameList frameList, Promise<Stream> promise, Stream.Listener listener)
{
Slot slot = new Slot();
int currentStreamId = frameList.getStreamId();
Expand Down
Expand Up @@ -47,6 +47,15 @@ public interface ISession extends Session
*/
void removeStream(IStream stream);

/**
* <p>Sends the given list of frames to create a new {@link Stream}.</p>
*
* @param frames the list of frames to send
* @param promise the promise that gets notified of the stream creation
* @param listener the listener that gets notified of stream events
*/
void newStream(IStream.FrameList frames, Promise<Stream> promise, Stream.Listener listener);

/**
* <p>Enqueues the given frames to be written to the connection.</p>
* @param stream the stream the frames belong to
Expand Down
Expand Up @@ -19,9 +19,16 @@
package org.eclipse.jetty.http2;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.StreamFrame;
import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.Callback;

Expand Down Expand Up @@ -52,6 +59,15 @@ public interface IStream extends Stream, Attachable, Closeable
*/
void setListener(Listener listener);

/**
* <p>Sends the given list of frames.</p>
* <p>Typically used to send an HTTP response along with content and possibly trailers.</p>
*
* @param frameList the list of frames to send
* @param callback the callback that gets notified when the frames have been sent
*/
void send(FrameList frameList, Callback callback);

/**
* <p>Processes the given {@code frame}, belonging to this stream.</p>
*
Expand Down Expand Up @@ -109,4 +125,63 @@ public interface IStream extends Stream, Attachable, Closeable
* @see Listener#onFailure(Stream, int, String, Throwable, Callback)
*/
boolean isResetOrFailed();

/**
* <p>An ordered list of frames belonging to the same stream.</p>
*/
public static class FrameList
{
private final List<StreamFrame> frames;

/**
* <p>Creates a frame list of just the given HEADERS frame.</p>
*
* @param headers the HEADERS frame
*/
public FrameList(HeadersFrame headers)
{
Objects.requireNonNull(headers);
this.frames = Collections.singletonList(headers);
}

/**
* <p>Creates a frame list of the given frames.</p>
*
* @param headers the HEADERS frame for the headers
* @param data the DATA frame for the content, or null if there is no content
* @param trailers the HEADERS frame for the trailers, or null if there are no trailers
*/
public FrameList(HeadersFrame headers, DataFrame data, HeadersFrame trailers)
{
Objects.requireNonNull(headers);
ArrayList<StreamFrame> frames = new ArrayList<>(3);
int streamId = headers.getStreamId();
if (data != null && data.getStreamId() != streamId)
throw new IllegalArgumentException("Invalid stream ID for DATA frame " + data);
if (trailers != null && trailers.getStreamId() != streamId)
throw new IllegalArgumentException("Invalid stream ID for HEADERS frame " + trailers);
frames.add(headers);
if (data != null)
frames.add(data);
if (trailers != null)
frames.add(trailers);
this.frames = Collections.unmodifiableList(frames);
}

/**
* @return the stream ID of the frames in this list
*/
public int getStreamId()
{
return frames.get(0).getStreamId();
}

/**
* @return a List of non-null frames
*/
public List<StreamFrame> getFrames()
{
return frames;
}
}
}
Expand Up @@ -63,15 +63,6 @@ public interface Session
*/
void newStream(HeadersFrame frame, Promise<Stream> promise, Stream.Listener listener);

/**
* <p>Sends the given list of frames to create a new {@link Stream}.</p>
*
* @param frames the list of frames to send
* @param promise the promise that gets notified of the stream creation
* @param listener the listener that gets notified of stream events
*/
void newStream(Stream.FrameList frames, Promise<Stream> promise, Stream.Listener listener);

/**
* <p>Sends the given PRIORITY {@code frame}.</p>
* <p>If the {@code frame} references a {@code streamId} that does not exist
Expand Down
Expand Up @@ -18,16 +18,10 @@

package org.eclipse.jetty.http2.api;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.StreamFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;

Expand Down Expand Up @@ -56,23 +50,13 @@ public interface Stream
*/
Session getSession();

/**
* <p>Sends the given list of frames.</p>
* <p>Typically used to send an HTTP response along with content and possibly trailers.</p>
*
* @param frameList the list of frames to send
* @param callback the callback that gets notified when the frames have been sent
*/
void send(Stream.FrameList frameList, Callback callback);

/**
* <p>Sends the given HEADERS {@code frame}.</p>
* <p>Typically used to send an HTTP response with no content and no trailers,
* or to send the HTTP response trailers.</p>
*
* @param frame the HEADERS frame to send
* @param callback the callback that gets notified when the frame has been sent
* @see #send(FrameList, Callback)
*/
void headers(HeadersFrame frame, Callback callback);

Expand Down Expand Up @@ -316,62 +300,4 @@ public boolean onIdleTimeout(Stream stream, Throwable x)
}
}
}

/**
* <p>An ordered list of frames belonging to the same stream.</p>
*/
public static class FrameList
{
private final List<StreamFrame> frames;

/**
* <p>Creates a frame list of just the given HEADERS frame.</p>
*
* @param headers the HEADERS frame
*/
public FrameList(HeadersFrame headers)
{
Objects.requireNonNull(headers);
this.frames = Collections.singletonList(headers);
}

/**
* <p>Creates a frame list of the given frames.</p>
*
* @param headers the HEADERS frame for the headers
* @param data the DATA frame for the content, or null if there is no content
* @param trailers the HEADERS frame for the trailers, or null if there are no trailers
*/
public FrameList(HeadersFrame headers, DataFrame data, HeadersFrame trailers)
{
Objects.requireNonNull(headers);
this.frames = new ArrayList<>(3);
int streamId = headers.getStreamId();
if (data != null && data.getStreamId() != streamId)
throw new IllegalArgumentException("Invalid stream ID for DATA frame " + data);
if (trailers != null && trailers.getStreamId() != streamId)
throw new IllegalArgumentException("Invalid stream ID for HEADERS frame " + trailers);
this.frames.add(headers);
if (data != null)
this.frames.add(data);
if (trailers != null)
this.frames.add(trailers);
}

/**
* @return the stream ID of the frames in this list
*/
public int getStreamId()
{
return frames.get(0).getStreamId();
}

/**
* @return a List of non-null frames
*/
public List<StreamFrame> getFrames()
{
return frames;
}
}
}
Expand Up @@ -29,6 +29,7 @@
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
Expand Down Expand Up @@ -105,8 +106,8 @@ else if (lastContent)
}

HttpChannelOverHTTP2 channel = getHttpChannel();
Stream.FrameList frameList = new Stream.FrameList(headersFrame, dataFrame, trailersFrame);
channel.getSession().newStream(frameList, new HeadersPromise(request, callback), channel.getStreamListener());
IStream.FrameList frameList = new IStream.FrameList(headersFrame, dataFrame, trailersFrame);
((ISession)channel.getSession()).newStream(frameList, new HeadersPromise(request, callback), channel.getStreamListener());
}

private String relativize(String path)
Expand Down
Expand Up @@ -197,7 +197,7 @@ else if (hasContent && contentLength != realContentLength)
System.lineSeparator(), HttpVersion.HTTP_2, info.getStatus(),
System.lineSeparator(), info.getFields());
}
stream.send(new Stream.FrameList(hf, df, tf), c);
stream.send(new IStream.FrameList(hf, df, tf), c);
});
}

Expand Down

0 comments on commit fc6344f

Please sign in to comment.