Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace synchronized with j.u.c.l.ReentrantLock for Loom #3480

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 5 additions & 14 deletions src/main/java/redis/clients/jedis/CommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -4166,25 +4166,16 @@ public final CommandObject<Map<String, Object>> graphConfigGet(String configName
/**
* Get the instance for JsonObjectMapper if not null, otherwise a new instance reference with
* default implementation will be created and returned.
* <p>This process of checking whether or not
* the instance reference exists follows <a
* href="https://en.wikipedia.org/wiki/Double-checked_locking#Usage_in_Java"
* target="_blank">'double-checked lock optimization'</a> approach to reduce the overhead of
* acquiring a lock by testing the lock criteria (the "lock hint") before acquiring the lock.</p>
*
* @return the JsonObjectMapper instance reference
* @see DefaultGsonObjectMapper
*/
private JsonObjectMapper getJsonObjectMapper() {
JsonObjectMapper localRef = this.jsonObjectMapper;
if (Objects.isNull(localRef)) {
synchronized (this) {
localRef = this.jsonObjectMapper;
if (Objects.isNull(localRef)) {
this.jsonObjectMapper = localRef = new DefaultGsonObjectMapper();
}
}
if (Objects.isNull(this.jsonObjectMapper)) {
this.jsonObjectMapper = new DefaultGsonObjectMapper();
}
return localRef;

return this.jsonObjectMapper;
}

public void setJsonObjectMapper(JsonObjectMapper jsonObjectMapper) {
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/redis/clients/jedis/JedisSentinelPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
Expand All @@ -28,7 +30,7 @@ public class JedisSentinelPool extends Pool<Jedis> {

private volatile HostAndPort currentHostMaster;

private final Object initPoolLock = new Object();
private final Lock initPoolLock = new ReentrantLock(true);

public JedisSentinelPool(String masterName, Set<HostAndPort> sentinels,
final JedisClientConfig masteClientConfig, final JedisClientConfig sentinelClientConfig) {
Expand Down Expand Up @@ -213,7 +215,9 @@ public HostAndPort getCurrentHostMaster() {
}

private void initMaster(HostAndPort master) {
synchronized (initPoolLock) {
initPoolLock.lock();

try {
if (!master.equals(currentHostMaster)) {
currentHostMaster = master;
factory.setHostAndPort(currentHostMaster);
Expand All @@ -223,6 +227,8 @@ private void initMaster(HostAndPort master) {

LOG.info("Created JedisSentinelPool to master at {}", master);
}
} finally {
initPoolLock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.decorators.Decorators;
import io.github.resilience4j.decorators.Decorators.DecorateSupplier;
import redis.clients.jedis.*;
import redis.clients.jedis.CommandObject;
import redis.clients.jedis.Connection;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider;
import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider.Cluster;
import redis.clients.jedis.util.IOUtils;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


/**
Expand All @@ -25,10 +28,12 @@
public class CircuitBreakerCommandExecutor implements CommandExecutor {

private final static List<Class<? extends Throwable>> circuitBreakerFallbackException =
Arrays.asList(CallNotPermittedException.class);
Arrays.asList(CallNotPermittedException.class);

private final MultiClusterPooledConnectionProvider provider;

private final Lock clusterFailoverHandlerLock = new ReentrantLock(true);

public CircuitBreakerCommandExecutor(MultiClusterPooledConnectionProvider provider) {
this.provider = provider;
}
Expand All @@ -47,7 +52,7 @@ public <T> T executeCommand(CommandObject<T> commandObject) {
supplier.withRetry(cluster.getRetry());
supplier.withCircuitBreaker(cluster.getCircuitBreaker());
supplier.withFallback(circuitBreakerFallbackException,
e -> this.handleClusterFailover(commandObject, cluster.getCircuitBreaker()));
e -> this.handleClusterFailover(commandObject, cluster.getCircuitBreaker()));

return supplier.decorate().get();
}
Expand All @@ -64,32 +69,37 @@ private <T> T handleExecuteCommand(CommandObject<T> commandObject, Cluster clust
/**
* Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker failure scenarios
*/
private synchronized <T> T handleClusterFailover(CommandObject<T> commandObject, CircuitBreaker circuitBreaker) {

// Check state to handle race conditions since incrementActiveMultiClusterIndex() is non-idempotent
if (!CircuitBreaker.State.FORCED_OPEN.equals(circuitBreaker.getState())) {

// Transitions state machine to a FORCED_OPEN state, stopping state transition, metrics and event publishing.
// To recover/transition from this forced state the user will need to manually failback
circuitBreaker.transitionToForcedOpenState();

// Incrementing the activeMultiClusterIndex will allow subsequent calls to the executeCommand()
// to use the next cluster's connection pool - according to the configuration's prioritization/order
int activeMultiClusterIndex = provider.incrementActiveMultiClusterIndex();

// Implementation is optionally provided during configuration. Typically, used for activeMultiClusterIndex persistence or custom logging
provider.runClusterFailoverPostProcessor(activeMultiClusterIndex);
}

// Once the priority list is exhausted only a manual failback can open the circuit breaker so all subsequent operations will fail
else if (provider.isLastClusterCircuitBreakerForcedOpen()) {
throw new JedisConnectionException("Cluster/database endpoint could not failover since the MultiClusterClientConfig was not " +
"provided with an additional cluster/database endpoint according to its prioritized sequence. " +
"If applicable, consider failing back OR restarting with an available cluster/database endpoint");
private <T> T handleClusterFailover(CommandObject<T> commandObject, CircuitBreaker circuitBreaker) {
clusterFailoverHandlerLock.lock();

try {
// Check state to handle race conditions since incrementActiveMultiClusterIndex() is non-idempotent
if (!CircuitBreaker.State.FORCED_OPEN.equals(circuitBreaker.getState())) {

// Transitions state machine to a FORCED_OPEN state, stopping state transition, metrics and event publishing.
// To recover/transition from this forced state the user will need to manually failback
circuitBreaker.transitionToForcedOpenState();

// Incrementing the activeMultiClusterIndex will allow subsequent calls to the executeCommand()
// to use the next cluster's connection pool - according to the configuration's prioritization/order
int activeMultiClusterIndex = provider.incrementActiveMultiClusterIndex();

// Implementation is optionally provided during configuration. Typically, used for activeMultiClusterIndex persistence or custom logging
provider.runClusterFailoverPostProcessor(activeMultiClusterIndex);
}

// Once the priority list is exhausted only a manual failback can open the circuit breaker so all subsequent operations will fail
else if (provider.isLastClusterCircuitBreakerForcedOpen()) {
throw new JedisConnectionException("Cluster/database endpoint could not failover since the MultiClusterClientConfig was not " +
"provided with an additional cluster/database endpoint according to its prioritized sequence. " +
"If applicable, consider failing back OR restarting with an available cluster/database endpoint");
}

// Recursive call to the initiating method so the operation can be retried on the next cluster connection
return executeCommand(commandObject);
} finally {
clusterFailoverHandlerLock.unlock();
}

// Recursive call to the initiating method so the operation can be retried on the next cluster connection
return executeCommand(commandObject);
}

}
16 changes: 11 additions & 5 deletions src/main/java/redis/clients/jedis/graph/GraphCommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

import redis.clients.jedis.Builder;
Expand Down Expand Up @@ -102,9 +104,7 @@ private Builder<ResultSet> getBuilder(String graphName) {
}

private void createBuilder(String graphName) {
synchronized (builders) {
builders.putIfAbsent(graphName, new ResultSetBuilder(new GraphCacheImpl(graphName)));
}
builders.computeIfAbsent(graphName, graphNameKey -> new ResultSetBuilder(new GraphCacheImpl(graphNameKey)));
}

private class GraphCacheImpl implements GraphCache {
Expand Down Expand Up @@ -140,6 +140,8 @@ private class GraphCacheList {
private final String name;
private final String query;
private final List<String> data = new CopyOnWriteArrayList<>();

private final Lock dataLock = new ReentrantLock(true);

/**
*
Expand All @@ -160,14 +162,18 @@ public GraphCacheList(String name, String procedure) {
*/
public String getCachedData(int index) {
if (index >= data.size()) {
synchronized (data) {
dataLock.lock();

try {
if (index >= data.size()) {
getProcedureInfo();
}
} finally {
dataLock.unlock();
}
}

return data.get(index);

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;


Expand Down Expand Up @@ -46,6 +48,8 @@ public class MultiClusterPooledConnectionProvider implements ConnectionProvider
* provided at startup via the MultiClusterClientConfig. All traffic will be routed according to this index.
*/
private volatile Integer activeMultiClusterIndex = 1;

private final Lock activeClusterIndexLock = new ReentrantLock(true);

/**
* Indicates the final cluster/database endpoint (connection pool), according to the pre-configured list
Expand Down Expand Up @@ -142,8 +146,9 @@ public int incrementActiveMultiClusterIndex() {

// Field-level synchronization is used to avoid the edge case in which
// setActiveMultiClusterIndex(int multiClusterIndex) is called at the same time
synchronized (activeMultiClusterIndex) {

activeClusterIndexLock.lock();

try {
String originalClusterName = getClusterCircuitBreaker().getName();

// Only increment if it can pass this validation otherwise we will need to check for NULL in the data path
Expand All @@ -165,6 +170,8 @@ public int incrementActiveMultiClusterIndex() {
incrementActiveMultiClusterIndex();

else log.warn("Cluster/database endpoint successfully updated from '{}' to '{}'", originalClusterName, circuitBreaker.getName());
} finally {
activeClusterIndexLock.unlock();
}

return activeMultiClusterIndex;
Expand Down Expand Up @@ -209,11 +216,13 @@ public void validateTargetConnection(int multiClusterIndex) {
* Special care should be taken to confirm cluster/database availability AND
* potentially cross-cluster replication BEFORE using this capability.
*/
public synchronized void setActiveMultiClusterIndex(int multiClusterIndex) {
public void setActiveMultiClusterIndex(int multiClusterIndex) {

// Field-level synchronization is used to avoid the edge case in which
// incrementActiveMultiClusterIndex() is called at the same time
synchronized (activeMultiClusterIndex) {
activeClusterIndexLock.lock();

try {

// Allows an attempt to reset the current cluster from a FORCED_OPEN to CLOSED state in the event that no failover is possible
if (activeMultiClusterIndex == multiClusterIndex &&
Expand All @@ -236,6 +245,8 @@ public synchronized void setActiveMultiClusterIndex(int multiClusterIndex) {

activeMultiClusterIndex = multiClusterIndex;
lastClusterCircuitBreakerForcedOpen = false;
} finally {
activeClusterIndexLock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
Expand Down Expand Up @@ -43,7 +45,7 @@ public class SentineledConnectionProvider implements ConnectionProvider {

private final long subscribeRetryWaitTimeMillis;

private final Object initPoolLock = new Object();
private final Lock initPoolLock = new ReentrantLock(true);

public SentineledConnectionProvider(String masterName, final JedisClientConfig masterClientConfig,
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig) {
Expand Down Expand Up @@ -95,7 +97,9 @@ public HostAndPort getCurrentMaster() {
}

private void initMaster(HostAndPort master) {
synchronized (initPoolLock) {
initPoolLock.lock();

try {
if (!master.equals(currentMaster)) {
currentMaster = master;

Expand All @@ -114,6 +118,8 @@ private void initMaster(HostAndPort master) {
existingPool.close();
}
}
} finally {
initPoolLock.unlock();
}
}

Expand Down