Skip to content

Commit

Permalink
Allow configurable connection provider eviction predicate
Browse files Browse the repository at this point in the history
  • Loading branch information
samueldlightfoot committed Nov 1, 2022
1 parent 0296f6e commit 68dfe0b
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 7 deletions.
Expand Up @@ -34,6 +34,7 @@
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Supplier;

Expand Down Expand Up @@ -278,6 +279,48 @@ default String name() {
return null;
}

interface ConnectionMetadata {

/**
* Returns the number of times the connection has been acquired from the pool. Returns 1 the first time the
* connection is allocated.
*
* @return the number of times the connection has been acquired
*/
int acquireCount();

/**
* Returns the time in ms that the connection has been idle.
*
* @return connection idle time in ms
*/
long idleTime();

/**
* Returns the age of the connection in ms.
*
* @return connection age in ms
*/
long lifeTime();

/**
* Returns a timestamp that denotes the order in which the connection was last released, to millisecond
* precision.
*
* @return the connection last release timestamp, or zero if currently acquired
*/
long releaseTimestamp();

/**
* Returns a timestamp that denotes the order in which the connection was created/allocated, to millisecond
* precision.
*
* @return the connection creation timestamp
*/
long allocationTimestamp();

}

interface AllocationStrategy<A extends AllocationStrategy<A>> {

/**
Expand Down Expand Up @@ -476,6 +519,7 @@ class ConnectionPoolSpec<SPEC extends ConnectionPoolSpec<SPEC>> implements Suppl
Supplier<? extends ConnectionProvider.MeterRegistrar> registrar;
BiFunction<Runnable, Duration, Disposable> pendingAcquireTimer;
AllocationStrategy<?> allocationStrategy;
BiPredicate<Connection, ConnectionMetadata> evictionPredicate;

/**
* Returns {@link ConnectionPoolSpec} new instance with default properties.
Expand All @@ -501,6 +545,7 @@ private ConnectionPoolSpec() {
this.registrar = copy.registrar;
this.pendingAcquireTimer = copy.pendingAcquireTimer;
this.allocationStrategy = copy.allocationStrategy;
this.evictionPredicate = copy.evictionPredicate;
}

/**
Expand Down Expand Up @@ -586,6 +631,22 @@ public final SPEC maxLifeTime(Duration maxLifeTime) {
return get();
}

/**
* Set the options to use for configuring {@link ConnectionProvider} eviction predicate.
* Default to null implying connections will not be evaluated for eviction. The eviction predicate works
* alongside {@link #maxLifeTime(Duration)} and {@link #maxIdleTime(Duration)}.
* <p><strong>Note:</strong> This configuration is not applicable for {@link reactor.netty.tcp.TcpClient}.
* A TCP connection is always closed and never returned to the pool.
*
* @param evictionPredicate The predicate function that evaluates whether a connection should be evicted
* @return {@literal this}
* @throws NullPointerException if evictionPredicate is null
*/
public final SPEC evictionPredicate(BiPredicate<Connection, ConnectionMetadata> evictionPredicate) {
this.evictionPredicate = Objects.requireNonNull(evictionPredicate);
return get();
}

/**
* Whether to enable metrics to be collected and registered in Micrometer's
* {@link io.micrometer.core.instrument.Metrics#globalRegistry globalRegistry}
Expand Down
Expand Up @@ -387,6 +387,7 @@ protected static final class PoolFactory<T extends Connection> {
final Duration disposeTimeout;
final BiFunction<Runnable, Duration, Disposable> pendingAcquireTimer;
final AllocationStrategy<?> allocationStrategy;
final BiPredicate<Connection, ConnectionMetadata> evictionPredicate;

PoolFactory(ConnectionPoolSpec<?> conf, Duration disposeTimeout) {
this(conf, disposeTimeout, null);
Expand All @@ -408,6 +409,7 @@ protected static final class PoolFactory<T extends Connection> {
this.disposeTimeout = disposeTimeout;
this.pendingAcquireTimer = conf.pendingAcquireTimer;
this.allocationStrategy = conf.allocationStrategy;
this.evictionPredicate = conf.evictionPredicate;
}

public InstrumentedPool<T> newPool(
Expand All @@ -426,28 +428,36 @@ public InstrumentedPool<T> newPool(
Publisher<T> allocator,
@Nullable reactor.pool.AllocationStrategy allocationStrategy, // this is not used but kept for backwards compatibility
Function<T, Publisher<Void>> destroyHandler,
BiPredicate<T, PooledRefMetadata> evictionPredicate,
BiPredicate<T, PooledRefMetadata> defaultEvictionPredicate,
Function<PoolConfig<T>, InstrumentedPool<T>> poolFactory) {
if (disposeTimeout != null) {
return newPoolInternal(allocator, destroyHandler, evictionPredicate)
return newPoolInternal(allocator, destroyHandler, defaultEvictionPredicate)
.build(poolFactory.andThen(InstrumentedPoolDecorators::gracefulShutdown));
}
return newPoolInternal(allocator, destroyHandler, evictionPredicate).build(poolFactory);
return newPoolInternal(allocator, destroyHandler, defaultEvictionPredicate).build(poolFactory);
}

PoolBuilder<T, PoolConfig<T>> newPoolInternal(
Publisher<T> allocator,
Function<T, Publisher<Void>> destroyHandler,
BiPredicate<T, PooledRefMetadata> evictionPredicate) {
BiPredicate<T, PooledRefMetadata> defaultEvictionPredicate) {
PoolBuilder<T, PoolConfig<T>> poolBuilder =
PoolBuilder.from(allocator)
.destroyHandler(destroyHandler)
.evictionPredicate(evictionPredicate
.or((poolable, meta) -> (maxIdleTime != -1 && meta.idleTime() >= maxIdleTime)
|| (maxLifeTime != -1 && meta.lifeTime() >= maxLifeTime)))
.maxPendingAcquire(pendingAcquireMaxCount)
.evictInBackground(evictionInterval);

BiPredicate<T, PooledRefMetadata> baseEvictionPredicate = (poolable, meta) ->
(maxIdleTime != -1 && meta.idleTime() >= maxIdleTime) || (maxLifeTime != -1 && meta.lifeTime() >= maxLifeTime);

if (this.evictionPredicate != null) {
poolBuilder = poolBuilder.evictionPredicate(baseEvictionPredicate
.or((poolable, meta) -> this.evictionPredicate.test(poolable, new PooledConnectionMetadata(meta))));
}
else {
poolBuilder = poolBuilder.evictionPredicate(baseEvictionPredicate.or(defaultEvictionPredicate));
}

if (DEFAULT_POOL_GET_PERMITS_SAMPLING_RATE > 0d && DEFAULT_POOL_GET_PERMITS_SAMPLING_RATE <= 1d
&& DEFAULT_POOL_RETURN_PERMITS_SAMPLING_RATE > 0d && DEFAULT_POOL_RETURN_PERMITS_SAMPLING_RATE <= 1d) {
poolBuilder = poolBuilder.allocationStrategy(SamplingAllocationStrategy.sizeBetweenWithSampling(
Expand Down Expand Up @@ -550,6 +560,40 @@ public void returnPermits(int returned) {
}
}

static final class PooledConnectionMetadata implements ConnectionMetadata {

final PooledRefMetadata delegate;

PooledConnectionMetadata(PooledRefMetadata delegate) {
this.delegate = delegate;
}

@Override
public int acquireCount() {
return delegate.acquireCount();
}

@Override
public long idleTime() {
return delegate.idleTime();
}

@Override
public long lifeTime() {
return delegate.lifeTime();
}

@Override
public long releaseTimestamp() {
return delegate.releaseTimestamp();
}

@Override
public long allocationTimestamp() {
return delegate.allocationTimestamp();
}
}

static final class PoolKey {
final String fqdn;
final SocketAddress holder;
Expand Down
Expand Up @@ -17,13 +17,15 @@

import org.junit.jupiter.api.Test;
import reactor.core.Disposable;
import reactor.netty.Connection;

import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.function.Supplier;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -34,6 +36,7 @@ class ConnectionProviderTest {
static final String TEST_STRING = "";
static final Supplier<ConnectionProvider.MeterRegistrar> TEST_SUPPLIER = () -> (a, b, c, d) -> {};
static final BiFunction<Runnable, Duration, Disposable> TEST_BI_FUNCTION = (r, duration) -> () -> {};
static final BiPredicate<Connection, ConnectionProvider.ConnectionMetadata> TEST_BI_PREDICATE = (conn, meta) -> true;

@Test
void testBuilderCopyConstructor() throws IllegalAccessException {
Expand Down Expand Up @@ -80,6 +83,9 @@ else if (int.class == clazz) {
else if (BiFunction.class == clazz) {
field.set(builder, TEST_BI_FUNCTION);
}
else if (BiPredicate.class == clazz) {
field.set(builder, TEST_BI_PREDICATE);
}
else {
throw new IllegalArgumentException("Unknown field type " + clazz);
}
Expand Down

0 comments on commit 68dfe0b

Please sign in to comment.