Skip to content

Commit

Permalink
Merge branch 'jetty-9.4.x' into jetty-9.4.x-2014-unixsocket-client
Browse files Browse the repository at this point in the history
  • Loading branch information
gregw committed Dec 28, 2017
2 parents 44ddc42 + e86e8a7 commit 27dc3a8
Show file tree
Hide file tree
Showing 33 changed files with 524 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ public boolean isOnlyClient()
return onlyClient;
}

/**
*
* @param onlyClient if <code>true</code> the session manager will only connect to an external Hazelcast instance
* and not use this JVM to start an Hazelcast instance
*/
public void setOnlyClient( boolean onlyClient )
{
this.onlyClient = onlyClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.component.LifeCycle;
Expand All @@ -37,7 +38,7 @@
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;

public class HTTP2Connection extends AbstractConnection
public class HTTP2Connection extends AbstractConnection implements WriteFlusher.Listener
{
protected static final Logger LOG = Log.getLogger(HTTP2Connection.class);

Expand Down Expand Up @@ -176,6 +177,13 @@ private Runnable pollTask()
}
}

@Override
public void onFlushed(long bytes) throws IOException
{
// TODO: add method to ISession ?
((HTTP2Session)session).onFlushed(bytes);
}

protected class HTTP2Producer implements ExecutionStrategy.Producer
{
private final Callback fillableCallback = new FillableCallback();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
Expand Down Expand Up @@ -175,7 +176,7 @@ protected Action process() throws Throwable
{
if (entry.generate(lease))
{
if (entry.dataRemaining() > 0)
if (entry.getDataBytesRemaining() > 0)
entries.offer(entry);
}
else
Expand Down Expand Up @@ -207,6 +208,31 @@ protected Action process() throws Throwable
return Action.SCHEDULED;
}

void onFlushed(long bytes) throws IOException
{
// For the given flushed bytes, we want to only
// forward those that belong to data frame content.
for (Entry entry : actives)
{
int frameBytesLeft = entry.getFrameBytesRemaining();
if (frameBytesLeft > 0)
{
int update = (int)Math.min(bytes, frameBytesLeft);
entry.onFrameBytesFlushed(update);
bytes -= update;
IStream stream = entry.stream;
if (stream != null && !entry.isControl())
{
Object channel = stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
if (channel instanceof WriteFlusher.Listener)
((WriteFlusher.Listener)channel).onFlushed(update - Frame.HEADER_LENGTH);
}
if (bytes == 0)
break;
}
}
}

@Override
public void succeeded()
{
Expand Down Expand Up @@ -234,13 +260,13 @@ private void complete()
for (int i = index; i < actives.size(); ++i)
{
Entry entry = actives.get(i);
if (entry.dataRemaining() > 0)
if (entry.getDataBytesRemaining() > 0)
append(entry);
}
for (int i = 0; i < index; ++i)
{
Entry entry = actives.get(i);
if (entry.dataRemaining() > 0)
if (entry.getDataBytesRemaining() > 0)
append(entry);
}
stalled = null;
Expand Down Expand Up @@ -333,7 +359,11 @@ protected Entry(Frame frame, IStream stream, Callback callback)
this.stream = stream;
}

public int dataRemaining()
public abstract int getFrameBytesRemaining();

public abstract void onFrameBytesFlushed(int bytesFlushed);

public int getDataBytesRemaining()
{
return 0;
}
Expand Down Expand Up @@ -387,6 +417,17 @@ private boolean isProtocol()
}
}

