Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Virtual Threads #2055

Open
wants to merge 4 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/com/zaxxer/hikari/HikariConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ public void setMetricRegistry(Object metricRegistry)
metricRegistry = getObjectOrPerformJndiLookup(metricRegistry);

if (!safeIsAssignableFrom(metricRegistry, "com.codahale.metrics.MetricRegistry")
&& !(safeIsAssignableFrom(metricRegistry, "io.micrometer.core.instrument.MeterRegistry"))) {
&& !(safeIsAssignableFrom(metricRegistry, "io.micrometer.core.instrument.MeterRegistry"))) {
throw new IllegalArgumentException("Class must be instance of com.codahale.metrics.MetricRegistry or io.micrometer.core.instrument.MeterRegistry");
}
}
Expand Down
7 changes: 6 additions & 1 deletion src/main/java/com/zaxxer/hikari/HikariDataSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;

import static com.zaxxer.hikari.pool.HikariPool.POOL_NORMAL;

Expand All @@ -45,6 +46,7 @@ public class HikariDataSource extends HikariConfig implements DataSource, Closea

private final HikariPool fastPathPool;
private volatile HikariPool pool;
private final ReentrantLock hikariDataSourceLock = new ReentrantLock();

/**
* Default constructor. Setters are used to configure the pool. Using
Expand Down Expand Up @@ -103,7 +105,8 @@ public Connection getConnection() throws SQLException
// See http://en.wikipedia.org/wiki/Double-checked_locking#Usage_in_Java
HikariPool result = pool;
if (result == null) {
synchronized (this) {
hikariDataSourceLock.lock();
try {
result = pool;
if (result == null) {
validate();
Expand All @@ -122,6 +125,8 @@ public Connection getConnection() throws SQLException
}
LOGGER.info("{} - Start completed.", getPoolName());
}
} finally {
hikariDataSourceLock.unlock();
}
}

Expand Down
35 changes: 21 additions & 14 deletions src/main/java/com/zaxxer/hikari/HikariJNDIFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import javax.sql.DataSource;
import java.util.Hashtable;
import java.util.Properties;
import java.util.concurrent.locks.ReentrantLock;

/**
* A JNDI factory that produces HikariDataSource instances.
Expand All @@ -31,26 +32,32 @@
*/
public class HikariJNDIFactory implements ObjectFactory
{
private final ReentrantLock hikariJNDIFactoryLock = new ReentrantLock();
@Override
synchronized public Object getObjectInstance(Object obj, Name name, Context nameCtx, Hashtable<?, ?> environment) throws Exception
public Object getObjectInstance(Object obj, Name name, Context nameCtx, Hashtable<?, ?> environment) throws Exception
{
// We only know how to deal with <code>javax.naming.Reference</code> that specify a class name of "javax.sql.DataSource"
if (obj instanceof Reference && "javax.sql.DataSource".equals(((Reference) obj).getClassName())) {
var ref = (Reference) obj;
var hikariPropSet = PropertyElf.getPropertyNames(HikariConfig.class);
hikariJNDIFactoryLock.lock();
try {
// We only know how to deal with <code>javax.naming.Reference</code> that specify a class name of "javax.sql.DataSource"
if (obj instanceof Reference && "javax.sql.DataSource".equals(((Reference) obj).getClassName())) {
var ref = (Reference) obj;
var hikariPropSet = PropertyElf.getPropertyNames(HikariConfig.class);

var properties = new Properties();
var enumeration = ref.getAll();
while (enumeration.hasMoreElements()) {
var element = enumeration.nextElement();
var type = element.getType();
if (type.startsWith("dataSource.") || hikariPropSet.contains(type)) {
properties.setProperty(type, element.getContent().toString());
var properties = new Properties();
var enumeration = ref.getAll();
while (enumeration.hasMoreElements()) {
var element = enumeration.nextElement();
var type = element.getType();
if (type.startsWith("dataSource.") || hikariPropSet.contains(type)) {
properties.setProperty(type, element.getContent().toString());
}
}
return createDataSource(properties, nameCtx);
}
return createDataSource(properties, nameCtx);
return null;
} finally {
hikariJNDIFactoryLock.unlock();
}
return null;
}

private DataSource createDataSource(final Properties properties, final Context context) throws NamingException
Expand Down
158 changes: 93 additions & 65 deletions src/main/java/com/zaxxer/hikari/pool/HikariPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Optional;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.ReentrantLock;

import static com.zaxxer.hikari.util.ClockSource.*;
import static com.zaxxer.hikari.util.ConcurrentBag.IConcurrentBagEntry.STATE_IN_USE;
Expand Down Expand Up @@ -80,6 +81,7 @@ public final class HikariPool extends PoolBase implements HikariPoolMXBean, IBag

private final ScheduledExecutorService houseKeepingExecutorService;
private ScheduledFuture<?> houseKeeperTask;
private final ReentrantLock hikariPoolLock = new ReentrantLock();

/**
* Construct a HikariPool with the specified configuration.
Expand Down Expand Up @@ -193,59 +195,64 @@ public Connection getConnection(final long hardTimeout) throws SQLException
*
* @throws InterruptedException thrown if the thread is interrupted during shutdown
*/
public synchronized void shutdown() throws InterruptedException
public void shutdown() throws InterruptedException
{
hikariPoolLock.lock();
try {
poolState = POOL_SHUTDOWN;
try {
poolState = POOL_SHUTDOWN;

if (addConnectionExecutor == null) { // pool never started
return;
}
if (addConnectionExecutor == null) { // pool never started
return;
}

logPoolState("Before shutdown ");
logPoolState("Before shutdown ");

if (houseKeeperTask != null) {
houseKeeperTask.cancel(false);
houseKeeperTask = null;
}
if (houseKeeperTask != null) {
houseKeeperTask.cancel(false);
houseKeeperTask = null;
}

softEvictConnections();
softEvictConnections();

addConnectionExecutor.shutdown();
if (!addConnectionExecutor.awaitTermination(getLoginTimeout(), SECONDS)) {
logger.warn("Timed-out waiting for add connection executor to shutdown");
}
addConnectionExecutor.shutdown();
if (!addConnectionExecutor.awaitTermination(getLoginTimeout(), SECONDS)) {
logger.warn("Timed-out waiting for add connection executor to shutdown");
}

destroyHouseKeepingExecutorService();
destroyHouseKeepingExecutorService();

connectionBag.close();
connectionBag.close();

final var assassinExecutor = createThreadPoolExecutor(config.getMaximumPoolSize(), poolName + " connection assassinator",
config.getThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
try {
final var start = currentTime();
do {
abortActiveConnections(assassinExecutor);
softEvictConnections();
} while (getTotalConnections() > 0 && elapsedMillis(start) < SECONDS.toMillis(10));
}
finally {
assassinExecutor.shutdown();
if (!assassinExecutor.awaitTermination(10L, SECONDS)) {
logger.warn("Timed-out waiting for connection assassin to shutdown");
final var assassinExecutor = createThreadPoolExecutor(config.getMaximumPoolSize(), poolName + " connection assassinator",
config.getThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
try {
final var start = currentTime();
do {
abortActiveConnections(assassinExecutor);
softEvictConnections();
} while (getTotalConnections() > 0 && elapsedMillis(start) < SECONDS.toMillis(10));
}
finally {
assassinExecutor.shutdown();
if (!assassinExecutor.awaitTermination(10L, SECONDS)) {
logger.warn("Timed-out waiting for connection assassin to shutdown");
}
}
}

shutdownNetworkTimeoutExecutor();
closeConnectionExecutor.shutdown();
if (!closeConnectionExecutor.awaitTermination(10L, SECONDS)) {
logger.warn("Timed-out waiting for close connection executor to shutdown");
shutdownNetworkTimeoutExecutor();
closeConnectionExecutor.shutdown();
if (!closeConnectionExecutor.awaitTermination(10L, SECONDS)) {
logger.warn("Timed-out waiting for close connection executor to shutdown");
}
}
}
finally {
logPoolState("After shutdown ");
handleMBeans(this, false);
metricsTracker.close();
finally {
logPoolState("After shutdown ");
handleMBeans(this, false);
metricsTracker.close();
}
} finally {
hikariPoolLock.unlock();
bdeneuter marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down Expand Up @@ -368,25 +375,35 @@ public void softEvictConnections()

/** {@inheritDoc} */
@Override
public synchronized void suspendPool()
public void suspendPool()
{
if (suspendResumeLock == SuspendResumeLock.FAUX_LOCK) {
throw new IllegalStateException(poolName + " - is not suspendable");
}
else if (poolState != POOL_SUSPENDED) {
suspendResumeLock.suspend();
poolState = POOL_SUSPENDED;
hikariPoolLock.lock();
try {
if (suspendResumeLock == SuspendResumeLock.FAUX_LOCK) {
throw new IllegalStateException(poolName + " - is not suspendable");
}
else if (poolState != POOL_SUSPENDED) {
suspendResumeLock.suspend();
poolState = POOL_SUSPENDED;
}
} finally {
hikariPoolLock.unlock();
}
}

/** {@inheritDoc} */
@Override
public synchronized void resumePool()
public void resumePool()
{
if (poolState == POOL_SUSPENDED) {
poolState = POOL_NORMAL;
fillPool(false);
suspendResumeLock.resume();
hikariPoolLock.lock();
try {
if (poolState == POOL_SUSPENDED) {
poolState = POOL_NORMAL;
fillPool(false);
suspendResumeLock.resume();
}
} finally {
hikariPoolLock.unlock();
}
}

Expand Down Expand Up @@ -496,18 +513,23 @@ private PoolEntry createPoolEntry()
/**
* Fill pool up from current idle connections (as they are perceived at the point of execution) to minimumIdle connections.
*/
private synchronized void fillPool(final boolean isAfterAdd)
private void fillPool(final boolean isAfterAdd)
{
final var idle = getIdleConnections();
final var shouldAdd = getTotalConnections() < config.getMaximumPoolSize() && idle < config.getMinimumIdle();
hikariPoolLock.lock();
try {
final var idle = getIdleConnections();
final var shouldAdd = getTotalConnections() < config.getMaximumPoolSize() && idle < config.getMinimumIdle();

if (shouldAdd) {
final var countToAdd = config.getMinimumIdle() - idle;
for (int i = 0; i < countToAdd; i++)
addConnectionExecutor.submit(isAfterAdd ? postFillPoolEntryCreator : poolEntryCreator);
}
else if (isAfterAdd) {
logger.debug("{} - Fill pool skipped, pool has sufficient level or currently being filled.", poolName);
if (shouldAdd) {
final var countToAdd = config.getMinimumIdle() - idle;
for (int i = 0; i < countToAdd; i++)
addConnectionExecutor.submit(isAfterAdd ? postFillPoolEntryCreator : poolEntryCreator);
}
else if (isAfterAdd) {
logger.debug("{} - Fill pool skipped, pool has sufficient level or currently being filled.", poolName);
}
} finally {
hikariPoolLock.unlock();
}
}

Expand Down Expand Up @@ -704,6 +726,7 @@ private SQLException createTimeoutException(long startTime)
private final class PoolEntryCreator implements Callable<Boolean>
{
private final String loggingPrefix;
private final ReentrantLock poolEntryLock = new ReentrantLock();

PoolEntryCreator()
{
Expand Down Expand Up @@ -754,9 +777,14 @@ public Boolean call()
*
* @return true if we should create a connection, false if the need has disappeared
*/
private synchronized boolean shouldContinueCreating() {
return poolState == POOL_NORMAL && getTotalConnections() < config.getMaximumPoolSize() &&
(getIdleConnections() < config.getMinimumIdle() || connectionBag.getWaitingThreadCount() > getIdleConnections());
private boolean shouldContinueCreating() {
poolEntryLock.lock();
try {
return poolState == POOL_NORMAL && getTotalConnections() < config.getMaximumPoolSize() &&
(getIdleConnections() < config.getMinimumIdle() || connectionBag.getWaitingThreadCount() > getIdleConnections());
} finally {
poolEntryLock.unlock();
}
}
}

Expand Down