Skip to content

Commit

Permalink
Fixes #3766 - Introduce HTTP/2 API to batch frames. (#5222)
Browse files Browse the repository at this point in the history
* Fixes #3766 - Introduce HTTP/2 API to batch frames.

Introduced Stream.FrameList to hold HEADERS+DATA+HEADERS frames.
These are often used by the client and by the server when the
request/response content is known and FrameList will allow to
send them in a single TCP write, rather than multiple ones.

Rewritten HttpSenderOverHTTP2.sendHeaders() and
HttpTransportOverHTTP2.sendHeaders() to take advantage of
FrameList.

Now using ConcurrentHashMap as a client context, because
with DEBUG logging enabled it may be access concurrently.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Sep 10, 2020
1 parent 1256261 commit f81bf7f
Show file tree
Hide file tree
Showing 20 changed files with 472 additions and 271 deletions.
Expand Up @@ -30,7 +30,6 @@
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -603,7 +602,8 @@ protected void newConnection(final HttpDestination destination, final Promise<Co
@Override
public void succeeded(List<InetSocketAddress> socketAddresses)
{
Map<String, Object> context = new HashMap<>();
// Multiple threads may access the map, especially with DEBUG logging enabled.
Map<String, Object> context = new ConcurrentHashMap<>();
context.put(ClientConnectionFactory.CONNECTOR_CONTEXT_KEY, HttpClient.this);
context.put(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY, destination);
connect(socketAddresses, 0, context);
Expand Down
Expand Up @@ -25,9 +25,9 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;

import org.eclipse.jetty.alpn.client.ALPNClientConnectionFactory;
Expand Down Expand Up @@ -433,7 +433,7 @@ public void accept(SslContextFactory sslContextFactory, SocketChannel channel, S
private Map<String, Object> contextFrom(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise, Map<String, Object> context)
{
if (context == null)
context = new HashMap<>();
context = new ConcurrentHashMap<>();
context.put(HTTP2ClientConnectionFactory.CLIENT_CONTEXT_KEY, this);
context.put(HTTP2ClientConnectionFactory.SESSION_LISTENER_CONTEXT_KEY, listener);
context.put(HTTP2ClientConnectionFactory.SESSION_PROMISE_CONTEXT_KEY, promise);
Expand Down
Expand Up @@ -18,6 +18,7 @@

package org.eclipse.jetty.http2.client;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -127,11 +128,11 @@ public void onOpen()
if (windowDelta > 0)
{
session.updateRecvWindow(windowDelta);
session.frames(null, this, prefaceFrame, settingsFrame, new WindowUpdateFrame(0, windowDelta));
session.frames(null, Arrays.asList(prefaceFrame, settingsFrame, new WindowUpdateFrame(0, windowDelta)), this);
}
else
{
session.frames(null, this, prefaceFrame, settingsFrame);
session.frames(null, Arrays.asList(prefaceFrame, settingsFrame), this);
}
}

Expand Down
Expand Up @@ -27,6 +27,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -63,6 +64,7 @@
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PrefaceFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
Expand Down Expand Up @@ -198,14 +200,15 @@ public void onReset(Stream s, ResetFrame frame)
{
// Simulate that there is pending data to send.
IStream stream = (IStream)s;
stream.getSession().frames(stream, new Callback()
List<Frame> frames = Collections.singletonList(new DataFrame(s.getId(), ByteBuffer.allocate(16), true));
stream.getSession().frames(stream, frames, new Callback()
{
@Override
public void failed(Throwable x)
{
serverResetLatch.countDown();
}
}, new DataFrame(s.getId(), ByteBuffer.allocate(16), true));
});
}
};
}
Expand Down
Expand Up @@ -18,11 +18,11 @@

package org.eclipse.jetty.http2;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.util.Atomics;
import org.eclipse.jetty.util.Callback;
Expand Down Expand Up @@ -160,7 +160,7 @@ public void onDataConsumed(ISession session, IStream stream, int length)

protected void sendWindowUpdate(IStream stream, ISession session, WindowUpdateFrame frame)
{
session.frames(stream, Callback.NOOP, frame, Frame.EMPTY_ARRAY);
session.frames(stream, Collections.singletonList(frame), Callback.NOOP);
}

@Override
Expand Down
Expand Up @@ -113,6 +113,25 @@ public boolean append(Entry entry)
return false;
}

public boolean append(List<Entry> list)
{
Throwable closed;
synchronized (this)
{
closed = terminated;
if (closed == null)
{
list.forEach(entries::offer);
if (LOG.isDebugEnabled())
LOG.debug("Appended {}, entries={}", list, entries.size());
}
}
if (closed == null)
return true;
list.forEach(entry -> closed(entry, closed));
return false;
}

private int getWindowQueueSize()
{
synchronized (this)
Expand Down Expand Up @@ -414,6 +433,11 @@ public int getDataBytesRemaining()

public abstract long onFlushed(long bytes) throws IOException;

boolean hasHighPriority()
{
return false;
}

@Override
public void failed(Throwable x)
{
Expand Down

0 comments on commit f81bf7f

Please sign in to comment.