Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add active detection in sentinel mode,Reconstruct sentinel mode Listener #3547

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
170 changes: 45 additions & 125 deletions src/main/java/redis/clients/jedis/JedisSentinelPool.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package redis.clients.jedis;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
Expand All @@ -24,7 +22,7 @@ public class JedisSentinelPool extends Pool<Jedis> {

private final JedisClientConfig sentinelClientConfig;

protected final Collection<MasterListener> masterListeners = new ArrayList<>();
private final Collection<SentinelMasterListener> masterListeners = new ArrayList<>();

private volatile HostAndPort currentHostMaster;

Expand Down Expand Up @@ -181,6 +179,7 @@ public JedisSentinelPool(String masterName, Set<HostAndPort> sentinels,

HostAndPort master = initSentinels(sentinels, masterName);
initMaster(master);
initMasterListeners(sentinels, masterName);
}

public JedisSentinelPool(String masterName, Set<HostAndPort> sentinels,
Expand All @@ -193,6 +192,47 @@ public JedisSentinelPool(String masterName, Set<HostAndPort> sentinels,

HostAndPort master = initSentinels(sentinels, masterName);
initMaster(master);
initMasterListeners(sentinels, masterName, poolConfig);
}

private void initMasterListeners(Set<HostAndPort> sentinels, String masterName) {
initMasterListeners(sentinels, masterName, null);
}

private void initMasterListeners(Set<HostAndPort> sentinels, String masterName,
GenericObjectPoolConfig<Jedis> poolConfig) {

LOG.info("Starting Sentinel listeners for {}...", masterName);
SentinelPoolConfig jedisSentinelPoolConfig = null;
if (poolConfig instanceof SentinelPoolConfig) {
jedisSentinelPoolConfig = ((SentinelPoolConfig) poolConfig);
} else {
jedisSentinelPoolConfig = new SentinelPoolConfig();
}

for (HostAndPort sentinel : sentinels) {
if (jedisSentinelPoolConfig.isEnableActiveDetectListener()) {
masterListeners.add(
new SentinelMasterActiveDetectListener(currentHostMaster, sentinel, sentinelClientConfig,
masterName, jedisSentinelPoolConfig.getActiveDetectIntervalTimeMillis()) {
@Override
public void onChange(HostAndPort hostAndPort) {
initMaster(hostAndPort);
}
});
}

if (jedisSentinelPoolConfig.isEnableDefaultSubscribeListener()) {
masterListeners.add(new SentinelMasterSubscribeListener(masterName, sentinel,
sentinelClientConfig, jedisSentinelPoolConfig.getSubscribeRetryWaitTimeMillis()) {
@Override
public void onChange(HostAndPort hostAndPort) {
initMaster(hostAndPort);
}
});
}
}
masterListeners.forEach(SentinelMasterListener::start);
}

private static Set<HostAndPort> parseHostAndPorts(Set<String> strings) {
Expand All @@ -201,10 +241,7 @@ private static Set<HostAndPort> parseHostAndPorts(Set<String> strings) {

@Override
public void destroy() {
for (MasterListener m : masterListeners) {
m.shutdown();
}

masterListeners.forEach(SentinelMasterListener::shutdown);
super.destroy();
}

Expand Down Expand Up @@ -271,16 +308,7 @@ private HostAndPort initSentinels(Set<HostAndPort> sentinels, final String maste
}
}

LOG.info("Redis master running at {}, starting Sentinel listeners...", master);

for (HostAndPort sentinel : sentinels) {

MasterListener masterListener = new MasterListener(masterName, sentinel.getHost(), sentinel.getPort());
// whether MasterListener threads are alive or not, process can be stopped
masterListener.setDaemon(true);
masterListeners.add(masterListener);
masterListener.start();
}
LOG.info("Redis master running at {}", master);

return master;
}
Expand Down Expand Up @@ -324,112 +352,4 @@ public void returnResource(final Jedis resource) {
}
}

protected class MasterListener extends Thread {

protected String masterName;
protected String host;
protected int port;
protected long subscribeRetryWaitTimeMillis = 5000;
protected volatile Jedis j;
protected AtomicBoolean running = new AtomicBoolean(false);

protected MasterListener() {
}

public MasterListener(String masterName, String host, int port) {
super(String.format("MasterListener-%s-[%s:%d]", masterName, host, port));
this.masterName = masterName;
this.host = host;
this.port = port;
}

public MasterListener(String masterName, String host, int port,
long subscribeRetryWaitTimeMillis) {
this(masterName, host, port);
this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis;
}

@Override
public void run() {

running.set(true);

while (running.get()) {

try {
// double check that it is not being shutdown
if (!running.get()) {
break;
}

final HostAndPort hostPort = new HostAndPort(host, port);
j = new Jedis(hostPort, sentinelClientConfig);

// code for active refresh
List<String> masterAddr = j.sentinelGetMasterAddrByName(masterName);
if (masterAddr == null || masterAddr.size() != 2) {
LOG.warn("Can not get master addr, master name: {}. Sentinel: {}.", masterName,
hostPort);
} else {
initMaster(toHostAndPort(masterAddr));
}

j.subscribe(new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
LOG.debug("Sentinel {} published: {}.", hostPort, message);

String[] switchMasterMsg = message.split(" ");

if (switchMasterMsg.length > 3) {

if (masterName.equals(switchMasterMsg[0])) {
initMaster(toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4])));
} else {
LOG.debug(
"Ignoring message on +switch-master for master name {}, our master name is {}",
switchMasterMsg[0], masterName);
}

} else {
LOG.error("Invalid message received on Sentinel {} on channel +switch-master: {}",
hostPort, message);
}
}
}, "+switch-master");

} catch (JedisException e) {

if (running.get()) {
LOG.error("Lost connection to Sentinel at {}:{}. Sleeping 5000ms and retrying.", host,
port, e);
try {
Thread.sleep(subscribeRetryWaitTimeMillis);
} catch (InterruptedException e1) {
LOG.error("Sleep interrupted: ", e1);
}
} else {
LOG.debug("Unsubscribing from Sentinel at {}:{}", host, port);
}
} finally {
if (j != null) {
j.close();
}
}
}
}

