Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

features: add heartbeat to keepalive connection if configure it #1699

Merged
merged 7 commits into from Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/main/java/com/zaxxer/hikari/HikariConfig.java
Expand Up @@ -55,6 +55,7 @@ public class HikariConfig implements HikariConfigMXBean
private static final long VALIDATION_TIMEOUT = SECONDS.toMillis(5);
private static final long IDLE_TIMEOUT = MINUTES.toMillis(10);
private static final long MAX_LIFETIME = MINUTES.toMillis(30);
private static final long DEFAULT_KEEPALIVE_TIME = MINUTES.toMillis(30);
cadeeper marked this conversation as resolved.
Show resolved Hide resolved
private static final int DEFAULT_POOL_SIZE = 10;

private static boolean unitTest = false;
Expand Down Expand Up @@ -99,6 +100,10 @@ public class HikariConfig implements HikariConfigMXBean
private Object healthCheckRegistry;
private Properties healthCheckProperties;

private boolean isKeepalive;

private long keepaliveTime;

cadeeper marked this conversation as resolved.
Show resolved Hide resolved
private volatile boolean sealed;

/**
Expand All @@ -117,6 +122,8 @@ public HikariConfig()
idleTimeout = IDLE_TIMEOUT;
initializationFailTimeout = 1;
isAutoCommit = true;
isKeepalive = false;
cadeeper marked this conversation as resolved.
Show resolved Hide resolved
keepaliveTime = DEFAULT_KEEPALIVE_TIME;

String systemProp = System.getProperty("hikaricp.configurationFile");
if (systemProp != null) {
Expand Down Expand Up @@ -720,6 +727,22 @@ public void addHealthCheckProperty(String key, String value)
healthCheckProperties.setProperty(key, value);
}

public boolean isKeepalive() {
return isKeepalive;
}
cadeeper marked this conversation as resolved.
Show resolved Hide resolved

public void setKeepalive(boolean keepalive) {
isKeepalive = keepalive;
}
cadeeper marked this conversation as resolved.
Show resolved Hide resolved

public long getKeepaliveTime() {
return keepaliveTime;
}
cadeeper marked this conversation as resolved.
Show resolved Hide resolved

public void setKeepaliveTime(long keepaliveTime) {
this.keepaliveTime = keepaliveTime;
}
cadeeper marked this conversation as resolved.
Show resolved Hide resolved

/**
* Determine whether the Connections in the pool are in read-only mode.
*
Expand Down Expand Up @@ -1018,6 +1041,12 @@ private void validateNumerics()
maxLifetime = MAX_LIFETIME;
}

// keepalive time must larger then 30 seconds
if (keepaliveTime < SECONDS.toMillis(30)) {
cadeeper marked this conversation as resolved.
Show resolved Hide resolved
LOGGER.warn("{} - keepaliveTime is less than 30000ms, setting to default {}ms.", poolName, DEFAULT_KEEPALIVE_TIME);
cadeeper marked this conversation as resolved.
Show resolved Hide resolved
keepaliveTime = DEFAULT_KEEPALIVE_TIME;
}
cadeeper marked this conversation as resolved.
Show resolved Hide resolved

cadeeper marked this conversation as resolved.
Show resolved Hide resolved
if (leakDetectionThreshold > 0 && !unitTest) {
if (leakDetectionThreshold < SECONDS.toMillis(2) || (leakDetectionThreshold > maxLifetime && maxLifetime > 0)) {
LOGGER.warn("{} - leakDetectionThreshold is less than 2000ms or more than maxLifetime, disabling it.", poolName);
Expand Down
32 changes: 31 additions & 1 deletion src/main/java/com/zaxxer/hikari/pool/HikariPool.java
Expand Up @@ -28,7 +28,7 @@
import com.zaxxer.hikari.util.ConcurrentBag;
import com.zaxxer.hikari.util.ConcurrentBag.IBagStateListener;
import com.zaxxer.hikari.util.SuspendResumeLock;
import com.zaxxer.hikari.util.UtilityElf.DefaultThreadFactory;
import com.zaxxer.hikari.util.UtilityElf.*;
cadeeper marked this conversation as resolved.
Show resolved Hide resolved
import io.micrometer.core.instrument.MeterRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -477,6 +477,8 @@ private PoolEntry createPoolEntry()
final PoolEntry poolEntry = newPoolEntry();

final long maxLifetime = config.getMaxLifetime();
final boolean isKeepalive = config.isKeepalive();
cadeeper marked this conversation as resolved.
Show resolved Hide resolved

if (maxLifetime > 0) {
// variance up to 2.5% of the maxlifetime
final long variance = maxLifetime > 10_000 ? ThreadLocalRandom.current().nextLong( maxLifetime / 40 ) : 0;
Expand All @@ -489,6 +491,24 @@ private PoolEntry createPoolEntry()
},
lifetime, MILLISECONDS));
}
if (isKeepalive) {
cadeeper marked this conversation as resolved.
Show resolved Hide resolved
final long keepaliveTime = config.getKeepaliveTime();
// variance up to 10% of the heartbeat time
final long variance = ThreadLocalRandom.current().nextLong(keepaliveTime / 10);
final long heartbeatTime = keepaliveTime - variance;
poolEntry.setKeepalive(houseKeepingExecutorService.scheduleWithFixedDelay(
() -> {
boolean isConnectionAlive = keepaliveConnection(poolEntry);
if (!isConnectionAlive) {
if (softEvictConnection(poolEntry, DEAD_CONNECTION_MESSAGE, false)) {
addBagItem(connectionBag.getWaitingThreadCount());
}
}
if (logger.isDebugEnabled()) {
logger.debug("{} - keepalive heartbeat: Is connection alive? {}", poolName, isConnectionAlive);
}
cadeeper marked this conversation as resolved.
Show resolved Hide resolved
}, heartbeatTime, heartbeatTime, MILLISECONDS));
}

return poolEntry;
}
Expand Down Expand Up @@ -619,6 +639,16 @@ private boolean softEvictConnection(final PoolEntry poolEntry, final String reas
return false;
}

/**
* check if the connection is alive
* if the connection state is in use ,we suppose it is alive. then we'll check it by {@link PoolBase#isConnectionAlive}
* @param poolEntry the PoolEntry (/Connection) to be checked from the pool
* @return true if the connection is alive,false if it was dead
*/
private boolean keepaliveConnection(final PoolEntry poolEntry) {
return poolEntry.getState() == STATE_IN_USE || isConnectionAlive(poolEntry.connection);
}

