Skip to content

Commit

Permalink
Fixes #90 Cluster manager is corrupted after merging of hazelcast par…
Browse files Browse the repository at this point in the history
…tition

The original HZ member uuid is saved as a cluster member attribute.
Consequently, even if HZ changes it during partition merging, HAManager and ClusteredEventBus will keep using the same original uuid.

Note that when member added event is handled by existing cluster nodes, the member attribute may not be saved yet.
In this case, it is safe to use member uuid (at this stage, the member uuid and member attribute are equal).
  • Loading branch information
tsegismont committed Sep 20, 2018
1 parent 1194b8a commit 05ca315
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 26 deletions.
4 changes: 1 addition & 3 deletions src/main/asciidoc/java/index.adoc
Expand Up @@ -166,9 +166,7 @@ Notice that the custom Hazelcast instance need to be configured with:
</semaphore>
----

**IMPORTANT** Do not use Hazelcast clients or smart clients when using high-availability (HA, or fail-over) in your
cluster as they do not notify when they leave the cluster and you may loose data, or leave the cluster in an
inconsistent state. See https://github.com/vert-x3/vertx-hazelcast/issues/24 for more details.
**IMPORTANT** Hazelcast clients or smart clients are not supported.

**IMPORTANT** Make sure Hazelcast is started before and shut down after Vert.x.
Also, the Hazelcast shutdown hook should be disabled (see xml example, or via system property).
Expand Down
Expand Up @@ -55,6 +55,7 @@ public class HazelcastClusterManager implements ClusterManager, MembershipListen
private static final Logger log = LoggerFactory.getLogger(HazelcastClusterManager.class);

private static final String LOCK_SEMAPHORE_PREFIX = "__vertx.";
private static final String NODE_ID_ATTRIBUTE = "__vertx.nodeId";

// Hazelcast config file
private static final String DEFAULT_CONFIG_FILE = "default-cluster.xml";
Expand All @@ -78,7 +79,7 @@ public class HazelcastClusterManager implements ClusterManager, MembershipListen
private String membershipListenerId;
private String lifecycleListenerId;
private boolean customHazelcastCluster;
private Set<Member> members = new HashSet<>();
private Set<String> nodeIds = new HashSet<>();
// Guarded by this
private Set<HazelcastAsyncMultiMap> multimaps = Collections.newSetFromMap(new WeakHashMap<>(1));

Expand Down Expand Up @@ -135,7 +136,9 @@ public synchronized void join(Handler<AsyncResult<Void>> resultHandler) {
hazelcast = Hazelcast.newHazelcastInstance(conf);
}

nodeID = hazelcast.getLocalEndpoint().getUuid();
Member localMember = hazelcast.getCluster().getLocalMember();
nodeID = localMember.getUuid();
localMember.setStringAttribute(NODE_ID_ATTRIBUTE, nodeID);
membershipListenerId = hazelcast.getCluster().addMembershipListener(this);
lifecycleListenerId = hazelcast.getLifecycleService().addLifecycleListener(this);
fut.complete();
Expand Down Expand Up @@ -172,12 +175,12 @@ public String getNodeID() {

@Override
public List<String> getNodes() {
Set<Member> members = hazelcast.getCluster().getMembers();
List<String> lMembers = new ArrayList<>();
for (Member member : members) {
lMembers.add(member.getUuid());
List<String> list = new ArrayList<>();
for (Member member : hazelcast.getCluster().getMembers()) {
String nodeIdAttribute = member.getStringAttribute(NODE_ID_ATTRIBUTE);
list.add(nodeIdAttribute != null ? nodeIdAttribute : member.getUuid());
}
return lMembers;
return list;
}

@Override
Expand Down Expand Up @@ -264,6 +267,11 @@ public void leave(Handler<AsyncResult<Void>> resultHandler) {
Thread.currentThread().interrupt();
}
}

if (customHazelcastCluster) {
hazelcast.getCluster().getLocalMember().removeAttribute(NODE_ID_ATTRIBUTE);
}

} catch (Throwable t) {
fut.fail(t);
}
Expand All @@ -278,12 +286,16 @@ public synchronized void memberAdded(MembershipEvent membershipEvent) {
if (!active) {
return;
}
Member member = membershipEvent.getMember();
String memberNodeId = member.getStringAttribute(NODE_ID_ATTRIBUTE);
if (memberNodeId == null) {
memberNodeId = member.getUuid();
}
try {
multimaps.forEach(HazelcastAsyncMultiMap::clearCache);
if (nodeListener != null) {
Member member = membershipEvent.getMember();
members.add(member);
nodeListener.nodeAdded(member.getUuid());
nodeIds.add(memberNodeId);
nodeListener.nodeAdded(memberNodeId);
}
} catch (Throwable t) {
log.error("Failed to handle memberAdded", t);
Expand All @@ -295,12 +307,16 @@ public synchronized void memberRemoved(MembershipEvent membershipEvent) {
if (!active) {
return;
}
Member member = membershipEvent.getMember();
String memberNodeId = member.getStringAttribute(NODE_ID_ATTRIBUTE);
if (memberNodeId == null) {
memberNodeId = member.getUuid();
}
try {
multimaps.forEach(HazelcastAsyncMultiMap::clearCache);
if (nodeListener != null) {
Member member = membershipEvent.getMember();
members.remove(member);
nodeListener.nodeLeft(member.getUuid());
nodeIds.remove(memberNodeId);
nodeListener.nodeLeft(memberNodeId);
}
} catch (Throwable t) {
log.error("Failed to handle memberRemoved", t);
Expand All @@ -315,18 +331,18 @@ public synchronized void stateChanged(LifecycleEvent lifecycleEvent) {
multimaps.forEach(HazelcastAsyncMultiMap::clearCache);
// Safeguard to make sure members list is OK after a partition merge
if(lifecycleEvent.getState() == LifecycleEvent.LifecycleState.MERGED) {
final Set<Member> currentMembers = hazelcast.getCluster().getMembers();
Set<Member> newMembers = new HashSet<>(currentMembers);
newMembers.removeAll(members);
Set<Member> removedMembers = new HashSet<>(members);
removedMembers.removeAll(currentMembers);
for(Member m : newMembers) {
nodeListener.nodeAdded(m.getUuid());
final List<String> currentNodes = getNodes();
Set<String> newNodes = new HashSet<>(currentNodes);
newNodes.removeAll(nodeIds);
Set<String> removedMembers = new HashSet<>(nodeIds);
removedMembers.removeAll(currentNodes);
for (String nodeId : newNodes) {
nodeListener.nodeAdded(nodeId);
}
for(Member m : removedMembers) {
nodeListener.nodeLeft(m.getUuid());
for (String nodeId : removedMembers) {
nodeListener.nodeLeft(nodeId);
}
members.retainAll(currentMembers);
nodeIds.retainAll(currentNodes);
}
}

Expand Down

0 comments on commit 05ca315

Please sign in to comment.