Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add active detection in sentinel mode,Reconstruct sentinel mode Listener #3547

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
173 changes: 48 additions & 125 deletions src/main/java/redis/clients/jedis/JedisSentinelPool.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package redis.clients.jedis;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
Expand All @@ -14,6 +12,9 @@

import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.sentinel.listener.SentinelActiveDetectListener;
import redis.clients.jedis.sentinel.listener.SentinelListener;
import redis.clients.jedis.sentinel.listener.SentinelSubscribeListener;
import redis.clients.jedis.util.Pool;

public class JedisSentinelPool extends Pool<Jedis> {
Expand All @@ -24,7 +25,7 @@ public class JedisSentinelPool extends Pool<Jedis> {

private final JedisClientConfig sentinelClientConfig;

protected final Collection<MasterListener> masterListeners = new ArrayList<>();
private final Collection<SentinelListener> masterListeners = new ArrayList<>();

private volatile HostAndPort currentHostMaster;

Expand Down Expand Up @@ -181,6 +182,7 @@ public JedisSentinelPool(String masterName, Set<HostAndPort> sentinels,

HostAndPort master = initSentinels(sentinels, masterName);
initMaster(master);
initMasterListeners(sentinels, masterName);
}

public JedisSentinelPool(String masterName, Set<HostAndPort> sentinels,
Expand All @@ -193,6 +195,47 @@ public JedisSentinelPool(String masterName, Set<HostAndPort> sentinels,

HostAndPort master = initSentinels(sentinels, masterName);
initMaster(master);
initMasterListeners(sentinels, masterName, poolConfig);
}

private void initMasterListeners(Set<HostAndPort> sentinels, String masterName) {
initMasterListeners(sentinels, masterName, null);
}

private void initMasterListeners(Set<HostAndPort> sentinels, String masterName,
GenericObjectPoolConfig<Jedis> poolConfig) {

LOG.info("Starting Sentinel listeners for {}...", masterName);
SentinelPoolConfig jedisSentinelPoolConfig = null;
if (poolConfig instanceof SentinelPoolConfig) {
jedisSentinelPoolConfig = ((SentinelPoolConfig) poolConfig);
} else {
jedisSentinelPoolConfig = new SentinelPoolConfig();
}

for (HostAndPort sentinel : sentinels) {
if (jedisSentinelPoolConfig.isEnableActiveDetectListener()) {
masterListeners.add(
new SentinelActiveDetectListener(currentHostMaster, sentinel, sentinelClientConfig,
masterName, jedisSentinelPoolConfig.getActiveDetectIntervalTimeMillis()) {
@Override
public void onChange(HostAndPort hostAndPort) {
initMaster(hostAndPort);
}
});
}

if (jedisSentinelPoolConfig.isEnableDefaultSubscribeListener()) {
masterListeners.add(new SentinelSubscribeListener(masterName, sentinel,
sentinelClientConfig, jedisSentinelPoolConfig.getSubscribeRetryWaitTimeMillis()) {
@Override
public void onChange(HostAndPort hostAndPort) {
initMaster(hostAndPort);
}
});
}
}
masterListeners.forEach(SentinelListener::start);
}

private static Set<HostAndPort> parseHostAndPorts(Set<String> strings) {
Expand All @@ -201,10 +244,7 @@ private static Set<HostAndPort> parseHostAndPorts(Set<String> strings) {

@Override
public void destroy() {
for (MasterListener m : masterListeners) {
m.shutdown();
}

masterListeners.forEach(SentinelListener::shutdown);
super.destroy();
}

Expand Down Expand Up @@ -271,16 +311,7 @@ private HostAndPort initSentinels(Set<HostAndPort> sentinels, final String maste
}
}

LOG.info("Redis master running at {}, starting Sentinel listeners...", master);

for (HostAndPort sentinel : sentinels) {

MasterListener masterListener = new MasterListener(masterName, sentinel.getHost(), sentinel.getPort());
// whether MasterListener threads are alive or not, process can be stopped
masterListener.setDaemon(true);
masterListeners.add(masterListener);
masterListener.start();
}
LOG.info("Redis master running at {}", master);

return master;
}
Expand Down Expand Up @@ -324,112 +355,4 @@ public void returnResource(final Jedis resource) {
}
}

protected class MasterListener extends Thread {

protected String masterName;
protected String host;
protected int port;
protected long subscribeRetryWaitTimeMillis = 5000;
protected volatile Jedis j;
protected AtomicBoolean running = new AtomicBoolean(false);

protected MasterListener() {
}

public MasterListener(String masterName, String host, int port) {
super(String.format("MasterListener-%s-[%s:%d]", masterName, host, port));
this.masterName = masterName;
this.host = host;
this.port = port;
}

public MasterListener(String masterName, String host, int port,
long subscribeRetryWaitTimeMillis) {
this(masterName, host, port);
this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis;
}

@Override
public void run() {

running.set(true);

while (running.get()) {

try {
// double check that it is not being shutdown
if (!running.get()) {
break;
}

final HostAndPort hostPort = new HostAndPort(host, port);
j = new Jedis(hostPort, sentinelClientConfig);

// code for active refresh
List<String> masterAddr = j.sentinelGetMasterAddrByName(masterName);
if (masterAddr == null || masterAddr.size() != 2) {
LOG.warn("Can not get master addr, master name: {}. Sentinel: {}.", masterName,
hostPort);
} else {
initMaster(toHostAndPort(masterAddr));
}

j.subscribe(new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
LOG.debug("Sentinel {} published: {}.", hostPort, message);

String[] switchMasterMsg = message.split(" ");

if (switchMasterMsg.length > 3) {

if (masterName.equals(switchMasterMsg[0])) {
initMaster(toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4])));
} else {
LOG.debug(
"Ignoring message on +switch-master for master name {}, our master name is {}",
switchMasterMsg[0], masterName);
}

} else {
LOG.error("Invalid message received on Sentinel {} on channel +switch-master: {}",
hostPort, message);
}
}
}, "+switch-master");

} catch (JedisException e) {

if (running.get()) {
LOG.error("Lost connection to Sentinel at {}:{}. Sleeping 5000ms and retrying.", host,
port, e);
try {
Thread.sleep(subscribeRetryWaitTimeMillis);
} catch (InterruptedException e1) {
LOG.error("Sleep interrupted: ", e1);
}
} else {
LOG.debug("Unsubscribing from Sentinel at {}:{}", host, port);
}
} finally {
if (j != null) {
j.close();
}
}
}
}