cadeeper marked this conversation as resolved.
Show resolved Hide resolved
/**
* Create/initialize the Housekeeping service {@link ScheduledExecutorService}. If the user specified an Executor
* to be used in the {@link HikariConfig}, then we use that. If no Executor was specified (typical), then create
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/com/zaxxer/hikari/pool/PoolEntry.java
Expand Up @@ -48,6 +48,8 @@ final class PoolEntry implements IConcurrentBagEntry

private volatile ScheduledFuture<?> endOfLife;

private volatile ScheduledFuture<?> keepalive;
cadeeper marked this conversation as resolved.
Show resolved Hide resolved

private final FastList<Statement> openStatements;
private final HikariPool hikariPool;

Expand Down Expand Up @@ -92,6 +94,10 @@ void setFutureEol(final ScheduledFuture<?> endOfLife)
this.endOfLife = endOfLife;
}

public void setKeepalive(ScheduledFuture<?> keepalive) {
this.keepalive = keepalive;
}

Connection createProxyConnection(final ProxyLeakTask leakTask, final long now)
{
return ProxyFactory.getProxyConnection(this, connection, openStatements, leakTask, now, isReadOnly, isAutoCommit);
Expand Down Expand Up @@ -175,9 +181,15 @@ Connection close()
LOGGER.warn("{} - maxLifeTime expiration task cancellation unexpectedly returned false for connection {}", getPoolName(), connection);
}

ScheduledFuture<?> ka = keepalive;
if (ka != null && !ka.isDone() && !ka.cancel(false)) {
LOGGER.warn("{} - keepalive expiration task cancellation unexpectedly returned false for connection {}", getPoolName(), connection);
cadeeper marked this conversation as resolved.
Show resolved Hide resolved
}

Connection con = connection;
connection = null;
endOfLife = null;
keepalive = null;
return con;
}

Expand Down
38 changes: 35 additions & 3 deletions src/test/java/com/zaxxer/hikari/mocks/StubConnection.java
Expand Up @@ -33,7 +33,7 @@
import java.sql.Struct;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.*;
cadeeper marked this conversation as resolved.
Show resolved Hide resolved
import java.util.concurrent.atomic.AtomicInteger;

import com.zaxxer.hikari.util.UtilityElf;
Expand All @@ -47,11 +47,16 @@ public class StubConnection extends StubBaseConnection implements Connection
public static final AtomicInteger count = new AtomicInteger();
public static volatile boolean slowCreate;
public static volatile boolean oldDriver;
private volatile boolean isClose = false;
cadeeper marked this conversation as resolved.
Show resolved Hide resolved

private static long foo;
private boolean autoCommit;
private int isolation = Connection.TRANSACTION_READ_COMMITTED;
private String catalog;
private long waitTimeout;

private ScheduledExecutorService connectionWaitTimeout = new ScheduledThreadPoolExecutor(1);
cadeeper marked this conversation as resolved.
Show resolved Hide resolved
private ScheduledFuture<?> waitTimeoutTask;

static {
foo = System.currentTimeMillis();
Expand All @@ -64,6 +69,20 @@ public StubConnection() {
}
}

public StubConnection(long waitTimeout) {
this.waitTimeout = waitTimeout;
count.incrementAndGet();
if (slowCreate) {
UtilityElf.quietlySleep(1000);
}
cadeeper marked this conversation as resolved.
Show resolved Hide resolved
try{
refreshConnectionWaitTimeout();
}catch (Exception e){
//ignore
}
cadeeper marked this conversation as resolved.
Show resolved Hide resolved
}


/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override
Expand Down Expand Up @@ -128,7 +147,19 @@ public boolean getAutoCommit() throws SQLException
@Override
public void commit() throws SQLException
{
refreshConnectionWaitTimeout();
}

private void refreshConnectionWaitTimeout() throws SQLException {
if (this.isClose) {
throw new SQLException("connection has been closed");
}
cadeeper marked this conversation as resolved.
Show resolved Hide resolved
if (waitTimeoutTask != null) {
waitTimeoutTask.cancel(true);
}
cadeeper marked this conversation as resolved.
Show resolved Hide resolved
if (waitTimeout > 0) {
waitTimeoutTask = connectionWaitTimeout.schedule(() -> { this.isClose = true;}, waitTimeout, TimeUnit.MILLISECONDS);
}
}

/** {@inheritDoc} */
Expand All @@ -152,7 +183,7 @@ public boolean isClosed() throws SQLException
if (throwException) {
throw new SQLException();
}
return false;
return isClose;
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -408,7 +439,8 @@ public boolean isValid(int timeout) throws SQLException
if (throwException) {
throw new SQLException();
}
return true;
refreshConnectionWaitTimeout();
return !isClose;
}

