Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #6603 - HTTP/2 max local stream count exceeded #6639

Merged
merged 13 commits into from Aug 30, 2021
Expand Up @@ -140,6 +140,23 @@ protected void setMaxMultiplex(int maxMultiplex)
pool.setMaxMultiplex(maxMultiplex);
}

protected void setMaxMultiplex(Connection connection, int maxMultiplex)
{
if (connection == null)
{
setMaxMultiplex(maxMultiplex);
}
else
{
if (connection instanceof Attachable)
{
Object attachment = ((Attachable)connection).getAttachment();
if (attachment instanceof EntryHolder)
((EntryHolder)attachment).entry.setMaxMultiplex(maxMultiplex);
}
}
}

protected int getMaxUsageCount()
{
return pool.getMaxUsageCount();
Expand Down
Expand Up @@ -109,13 +109,22 @@ interface Factory
interface Multiplexable
{
/**
* @return the max number of requests multiplexable on a single connection
* @return the default max number of requests multiplexable on a connection
*/
int getMaxMultiplex();

/**
* @param maxMultiplex the max number of requests multiplexable on a single connection
* @param maxMultiplex the default max number of requests multiplexable on a connection
*/
void setMaxMultiplex(int maxMultiplex);

/**
* @param connection the multiplexed connection
* @param maxMultiplex the max number of requests multiplexable on the given connection
*/
default void setMaxMultiplex(Connection connection, int maxMultiplex)
{
setMaxMultiplex(maxMultiplex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation is wrong. Setting the max multiplex for a connection is not the same as setting the default for the pool

}
}
}
Expand Up @@ -56,6 +56,12 @@ public void setMaxMultiplex(int maxMultiplex)
super.setMaxMultiplex(maxMultiplex);
}

@Override
public void setMaxMultiplex(Connection connection, int maxMultiplex)
{
super.setMaxMultiplex(connection, maxMultiplex);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is needed. The super method is protected, this is public.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmmmm kind of proves my point that we need to separate out Pool and MultiplexedPool. It's not good to have "hidden" features on all pools that can be made public. I'd rather cut and paste the Pool class and not have all our other pools carrying around unused multiplex complexity.


@Override
@ManagedAttribute(value = "The maximum amount of times a connection is used before it gets closed")
public int getMaxUsageCount()
Expand Down
Expand Up @@ -18,6 +18,8 @@

package org.eclipse.jetty.client;

import org.eclipse.jetty.client.api.Connection;

public abstract class MultiplexHttpDestination extends HttpDestination
{
protected MultiplexHttpDestination(HttpClient client, Origin origin)
Expand All @@ -34,9 +36,14 @@ public int getMaxRequestsPerConnection()
}

public void setMaxRequestsPerConnection(int maxRequestsPerConnection)
{
setMaxRequestsPerConnection(null, maxRequestsPerConnection);
}

public void setMaxRequestsPerConnection(Connection connection, int maxRequestsPerConnection)
{
ConnectionPool connectionPool = getConnectionPool();
if (connectionPool instanceof ConnectionPool.Multiplexable)
((ConnectionPool.Multiplexable)connectionPool).setMaxMultiplex(maxRequestsPerConnection);
((ConnectionPool.Multiplexable)connectionPool).setMaxMultiplex(connection, maxRequestsPerConnection);
}
}
Expand Up @@ -737,7 +737,10 @@ protected IStream createLocalStream(int streamId, Promise<Stream> promise)
int maxCount = getMaxLocalStreams();
if (maxCount >= 0 && localCount >= maxCount)
{
promise.failed(new IllegalStateException("Max local stream count " + maxCount + " exceeded"));
IllegalStateException failure = new IllegalStateException("Max local stream count " + maxCount + " exceeded");
if (LOG.isDebugEnabled())
LOG.debug("Could not create local stream #{} for {}", streamId, this, failure);
promise.failed(failure);
return null;
}
if (localStreamCount.compareAndSet(localCount, localCount + 1))
Expand All @@ -750,7 +753,7 @@ protected IStream createLocalStream(int streamId, Promise<Stream> promise)
stream.setIdleTimeout(getStreamIdleTimeout());
flowControl.onStreamCreated(stream);
if (LOG.isDebugEnabled())
LOG.debug("Created local {}", stream);
LOG.debug("Created local {} for {}", stream, this);
return stream;
}
else
Expand Down Expand Up @@ -785,6 +788,8 @@ protected IStream createRemoteStream(int streamId)
int maxCount = getMaxRemoteStreams();
if (maxCount >= 0 && remoteCount - remoteClosing >= maxCount)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not create remote stream #{} for {}", streamId, this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are keeping this debug, you may as well include maxCount, remoteCount & remoteClosing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