public void shutdown() {
try {
LOG.debug("Shutting down listener on {}:{}", host, port);
running.set(false);
// This isn't good, the Jedis object is not thread safe
if (j != null) {
j.close();
}
} catch (RuntimeException e) {
LOG.error("Caught exception while shutting down: ", e);
}
}
}
}
44 changes: 44 additions & 0 deletions src/main/java/redis/clients/jedis/SentinelPoolConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package redis.clients.jedis;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

public class SentinelPoolConfig extends GenericObjectPoolConfig {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GenericObjectPoolConfig takes a type argument. Declaring a sub-class without the type argument is anti-pattern.

Idea: Just declare this interface separately without sub-classing it; with a more appropriate name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be sub-class is the best way to avoid add more Constructor method . currently threse are 24 Constructor method for JedisSentinelPool.
use sub-class will not break the Constructor parameter type GenericObjectPoolConfig.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that's your concern, you can introduce a Builder.


private boolean enableActiveDetectListener = false;
private long activeDetectIntervalTimeMillis = 5 * 1000;

private boolean enableDefaultSubscribeListener = true;
private long subscribeRetryWaitTimeMillis = 5 * 1000;

public boolean isEnableActiveDetectListener() {
return enableActiveDetectListener;
}

public void setEnableActiveDetectListener(boolean enableActiveDetectListener) {
this.enableActiveDetectListener = enableActiveDetectListener;
}

public long getActiveDetectIntervalTimeMillis() {
return activeDetectIntervalTimeMillis;
}

public void setActiveDetectIntervalTimeMillis(long activeDetectIntervalTimeMillis) {
this.activeDetectIntervalTimeMillis = activeDetectIntervalTimeMillis;
}

public boolean isEnableDefaultSubscribeListener() {
return enableDefaultSubscribeListener;
}

public void setEnableDefaultSubscribeListener(boolean enableDefaultSubscribeListener) {
this.enableDefaultSubscribeListener = enableDefaultSubscribeListener;
}

public long getSubscribeRetryWaitTimeMillis() {
return subscribeRetryWaitTimeMillis;
}

public void setSubscribeRetryWaitTimeMillis(long subscribeRetryWaitTimeMillis) {
this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis;
}
}