From 68dfe0b04bda30a1790708ec3abb180fa01c8fa2 Mon Sep 17 00:00:00 2001 From: samueldlightfoot Date: Tue, 1 Nov 2022 15:02:25 +0000 Subject: [PATCH] Allow configurable connection provider eviction predicate --- .../netty/resources/ConnectionProvider.java | 61 +++++++++++++++++++ .../resources/PooledConnectionProvider.java | 58 +++++++++++++++--- .../resources/ConnectionProviderTest.java | 6 ++ 3 files changed, 118 insertions(+), 7 deletions(-) diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java index 3fa2bb1f37..559cbbae42 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java @@ -34,6 +34,7 @@ import java.util.Objects; import java.util.concurrent.TimeoutException; import java.util.function.BiFunction; +import java.util.function.BiPredicate; import java.util.function.Consumer; import java.util.function.Supplier; @@ -278,6 +279,48 @@ default String name() { return null; } + interface ConnectionMetadata { + + /** + * Returns the number of times the connection has been acquired from the pool. Returns 1 the first time the + * connection is allocated. + * + * @return the number of times the connection has been acquired + */ + int acquireCount(); + + /** + * Returns the time in ms that the connection has been idle. + * + * @return connection idle time in ms + */ + long idleTime(); + + /** + * Returns the age of the connection in ms. + * + * @return connection age in ms + */ + long lifeTime(); + + /** + * Returns a timestamp that denotes the order in which the connection was last released, to millisecond + * precision. + * + * @return the connection last release timestamp, or zero if currently acquired + */ + long releaseTimestamp(); + + /** + * Returns a timestamp that denotes the order in which the connection was created/allocated, to millisecond + * precision. + * + * @return the connection creation timestamp + */ + long allocationTimestamp(); + + } + interface AllocationStrategy> { /** @@ -476,6 +519,7 @@ class ConnectionPoolSpec> implements Suppl Supplier registrar; BiFunction pendingAcquireTimer; AllocationStrategy allocationStrategy; + BiPredicate evictionPredicate; /** * Returns {@link ConnectionPoolSpec} new instance with default properties. @@ -501,6 +545,7 @@ private ConnectionPoolSpec() { this.registrar = copy.registrar; this.pendingAcquireTimer = copy.pendingAcquireTimer; this.allocationStrategy = copy.allocationStrategy; + this.evictionPredicate = copy.evictionPredicate; } /** @@ -586,6 +631,22 @@ public final SPEC maxLifeTime(Duration maxLifeTime) { return get(); } + /** + * Set the options to use for configuring {@link ConnectionProvider} eviction predicate. + * Default to null implying connections will not be evaluated for eviction. The eviction predicate works + * alongside {@link #maxLifeTime(Duration)} and {@link #maxIdleTime(Duration)}. + *

Note: This configuration is not applicable for {@link reactor.netty.tcp.TcpClient}. + * A TCP connection is always closed and never returned to the pool. + * + * @param evictionPredicate The predicate function that evaluates whether a connection should be evicted + * @return {@literal this} + * @throws NullPointerException if evictionPredicate is null + */ + public final SPEC evictionPredicate(BiPredicate evictionPredicate) { + this.evictionPredicate = Objects.requireNonNull(evictionPredicate); + return get(); + } + /** * Whether to enable metrics to be collected and registered in Micrometer's * {@link io.micrometer.core.instrument.Metrics#globalRegistry globalRegistry} diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java index 48da14061b..1ef839b90c 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java @@ -387,6 +387,7 @@ protected static final class PoolFactory { final Duration disposeTimeout; final BiFunction pendingAcquireTimer; final AllocationStrategy allocationStrategy; + final BiPredicate evictionPredicate; PoolFactory(ConnectionPoolSpec conf, Duration disposeTimeout) { this(conf, disposeTimeout, null); @@ -408,6 +409,7 @@ protected static final class PoolFactory { this.disposeTimeout = disposeTimeout; this.pendingAcquireTimer = conf.pendingAcquireTimer; this.allocationStrategy = conf.allocationStrategy; + this.evictionPredicate = conf.evictionPredicate; } public InstrumentedPool newPool( @@ -426,28 +428,36 @@ public InstrumentedPool newPool( Publisher allocator, @Nullable reactor.pool.AllocationStrategy allocationStrategy, // this is not used but kept for backwards compatibility Function> destroyHandler, - BiPredicate evictionPredicate, + BiPredicate defaultEvictionPredicate, Function, InstrumentedPool> poolFactory) { if (disposeTimeout != null) { - return newPoolInternal(allocator, destroyHandler, evictionPredicate) + return newPoolInternal(allocator, destroyHandler, defaultEvictionPredicate) .build(poolFactory.andThen(InstrumentedPoolDecorators::gracefulShutdown)); } - return newPoolInternal(allocator, destroyHandler, evictionPredicate).build(poolFactory); + return newPoolInternal(allocator, destroyHandler, defaultEvictionPredicate).build(poolFactory); } PoolBuilder> newPoolInternal( Publisher allocator, Function> destroyHandler, - BiPredicate evictionPredicate) { + BiPredicate defaultEvictionPredicate) { PoolBuilder> poolBuilder = PoolBuilder.from(allocator) .destroyHandler(destroyHandler) - .evictionPredicate(evictionPredicate - .or((poolable, meta) -> (maxIdleTime != -1 && meta.idleTime() >= maxIdleTime) - || (maxLifeTime != -1 && meta.lifeTime() >= maxLifeTime))) .maxPendingAcquire(pendingAcquireMaxCount) .evictInBackground(evictionInterval); + BiPredicate baseEvictionPredicate = (poolable, meta) -> + (maxIdleTime != -1 && meta.idleTime() >= maxIdleTime) || (maxLifeTime != -1 && meta.lifeTime() >= maxLifeTime); + + if (this.evictionPredicate != null) { + poolBuilder = poolBuilder.evictionPredicate(baseEvictionPredicate + .or((poolable, meta) -> this.evictionPredicate.test(poolable, new PooledConnectionMetadata(meta)))); + } + else { + poolBuilder = poolBuilder.evictionPredicate(baseEvictionPredicate.or(defaultEvictionPredicate)); + } + if (DEFAULT_POOL_GET_PERMITS_SAMPLING_RATE > 0d && DEFAULT_POOL_GET_PERMITS_SAMPLING_RATE <= 1d && DEFAULT_POOL_RETURN_PERMITS_SAMPLING_RATE > 0d && DEFAULT_POOL_RETURN_PERMITS_SAMPLING_RATE <= 1d) { poolBuilder = poolBuilder.allocationStrategy(SamplingAllocationStrategy.sizeBetweenWithSampling( @@ -550,6 +560,40 @@ public void returnPermits(int returned) { } } + static final class PooledConnectionMetadata implements ConnectionMetadata { + + final PooledRefMetadata delegate; + + PooledConnectionMetadata(PooledRefMetadata delegate) { + this.delegate = delegate; + } + + @Override + public int acquireCount() { + return delegate.acquireCount(); + } + + @Override + public long idleTime() { + return delegate.idleTime(); + } + + @Override + public long lifeTime() { + return delegate.lifeTime(); + } + + @Override + public long releaseTimestamp() { + return delegate.releaseTimestamp(); + } + + @Override + public long allocationTimestamp() { + return delegate.allocationTimestamp(); + } + } + static final class PoolKey { final String fqdn; final SocketAddress holder; diff --git a/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java b/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java index ffc289b3a5..18b62f4548 100644 --- a/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java +++ b/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java @@ -17,6 +17,7 @@ import org.junit.jupiter.api.Test; import reactor.core.Disposable; +import reactor.netty.Connection; import java.lang.reflect.Field; import java.lang.reflect.Modifier; @@ -24,6 +25,7 @@ import java.util.Collections; import java.util.Map; import java.util.function.BiFunction; +import java.util.function.BiPredicate; import java.util.function.Supplier; import static org.assertj.core.api.Assertions.assertThat; @@ -34,6 +36,7 @@ class ConnectionProviderTest { static final String TEST_STRING = ""; static final Supplier TEST_SUPPLIER = () -> (a, b, c, d) -> {}; static final BiFunction TEST_BI_FUNCTION = (r, duration) -> () -> {}; + static final BiPredicate TEST_BI_PREDICATE = (conn, meta) -> true; @Test void testBuilderCopyConstructor() throws IllegalAccessException { @@ -80,6 +83,9 @@ else if (int.class == clazz) { else if (BiFunction.class == clazz) { field.set(builder, TEST_BI_FUNCTION); } + else if (BiPredicate.class == clazz) { + field.set(builder, TEST_BI_PREDICATE); + } else { throw new IllegalArgumentException("Unknown field type " + clazz); }