private boolean isControl()
{
switch (frame.getType())
{
case DATA:
return false;
default:
return true;
}
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,11 @@ protected void onStreamClosed(IStream stream)
{
}

void onFlushed(long bytes) throws IOException
{
flusher.onFlushed(bytes);
}

public void disconnect()
{
if (LOG.isDebugEnabled())
Expand Down Expand Up @@ -1132,15 +1137,28 @@ public String toString()
private class ControlEntry extends HTTP2Flusher.Entry
{
private int bytes;
private int frameBytes;

private ControlEntry(Frame frame, IStream stream, Callback callback)
{
super(frame, stream, callback);
}

@Override
public int getFrameBytesRemaining()
{
return frameBytes;
}

@Override
public void onFrameBytesFlushed(int bytesFlushed)
{
frameBytes -= bytesFlushed;
}

protected boolean generate(ByteBufferPool.Lease lease)
{
bytes = generator.control(lease, frame);
bytes = frameBytes = generator.control(lease, frame);
if (LOG.isDebugEnabled())
LOG.debug("Generated {}", frame);
prepare();
Expand Down Expand Up @@ -1238,7 +1256,8 @@ public void succeeded()
private class DataEntry extends HTTP2Flusher.Entry
{
private int bytes;
private int dataRemaining;
private int frameBytes;
private int dataBytes;
private int dataWritten;

private DataEntry(DataFrame frame, IStream stream, Callback callback)
Expand All @@ -1249,35 +1268,47 @@ private DataEntry(DataFrame frame, IStream stream, Callback callback)
// of data frames that cannot be completely written due to
// the flow control window exhausting, since in that case
// we would have to count the padding only once.
dataRemaining = frame.remaining();
dataBytes = frame.remaining();
}

@Override
public int getFrameBytesRemaining()
{
return frameBytes;
}

@Override
public void onFrameBytesFlushed(int bytesFlushed)
{
frameBytes -= bytesFlushed;
}

@Override
public int dataRemaining()
public int getDataBytesRemaining()
{
return dataRemaining;
return dataBytes;
}

protected boolean generate(ByteBufferPool.Lease lease)
{
int dataRemaining = dataRemaining();
int dataBytes = getDataBytesRemaining();

int sessionSendWindow = getSendWindow();
int streamSendWindow = stream.updateSendWindow(0);
int window = Math.min(streamSendWindow, sessionSendWindow);
if (window <= 0 && dataRemaining > 0)
if (window <= 0 && dataBytes > 0)
return false;

int length = Math.min(dataRemaining, window);
int length = Math.min(dataBytes, window);

// Only one DATA frame is generated.
bytes = generator.data(lease, (DataFrame)frame, length);
bytes = frameBytes = generator.data(lease, (DataFrame)frame, length);
int written = bytes - Frame.HEADER_LENGTH;
if (LOG.isDebugEnabled())
LOG.debug("Generated {}, length/window/data={}/{}/{}", frame, written, window, dataRemaining);
LOG.debug("Generated {}, length/window/data={}/{}/{}", frame, written, window, dataBytes);

this.dataWritten = written;
this.dataRemaining -= written;
this.dataBytes -= written;

flowControl.onDataSending(stream, written);

Expand All @@ -1292,7 +1323,7 @@ public void succeeded()

// Do we have more to send ?
DataFrame dataFrame = (DataFrame)frame;
if (dataRemaining() == 0)
if (getDataBytesRemaining() == 0)
{
// Only now we can update the close state
// and eventually remove the stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public HttpClientTransportOverHTTP2(HTTP2Client client)
});
}

public HTTP2Client getHTTP2Client()
{
return client;
}

@ManagedAttribute(value = "The number of selectors", readonly = true)
public int getSelectors()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration;
Expand All @@ -48,7 +49,7 @@
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;

public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable
public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, WriteFlusher.Listener
{
private static final Logger LOG = Log.getLogger(HttpChannelOverHTTP2.class);
private static final HttpField SERVER_VERSION = new PreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION);
Expand Down Expand Up @@ -85,6 +86,12 @@ public long getIdleTimeout()
return getStream().getIdleTimeout();
}

@Override
public void onFlushed(long bytes) throws IOException
{
getResponse().getHttpOutput().onFlushed(bytes);
}

public Runnable onRequest(HeadersFrame frame)
{
try
Expand Down

0 comments on commit 27dc3a8

Please sign in to comment.