Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow configurable connection provider eviction predicate #2557

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -34,6 +34,7 @@
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Supplier;

Expand Down Expand Up @@ -278,6 +279,48 @@ default String name() {
return null;
}

interface ConnectionMetadata {

/**
* Returns the number of times the connection has been acquired from the pool. Returns 1 the first time the
* connection is allocated.
*
* @return the number of times the connection has been acquired
*/
int acquireCount();

/**
* Returns the time in ms that the connection has been idle.
*
* @return connection idle time in ms
*/
long idleTime();

/**
* Returns the age of the connection in ms.
*
* @return connection age in ms
*/
long lifeTime();

/**
* Returns a timestamp that denotes the order in which the connection was last released, to millisecond
* precision.
*
* @return the connection last release timestamp, or zero if currently acquired
*/
long releaseTimestamp();

/**
* Returns a timestamp that denotes the order in which the connection was created/allocated, to millisecond
* precision.
*
* @return the connection creation timestamp
*/
long allocationTimestamp();

}

interface AllocationStrategy<A extends AllocationStrategy<A>> {

/**
Expand Down Expand Up @@ -476,6 +519,7 @@ class ConnectionPoolSpec<SPEC extends ConnectionPoolSpec<SPEC>> implements Suppl
Supplier<? extends ConnectionProvider.MeterRegistrar> registrar;
BiFunction<Runnable, Duration, Disposable> pendingAcquireTimer;
AllocationStrategy<?> allocationStrategy;
BiPredicate<Connection, ConnectionMetadata> evictionPredicate;

/**
* Returns {@link ConnectionPoolSpec} new instance with default properties.
Expand All @@ -501,6 +545,7 @@ private ConnectionPoolSpec() {
this.registrar = copy.registrar;
this.pendingAcquireTimer = copy.pendingAcquireTimer;
this.allocationStrategy = copy.allocationStrategy;
this.evictionPredicate = copy.evictionPredicate;
}

/**
Expand Down Expand Up @@ -586,6 +631,24 @@ public final SPEC maxLifeTime(Duration maxLifeTime) {
return get();
}

/**
* Set the options to use for configuring {@link ConnectionProvider} custom eviction predicate.
* <p>Unless a custom eviction predicate is specified, the connection is evicted when not active or not persistent,
* If {@link #maxLifeTime(Duration)} and/or {@link #maxIdleTime(Duration)} settings are configured,
* they are also taken into account.
* <p>Otherwise only the custom eviction predicate is invoked.
* <p><strong>Note:</strong> This configuration is not applicable for {@link reactor.netty.tcp.TcpClient}.
* A TCP connection is always closed and never returned to the pool.
*
* @param evictionPredicate The predicate function that evaluates whether a connection should be evicted
* @return {@literal this}
* @throws NullPointerException if evictionPredicate is null
*/
public final SPEC evictionPredicate(BiPredicate<Connection, ConnectionMetadata> evictionPredicate) {
this.evictionPredicate = Objects.requireNonNull(evictionPredicate);
return get();
}

/**
* Whether to enable metrics to be collected and registered in Micrometer's
* {@link io.micrometer.core.instrument.Metrics#globalRegistry globalRegistry}
Expand Down
Expand Up @@ -387,6 +387,7 @@ protected static final class PoolFactory<T extends Connection> {
final Duration disposeTimeout;
final BiFunction<Runnable, Duration, Disposable> pendingAcquireTimer;
final AllocationStrategy<?> allocationStrategy;
final BiPredicate<Connection, ConnectionMetadata> evictionPredicate;

PoolFactory(ConnectionPoolSpec<?> conf, Duration disposeTimeout) {
this(conf, disposeTimeout, null);
Expand All @@ -408,6 +409,7 @@ protected static final class PoolFactory<T extends Connection> {
this.disposeTimeout = disposeTimeout;
this.pendingAcquireTimer = conf.pendingAcquireTimer;
this.allocationStrategy = conf.allocationStrategy;
this.evictionPredicate = conf.evictionPredicate;
}

public InstrumentedPool<T> newPool(
Expand All @@ -426,28 +428,35 @@ public InstrumentedPool<T> newPool(
Publisher<T> allocator,
@Nullable reactor.pool.AllocationStrategy allocationStrategy, // this is not used but kept for backwards compatibility
Function<T, Publisher<Void>> destroyHandler,
BiPredicate<T, PooledRefMetadata> evictionPredicate,
BiPredicate<T, PooledRefMetadata> defaultEvictionPredicate,
Function<PoolConfig<T>, InstrumentedPool<T>> poolFactory) {
if (disposeTimeout != null) {
return newPoolInternal(allocator, destroyHandler, evictionPredicate)
return newPoolInternal(allocator, destroyHandler, defaultEvictionPredicate)
.build(poolFactory.andThen(InstrumentedPoolDecorators::gracefulShutdown));
}
return newPoolInternal(allocator, destroyHandler, evictionPredicate).build(poolFactory);
return newPoolInternal(allocator, destroyHandler, defaultEvictionPredicate).build(poolFactory);
}

PoolBuilder<T, PoolConfig<T>> newPoolInternal(
Publisher<T> allocator,
Function<T, Publisher<Void>> destroyHandler,
BiPredicate<T, PooledRefMetadata> evictionPredicate) {
BiPredicate<T, PooledRefMetadata> defaultEvictionPredicate) {
PoolBuilder<T, PoolConfig<T>> poolBuilder =
PoolBuilder.from(allocator)
.destroyHandler(destroyHandler)
.evictionPredicate(evictionPredicate
.or((poolable, meta) -> (maxIdleTime != -1 && meta.idleTime() >= maxIdleTime)
|| (maxLifeTime != -1 && meta.lifeTime() >= maxLifeTime)))
.maxPendingAcquire(pendingAcquireMaxCount)
.evictInBackground(evictionInterval);

if (this.evictionPredicate != null) {
poolBuilder = poolBuilder.evictionPredicate(
(poolable, meta) -> this.evictionPredicate.test(poolable, new PooledConnectionMetadata(meta)));
}
else {
poolBuilder = poolBuilder.evictionPredicate(defaultEvictionPredicate.or((poolable, meta) ->
(maxIdleTime != -1 && meta.idleTime() >= maxIdleTime)
|| (maxLifeTime != -1 && meta.lifeTime() >= maxLifeTime)));
}

if (DEFAULT_POOL_GET_PERMITS_SAMPLING_RATE > 0d && DEFAULT_POOL_GET_PERMITS_SAMPLING_RATE <= 1d
&& DEFAULT_POOL_RETURN_PERMITS_SAMPLING_RATE > 0d && DEFAULT_POOL_RETURN_PERMITS_SAMPLING_RATE <= 1d) {
poolBuilder = poolBuilder.allocationStrategy(SamplingAllocationStrategy.sizeBetweenWithSampling(
Expand Down Expand Up @@ -550,6 +559,40 @@ public void returnPermits(int returned) {
}
}

static final class PooledConnectionMetadata implements ConnectionMetadata {

final PooledRefMetadata delegate;

PooledConnectionMetadata(PooledRefMetadata delegate) {
this.delegate = delegate;
}

@Override
public int acquireCount() {
return delegate.acquireCount();
}

@Override
public long idleTime() {
return delegate.idleTime();
}

@Override
public long lifeTime() {
return delegate.lifeTime();
}

@Override
public long releaseTimestamp() {
return delegate.releaseTimestamp();
}

@Override
public long allocationTimestamp() {
return delegate.allocationTimestamp();
}
}

static final class PoolKey {
final String fqdn;
final SocketAddress holder;
Expand Down
Expand Up @@ -17,13 +17,15 @@

import org.junit.jupiter.api.Test;
import reactor.core.Disposable;
import reactor.netty.Connection;

import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.function.Supplier;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -34,6 +36,7 @@ class ConnectionProviderTest {
static final String TEST_STRING = "";
static final Supplier<ConnectionProvider.MeterRegistrar> TEST_SUPPLIER = () -> (a, b, c, d) -> {};
static final BiFunction<Runnable, Duration, Disposable> TEST_BI_FUNCTION = (r, duration) -> () -> {};
static final BiPredicate<Connection, ConnectionProvider.ConnectionMetadata> TEST_BI_PREDICATE = (conn, meta) -> true;

@Test
void testBuilderCopyConstructor() throws IllegalAccessException {
Expand Down Expand Up @@ -80,6 +83,9 @@ else if (int.class == clazz) {
else if (BiFunction.class == clazz) {
field.set(builder, TEST_BI_FUNCTION);
}
else if (BiPredicate.class == clazz) {
field.set(builder, TEST_BI_PREDICATE);
}
else {
throw new IllegalArgumentException("Unknown field type " + clazz);
}
Expand Down
Expand Up @@ -494,7 +494,7 @@ static final class PooledConnectionAllocator {
this.remoteAddress = remoteAddress;
this.resolver = resolver;
this.pool = poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
poolConFig -> new Http2Pool(poolConFig, poolFactory.allocationStrategy(), poolFactory.maxIdleTime(), poolFactory.maxLifeTime()));
poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy()));
}

Publisher<Connection> connectChannel() {
Expand Down