Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introducing periodic topology mechanism for JedisCluster (#3596) #3604

Open
wants to merge 2 commits into
base: 4.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 5 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ daemonize yes
protected-mode no
requirepass cluster
port 7379
cluster-node-timeout 50
cluster-node-timeout 15000
pidfile /tmp/redis_cluster_node1.pid
logfile /tmp/redis_cluster_node1.log
save ""
Expand All @@ -223,7 +223,7 @@ daemonize yes
protected-mode no
requirepass cluster
port 7380
cluster-node-timeout 50
cluster-node-timeout 15000
pidfile /tmp/redis_cluster_node2.pid
logfile /tmp/redis_cluster_node2.log
save ""
Expand All @@ -237,7 +237,7 @@ daemonize yes
protected-mode no
requirepass cluster
port 7381
cluster-node-timeout 50
cluster-node-timeout 15000
pidfile /tmp/redis_cluster_node3.pid
logfile /tmp/redis_cluster_node3.log
save ""
Expand All @@ -251,7 +251,7 @@ daemonize yes
protected-mode no
requirepass cluster
port 7382
cluster-node-timeout 50
cluster-node-timeout 15000
pidfile /tmp/redis_cluster_node4.pid
logfile /tmp/redis_cluster_node4.log
save ""
Expand All @@ -265,7 +265,7 @@ daemonize yes
protected-mode no
requirepass cluster
port 7383
cluster-node-timeout 5000
cluster-node-timeout 15000
pidfile /tmp/redis_cluster_node5.pid
logfile /tmp/redis_cluster_node5.log
save ""
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/redis/clients/jedis/ClusterPipeline.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package redis.clients.jedis;

import java.time.Duration;
import java.util.Set;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.providers.ClusterConnectionProvider;
Expand All @@ -21,6 +22,12 @@ public ClusterPipeline(Set<HostAndPort> clusterNodes, JedisClientConfig clientCo
this.closeable = this.provider;
}

public ClusterPipeline(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig, Duration topologyRefreshPeriod) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig, topologyRefreshPeriod));
this.closeable = this.provider;
}

public ClusterPipeline(ClusterConnectionProvider provider) {
super(new ClusterCommandObjects());
this.provider = provider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ private Socket connectToFirstSuccessfulHost(HostAndPort hostAndPort) throws Exce
socket.setTcpNoDelay(true); // Socket buffer Whetherclosed, to ensure timely delivery of data
socket.setSoLinger(true, 0); // Control calls close () method, the underlying socket is closed immediately

socket.connect(new InetSocketAddress(host.getHostAddress(), hostAndPort.getPort()), connectionTimeout);
// Passing 'host' directly will avoid another call to InetAddress.getByName() inside the InetSocketAddress constructor.
// For machines with ipv4 and ipv6, but the startNode uses ipv4 to connect, the ipv6 connection may fail.
socket.connect(new InetSocketAddress(host, hostAndPort.getPort()), connectionTimeout);
return socket;
} catch (Exception e) {
jce.addSuppressed(e);
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

public class JedisCluster extends UnifiedJedis {

public static final String INIT_NO_ERROR_PROPERTY = "jedis.cluster.initNoError";

/**
* Default timeout in milliseconds.
*/
Expand Down Expand Up @@ -192,6 +194,13 @@ public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfi
super(clusterNodes, clientConfig, maxAttempts, maxTotalRetriesDuration);
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig, Duration topologyRefreshPeriod, int maxAttempts,
Duration maxTotalRetriesDuration) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig, topologyRefreshPeriod),
maxAttempts, maxTotalRetriesDuration);
}

