Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

features: add heartbeat to keepalive connection if configure it #1699

Merged
merged 7 commits into from Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 36 additions & 0 deletions src/main/java/com/zaxxer/hikari/HikariConfig.java
Expand Up @@ -55,6 +55,7 @@ public class HikariConfig implements HikariConfigMXBean
private static final long VALIDATION_TIMEOUT = SECONDS.toMillis(5);
private static final long IDLE_TIMEOUT = MINUTES.toMillis(10);
private static final long MAX_LIFETIME = MINUTES.toMillis(30);
private static final long DEFAULT_KEEPALIVE_TIME = 0L;
private static final int DEFAULT_POOL_SIZE = 10;

private static boolean unitTest = false;
Expand Down Expand Up @@ -99,6 +100,8 @@ public class HikariConfig implements HikariConfigMXBean
private Object healthCheckRegistry;
private Properties healthCheckProperties;

private long keepaliveTime;

cadeeper marked this conversation as resolved.
Show resolved Hide resolved
private volatile boolean sealed;

/**
Expand All @@ -117,6 +120,7 @@ public HikariConfig()
idleTimeout = IDLE_TIMEOUT;
initializationFailTimeout = 1;
isAutoCommit = true;
keepaliveTime = DEFAULT_KEEPALIVE_TIME;

String systemProp = System.getProperty("hikaricp.configurationFile");
if (systemProp != null) {
Expand Down Expand Up @@ -720,6 +724,26 @@ public void addHealthCheckProperty(String key, String value)
healthCheckProperties.setProperty(key, value);
}

/**
* This property controls the keepalive interval for a connection in the pool. An in-use connection will never be
* tested by the keepalive thread, only when it is idle will it be tested.
*
* @return the interval in which connections will be tested for aliveness, thus keeping them alive by the act of checking. Value is in milliseconds, default is 0 (disabled).
*/
public long getKeepaliveTime() {
return keepaliveTime;
}
cadeeper marked this conversation as resolved.
Show resolved Hide resolved

/**
* This property controls the keepalive interval for a connection in the pool. An in-use connection will never be
* tested by the keepalive thread, only when it is idle will it be tested.
*
* @param keepaliveTimeMs the interval in which connections will be tested for aliveness, thus keeping them alive by the act of checking. Value is in milliseconds, default is 0 (disabled).
*/
public void setKeepaliveTime(long keepaliveTimeMs) {
this.keepaliveTime = keepaliveTimeMs;
}
cadeeper marked this conversation as resolved.
Show resolved Hide resolved

