Skip to content

Commit

Permalink
features: add heartbeat to keepalive connection if configure it (#1699)
Browse files Browse the repository at this point in the history
* features: add heartbeat to keepalive connection if configure it

* features: add heartbeat to keepalive connection if configure it

* features: add heartbeat to keepalive connection if configure it

* features: keepalive unit test

* optimize code

* optimize code
  • Loading branch information
cadeeper committed Jan 12, 2021
1 parent a5a38d5 commit 0561309
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 6 deletions.
36 changes: 36 additions & 0 deletions src/main/java/com/zaxxer/hikari/HikariConfig.java
Expand Up @@ -55,6 +55,7 @@ public class HikariConfig implements HikariConfigMXBean
private static final long VALIDATION_TIMEOUT = SECONDS.toMillis(5);
private static final long IDLE_TIMEOUT = MINUTES.toMillis(10);
private static final long MAX_LIFETIME = MINUTES.toMillis(30);
private static final long DEFAULT_KEEPALIVE_TIME = 0L;
private static final int DEFAULT_POOL_SIZE = 10;

private static boolean unitTest = false;
Expand Down Expand Up @@ -99,6 +100,8 @@ public class HikariConfig implements HikariConfigMXBean
private Object healthCheckRegistry;
private Properties healthCheckProperties;

private long keepaliveTime;

private volatile boolean sealed;

/**
Expand All @@ -117,6 +120,7 @@ public HikariConfig()
idleTimeout = IDLE_TIMEOUT;
initializationFailTimeout = 1;
isAutoCommit = true;
keepaliveTime = DEFAULT_KEEPALIVE_TIME;

String systemProp = System.getProperty("hikaricp.configurationFile");
if (systemProp != null) {
Expand Down Expand Up @@ -720,6 +724,26 @@ public void addHealthCheckProperty(String key, String value)
healthCheckProperties.setProperty(key, value);
}

/**
* This property controls the keepalive interval for a connection in the pool. An in-use connection will never be
* tested by the keepalive thread, only when it is idle will it be tested.
*
* @return the interval in which connections will be tested for aliveness, thus keeping them alive by the act of checking. Value is in milliseconds, default is 0 (disabled).
*/
public long getKeepaliveTime() {
return keepaliveTime;
}

/**
* This property controls the keepalive interval for a connection in the pool. An in-use connection will never be
* tested by the keepalive thread, only when it is idle will it be tested.
*
* @param keepaliveTimeMs the interval in which connections will be tested for aliveness, thus keeping them alive by the act of checking. Value is in milliseconds, default is 0 (disabled).
*/
public void setKeepaliveTime(long keepaliveTimeMs) {
this.keepaliveTime = keepaliveTimeMs;
}

/**
* Determine whether the Connections in the pool are in read-only mode.
*
Expand Down Expand Up @@ -1018,6 +1042,18 @@ private void validateNumerics()
maxLifetime = MAX_LIFETIME;
}

// keepalive time must larger then 30 seconds
if (keepaliveTime != 0 && keepaliveTime < SECONDS.toMillis(30)) {
LOGGER.warn("{} - keepaliveTime is less than 30000ms, disabling it.", poolName);
keepaliveTime = DEFAULT_KEEPALIVE_TIME;
}

// keepalive time must be less than maxLifetime (if maxLifetime is enabled)
if (keepaliveTime != 0 && maxLifetime != 0 && keepaliveTime >= maxLifetime) {
LOGGER.warn("{} - keepaliveTime is greater than or equal to maxLifetime, disabling it.", poolName);
keepaliveTime = DEFAULT_KEEPALIVE_TIME;
}

if (leakDetectionThreshold > 0 && !unitTest) {
if (leakDetectionThreshold < SECONDS.toMillis(2) || (leakDetectionThreshold > maxLifetime && maxLifetime > 0)) {
LOGGER.warn("{} - leakDetectionThreshold is less than 2000ms or more than maxLifetime, disabling it.", poolName);
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/com/zaxxer/hikari/pool/HikariPool.java
Expand Up @@ -477,6 +477,8 @@ private PoolEntry createPoolEntry()
final PoolEntry poolEntry = newPoolEntry();

final long maxLifetime = config.getMaxLifetime();
final long keepaliveTime = config.getKeepaliveTime();

if (maxLifetime > 0) {
// variance up to 2.5% of the maxlifetime
final long variance = maxLifetime > 10_000 ? ThreadLocalRandom.current().nextLong( maxLifetime / 40 ) : 0;
Expand All @@ -490,6 +492,25 @@ private PoolEntry createPoolEntry()
lifetime, MILLISECONDS));
}

if (keepaliveTime > 0) {
// variance up to 10% of the heartbeat time
final long variance = ThreadLocalRandom.current().nextLong(keepaliveTime / 10);
final long heartbeatTime = keepaliveTime - variance;
poolEntry.setKeepalive(houseKeepingExecutorService.scheduleWithFixedDelay(
() -> {
if (connectionBag.reserve(poolEntry)) {
if (!isConnectionAlive(poolEntry.connection)) {
softEvictConnection(poolEntry, DEAD_CONNECTION_MESSAGE, true);
addBagItem(connectionBag.getWaitingThreadCount());
}
else {
connectionBag.unreserve(poolEntry);
logger.debug("{} - keepalive: connection {} is alive", poolName, poolEntry.connection);
}
}
}, heartbeatTime, heartbeatTime, MILLISECONDS));
}

return poolEntry;
}
catch (ConnectionSetupException e) {
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/com/zaxxer/hikari/pool/PoolEntry.java
Expand Up @@ -47,6 +47,7 @@ final class PoolEntry implements IConcurrentBagEntry
private volatile boolean evict;

private volatile ScheduledFuture<?> endOfLife;
private volatile ScheduledFuture<?> keepalive;

private final FastList<Statement> openStatements;
private final HikariPool hikariPool;
Expand Down Expand Up @@ -92,6 +93,10 @@ void setFutureEol(final ScheduledFuture<?> endOfLife)
this.endOfLife = endOfLife;
}

public void setKeepalive(ScheduledFuture<?> keepalive) {
this.keepalive = keepalive;
}

Connection createProxyConnection(final ProxyLeakTask leakTask, final long now)
{
return ProxyFactory.getProxyConnection(this, connection, openStatements, leakTask, now, isReadOnly, isAutoCommit);
Expand Down Expand Up @@ -175,9 +180,15 @@ Connection close()
LOGGER.warn("{} - maxLifeTime expiration task cancellation unexpectedly returned false for connection {}", getPoolName(), connection);
}

ScheduledFuture<?> ka = keepalive;
if (ka != null && !ka.isDone() && !ka.cancel(false)) {
LOGGER.warn("{} - keepalive task cancellation unexpectedly returned false for connection {}", getPoolName(), connection);
}

Connection con = connection;
connection = null;
endOfLife = null;
keepalive = null;
return con;
}

Expand Down
41 changes: 38 additions & 3 deletions src/test/java/com/zaxxer/hikari/mocks/StubConnection.java
Expand Up @@ -33,7 +33,7 @@
import java.sql.Struct;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

import com.zaxxer.hikari.util.UtilityElf;
Expand All @@ -47,11 +47,16 @@ public class StubConnection extends StubBaseConnection implements Connection
public static final AtomicInteger count = new AtomicInteger();
public static volatile boolean slowCreate;
public static volatile boolean oldDriver;
private volatile boolean isClosed = false;

private static long foo;
private boolean autoCommit;
private int isolation = Connection.TRANSACTION_READ_COMMITTED;
private String catalog;
private long waitTimeout;

private static ScheduledExecutorService connectionWaitTimeout = new ScheduledThreadPoolExecutor(1);
private ScheduledFuture<?> waitTimeoutTask;

static {
foo = System.currentTimeMillis();
Expand All @@ -64,6 +69,21 @@ public StubConnection() {
}
}

public StubConnection(long waitTimeout) {
this.waitTimeout = waitTimeout;
count.incrementAndGet();
if (slowCreate) {
UtilityElf.quietlySleep(1000);
}

try {
refreshConnectionWaitTimeout();
} catch (Exception e){
//ignore
}
}


/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override
Expand Down Expand Up @@ -128,7 +148,21 @@ public boolean getAutoCommit() throws SQLException
@Override
public void commit() throws SQLException
{
refreshConnectionWaitTimeout();
}

private void refreshConnectionWaitTimeout() throws SQLException {
if (this.isClosed) {
throw new SQLException("connection has been closed");
}

if (waitTimeoutTask != null) {
waitTimeoutTask.cancel(true);
}

if (waitTimeout > 0) {
waitTimeoutTask = connectionWaitTimeout.schedule(() -> { this.isClosed = true;}, waitTimeout, TimeUnit.MILLISECONDS);
}
}

/** {@inheritDoc} */
Expand All @@ -152,7 +186,7 @@ public boolean isClosed() throws SQLException
if (throwException) {
throw new SQLException();
}
return false;
return isClosed;
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -408,7 +442,8 @@ public boolean isValid(int timeout) throws SQLException
if (throwException) {
throw new SQLException();
}
return true;
refreshConnectionWaitTimeout();
return !isClosed;
}