public JedisCluster(ClusterConnectionProvider provider, int maxAttempts,
Duration maxTotalRetriesDuration) {
super(provider, maxAttempts, maxTotalRetriesDuration);
Expand Down
85 changes: 81 additions & 4 deletions src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package redis.clients.jedis;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -10,17 +11,26 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.exceptions.JedisClusterOperationException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.util.SafeEncoder;

import static redis.clients.jedis.JedisCluster.INIT_NO_ERROR_PROPERTY;

public class JedisClusterInfoCache {

private static final Logger logger = LoggerFactory.getLogger(JedisClusterInfoCache.class);

private final Map<String, ConnectionPool> nodes = new HashMap<>();
private final ConnectionPool[] slots = new ConnectionPool[Protocol.CLUSTER_HASHSLOTS];
private final HostAndPort[] slotNodes = new HostAndPort[Protocol.CLUSTER_HASHSLOTS];
Expand All @@ -36,6 +46,20 @@ public class JedisClusterInfoCache {

private static final int MASTER_NODE_INDEX = 2;

/**
* The single thread executor for the topology refresh task.
*/
private ScheduledExecutorService topologyRefreshExecutor = null;

class TopologyRefreshTask implements Runnable {
@Override
public void run() {
logger.debug("Cluster topology refresh run, old nodes: {}", nodes.keySet());
renewClusterSlots(null);
logger.debug("Cluster topology refresh run, new nodes: {}", nodes.keySet());
}
}

@Deprecated
public JedisClusterInfoCache(final JedisClientConfig clientConfig) {
this(clientConfig, new GenericObjectPoolConfig<Connection>());
Expand All @@ -53,15 +77,55 @@ public JedisClusterInfoCache(final JedisClientConfig clientConfig, final Set<Hos

public JedisClusterInfoCache(final JedisClientConfig clientConfig,
final GenericObjectPoolConfig<Connection> poolConfig, final Set<HostAndPort> startNodes) {
this(clientConfig, poolConfig, startNodes, null);
}

public JedisClusterInfoCache(final JedisClientConfig clientConfig,
final GenericObjectPoolConfig<Connection> poolConfig, final Set<HostAndPort> startNodes,
final Duration topologyRefreshPeriod) {
this.poolConfig = poolConfig;
this.clientConfig = clientConfig;
this.startNodes = startNodes;
if (topologyRefreshPeriod != null) {
logger.info("Cluster topology refresh start, period: {}, startNodes: {}", topologyRefreshPeriod, startNodes);
topologyRefreshExecutor = Executors.newSingleThreadScheduledExecutor();
topologyRefreshExecutor.scheduleWithFixedDelay(new TopologyRefreshTask(), topologyRefreshPeriod.toMillis(),
topologyRefreshPeriod.toMillis(), TimeUnit.MILLISECONDS);
}
}

/**
* Check whether the number and order of slots in the cluster topology are equal to CLUSTER_HASHSLOTS
* @param slotsInfo the cluster topology
* @return if slots is ok, return true, elese return false.
*/
private boolean checkClusterSlotSequence(List<Object> slotsInfo) {
List<Integer> slots = new ArrayList<>();
for (Object slotInfoObj : slotsInfo) {
List<Object> slotInfo = (List<Object>)slotInfoObj;
slots.addAll(getAssignedSlotArray(slotInfo));
}
Collections.sort(slots);
if (slots.size() != Protocol.CLUSTER_HASHSLOTS) {
return false;
}
for (int i = 0; i < Protocol.CLUSTER_HASHSLOTS; ++i) {
if (i != slots.get(i)) {
return false;
}
}
return true;
}

public void discoverClusterNodesAndSlots(Connection jedis) {
List<Object> slotsInfo = executeClusterSlots(jedis);
if (slotsInfo.isEmpty()) {
throw new JedisClusterOperationException("Cluster slots list is empty.");
if (System.getProperty(INIT_NO_ERROR_PROPERTY) == null) {
if (slotsInfo.isEmpty()) {
throw new JedisClusterOperationException("Cluster slots list is empty.");
}
if (!checkClusterSlotSequence(slotsInfo)) {
throw new JedisClusterOperationException("Cluster slots have holes.");
}
}
w.lock();
try {
Expand Down Expand Up @@ -144,8 +208,13 @@ public void renewClusterSlots(Connection jedis) {

private void discoverClusterSlots(Connection jedis) {
List<Object> slotsInfo = executeClusterSlots(jedis);
if (slotsInfo.isEmpty()) {
throw new JedisClusterOperationException("Cluster slots list is empty.");
if (System.getProperty(INIT_NO_ERROR_PROPERTY) == null) {
if (slotsInfo.isEmpty()) {
throw new JedisClusterOperationException("Cluster slots list is empty.");
}
if (!checkClusterSlotSequence(slotsInfo)) {
throw new JedisClusterOperationException("Cluster slots have holes.");
}
}
w.lock();
try {
Expand Down Expand Up @@ -319,6 +388,14 @@ public void reset() {
}
}

public void close() {
reset();
if (topologyRefreshExecutor != null) {
logger.info("Cluster topology refresh shutdown, startNodes: {}", startNodes);
topologyRefreshExecutor.shutdownNow();
}
}

public static String getNodeKey(HostAndPort hnp) {
//return hnp.getHost() + ":" + hnp.getPort();
return hnp.toString();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package redis.clients.jedis.providers;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -17,9 +18,9 @@
import redis.clients.jedis.exceptions.JedisClusterOperationException;
import redis.clients.jedis.exceptions.JedisException;

public class ClusterConnectionProvider implements ConnectionProvider {
import static redis.clients.jedis.JedisCluster.INIT_NO_ERROR_PROPERTY;

private static final String INIT_NO_ERROR_PROPERTY = "jedis.cluster.initNoError";
public class ClusterConnectionProvider implements ConnectionProvider {

protected final JedisClusterInfoCache cache;

Expand All @@ -34,6 +35,12 @@ public ClusterConnectionProvider(Set<HostAndPort> clusterNodes, JedisClientConfi
initializeSlotsCache(clusterNodes, clientConfig);
}

public ClusterConnectionProvider(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig, Duration topologyRefreshPeriod) {
this.cache = new JedisClusterInfoCache(clientConfig, poolConfig, clusterNodes, topologyRefreshPeriod);
initializeSlotsCache(clusterNodes, clientConfig);
}

private void initializeSlotsCache(Set<HostAndPort> startNodes, JedisClientConfig clientConfig) {
if (startNodes.isEmpty()) {
throw new JedisClusterOperationException("No nodes to initialize cluster slots cache.");
Expand Down Expand Up @@ -66,7 +73,7 @@ private void initializeSlotsCache(Set<HostAndPort> startNodes, JedisClientConfig

@Override
public void close() {
cache.reset();
cache.close();
}

public void renewSlotCache() {
Expand Down
59 changes: 59 additions & 0 deletions src/test/java/redis/clients/jedis/JedisClusterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,65 @@ public void clusterRefreshNodes() throws Exception {
}
}

@Test(timeout = 30_000)
public void clusterPeriodTopologyRefreshTest() throws Exception {
Set<HostAndPort> jedisClusterNode = new HashSet<>();
jedisClusterNode.add(nodeInfo1);
jedisClusterNode.add(nodeInfo2);
jedisClusterNode.add(nodeInfo3);

// we set topologyRefreshPeriod is 1s
Duration topologyRefreshPeriod = Duration.ofSeconds(1);
try (JedisCluster cluster = new JedisCluster(jedisClusterNode, DEFAULT_CLIENT_CONFIG, DEFAULT_POOL_CONFIG,
topologyRefreshPeriod, DEFAULT_REDIRECTIONS, Duration.ofSeconds(10))) {
assertEquals(3, cluster.getClusterNodes().size());
cleanUp(); // cleanup and add node4

// at first, join node4 to cluster
node1.clusterMeet(LOCAL_IP, nodeInfo2.getPort());
node1.clusterMeet(LOCAL_IP, nodeInfo3.getPort());
node1.clusterMeet(LOCAL_IP, nodeInfo4.getPort());
// split available slots across the three nodes
int slotsPerNode = CLUSTER_HASHSLOTS / 4;
int[] node1Slots = new int[slotsPerNode];
int[] node2Slots = new int[slotsPerNode];
int[] node3Slots = new int[slotsPerNode];
int[] node4Slots = new int[slotsPerNode];
for (int i = 0, slot1 = 0, slot2 = 0, slot3 = 0, slot4 = 0; i < CLUSTER_HASHSLOTS; i++) {
if (i < slotsPerNode) {
node1Slots[slot1++] = i;
} else if (i >= slotsPerNode && i < slotsPerNode*2) {
node2Slots[slot2++] = i;
} else if (i >= slotsPerNode*2 && i < slotsPerNode*3) {
node3Slots[slot3++] = i;
} else {
node4Slots[slot4++] = i;
}
}

node1.clusterAddSlots(node1Slots);
node2.clusterAddSlots(node2Slots);
node3.clusterAddSlots(node3Slots);
node4.clusterAddSlots(node4Slots);
JedisClusterTestUtil.waitForClusterReady(node1, node2, node3, node4);

// Now we just wait topologyRefreshPeriod * 3 (executor will delay) for cluster topology refresh (3 -> 4)
Thread.sleep(topologyRefreshPeriod.toMillis() * 3);

assertEquals(4, cluster.getClusterNodes().size());
String nodeKey4 = LOCAL_IP + ":" + nodeInfo4.getPort();
assertTrue(cluster.getClusterNodes().keySet().contains(nodeKey4));

// make 4 nodes to 3 nodes
cleanUp();
setUp();

// Now we just wait topologyRefreshPeriod * 3 (executor will delay) for cluster topology refresh (4 -> 3)
Thread.sleep(topologyRefreshPeriod.toMillis() * 3);
assertEquals(3, cluster.getClusterNodes().size());
}
}

private static String getNodeServingSlotRange(String infoOutput) {
// f4f3dc4befda352a4e0beccf29f5e8828438705d 127.0.0.1:7380 master - 0
// 1394372400827 0 connected 5461-10922
Expand Down
8 changes: 4 additions & 4 deletions src/test/java/redis/clients/jedis/JedisClusterTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ protected void cleanUp() {
node2.flushDB();
node3.flushDB();
node4.flushDB();
node1.clusterReset(ClusterResetType.SOFT);
node2.clusterReset(ClusterResetType.SOFT);
node3.clusterReset(ClusterResetType.SOFT);
node4.clusterReset(ClusterResetType.SOFT);
node1.clusterReset(ClusterResetType.HARD);
node2.clusterReset(ClusterResetType.HARD);
node3.clusterReset(ClusterResetType.HARD);
node4.clusterReset(ClusterResetType.HARD);
}

@After
Expand Down