/** {@inheritDoc} */
Expand Down
14 changes: 11 additions & 3 deletions src/test/java/com/zaxxer/hikari/mocks/StubDataSource.java
Expand Up @@ -38,6 +38,7 @@ public class StubDataSource implements DataSource
private SQLException throwException;
private long connectionAcquistionTime = 0;
private int loginTimeout;
private int waitTimeout = 30000;

public String getUser()
{
Expand All @@ -59,6 +60,14 @@ public void setPassword(String password)
this.password = password;
}

public int getWaitTimeout() {
return waitTimeout;
}

public void setWaitTimeout(int waitTimeout) {
this.waitTimeout = waitTimeout;
}

public void setURL(String url)
{
// we don't care
Expand Down Expand Up @@ -127,15 +136,14 @@ public Connection getConnection() throws SQLException
if (connectionAcquistionTime > 0) {
UtilityElf.quietlySleep(connectionAcquistionTime);
}

return new StubConnection();
return new StubConnection(waitTimeout);
}

/** {@inheritDoc} */
@Override
public Connection getConnection(String username, String password) throws SQLException
{
return new StubConnection();
return new StubConnection(waitTimeout);
}

public void setThrowException(SQLException e)
Expand Down
2 changes: 2 additions & 0 deletions src/test/java/com/zaxxer/hikari/mocks/StubStatement.java
Expand Up @@ -68,6 +68,7 @@ public ResultSet executeQuery(String sql) throws SQLException
{
checkClosed();
StubResultSet resultSet = new StubResultSet();
connection.commit();
return resultSet;
}

