Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/jetty-9.4.x' into jetty-9.4.x-51…
Browse files Browse the repository at this point in the history
…62-Improve-Decorating-Listener-Examples
  • Loading branch information
gregw committed Aug 24, 2020
2 parents ed0b344 + 65c4e86 commit b0f16c6
Show file tree
Hide file tree
Showing 15 changed files with 308 additions and 126 deletions.
Expand Up @@ -78,7 +78,7 @@ public CompletableFuture<Void> preCreateConnections(int connectionCount)
CompletableFuture<?>[] futures = new CompletableFuture[connectionCount];
for (int i = 0; i < connectionCount; i++)
{
futures[i] = tryCreateReturningFuture(pool.getMaxEntries());
futures[i] = tryCreateAsync(getMaxConnectionCount());
}
return CompletableFuture.allOf(futures);
}
Expand Down Expand Up @@ -175,6 +175,8 @@ public Connection acquire()
*/
protected Connection acquire(boolean create)
{
if (LOG.isDebugEnabled())
LOG.debug("Acquiring create={} on {}", create, this);
Connection connection = activate();
if (connection == null && create)
{
Expand All @@ -196,20 +198,21 @@ protected Connection acquire(boolean create)
*/
protected void tryCreate(int maxPending)
{
tryCreateReturningFuture(maxPending);
tryCreateAsync(maxPending);
}

private CompletableFuture<Void> tryCreateReturningFuture(int maxPending)
private CompletableFuture<Void> tryCreateAsync(int maxPending)
{
int connectionCount = getConnectionCount();
if (LOG.isDebugEnabled())
LOG.debug("tryCreate {}/{} connections {}/{} pending", pool.size(), pool.getMaxEntries(), getPendingConnectionCount(), maxPending);
LOG.debug("Try creating connection {}/{} with {}/{} pending", connectionCount, getMaxConnectionCount(), getPendingConnectionCount(), maxPending);

Pool<Connection>.Entry entry = pool.reserve(maxPending);
if (entry == null)
return CompletableFuture.completedFuture(null);

if (LOG.isDebugEnabled())
LOG.debug("newConnection {}/{} connections {}/{} pending", pool.size(), pool.getMaxEntries(), getPendingConnectionCount(), maxPending);
LOG.debug("Creating connection {}/{}", connectionCount, getMaxConnectionCount());

CompletableFuture<Void> future = new CompletableFuture<>();
destination.newConnection(new Promise<Connection>()
Expand All @@ -218,7 +221,7 @@ private CompletableFuture<Void> tryCreateReturningFuture(int maxPending)
public void succeeded(Connection connection)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection {}/{} creation succeeded {}", pool.size(), pool.getMaxEntries(), connection);
LOG.debug("Connection {}/{} creation succeeded {}", connectionCount, getMaxConnectionCount(), connection);
if (!(connection instanceof Attachable))
{
failed(new IllegalArgumentException("Invalid connection object: " + connection));
Expand All @@ -236,7 +239,7 @@ public void succeeded(Connection connection)
public void failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection " + pool.size() + "/" + pool.getMaxEntries() + " creation failed", x);
LOG.debug("Connection {}/{} creation failed", connectionCount, getMaxConnectionCount(), x);
entry.remove();
future.completeExceptionally(x);
requester.failed(x);
Expand All @@ -257,7 +260,7 @@ protected Connection activate()
if (entry != null)
{
if (LOG.isDebugEnabled())
LOG.debug("activated {}", entry);
LOG.debug("Activated {} {}", entry, pool);
Connection connection = entry.getPooled();
acquired(connection);
return connection;
Expand All @@ -275,8 +278,6 @@ public boolean isActive(Connection connection)
Pool<Connection>.Entry entry = (Pool<Connection>.Entry)attachable.getAttachment();
if (entry == null)
return false;
if (LOG.isDebugEnabled())
LOG.debug("isActive {}", entry);
return !entry.isIdle();
}

Expand All @@ -300,7 +301,7 @@ protected boolean deactivate(Connection connection)
return true;
boolean reusable = pool.release(entry);
if (LOG.isDebugEnabled())
LOG.debug("Released ({}) {}", reusable, entry);
LOG.debug("Released ({}) {} {}", reusable, entry, pool);
if (reusable)
return true;
remove(connection);
Expand All @@ -325,7 +326,7 @@ protected boolean remove(Connection connection, boolean force)
attachable.setAttachment(null);
boolean removed = pool.remove(entry);
if (LOG.isDebugEnabled())
LOG.debug("Removed ({}) {}", removed, entry);
LOG.debug("Removed ({}) {} {}", removed, entry, pool);
if (removed || force)
{
released(connection);
Expand Down
Expand Up @@ -384,7 +384,7 @@ private ProcessResult process(Connection connection)
{
// Aggressively send other queued requests
// in case connections are multiplexed.
return getHttpExchanges().size() > 0 ? ProcessResult.CONTINUE : ProcessResult.FINISH;
return getQueuedRequestCount() > 0 ? ProcessResult.CONTINUE : ProcessResult.FINISH;
}

if (LOG.isDebugEnabled())
Expand Down Expand Up @@ -438,7 +438,7 @@ public void release(Connection connection)
{
if (connectionPool.isActive(connection))
{
// trigger the next request after releasing the connection
// Trigger the next request after releasing the connection.
if (connectionPool.release(connection))
send(false);
else
Expand Down
Expand Up @@ -40,7 +40,6 @@ public class HttpChannelOverHTTP extends HttpChannel
private final HttpConnectionOverHTTP connection;
private final HttpSenderOverHTTP sender;
private final HttpReceiverOverHTTP receiver;
private final LongAdder inMessages = new LongAdder();
private final LongAdder outMessages = new LongAdder();

public HttpChannelOverHTTP(HttpConnectionOverHTTP connection)
Expand Down Expand Up @@ -129,7 +128,6 @@ public Result exchangeTerminating(HttpExchange exchange, Result result)

public void receive()
{
inMessages.increment();
receiver.receive();
}

Expand Down Expand Up @@ -185,7 +183,7 @@ else if (sender.isShutdown())

protected long getMessagesIn()
{
return inMessages.longValue();
return receiver.getMessagesIn();
}

protected long getMessagesOut()
Expand Down
Expand Up @@ -20,6 +20,7 @@

import java.io.EOFException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.LongAdder;

import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpExchange;
Expand All @@ -40,6 +41,7 @@

public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.ResponseHandler
{
private final LongAdder inMessages = new LongAdder();
private final HttpParser parser;
private RetainableByteBuffer networkBuffer;
private boolean shutdown;
Expand Down Expand Up @@ -333,9 +335,11 @@ public boolean messageComplete()
return false;

int status = exchange.getResponse().getStatus();

if (status != HttpStatus.CONTINUE_100)
{
inMessages.increment();
complete = true;
}

return !responseSuccess(exchange);
}
Expand Down Expand Up @@ -376,6 +380,11 @@ private void failAndClose(Throwable failure)
getHttpConnection().close(failure);
}

long getMessagesIn()
{
return inMessages.longValue();
}

@Override
public String toString()
{
Expand Down
Expand Up @@ -48,7 +48,6 @@
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

Expand All @@ -63,10 +62,15 @@ public class ConnectionPoolTest
private HttpClient client;

public static Stream<ConnectionPoolFactory> pools()
{
return Stream.concat(poolsNoRoundRobin(),
Stream.of(new ConnectionPoolFactory("round-robin", destination -> new RoundRobinConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination))));
}

public static Stream<ConnectionPoolFactory> poolsNoRoundRobin()
{
return Stream.of(
new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)),
new ConnectionPoolFactory("round-robin", destination -> new RoundRobinConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)),
new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1))
);
}
Expand Down Expand Up @@ -295,11 +299,11 @@ public void resolve(String host, int port, Promise<List<InetSocketAddress>> prom
}

@ParameterizedTest
@MethodSource("pools")
@MethodSource("poolsNoRoundRobin")
public void testConcurrentRequestsDontOpenTooManyConnections(ConnectionPoolFactory factory) throws Exception
{
// Round robin connection pool does open a few more connections than expected.
Assumptions.assumeFalse(factory.name.equals("round-robin"));
// Round robin connection pool does open a few more
// connections than expected, exclude it from this test.

startServer(new EmptyServerHandler());

Expand Down
@@ -0,0 +1,114 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//

package org.eclipse.jetty.io;

import java.util.AbstractSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.function.Predicate;

import org.eclipse.jetty.util.IncludeExcludeSet;

public class IncludeExcludeConnectionStatistics extends ConnectionStatistics
{
private final IncludeExcludeSet<Class<? extends Connection>, Connection> _set = new IncludeExcludeSet<>(ConnectionSet.class);

public void include(String className) throws ClassNotFoundException
{
_set.include(connectionForName(className));
}

public void include(Class<? extends Connection> clazz)
{
_set.include(clazz);
}

public void exclude(String className) throws ClassNotFoundException
{
_set.exclude(connectionForName(className));
}

public void exclude(Class<? extends Connection> clazz)
{
_set.exclude(clazz);
}

private Class<? extends Connection> connectionForName(String className) throws ClassNotFoundException
{
Class<?> aClass = Class.forName(className);
if (!Connection.class.isAssignableFrom(aClass))
throw new IllegalArgumentException("Class is not a Connection");

@SuppressWarnings("unchecked")
Class<? extends Connection> connectionClass = (Class<? extends Connection>)aClass;
return connectionClass;
}

@Override
public void onOpened(Connection connection)
{
if (_set.test(connection))
super.onOpened(connection);
}

@Override
public void onClosed(Connection connection)
{
if (_set.test(connection))
super.onClosed(connection);
}

public static class ConnectionSet extends AbstractSet<Class<? extends Connection>> implements Predicate<Connection>
{
private final Set<Class<? extends Connection>> set = new HashSet<>();

@Override
public boolean add(Class<? extends Connection> aClass)
{
return set.add(aClass);
}

@Override
public boolean remove(Object o)
{
return set.remove(o);
}

@Override
public Iterator<Class<? extends Connection>> iterator()
{
return set.iterator();
}

@Override
public int size()
{
return set.size();
}

@Override
public boolean test(Connection connection)
{
if (connection == null)
return false;
return set.stream().anyMatch(c -> c.isAssignableFrom(connection.getClass()));
}
}
}
Expand Up @@ -78,6 +78,7 @@
import org.eclipse.jetty.util.log.StacklessLogging;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.OS;

Expand Down Expand Up @@ -978,6 +979,7 @@ public boolean transform(Source source, Sink sink) throws IOException
assertArrayEquals(data, response.getContent());
}

@Disabled("See issue #3974")
@Test
public void testAfterContentTransformerOverflowingToDisk() throws Exception
{
Expand Down

0 comments on commit b0f16c6

Please sign in to comment.