reset(null, new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.from(() -> onStreamDestroyed(streamId)));
return null;
}
Expand All @@ -798,7 +803,7 @@ protected IStream createRemoteStream(int streamId)
stream.setIdleTimeout(getStreamIdleTimeout());
flowControl.onStreamCreated(stream);
if (LOG.isDebugEnabled())
LOG.debug("Created remote {}", stream);
LOG.debug("Created remote {} for {}", stream, this);
return stream;
}
else
Expand Down Expand Up @@ -944,7 +949,7 @@ public void onFrame(Frame frame)
private void onStreamCreated(int streamId)
{
if (LOG.isDebugEnabled())
LOG.debug("Created stream #{} for {}", streamId, this);
LOG.debug("Creating stream #{} for {}", streamId, this);
streamsState.onStreamCreated();
}

Expand Down
Expand Up @@ -212,8 +212,9 @@ private Promise<Connection> connectionPromise()
public void onSettings(Session session, SettingsFrame frame)
{
Map<Integer, Integer> settings = frame.getSettings();
if (settings.containsKey(SettingsFrame.MAX_CONCURRENT_STREAMS))
destination().setMaxRequestsPerConnection(settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS));
Integer maxConcurrentStreams = settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS);
if (maxConcurrentStreams != null)
destination().setMaxRequestsPerConnection(connection.getReference(), maxConcurrentStreams);
if (!connection.isMarked())
onServerPreface(session);
}
Expand Down
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
Expand All @@ -40,6 +41,7 @@
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpResponseException;
import org.eclipse.jetty.client.MultiplexConnectionPool;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
Expand Down Expand Up @@ -76,6 +78,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

public class MaxConcurrentStreamsTest extends AbstractTest
{
Expand Down Expand Up @@ -545,6 +548,109 @@ public Connection newConnection(EndPoint endPoint, Map<String, Object> context)
assertTrue(response3Latch.await(5, TimeUnit.SECONDS));
}

@Test
public void testDifferentMaxConcurrentStreamsForDifferentConnections() throws Exception
{
long processing = 125;
RawHTTP2ServerConnectionFactory http2 = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), new ServerSessionListener.Adapter()
{
private Session session1;
private Session session2;

@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Request request = (MetaData.Request)frame.getMetaData();
switch (request.getURI().getPath())
{
case "/prime":
{
session1 = stream.getSession();
// Send another request from here to force the opening of the 2nd connection.
client.newRequest("localhost", connector.getLocalPort()).path("/prime2").send(result ->
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, result.getResponse().getStatus(), new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
});
break;
}
case "/prime2":
{
session2 = stream.getSession();
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
break;
}
case "/update_max_streams":
{
Session session = stream.getSession() == session1 ? session2 : session1;
Map<Integer, Integer> settings = new HashMap<>();
settings.put(SettingsFrame.MAX_CONCURRENT_STREAMS, 2);
session.settings(new SettingsFrame(settings, false), Callback.NOOP);
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
break;
}
default:
{
sleep(processing);
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
break;
}
}
return null;
}
});
http2.setMaxConcurrentStreams(1);
prepareServer(http2);
server.start();
prepareClient();
client.setMaxConnectionsPerDestination(2);
client.start();

// Prime the 2 connections.
primeConnection();

String host = "localhost";
int port = connector.getLocalPort();

AbstractConnectionPool pool = (AbstractConnectionPool)client.resolveDestination(new Origin("http", host, port)).getConnectionPool();
assertEquals(2, pool.getConnectionCount());

// Send a request on one connection, which sends back a SETTINGS frame on the other connection.
ContentResponse response = client.newRequest(host, port)
.path("/update_max_streams")
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());

// Send 4 requests at once: 1 should go on one connection, 2 on the other connection, and 1 queued.
int count = 4;
CountDownLatch latch = new CountDownLatch(count);
for (int i = 0; i < count; ++i)
{
client.newRequest(host, port)
.path("/" + i)
.send(result ->
{
if (result.isSucceeded())
{
int status = result.getResponse().getStatus();
if (status == HttpStatus.OK_200)
latch.countDown();
else
fail("unexpected status " + status);
}
else
{
fail(result.getFailure());
}
});
}

assertTrue(awaitLatch(latch, count * processing * 10, TimeUnit.MILLISECONDS));
}