/** {@inheritDoc} */
Expand Down
14 changes: 11 additions & 3 deletions src/test/java/com/zaxxer/hikari/mocks/StubDataSource.java
Expand Up @@ -38,6 +38,7 @@ public class StubDataSource implements DataSource
private SQLException throwException;
private long connectionAcquistionTime = 0;
private int loginTimeout;
private int waitTimeout = 30000;

public String getUser()
{
Expand All @@ -59,6 +60,14 @@ public void setPassword(String password)
this.password = password;
}

public int getWaitTimeout() {
return waitTimeout;
}

public void setWaitTimeout(int waitTimeout) {
this.waitTimeout = waitTimeout;
}

public void setURL(String url)
{
// we don't care
Expand Down Expand Up @@ -127,15 +136,14 @@ public Connection getConnection() throws SQLException
if (connectionAcquistionTime > 0) {
UtilityElf.quietlySleep(connectionAcquistionTime);
}

return new StubConnection();
return new StubConnection(waitTimeout);
}

/** {@inheritDoc} */
@Override
public Connection getConnection(String username, String password) throws SQLException
{
return new StubConnection();
return new StubConnection(waitTimeout);
}

public void setThrowException(SQLException e)
Expand Down
2 changes: 2 additions & 0 deletions src/test/java/com/zaxxer/hikari/mocks/StubStatement.java
Expand Up @@ -68,6 +68,7 @@ public ResultSet executeQuery(String sql) throws SQLException
{
checkClosed();
StubResultSet resultSet = new StubResultSet();
connection.commit();
return resultSet;
}