public void shutdown() {
try {
LOG.debug("Shutting down listener on {}:{}", host, port);
running.set(false);
// This isn't good, the Jedis object is not thread safe
if (j != null) {
j.close();
}
} catch (RuntimeException e) {
LOG.error("Caught exception while shutting down: ", e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package redis.clients.jedis;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* active detect master node .in case of the subscribe message lost
* @see SentinelMasterSubscribeListener subscribe failover message from "+switch-master" channel
*/
public abstract class SentinelMasterActiveDetectListener extends Thread
chenshi5012 marked this conversation as resolved.
Show resolved Hide resolved
implements SentinelMasterListener {

private static final Logger LOG = LoggerFactory
.getLogger(SentinelMasterActiveDetectListener.class);

private List<String> currentHostMaster;
private HostAndPort sentinel;
private JedisClientConfig jedisClientConfig;
private String masterName;
private long activeDetectIntervalTimeMillis = 5 * 1000;

private AtomicBoolean running = new AtomicBoolean(false);
private volatile Jedis j;

public SentinelMasterActiveDetectListener(HostAndPort currentHostMaster, HostAndPort sentinel,
JedisClientConfig jedisClientConfig, String masterName, long activeDetectIntervalTimeMillis) {
super(String.format("SentinelMasterActiveDetectListener-%s-[%s:%d]", masterName,
sentinel.getHost(), sentinel.getPort()));
this.currentHostMaster = Arrays.asList(currentHostMaster.getHost(),
String.valueOf(currentHostMaster.getPort()));
this.sentinel = sentinel;
this.jedisClientConfig = jedisClientConfig;
this.masterName = masterName;
this.activeDetectIntervalTimeMillis = activeDetectIntervalTimeMillis;
this.setDaemon(true);
}

@Override
public void shutdown() {
LOG.info("Shutting down active detect listener on {}", sentinel);
running.set(false);
if (j != null) {
j.close();
}
}

@Override
public void run() {
LOG.info("Start active detect listener on {},interval {} ms", sentinel,
activeDetectIntervalTimeMillis);
running.set(true);
j = new Jedis(sentinel, jedisClientConfig);
while (running.get()) {
try {
Thread.sleep(activeDetectIntervalTimeMillis);

if (j == null || j.isBroken() || !j.isConnected()) {
j = new Jedis(sentinel, jedisClientConfig);
}

List<String> masterAddr = j.sentinelGetMasterAddrByName(masterName);
if (masterAddr == null || masterAddr.size() != 2) {
LOG.warn("Can not get master addr, master name: {}. Sentinel: {}", masterName, sentinel);
continue;
}

if (!currentHostMaster.equals(masterAddr)) {
LOG.info("Found master node change from {} to{} ", currentHostMaster, masterAddr);
onChange(new HostAndPort(masterAddr.get(0), Integer.parseInt(masterAddr.get(1))));
this.currentHostMaster = masterAddr;
}
} catch (Exception e) {
// TO ensure the thread running, catch all exception
LOG.error("Active detect listener failed ", e);
}
}
}

public abstract void onChange(HostAndPort hostAndPort);
}
14 changes: 14 additions & 0 deletions src/main/java/redis/clients/jedis/SentinelMasterListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package redis.clients.jedis;

/**
* interface for monitor the master failover under sentinel mode We offer two implementation options
* @see SentinelMasterSubscribeListener Passive subscription
* @see SentinelMasterActiveDetectListener Active detection
*/
public interface SentinelMasterListener {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check comment in implementing classes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix

void start();

void shutdown();

void onChange(HostAndPort hostAndPort);
}