Expand Down Expand Up @@ -179,6 +180,7 @@ public boolean execute(String sql) throws SQLException
if (simulatedQueryTime > 0) {
quietlySleep(simulatedQueryTime);
}
connection.commit();
return false;
}

Expand Down
72 changes: 72 additions & 0 deletions src/test/java/com/zaxxer/hikari/pool/TestConnections.java
Expand Up @@ -217,6 +217,78 @@ public void testMaxLifetime2() throws Exception
}
}

@Test
public void testKeepalive() throws Exception{
HikariConfig config = newHikariConfig();
config.setMinimumIdle(0);
config.setMaximumPoolSize(1);
config.setConnectionTimeout(2500);
config.setConnectionTestQuery("VALUES 1");
StubDataSource sds = new StubDataSource();
sds.setWaitTimeout(700);
config.setDataSource(sds);

System.setProperty("com.zaxxer.hikari.housekeeping.periodMs", "100");

setConfigUnitTest(true);
try (HikariDataSource ds = new HikariDataSource(config)) {
getUnsealedConfig(ds).setKeepalive(true);
getUnsealedConfig(ds).setKeepaliveTime(500);

HikariPool pool = getPool(ds);
Connection conn = pool.getConnection();
Connection unwrap = conn.unwrap(Connection.class);
//recycle, change IN_USE state
conn.close();
assertFalse("Connection should be open", unwrap.isClosed());
quietlySleep(1200);
assertFalse("Connection should be open", unwrap.isClosed());
}
finally {
setConfigUnitTest(false);
}
}

@Test
public void testKeepalive2() throws Exception{
HikariConfig config = newHikariConfig();
config.setMinimumIdle(0);
config.setMaximumPoolSize(1);
config.setConnectionTimeout(2500);
config.setConnectionTestQuery("VALUES 1");
StubDataSource sds = new StubDataSource();
sds.setWaitTimeout(500);
config.setDataSource(sds);

System.setProperty("com.zaxxer.hikari.housekeeping.periodMs", "100");

setConfigUnitTest(true);
try (HikariDataSource ds = new HikariDataSource(config)) {
getUnsealedConfig(ds).setKeepalive(true);
getUnsealedConfig(ds).setKeepaliveTime(700);

HikariPool pool = getPool(ds);
Connection conn = pool.getConnection();
Connection unwrap = conn.unwrap(Connection.class);
//recycle, change IN_USE state
conn.close();
assertFalse("Connection should be open", unwrap.isClosed());
quietlySleep(1200);
assertTrue("Connection should have closed:" + unwrap, unwrap.isClosed());

Connection conn2 = pool.getConnection();
Connection unwrap2 = conn2.unwrap(Connection.class);

assertNotSame("Expected a different connection", unwrap, unwrap2);
assertFalse("Connection should be open", unwrap2.isClosed());

conn2.close();
}
finally {
setConfigUnitTest(false);
}
}

@Test
public void testDoubleClose() throws Exception
{
Expand Down