Expand Down Expand Up @@ -179,6 +180,7 @@ public boolean execute(String sql) throws SQLException
if (simulatedQueryTime > 0) {
quietlySleep(simulatedQueryTime);
}
connection.commit();
return false;
}

Expand Down
70 changes: 70 additions & 0 deletions src/test/java/com/zaxxer/hikari/pool/TestConnections.java
Expand Up @@ -217,6 +217,76 @@ public void testMaxLifetime2() throws Exception
}
}

@Test
public void testKeepalive() throws Exception{
HikariConfig config = newHikariConfig();
config.setMinimumIdle(0);
config.setMaximumPoolSize(1);
config.setConnectionTimeout(2500);
config.setConnectionTestQuery("VALUES 1");
StubDataSource sds = new StubDataSource();
sds.setWaitTimeout(700);
config.setDataSource(sds);

System.setProperty("com.zaxxer.hikari.housekeeping.periodMs", "100");

setConfigUnitTest(true);
try (HikariDataSource ds = new HikariDataSource(config)) {
getUnsealedConfig(ds).setKeepaliveTime(500);

HikariPool pool = getPool(ds);
Connection conn = pool.getConnection();
Connection unwrap = conn.unwrap(Connection.class);
//recycle, change IN_USE state
conn.close();
assertFalse("Connection should be open", unwrap.isClosed());
quietlySleep(1200);
assertFalse("Connection should be open", unwrap.isClosed());
}
finally {
setConfigUnitTest(false);
}
}

@Test
public void testKeepalive2() throws Exception{
HikariConfig config = newHikariConfig();
config.setMinimumIdle(0);
config.setMaximumPoolSize(1);
config.setConnectionTimeout(2500);
config.setConnectionTestQuery("VALUES 1");
StubDataSource sds = new StubDataSource();
sds.setWaitTimeout(500);
config.setDataSource(sds);

System.setProperty("com.zaxxer.hikari.housekeeping.periodMs", "100");

setConfigUnitTest(true);
try (HikariDataSource ds = new HikariDataSource(config)) {
getUnsealedConfig(ds).setKeepaliveTime(700);

HikariPool pool = getPool(ds);
Connection conn = pool.getConnection();
Connection unwrap = conn.unwrap(Connection.class);
//recycle, change IN_USE state
conn.close();
assertFalse("Connection should be open", unwrap.isClosed());
quietlySleep(1200);
assertTrue("Connection should have closed:" + unwrap, unwrap.isClosed());

Connection conn2 = pool.getConnection();
Connection unwrap2 = conn2.unwrap(Connection.class);

assertNotSame("Expected a different connection", unwrap, unwrap2);
assertFalse("Connection should be open", unwrap2.isClosed());

conn2.close();
}
finally {
setConfigUnitTest(false);
}
}

@Test
public void testDoubleClose() throws Exception
{
Expand Down

0 comments on commit 0561309

Please sign in to comment.