/**
* Determine whether the Connections in the pool are in read-only mode.
*
Expand Down Expand Up @@ -1018,6 +1042,18 @@ private void validateNumerics()
maxLifetime = MAX_LIFETIME;
}

// keepalive time must larger then 30 seconds
if (keepaliveTime != 0 && keepaliveTime < SECONDS.toMillis(30)) {
LOGGER.warn("{} - keepaliveTime is less than 30000ms, disabling it.", poolName);
keepaliveTime = DEFAULT_KEEPALIVE_TIME;
}

// keepalive time must be less than maxLifetime (if maxLifetime is enabled)
if (keepaliveTime != 0 && maxLifetime != 0 && keepaliveTime >= maxLifetime) {
LOGGER.warn("{} - keepaliveTime is greater than or equal to maxLifetime, disabling it.", poolName);
keepaliveTime = DEFAULT_KEEPALIVE_TIME;
}
cadeeper marked this conversation as resolved.
Show resolved Hide resolved

cadeeper marked this conversation as resolved.
Show resolved Hide resolved
if (leakDetectionThreshold > 0 && !unitTest) {
if (leakDetectionThreshold < SECONDS.toMillis(2) || (leakDetectionThreshold > maxLifetime && maxLifetime > 0)) {
LOGGER.warn("{} - leakDetectionThreshold is less than 2000ms or more than maxLifetime, disabling it.", poolName);
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/com/zaxxer/hikari/pool/HikariPool.java
Expand Up @@ -477,6 +477,8 @@ private PoolEntry createPoolEntry()
final PoolEntry poolEntry = newPoolEntry();

final long maxLifetime = config.getMaxLifetime();
final long keepaliveTime = config.getKeepaliveTime();

if (maxLifetime > 0) {
// variance up to 2.5% of the maxlifetime
final long variance = maxLifetime > 10_000 ? ThreadLocalRandom.current().nextLong( maxLifetime / 40 ) : 0;
Expand All @@ -490,6 +492,25 @@ private PoolEntry createPoolEntry()
lifetime, MILLISECONDS));
}

if (keepaliveTime > 0) {
// variance up to 10% of the heartbeat time
final long variance = ThreadLocalRandom.current().nextLong(keepaliveTime / 10);
final long heartbeatTime = keepaliveTime - variance;
poolEntry.setKeepalive(houseKeepingExecutorService.scheduleWithFixedDelay(
() -> {
if (connectionBag.reserve(poolEntry)) {
if (!isConnectionAlive(poolEntry.connection)) {
softEvictConnection(poolEntry, DEAD_CONNECTION_MESSAGE, true);
addBagItem(connectionBag.getWaitingThreadCount());
}
else {
cadeeper marked this conversation as resolved.
Show resolved Hide resolved
connectionBag.unreserve(poolEntry);
logger.debug("{} - keepalive: connection {} is alive", poolName, poolEntry.connection);
}
}
}, heartbeatTime, heartbeatTime, MILLISECONDS));
}

return poolEntry;
}
catch (ConnectionSetupException e) {
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/com/zaxxer/hikari/pool/PoolEntry.java
Expand Up @@ -47,6 +47,7 @@ final class PoolEntry implements IConcurrentBagEntry
private volatile boolean evict;

private volatile ScheduledFuture<?> endOfLife;
private volatile ScheduledFuture<?> keepalive;
cadeeper marked this conversation as resolved.
Show resolved Hide resolved

private final FastList<Statement> openStatements;
private final HikariPool hikariPool;
Expand Down Expand Up @@ -92,6 +93,10 @@ void setFutureEol(final ScheduledFuture<?> endOfLife)
this.endOfLife = endOfLife;
}

public void setKeepalive(ScheduledFuture<?> keepalive) {
this.keepalive = keepalive;
}

Connection createProxyConnection(final ProxyLeakTask leakTask, final long now)
{
return ProxyFactory.getProxyConnection(this, connection, openStatements, leakTask, now, isReadOnly, isAutoCommit);
Expand Down Expand Up @@ -175,9 +180,15 @@ Connection close()
LOGGER.warn("{} - maxLifeTime expiration task cancellation unexpectedly returned false for connection {}", getPoolName(), connection);
}

ScheduledFuture<?> ka = keepalive;
if (ka != null && !ka.isDone() && !ka.cancel(false)) {
LOGGER.warn("{} - keepalive task cancellation unexpectedly returned false for connection {}", getPoolName(), connection);
}

Connection con = connection;
connection = null;
endOfLife = null;
keepalive = null;
return con;
}

Expand Down
41 changes: 38 additions & 3 deletions src/test/java/com/zaxxer/hikari/mocks/StubConnection.java
Expand Up @@ -33,7 +33,7 @@
import java.sql.Struct;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.*;
cadeeper marked this conversation as resolved.
Show resolved Hide resolved
import java.util.concurrent.atomic.AtomicInteger;

import com.zaxxer.hikari.util.UtilityElf;
Expand All @@ -47,11 +47,16 @@ public class StubConnection extends StubBaseConnection implements Connection
public static final AtomicInteger count = new AtomicInteger();
public static volatile boolean slowCreate;
public static volatile boolean oldDriver;
private volatile boolean isClosed = false;

private static long foo;
private boolean autoCommit;
private int isolation = Connection.TRANSACTION_READ_COMMITTED;
private String catalog;
private long waitTimeout;

private static ScheduledExecutorService connectionWaitTimeout = new ScheduledThreadPoolExecutor(1);
private ScheduledFuture<?> waitTimeoutTask;

static {
foo = System.currentTimeMillis();
Expand All @@ -64,6 +69,21 @@ public StubConnection() {
}
}

public StubConnection(long waitTimeout) {
this.waitTimeout = waitTimeout;
count.incrementAndGet();
if (slowCreate) {
UtilityElf.quietlySleep(1000);
}
cadeeper marked this conversation as resolved.
Show resolved Hide resolved

try {
refreshConnectionWaitTimeout();
} catch (Exception e){
//ignore
}
}


/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override
Expand Down Expand Up @@ -128,7 +148,21 @@ public boolean getAutoCommit() throws SQLException
@Override
public void commit() throws SQLException
{
refreshConnectionWaitTimeout();
}

private void refreshConnectionWaitTimeout() throws SQLException {
if (this.isClosed) {
throw new SQLException("connection has been closed");
}
cadeeper marked this conversation as resolved.
Show resolved Hide resolved

if (waitTimeoutTask != null) {
waitTimeoutTask.cancel(true);
}
cadeeper marked this conversation as resolved.
Show resolved Hide resolved

if (waitTimeout > 0) {
waitTimeoutTask = connectionWaitTimeout.schedule(() -> { this.isClosed = true;}, waitTimeout, TimeUnit.MILLISECONDS);
}
}

/** {@inheritDoc} */
Expand All @@ -152,7 +186,7 @@ public boolean isClosed() throws SQLException
if (throwException) {
throw new SQLException();
}
return false;
return isClosed;
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -408,7 +442,8 @@ public boolean isValid(int timeout) throws SQLException
if (throwException) {
throw new SQLException();
}
return true;
refreshConnectionWaitTimeout();
return !isClosed;
}

