From 6457f4b2fbbb83f1df8a983e12a8d72ddadef74f Mon Sep 17 00:00:00 2001 From: samueldlightfoot Date: Tue, 1 Nov 2022 15:02:25 +0000 Subject: [PATCH] Allow configurable connection provider eviction predicate --- .../netty/resources/ConnectionProvider.java | 63 ++++++ .../resources/PooledConnectionProvider.java | 57 ++++- .../resources/ConnectionProviderTest.java | 6 + .../http/client/Http2ConnectionProvider.java | 2 +- .../reactor/netty/http/client/Http2Pool.java | 104 ++++----- .../netty/http/client/Http2PoolTest.java | 207 ++++++++++++++++-- 6 files changed, 347 insertions(+), 92 deletions(-) diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java index 3fa2bb1f37..3cbfd25f8b 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java @@ -34,6 +34,7 @@ import java.util.Objects; import java.util.concurrent.TimeoutException; import java.util.function.BiFunction; +import java.util.function.BiPredicate; import java.util.function.Consumer; import java.util.function.Supplier; @@ -278,6 +279,48 @@ default String name() { return null; } + interface ConnectionMetadata { + + /** + * Returns the number of times the connection has been acquired from the pool. Returns 1 the first time the + * connection is allocated. + * + * @return the number of times the connection has been acquired + */ + int acquireCount(); + + /** + * Returns the time in ms that the connection has been idle. + * + * @return connection idle time in ms + */ + long idleTime(); + + /** + * Returns the age of the connection in ms. + * + * @return connection age in ms + */ + long lifeTime(); + + /** + * Returns a timestamp that denotes the order in which the connection was last released, to millisecond + * precision. + * + * @return the connection last release timestamp, or zero if currently acquired + */ + long releaseTimestamp(); + + /** + * Returns a timestamp that denotes the order in which the connection was created/allocated, to millisecond + * precision. + * + * @return the connection creation timestamp + */ + long allocationTimestamp(); + + } + interface AllocationStrategy> { /** @@ -476,6 +519,7 @@ class ConnectionPoolSpec> implements Suppl Supplier registrar; BiFunction pendingAcquireTimer; AllocationStrategy allocationStrategy; + BiPredicate evictionPredicate; /** * Returns {@link ConnectionPoolSpec} new instance with default properties. @@ -501,6 +545,7 @@ private ConnectionPoolSpec() { this.registrar = copy.registrar; this.pendingAcquireTimer = copy.pendingAcquireTimer; this.allocationStrategy = copy.allocationStrategy; + this.evictionPredicate = copy.evictionPredicate; } /** @@ -586,6 +631,24 @@ public final SPEC maxLifeTime(Duration maxLifeTime) { return get(); } + /** + * Set the options to use for configuring {@link ConnectionProvider} custom eviction predicate. + *

Unless a custom eviction predicate is specified, the connection is evicted when not active or not persistent, + * If {@link #maxLifeTime(Duration)} and/or {@link #maxIdleTime(Duration)} settings are configured, + * they are also taken into account. + *

Otherwise only the custom eviction predicate is invoked. + *

Note: This configuration is not applicable for {@link reactor.netty.tcp.TcpClient}. + * A TCP connection is always closed and never returned to the pool. + * + * @param evictionPredicate The predicate function that evaluates whether a connection should be evicted + * @return {@literal this} + * @throws NullPointerException if evictionPredicate is null + */ + public final SPEC evictionPredicate(BiPredicate evictionPredicate) { + this.evictionPredicate = Objects.requireNonNull(evictionPredicate); + return get(); + } + /** * Whether to enable metrics to be collected and registered in Micrometer's * {@link io.micrometer.core.instrument.Metrics#globalRegistry globalRegistry} diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java index 48da14061b..5e6996c3ce 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java @@ -387,6 +387,7 @@ protected static final class PoolFactory { final Duration disposeTimeout; final BiFunction pendingAcquireTimer; final AllocationStrategy allocationStrategy; + final BiPredicate evictionPredicate; PoolFactory(ConnectionPoolSpec conf, Duration disposeTimeout) { this(conf, disposeTimeout, null); @@ -408,6 +409,7 @@ protected static final class PoolFactory { this.disposeTimeout = disposeTimeout; this.pendingAcquireTimer = conf.pendingAcquireTimer; this.allocationStrategy = conf.allocationStrategy; + this.evictionPredicate = conf.evictionPredicate; } public InstrumentedPool newPool( @@ -426,28 +428,35 @@ public InstrumentedPool newPool( Publisher allocator, @Nullable reactor.pool.AllocationStrategy allocationStrategy, // this is not used but kept for backwards compatibility Function> destroyHandler, - BiPredicate evictionPredicate, + BiPredicate defaultEvictionPredicate, Function, InstrumentedPool> poolFactory) { if (disposeTimeout != null) { - return newPoolInternal(allocator, destroyHandler, evictionPredicate) + return newPoolInternal(allocator, destroyHandler, defaultEvictionPredicate) .build(poolFactory.andThen(InstrumentedPoolDecorators::gracefulShutdown)); } - return newPoolInternal(allocator, destroyHandler, evictionPredicate).build(poolFactory); + return newPoolInternal(allocator, destroyHandler, defaultEvictionPredicate).build(poolFactory); } PoolBuilder> newPoolInternal( Publisher allocator, Function> destroyHandler, - BiPredicate evictionPredicate) { + BiPredicate defaultEvictionPredicate) { PoolBuilder> poolBuilder = PoolBuilder.from(allocator) .destroyHandler(destroyHandler) - .evictionPredicate(evictionPredicate - .or((poolable, meta) -> (maxIdleTime != -1 && meta.idleTime() >= maxIdleTime) - || (maxLifeTime != -1 && meta.lifeTime() >= maxLifeTime))) .maxPendingAcquire(pendingAcquireMaxCount) .evictInBackground(evictionInterval); + if (this.evictionPredicate != null) { + poolBuilder = poolBuilder.evictionPredicate( + (poolable, meta) -> this.evictionPredicate.test(poolable, new PooledConnectionMetadata(meta))); + } + else { + poolBuilder = poolBuilder.evictionPredicate(defaultEvictionPredicate.or((poolable, meta) -> + (maxIdleTime != -1 && meta.idleTime() >= maxIdleTime) + || (maxLifeTime != -1 && meta.lifeTime() >= maxLifeTime))); + } + if (DEFAULT_POOL_GET_PERMITS_SAMPLING_RATE > 0d && DEFAULT_POOL_GET_PERMITS_SAMPLING_RATE <= 1d && DEFAULT_POOL_RETURN_PERMITS_SAMPLING_RATE > 0d && DEFAULT_POOL_RETURN_PERMITS_SAMPLING_RATE <= 1d) { poolBuilder = poolBuilder.allocationStrategy(SamplingAllocationStrategy.sizeBetweenWithSampling( @@ -550,6 +559,40 @@ public void returnPermits(int returned) { } } + static final class PooledConnectionMetadata implements ConnectionMetadata { + + final PooledRefMetadata delegate; + + PooledConnectionMetadata(PooledRefMetadata delegate) { + this.delegate = delegate; + } + + @Override + public int acquireCount() { + return delegate.acquireCount(); + } + + @Override + public long idleTime() { + return delegate.idleTime(); + } + + @Override + public long lifeTime() { + return delegate.lifeTime(); + } + + @Override + public long releaseTimestamp() { + return delegate.releaseTimestamp(); + } + + @Override + public long allocationTimestamp() { + return delegate.allocationTimestamp(); + } + } + static final class PoolKey { final String fqdn; final SocketAddress holder; diff --git a/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java b/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java index ffc289b3a5..18b62f4548 100644 --- a/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java +++ b/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java @@ -17,6 +17,7 @@ import org.junit.jupiter.api.Test; import reactor.core.Disposable; +import reactor.netty.Connection; import java.lang.reflect.Field; import java.lang.reflect.Modifier; @@ -24,6 +25,7 @@ import java.util.Collections; import java.util.Map; import java.util.function.BiFunction; +import java.util.function.BiPredicate; import java.util.function.Supplier; import static org.assertj.core.api.Assertions.assertThat; @@ -34,6 +36,7 @@ class ConnectionProviderTest { static final String TEST_STRING = ""; static final Supplier TEST_SUPPLIER = () -> (a, b, c, d) -> {}; static final BiFunction TEST_BI_FUNCTION = (r, duration) -> () -> {}; + static final BiPredicate TEST_BI_PREDICATE = (conn, meta) -> true; @Test void testBuilderCopyConstructor() throws IllegalAccessException { @@ -80,6 +83,9 @@ else if (int.class == clazz) { else if (BiFunction.class == clazz) { field.set(builder, TEST_BI_FUNCTION); } + else if (BiPredicate.class == clazz) { + field.set(builder, TEST_BI_PREDICATE); + } else { throw new IllegalArgumentException("Unknown field type " + clazz); } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java index 3f2fa24668..a29aea60fe 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java @@ -494,7 +494,7 @@ static final class PooledConnectionAllocator { this.remoteAddress = remoteAddress; this.resolver = resolver; this.pool = poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE, - poolConFig -> new Http2Pool(poolConFig, poolFactory.allocationStrategy(), poolFactory.maxIdleTime(), poolFactory.maxLifeTime())); + poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy())); } Publisher connectChannel() { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java index 65aa659415..c350192a2d 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java @@ -65,16 +65,16 @@ * The connection is removed from the pool when: *

    *
  • The connection is closed.
  • - *
  • The connection has reached its life time and there are no active streams.
  • - *
  • The connection has reached its idle time and there are no active streams.
  • + *
  • The eviction predicate evaluates to true and there are no active streams.
  • *
  • When the client is in one of the two modes: 1) H2 and HTTP/1.1 or 2) H2C and HTTP/1.1, * and the negotiated protocol is HTTP/1.1.
  • *
*

* The connection is filtered out when: *

    - *
  • The connection has reached its life time and there are active streams. In this case, the connection stays - * in the pool, but it is not used. Once there are no active streams, the connection is removed from the pool.
  • + *
  • The connection's eviction predicate evaluates to true and there are active streams. In this case, the + * connection stays in the pool, but it is not used. Once there are no active streams, the connection is removed + * from the pool.
  • *
  • The connection has reached its max active streams configuration. In this case, the connection stays * in the pool, but it is not used. Once the number of the active streams is below max active streams configuration, * the connection can be used again.
  • @@ -94,9 +94,6 @@ * Configurations that are not applicable *
      *
    • {@link PoolConfig#destroyHandler()} - the destroy handler cannot be used as the destruction is more complex.
    • - *
    • {@link PoolConfig#evictionPredicate()} - the eviction predicate cannot be used as more complex - * checks have to be done. Also the pool uses filtering for the connections (a connection might not be able - * to be used but is required to stay in the pool).
    • *
    • {@link PoolConfig#metricsRecorder()} - no pool instrumentation.
    • *
    • {@link PoolConfig#releaseHandler()} - release functionality works as invalidate.
    • *
    • {@link PoolConfig#reuseIdleResourcesInLruOrder()} - FIFO is used when checking the connections.
    • @@ -156,8 +153,6 @@ final class Http2Pool implements InstrumentedPool, InstrumentedPool. final Clock clock; final Long maxConcurrentStreams; - final long maxIdleTime; - final long maxLifeTime; final int minConnections; final PoolConfig poolConfig; @@ -165,21 +160,17 @@ final class Http2Pool implements InstrumentedPool, InstrumentedPool. Disposable evictionTask; - Http2Pool(PoolConfig poolConfig, @Nullable ConnectionProvider.AllocationStrategy allocationStrategy, - long maxIdleTime, long maxLifeTime) { + Http2Pool(PoolConfig poolConfig, @Nullable ConnectionProvider.AllocationStrategy allocationStrategy) { this.clock = poolConfig.clock(); this.connections = new ConcurrentLinkedQueue<>(); this.lastInteractionTimestamp = clock.millis(); this.maxConcurrentStreams = allocationStrategy instanceof Http2AllocationStrategy ? ((Http2AllocationStrategy) allocationStrategy).maxConcurrentStreams() : -1; - this.maxIdleTime = maxIdleTime; - this.maxLifeTime = maxLifeTime; this.minConnections = allocationStrategy == null ? 0 : allocationStrategy.permitMinimum(); this.pending = new ConcurrentLinkedDeque<>(); this.poolConfig = poolConfig; recordInteractionTimestamp(); - scheduleEviction(); } @@ -323,8 +314,8 @@ else if (poolConfig.evictInBackgroundInterval().isZero()) { ref.slot.invalidate(); removeSlot(ref.slot); } - // max life reached - else if (maxLifeReached(ref.slot)) { + // eviction predicate evaluates to true + else if (testEvictionPredicate(ref.slot)) { //"FutureReturnValueIgnored" this is deliberate ref.slot.connection.channel().close(); ref.slot.invalidate(); @@ -503,9 +494,9 @@ void evictInBackground() { continue; } - if (maxLifeReached(slot)) { + if (testEvictionPredicate(slot)) { if (log.isDebugEnabled()) { - log.debug(format(slot.connection.channel(), "Max life time is reached, remove from pool")); + log.debug(format(slot.connection.channel(), "Eviction predicate was true, remove from pool")); } //"FutureReturnValueIgnored" this is deliberate slot.connection.channel().close(); @@ -513,19 +504,7 @@ void evictInBackground() { slots.remove(); IDLE_SIZE.decrementAndGet(this); slot.invalidate(); - continue; - } - } - if (maxIdleReached(slot)) { - if (log.isDebugEnabled()) { - log.debug(format(slot.connection.channel(), "Idle time is reached, remove from pool")); } - //"FutureReturnValueIgnored" this is deliberate - slot.connection.channel().close(); - recordInteractionTimestamp(); - slots.remove(); - IDLE_SIZE.decrementAndGet(this); - slot.invalidate(); } } } @@ -589,29 +568,18 @@ Slot findConnection(ConcurrentLinkedQueue resources) { continue; } - // check whether the connection's idle time has been reached - if (maxIdleReached(slot)) { - if (log.isDebugEnabled()) { - log.debug(format(slot.connection.channel(), "Idle time is reached, remove from pool")); - } - //"FutureReturnValueIgnored" this is deliberate - slot.connection.channel().close(); - slot.invalidate(); - continue; - } - - // check whether the connection's max lifetime has been reached - if (maxLifeReached(slot)) { + // check whether the eviction predicate for the connection evaluates to true + if (testEvictionPredicate(slot)) { if (slot.concurrency() > 0) { if (log.isDebugEnabled()) { - log.debug(format(slot.connection.channel(), "Max life time is reached, {} active streams"), + log.debug(format(slot.connection.channel(), "Eviction predicate was true, {} active streams"), slot.concurrency()); } offerSlot(resources, slot); } else { if (log.isDebugEnabled()) { - log.debug(format(slot.connection.channel(), "Max life time is reached, remove from pool")); + log.debug(format(slot.connection.channel(), "Eviction predicate was true, remove from pool")); } //"FutureReturnValueIgnored" this is deliberate slot.connection.channel().close(); @@ -635,12 +603,8 @@ Slot findConnection(ConcurrentLinkedQueue resources) { return null; } - boolean maxIdleReached(Slot slot) { - return maxIdleTime != -1 && slot.idleTime() >= maxIdleTime; - } - - boolean maxLifeReached(Slot slot) { - return maxLifeTime != -1 && slot.lifeTime() >= maxLifeTime; + boolean testEvictionPredicate(Slot slot) { + return poolConfig.evictionPredicate().test(slot.connection, slot); } void pendingAcquireLimitReached(Borrower borrower, int maxPending) { @@ -941,7 +905,7 @@ public String toString() { } } - static final class Slot extends AtomicBoolean { + static final class Slot extends AtomicBoolean implements PooledRefMetadata { volatile int concurrency; static final AtomicIntegerFieldUpdater CONCURRENCY = @@ -1021,14 +985,6 @@ boolean goAwayReceived() { return frameCodec != null && ((Http2FrameCodec) frameCodec.handler()).connection().goAwayReceived(); } - long idleTime() { - if (concurrency() > 0) { - return 0L; - } - long idleTime = idleTimestamp != 0 ? idleTimestamp : creationTimestamp; - return pool.clock.millis() - idleTime; - } - @Nullable ChannelHandlerContext http2FrameCodecCtx() { ChannelHandlerContext ctx = http2FrameCodecCtx; @@ -1076,8 +1032,34 @@ void invalidate() { } } - long lifeTime() { + @Override + public long idleTime() { + if (concurrency() > 0) { + return 0L; + } + long idleTime = idleTimestamp != 0 ? idleTimestamp : creationTimestamp; + return pool.clock.millis() - idleTime; + } + + @Override + public int acquireCount() { + return 1; + } + + @Override + public long lifeTime() { return pool.clock.millis() - creationTimestamp; } + + @Override + public long releaseTimestamp() { + return 0; + } + + @Override + public long allocationTimestamp() { + return creationTimestamp; + } + } } diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java index 52b63c9f1e..4dbda691db 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java @@ -41,6 +41,7 @@ import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; import java.util.concurrent.atomic.AtomicReference; @@ -57,7 +58,7 @@ void acquireInvalidate() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null)); try { List> acquired = new ArrayList<>(); @@ -101,7 +102,7 @@ void acquireRelease() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null)); try { List> acquired = new ArrayList<>(); @@ -148,7 +149,7 @@ void evictClosedConnection() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null)); Connection connection = null; try { @@ -226,7 +227,7 @@ private void evictClosedConnectionMaxConnectionsNotReached(boolean closeSecond) .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 2); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null)); Connection connection = null; try { @@ -314,7 +315,7 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null)); Connection connection = null; try { @@ -373,7 +374,7 @@ void evictInBackgroundClosedConnection() throws Exception { .maxPendingAcquireUnbounded() .sizeBetween(0, 1) .evictInBackground(Duration.ofSeconds(5)); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null)); Connection connection = null; try { @@ -445,8 +446,9 @@ void evictInBackgroundMaxIdleTime() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1) - .evictInBackground(Duration.ofSeconds(5)); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, 10, -1)); + .evictInBackground(Duration.ofSeconds(5)) + .evictionPredicate((conn, meta) -> meta.idleTime() >= 10); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null)); Connection connection1 = null; Connection connection2 = null; @@ -517,8 +519,9 @@ void evictInBackgroundMaxLifeTime() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1) - .evictInBackground(Duration.ofSeconds(5)); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, 10)); + .evictInBackground(Duration.ofSeconds(5)) + .evictionPredicate((conn, meta) -> meta.lifeTime() >= 10); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null)); Connection connection1 = null; Connection connection2 = null; @@ -581,6 +584,86 @@ void evictInBackgroundMaxLifeTime() throws Exception { } } + @Test + void evictInBackgroundEvictionPredicate() { + final AtomicBoolean shouldEvict = new AtomicBoolean(false); + PoolBuilder> poolBuilder = + PoolBuilder.from(Mono.fromSupplier(() -> { + Channel channel = new EmbeddedChannel( + new TestChannelId(), + Http2FrameCodecBuilder.forClient().build()); + return Connection.from(channel); + })) + .idleResourceReuseLruOrder() + .maxPendingAcquireUnbounded() + .sizeBetween(0, 1) + .evictInBackground(Duration.ofSeconds(5)) + .evictionPredicate((conn, metadata) -> shouldEvict.get()); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null)); + + Connection connection1 = null; + Connection connection2 = null; + try { + PooledRef acquired1 = http2Pool.acquire().block(); + + assertThat(acquired1).isNotNull(); + assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); + + connection1 = acquired1.poolable(); + ChannelId id1 = connection1.channel().id(); + + shouldEvict.set(true); + + assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); + + acquired1.invalidate().block(); + + http2Pool.evictInBackground(); + + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); + + shouldEvict.set(false); + + PooledRef acquired2 = http2Pool.acquire().block(); + + assertThat(acquired2).isNotNull(); + assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); + + connection2 = acquired2.poolable(); + ChannelId id2 = connection2.channel().id(); + + assertThat(id1).isNotEqualTo(id2); + + acquired2.invalidate().block(); + + shouldEvict.set(true); + + http2Pool.evictInBackground(); + + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); + } + finally { + if (connection1 != null) { + ((EmbeddedChannel) connection1.channel()).finishAndReleaseAll(); + connection1.dispose(); + } + if (connection2 != null) { + ((EmbeddedChannel) connection2.channel()).finishAndReleaseAll(); + connection2.dispose(); + } + } + } + @Test void maxIdleTime() throws Exception { PoolBuilder> poolBuilder = @@ -592,8 +675,9 @@ void maxIdleTime() throws Exception { })) .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() - .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, 10, -1)); + .sizeBetween(0, 1) + .evictionPredicate((conn, meta) -> meta.idleTime() >= 10); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null)); Connection connection1 = null; Connection connection2 = null; @@ -650,8 +734,9 @@ void maxIdleTimeActiveStreams() throws Exception { PoolBuilder.from(Mono.just(Connection.from(channel))) .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() - .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, 10, -1)); + .sizeBetween(0, 1) + .evictionPredicate((conn, meta) -> meta.idleTime() >= 10); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null)); Connection connection1 = null; Connection connection2 = null; @@ -712,8 +797,9 @@ void maxLifeTime() throws Exception { })) .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() - .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, 10)); + .sizeBetween(0, 1) + .evictionPredicate((conn, meta) -> meta.lifeTime() >= 10); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null)); Connection connection1 = null; Connection connection2 = null; @@ -770,6 +856,79 @@ void maxLifeTime() throws Exception { } } + @Test + void evictionPredicate() { + final AtomicBoolean shouldEvict = new AtomicBoolean(false); + PoolBuilder> poolBuilder = + PoolBuilder.from(Mono.fromSupplier(() -> { + Channel channel = new EmbeddedChannel( + new TestChannelId(), + Http2FrameCodecBuilder.forClient().build()); + return Connection.from(channel); + })) + .idleResourceReuseLruOrder() + .maxPendingAcquireUnbounded() + .sizeBetween(0, 1) + .evictionPredicate((conn, metadata) -> shouldEvict.get()); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null)); + + Connection connection1 = null; + Connection connection2 = null; + try { + PooledRef acquired1 = http2Pool.acquire().block(); + + assertThat(acquired1).isNotNull(); + assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); + + connection1 = acquired1.poolable(); + ChannelId id1 = connection1.channel().id(); + + shouldEvict.set(true); + + assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); + + acquired1.invalidate().block(); + + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); + + shouldEvict.set(false); + + PooledRef acquired2 = http2Pool.acquire().block(); + + assertThat(acquired2).isNotNull(); + assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); + + connection2 = acquired2.poolable(); + ChannelId id2 = connection2.channel().id(); + + assertThat(id1).isNotEqualTo(id2); + + acquired2.invalidate().block(); + + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); + } + finally { + if (connection1 != null) { + ((EmbeddedChannel) connection1.channel()).finishAndReleaseAll(); + connection1.dispose(); + } + if (connection2 != null) { + ((EmbeddedChannel) connection2.channel()).finishAndReleaseAll(); + connection2.dispose(); + } + } + } + @Test void maxLifeTimeMaxConnectionsNotReached() throws Exception { PoolBuilder> poolBuilder = @@ -781,8 +940,9 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception { })) .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() - .sizeBetween(0, 2); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, 50)); + .sizeBetween(0, 2) + .evictionPredicate((conn, meta) -> meta.lifeTime() >= 50); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null)); Connection connection1 = null; Connection connection2 = null; @@ -864,11 +1024,12 @@ private void doMaxLifeTimeMaxConnectionsReached(@Nullable BiFunction meta.lifeTime() >= 10); if (pendingAcquireTimer != null) { poolBuilder = poolBuilder.pendingAcquireTimer(pendingAcquireTimer); } - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, 10)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null)); Connection connection = null; try { @@ -923,7 +1084,7 @@ void minConnections() { .maxConnections(3) .minConnections(1) .build(); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, strategy, -1, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, strategy)); List> acquired = new ArrayList<>(); try { @@ -972,7 +1133,7 @@ void minConnectionsMaxStreamsReached() { .maxConnections(3) .minConnections(1) .build(); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, strategy, -1, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, strategy)); List> acquired = new ArrayList<>(); try { @@ -1016,7 +1177,7 @@ void nonHttp2ConnectionEmittedOnce() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null)); try { PooledRef acquired = http2Pool.acquire().block(Duration.ofSeconds(1));