From 0561309e328c213a0c0f1656161fdc51393ad77b Mon Sep 17 00:00:00 2001 From: nothing Date: Tue, 12 Jan 2021 11:32:22 +0800 Subject: [PATCH] features: add heartbeat to keepalive connection if configure it (#1699) * features: add heartbeat to keepalive connection if configure it * features: add heartbeat to keepalive connection if configure it * features: add heartbeat to keepalive connection if configure it * features: keepalive unit test * optimize code * optimize code --- .../java/com/zaxxer/hikari/HikariConfig.java | 36 ++++++++++ .../com/zaxxer/hikari/pool/HikariPool.java | 21 ++++++ .../com/zaxxer/hikari/pool/PoolEntry.java | 11 +++ .../zaxxer/hikari/mocks/StubConnection.java | 41 ++++++++++- .../zaxxer/hikari/mocks/StubDataSource.java | 14 +++- .../zaxxer/hikari/mocks/StubStatement.java | 2 + .../zaxxer/hikari/pool/TestConnections.java | 70 +++++++++++++++++++ 7 files changed, 189 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/zaxxer/hikari/HikariConfig.java b/src/main/java/com/zaxxer/hikari/HikariConfig.java index e4de64531..e1a375c19 100644 --- a/src/main/java/com/zaxxer/hikari/HikariConfig.java +++ b/src/main/java/com/zaxxer/hikari/HikariConfig.java @@ -55,6 +55,7 @@ public class HikariConfig implements HikariConfigMXBean private static final long VALIDATION_TIMEOUT = SECONDS.toMillis(5); private static final long IDLE_TIMEOUT = MINUTES.toMillis(10); private static final long MAX_LIFETIME = MINUTES.toMillis(30); + private static final long DEFAULT_KEEPALIVE_TIME = 0L; private static final int DEFAULT_POOL_SIZE = 10; private static boolean unitTest = false; @@ -99,6 +100,8 @@ public class HikariConfig implements HikariConfigMXBean private Object healthCheckRegistry; private Properties healthCheckProperties; + private long keepaliveTime; + private volatile boolean sealed; /** @@ -117,6 +120,7 @@ public HikariConfig() idleTimeout = IDLE_TIMEOUT; initializationFailTimeout = 1; isAutoCommit = true; + keepaliveTime = DEFAULT_KEEPALIVE_TIME; String systemProp = System.getProperty("hikaricp.configurationFile"); if (systemProp != null) { @@ -720,6 +724,26 @@ public void addHealthCheckProperty(String key, String value) healthCheckProperties.setProperty(key, value); } + /** + * This property controls the keepalive interval for a connection in the pool. An in-use connection will never be + * tested by the keepalive thread, only when it is idle will it be tested. + * + * @return the interval in which connections will be tested for aliveness, thus keeping them alive by the act of checking. Value is in milliseconds, default is 0 (disabled). + */ + public long getKeepaliveTime() { + return keepaliveTime; + } + + /** + * This property controls the keepalive interval for a connection in the pool. An in-use connection will never be + * tested by the keepalive thread, only when it is idle will it be tested. + * + * @param keepaliveTimeMs the interval in which connections will be tested for aliveness, thus keeping them alive by the act of checking. Value is in milliseconds, default is 0 (disabled). + */ + public void setKeepaliveTime(long keepaliveTimeMs) { + this.keepaliveTime = keepaliveTimeMs; + } + /** * Determine whether the Connections in the pool are in read-only mode. * @@ -1018,6 +1042,18 @@ private void validateNumerics() maxLifetime = MAX_LIFETIME; } + // keepalive time must larger then 30 seconds + if (keepaliveTime != 0 && keepaliveTime < SECONDS.toMillis(30)) { + LOGGER.warn("{} - keepaliveTime is less than 30000ms, disabling it.", poolName); + keepaliveTime = DEFAULT_KEEPALIVE_TIME; + } + + // keepalive time must be less than maxLifetime (if maxLifetime is enabled) + if (keepaliveTime != 0 && maxLifetime != 0 && keepaliveTime >= maxLifetime) { + LOGGER.warn("{} - keepaliveTime is greater than or equal to maxLifetime, disabling it.", poolName); + keepaliveTime = DEFAULT_KEEPALIVE_TIME; + } + if (leakDetectionThreshold > 0 && !unitTest) { if (leakDetectionThreshold < SECONDS.toMillis(2) || (leakDetectionThreshold > maxLifetime && maxLifetime > 0)) { LOGGER.warn("{} - leakDetectionThreshold is less than 2000ms or more than maxLifetime, disabling it.", poolName); diff --git a/src/main/java/com/zaxxer/hikari/pool/HikariPool.java b/src/main/java/com/zaxxer/hikari/pool/HikariPool.java index bb0c68954..c5a83e5a4 100644 --- a/src/main/java/com/zaxxer/hikari/pool/HikariPool.java +++ b/src/main/java/com/zaxxer/hikari/pool/HikariPool.java @@ -477,6 +477,8 @@ private PoolEntry createPoolEntry() final PoolEntry poolEntry = newPoolEntry(); final long maxLifetime = config.getMaxLifetime(); + final long keepaliveTime = config.getKeepaliveTime(); + if (maxLifetime > 0) { // variance up to 2.5% of the maxlifetime final long variance = maxLifetime > 10_000 ? ThreadLocalRandom.current().nextLong( maxLifetime / 40 ) : 0; @@ -490,6 +492,25 @@ private PoolEntry createPoolEntry() lifetime, MILLISECONDS)); } + if (keepaliveTime > 0) { + // variance up to 10% of the heartbeat time + final long variance = ThreadLocalRandom.current().nextLong(keepaliveTime / 10); + final long heartbeatTime = keepaliveTime - variance; + poolEntry.setKeepalive(houseKeepingExecutorService.scheduleWithFixedDelay( + () -> { + if (connectionBag.reserve(poolEntry)) { + if (!isConnectionAlive(poolEntry.connection)) { + softEvictConnection(poolEntry, DEAD_CONNECTION_MESSAGE, true); + addBagItem(connectionBag.getWaitingThreadCount()); + } + else { + connectionBag.unreserve(poolEntry); + logger.debug("{} - keepalive: connection {} is alive", poolName, poolEntry.connection); + } + } + }, heartbeatTime, heartbeatTime, MILLISECONDS)); + } + return poolEntry; } catch (ConnectionSetupException e) { diff --git a/src/main/java/com/zaxxer/hikari/pool/PoolEntry.java b/src/main/java/com/zaxxer/hikari/pool/PoolEntry.java index 07c670b83..93f892b19 100644 --- a/src/main/java/com/zaxxer/hikari/pool/PoolEntry.java +++ b/src/main/java/com/zaxxer/hikari/pool/PoolEntry.java @@ -47,6 +47,7 @@ final class PoolEntry implements IConcurrentBagEntry private volatile boolean evict; private volatile ScheduledFuture endOfLife; + private volatile ScheduledFuture keepalive; private final FastList openStatements; private final HikariPool hikariPool; @@ -92,6 +93,10 @@ void setFutureEol(final ScheduledFuture endOfLife) this.endOfLife = endOfLife; } + public void setKeepalive(ScheduledFuture keepalive) { + this.keepalive = keepalive; + } + Connection createProxyConnection(final ProxyLeakTask leakTask, final long now) { return ProxyFactory.getProxyConnection(this, connection, openStatements, leakTask, now, isReadOnly, isAutoCommit); @@ -175,9 +180,15 @@ Connection close() LOGGER.warn("{} - maxLifeTime expiration task cancellation unexpectedly returned false for connection {}", getPoolName(), connection); } + ScheduledFuture ka = keepalive; + if (ka != null && !ka.isDone() && !ka.cancel(false)) { + LOGGER.warn("{} - keepalive task cancellation unexpectedly returned false for connection {}", getPoolName(), connection); + } + Connection con = connection; connection = null; endOfLife = null; + keepalive = null; return con; } diff --git a/src/test/java/com/zaxxer/hikari/mocks/StubConnection.java b/src/test/java/com/zaxxer/hikari/mocks/StubConnection.java index f96624bd2..4660fbc82 100644 --- a/src/test/java/com/zaxxer/hikari/mocks/StubConnection.java +++ b/src/test/java/com/zaxxer/hikari/mocks/StubConnection.java @@ -33,7 +33,7 @@ import java.sql.Struct; import java.util.Map; import java.util.Properties; -import java.util.concurrent.Executor; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import com.zaxxer.hikari.util.UtilityElf; @@ -47,11 +47,16 @@ public class StubConnection extends StubBaseConnection implements Connection public static final AtomicInteger count = new AtomicInteger(); public static volatile boolean slowCreate; public static volatile boolean oldDriver; + private volatile boolean isClosed = false; private static long foo; private boolean autoCommit; private int isolation = Connection.TRANSACTION_READ_COMMITTED; private String catalog; + private long waitTimeout; + + private static ScheduledExecutorService connectionWaitTimeout = new ScheduledThreadPoolExecutor(1); + private ScheduledFuture waitTimeoutTask; static { foo = System.currentTimeMillis(); @@ -64,6 +69,21 @@ public StubConnection() { } } + public StubConnection(long waitTimeout) { + this.waitTimeout = waitTimeout; + count.incrementAndGet(); + if (slowCreate) { + UtilityElf.quietlySleep(1000); + } + + try { + refreshConnectionWaitTimeout(); + } catch (Exception e){ + //ignore + } + } + + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override @@ -128,7 +148,21 @@ public boolean getAutoCommit() throws SQLException @Override public void commit() throws SQLException { + refreshConnectionWaitTimeout(); + } + + private void refreshConnectionWaitTimeout() throws SQLException { + if (this.isClosed) { + throw new SQLException("connection has been closed"); + } + if (waitTimeoutTask != null) { + waitTimeoutTask.cancel(true); + } + + if (waitTimeout > 0) { + waitTimeoutTask = connectionWaitTimeout.schedule(() -> { this.isClosed = true;}, waitTimeout, TimeUnit.MILLISECONDS); + } } /** {@inheritDoc} */ @@ -152,7 +186,7 @@ public boolean isClosed() throws SQLException if (throwException) { throw new SQLException(); } - return false; + return isClosed; } /** {@inheritDoc} */ @@ -408,7 +442,8 @@ public boolean isValid(int timeout) throws SQLException if (throwException) { throw new SQLException(); } - return true; + refreshConnectionWaitTimeout(); + return !isClosed; } /** {@inheritDoc} */ diff --git a/src/test/java/com/zaxxer/hikari/mocks/StubDataSource.java b/src/test/java/com/zaxxer/hikari/mocks/StubDataSource.java index c7da189bd..7e55580ca 100644 --- a/src/test/java/com/zaxxer/hikari/mocks/StubDataSource.java +++ b/src/test/java/com/zaxxer/hikari/mocks/StubDataSource.java @@ -38,6 +38,7 @@ public class StubDataSource implements DataSource private SQLException throwException; private long connectionAcquistionTime = 0; private int loginTimeout; + private int waitTimeout = 30000; public String getUser() { @@ -59,6 +60,14 @@ public void setPassword(String password) this.password = password; } + public int getWaitTimeout() { + return waitTimeout; + } + + public void setWaitTimeout(int waitTimeout) { + this.waitTimeout = waitTimeout; + } + public void setURL(String url) { // we don't care @@ -127,15 +136,14 @@ public Connection getConnection() throws SQLException if (connectionAcquistionTime > 0) { UtilityElf.quietlySleep(connectionAcquistionTime); } - - return new StubConnection(); + return new StubConnection(waitTimeout); } /** {@inheritDoc} */ @Override public Connection getConnection(String username, String password) throws SQLException { - return new StubConnection(); + return new StubConnection(waitTimeout); } public void setThrowException(SQLException e) diff --git a/src/test/java/com/zaxxer/hikari/mocks/StubStatement.java b/src/test/java/com/zaxxer/hikari/mocks/StubStatement.java index 99e7b1330..7378abc50 100644 --- a/src/test/java/com/zaxxer/hikari/mocks/StubStatement.java +++ b/src/test/java/com/zaxxer/hikari/mocks/StubStatement.java @@ -68,6 +68,7 @@ public ResultSet executeQuery(String sql) throws SQLException { checkClosed(); StubResultSet resultSet = new StubResultSet(); + connection.commit(); return resultSet; } @@ -179,6 +180,7 @@ public boolean execute(String sql) throws SQLException if (simulatedQueryTime > 0) { quietlySleep(simulatedQueryTime); } + connection.commit(); return false; } diff --git a/src/test/java/com/zaxxer/hikari/pool/TestConnections.java b/src/test/java/com/zaxxer/hikari/pool/TestConnections.java index 0275b1322..f54417643 100644 --- a/src/test/java/com/zaxxer/hikari/pool/TestConnections.java +++ b/src/test/java/com/zaxxer/hikari/pool/TestConnections.java @@ -217,6 +217,76 @@ public void testMaxLifetime2() throws Exception } } + @Test + public void testKeepalive() throws Exception{ + HikariConfig config = newHikariConfig(); + config.setMinimumIdle(0); + config.setMaximumPoolSize(1); + config.setConnectionTimeout(2500); + config.setConnectionTestQuery("VALUES 1"); + StubDataSource sds = new StubDataSource(); + sds.setWaitTimeout(700); + config.setDataSource(sds); + + System.setProperty("com.zaxxer.hikari.housekeeping.periodMs", "100"); + + setConfigUnitTest(true); + try (HikariDataSource ds = new HikariDataSource(config)) { + getUnsealedConfig(ds).setKeepaliveTime(500); + + HikariPool pool = getPool(ds); + Connection conn = pool.getConnection(); + Connection unwrap = conn.unwrap(Connection.class); + //recycle, change IN_USE state + conn.close(); + assertFalse("Connection should be open", unwrap.isClosed()); + quietlySleep(1200); + assertFalse("Connection should be open", unwrap.isClosed()); + } + finally { + setConfigUnitTest(false); + } + } + + @Test + public void testKeepalive2() throws Exception{ + HikariConfig config = newHikariConfig(); + config.setMinimumIdle(0); + config.setMaximumPoolSize(1); + config.setConnectionTimeout(2500); + config.setConnectionTestQuery("VALUES 1"); + StubDataSource sds = new StubDataSource(); + sds.setWaitTimeout(500); + config.setDataSource(sds); + + System.setProperty("com.zaxxer.hikari.housekeeping.periodMs", "100"); + + setConfigUnitTest(true); + try (HikariDataSource ds = new HikariDataSource(config)) { + getUnsealedConfig(ds).setKeepaliveTime(700); + + HikariPool pool = getPool(ds); + Connection conn = pool.getConnection(); + Connection unwrap = conn.unwrap(Connection.class); + //recycle, change IN_USE state + conn.close(); + assertFalse("Connection should be open", unwrap.isClosed()); + quietlySleep(1200); + assertTrue("Connection should have closed:" + unwrap, unwrap.isClosed()); + + Connection conn2 = pool.getConnection(); + Connection unwrap2 = conn2.unwrap(Connection.class); + + assertNotSame("Expected a different connection", unwrap, unwrap2); + assertFalse("Connection should be open", unwrap2.isClosed()); + + conn2.close(); + } + finally { + setConfigUnitTest(false); + } + } + @Test public void testDoubleClose() throws Exception {