/** {@inheritDoc} */
Expand Down
14 changes: 11 additions & 3 deletions src/test/java/com/zaxxer/hikari/mocks/StubDataSource.java
Expand Up @@ -38,6 +38,7 @@ public class StubDataSource implements DataSource
private SQLException throwException;
private long connectionAcquistionTime = 0;
private int loginTimeout;
private int waitTimeout = 30000;

public String getUser()
{
Expand All @@ -59,6 +60,14 @@ public void setPassword(String password)
this.password = password;
}

public int getWaitTimeout() {
return waitTimeout;
}

public void setWaitTimeout(int waitTimeout) {
this.waitTimeout = waitTimeout;
}

public void setURL(String url)
{
// we don't care
Expand Down Expand Up @@ -127,15 +136,14 @@ public Connection getConnection() throws SQLException
if (connectionAcquistionTime > 0) {
UtilityElf.quietlySleep(connectionAcquistionTime);
}

return new StubConnection();
return new StubConnection(waitTimeout);
}

/** {@inheritDoc} */
@Override
public Connection getConnection(String username, String password) throws SQLException
{
return new StubConnection();
return new StubConnection(waitTimeout);
}

public void setThrowException(SQLException e)
Expand Down
2 changes: 2 additions & 0 deletions src/test/java/com/zaxxer/hikari/mocks/StubStatement.java
Expand Up @@ -68,6 +68,7 @@ public ResultSet executeQuery(String sql) throws SQLException
{
checkClosed();
StubResultSet resultSet = new StubResultSet();
connection.commit();
return resultSet;
}

Expand Down Expand Up @@ -179,6 +180,7 @@ public boolean execute(String sql) throws SQLException
if (simulatedQueryTime > 0) {
quietlySleep(simulatedQueryTime);
}
connection.commit();
return false;
}

Expand Down
70 changes: 70 additions & 0 deletions src/test/java/com/zaxxer/hikari/pool/TestConnections.java
Expand Up @@ -217,6 +217,76 @@ public void testMaxLifetime2() throws Exception
}
}

@Test
public void testKeepalive() throws Exception{
HikariConfig config = newHikariConfig();
config.setMinimumIdle(0);
config.setMaximumPoolSize(1);
config.setConnectionTimeout(2500);
config.setConnectionTestQuery("VALUES 1");
StubDataSource sds = new StubDataSource();
sds.setWaitTimeout(700);
config.setDataSource(sds);

System.setProperty("com.zaxxer.hikari.housekeeping.periodMs", "100");

setConfigUnitTest(true);
try (HikariDataSource ds = new HikariDataSource(config)) {
getUnsealedConfig(ds).setKeepaliveTime(500);

HikariPool pool = getPool(ds);
Connection conn = pool.getConnection();
Connection unwrap = conn.unwrap(Connection.class);
//recycle, change IN_USE state
conn.close();
assertFalse("Connection should be open", unwrap.isClosed());
quietlySleep(1200);
assertFalse("Connection should be open", unwrap.isClosed());
}
finally {
setConfigUnitTest(false);
}
}

@Test
public void testKeepalive2() throws Exception{
HikariConfig config = newHikariConfig();
config.setMinimumIdle(0);
config.setMaximumPoolSize(1);
config.setConnectionTimeout(2500);
config.setConnectionTestQuery("VALUES 1");
StubDataSource sds = new StubDataSource();
sds.setWaitTimeout(500);
config.setDataSource(sds);

System.setProperty("com.zaxxer.hikari.housekeeping.periodMs", "100");

setConfigUnitTest(true);
try (HikariDataSource ds = new HikariDataSource(config)) {
getUnsealedConfig(ds).setKeepaliveTime(700);

HikariPool pool = getPool(ds);
Connection conn = pool.getConnection();
Connection unwrap = conn.unwrap(Connection.class);
//recycle, change IN_USE state
conn.close();
assertFalse("Connection should be open", unwrap.isClosed());
quietlySleep(1200);
assertTrue("Connection should have closed:" + unwrap, unwrap.isClosed());

Connection conn2 = pool.getConnection();
Connection unwrap2 = conn2.unwrap(Connection.class);

assertNotSame("Expected a different connection", unwrap, unwrap2);
assertFalse("Connection should be open", unwrap2.isClosed());

conn2.close();
}
finally {
setConfigUnitTest(false);
}
}

@Test
public void testDoubleClose() throws Exception
{
Expand Down