private void primeConnection() throws Exception
{
// Prime the connection so that the maxConcurrentStream setting arrives to the client.
Expand Down
25 changes: 23 additions & 2 deletions jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java
Expand Up @@ -131,7 +131,7 @@ public Pool(StrategyType strategyType, int maxEntries, boolean cache)
this.maxEntries = maxEntries;
this.strategyType = strategyType;
this.cache = cache ? new ThreadLocal<>() : null;
nextIndex = strategyType == StrategyType.ROUND_ROBIN ? new AtomicInteger() : null;
this.nextIndex = strategyType == StrategyType.ROUND_ROBIN ? new AtomicInteger() : null;
}

public int getReservedCount()
Expand Down Expand Up @@ -169,6 +169,14 @@ public final void setMaxMultiplex(int maxMultiplex)
if (maxMultiplex < 1)
throw new IllegalArgumentException("Max multiplex must be >= 1");
this.maxMultiplex = maxMultiplex;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename the private field to make it clear it is the default:

Suggested change
this.maxMultiplex = maxMultiplex;
this.defaultMaxMultiplex = maxMultiplex;

Eitherway, this class now needs more javadoc describing how the maxMultiplex works

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have the word "default" for a lot of other cases, e.g. ServerConnector.idleTimeout and EndPoint.idleTimeout, or the flow control windows got HTTP/2 ConnectionFactorys, so I would not add "default" here.
It is just the maxMultiplex of the Pool.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But in other places we don't have the same fieldname on a subclass in the same file, but with a different meaning.


try (Locker.Lock l = locker.lock())
{
if (closed)
return;

entries.forEach(entry -> entry.setMaxMultiplex(maxMultiplex));
gregw marked this conversation as resolved.
Show resolved Hide resolved
}
}

/**
Expand Down Expand Up @@ -507,15 +515,28 @@ public class Entry
// hi: positive=open/maxUsage counter; negative=closed; MIN_VALUE pending
// lo: multiplexing counter
private final AtomicBiInteger state;

// The pooled item. This is not volatile as it is set once and then never changed.
// Other threads accessing must check the state field above first, so a good before/after
// relationship exists to make a memory barrier.
private T pooled;
private volatile int maxMultiplex;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still wondering if we should store 2 values here:

Suggested change
private volatile int maxMultiplex;
private volatile int remoteMaxMultiplex;
private volatile int localMaxMultiplex;

We set localMaxMultiplex when we send a settings frame, we set remoteMaxMultiplex when we receive a settings frame. If we change settings, local changes first and then remote changes when we get the ack, otherwise remote changes first and local changes second.

It will always be a hard error to have current greater than the localMaxMultiplex.

If the localMaxMultiplex is less than the remoteMaxMultiplex then that doesn't matter, it is still and error have current greater than the local.

If the localMaxMultiplex is greater than the remoteMaxMultiplex then we have to be more nuanced. It is an error for an remote created stream to exceed the remoteMaxMultiplex, but a locally created stream is allowed to let current exceed remoteMaxMultiplex but not localMaxMultiplex.

I think we really need to deal with this type of dynamic max streams adjustment. We have already seen one example in the wild... others will follow even if it is a bad idea.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also this kind of exceeds my threshold for unused features in a base utility class. Perhaps we need to split Pool into Pool and MultiplexPool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We set localMaxMultiplex when we send a settings frame

No, we don't. I don't think what you say is correct, although I understand the underlying idea of handling the case where a second incoming SETTINGS frame has a smaller max_concurrent_streams value.
However, I would handle this latter case in a different PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's pair program on this tonight. I think it can be fixed in this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sbordet As we discussed on hangout, we don't need to fix all the problems in this PR, but we should not add wrong things to the Pool. We should externalize the multiplexing checks as per my draft PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gregw I second this goal. I'd be very happy if we managed to externalize the multiplexing out of the low-level Pool as IMHO it was a design mistake.

That design also has adverse effects on performance as the delegation of the multiplexing accounting is delegated to a lower-level that the only thing it can do is to track it by using an atomic counter that's naturally going to be highly contended, i.e.: the HTTP2 pool is much slower when connections are multiplexed than when they're not because there's a ton of contention on the CAS loops around the multiplexing counters. If the multiplexing counter was tracked at the connection pool layer, we could add some contention-mitigating strategies.


Entry()
{
this.state = new AtomicBiInteger(Integer.MIN_VALUE, 0);
this.maxMultiplex = Pool.this.maxMultiplex;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this.maxMultiplex = Pool.this.maxMultiplex;
this.maxMultiplex = Pool.this.defaultMaxMultiplex;

}

public int getMaxMultiplex()
{
return maxMultiplex;
}

public void setMaxMultiplex(int maxMultiplex)
{
if (maxMultiplex < 1)
throw new IllegalArgumentException("Max multiplex must be >= 1");
this.maxMultiplex = maxMultiplex;
}

// for testing only
Expand Down