From 05ca315f7695909df0a70440b02d4d10deb4a169 Mon Sep 17 00:00:00 2001 From: Thomas Segismont Date: Wed, 19 Sep 2018 16:53:53 +0200 Subject: [PATCH] Fixes #90 Cluster manager is corrupted after merging of hazelcast partition The original HZ member uuid is saved as a cluster member attribute. Consequently, even if HZ changes it during partition merging, HAManager and ClusteredEventBus will keep using the same original uuid. Note that when member added event is handled by existing cluster nodes, the member attribute may not be saved yet. In this case, it is safe to use member uuid (at this stage, the member uuid and member attribute are equal). --- src/main/asciidoc/java/index.adoc | 4 +- .../hazelcast/HazelcastClusterManager.java | 62 ++++++++++++------- 2 files changed, 40 insertions(+), 26 deletions(-) diff --git a/src/main/asciidoc/java/index.adoc b/src/main/asciidoc/java/index.adoc index f60dd798..52dafe73 100644 --- a/src/main/asciidoc/java/index.adoc +++ b/src/main/asciidoc/java/index.adoc @@ -166,9 +166,7 @@ Notice that the custom Hazelcast instance need to be configured with: ---- -**IMPORTANT** Do not use Hazelcast clients or smart clients when using high-availability (HA, or fail-over) in your -cluster as they do not notify when they leave the cluster and you may loose data, or leave the cluster in an -inconsistent state. See https://github.com/vert-x3/vertx-hazelcast/issues/24 for more details. +**IMPORTANT** Hazelcast clients or smart clients are not supported. **IMPORTANT** Make sure Hazelcast is started before and shut down after Vert.x. Also, the Hazelcast shutdown hook should be disabled (see xml example, or via system property). diff --git a/src/main/java/io/vertx/spi/cluster/hazelcast/HazelcastClusterManager.java b/src/main/java/io/vertx/spi/cluster/hazelcast/HazelcastClusterManager.java index 74b66106..3c93d893 100644 --- a/src/main/java/io/vertx/spi/cluster/hazelcast/HazelcastClusterManager.java +++ b/src/main/java/io/vertx/spi/cluster/hazelcast/HazelcastClusterManager.java @@ -55,6 +55,7 @@ public class HazelcastClusterManager implements ClusterManager, MembershipListen private static final Logger log = LoggerFactory.getLogger(HazelcastClusterManager.class); private static final String LOCK_SEMAPHORE_PREFIX = "__vertx."; + private static final String NODE_ID_ATTRIBUTE = "__vertx.nodeId"; // Hazelcast config file private static final String DEFAULT_CONFIG_FILE = "default-cluster.xml"; @@ -78,7 +79,7 @@ public class HazelcastClusterManager implements ClusterManager, MembershipListen private String membershipListenerId; private String lifecycleListenerId; private boolean customHazelcastCluster; - private Set members = new HashSet<>(); + private Set nodeIds = new HashSet<>(); // Guarded by this private Set multimaps = Collections.newSetFromMap(new WeakHashMap<>(1)); @@ -135,7 +136,9 @@ public synchronized void join(Handler> resultHandler) { hazelcast = Hazelcast.newHazelcastInstance(conf); } - nodeID = hazelcast.getLocalEndpoint().getUuid(); + Member localMember = hazelcast.getCluster().getLocalMember(); + nodeID = localMember.getUuid(); + localMember.setStringAttribute(NODE_ID_ATTRIBUTE, nodeID); membershipListenerId = hazelcast.getCluster().addMembershipListener(this); lifecycleListenerId = hazelcast.getLifecycleService().addLifecycleListener(this); fut.complete(); @@ -172,12 +175,12 @@ public String getNodeID() { @Override public List getNodes() { - Set members = hazelcast.getCluster().getMembers(); - List lMembers = new ArrayList<>(); - for (Member member : members) { - lMembers.add(member.getUuid()); + List list = new ArrayList<>(); + for (Member member : hazelcast.getCluster().getMembers()) { + String nodeIdAttribute = member.getStringAttribute(NODE_ID_ATTRIBUTE); + list.add(nodeIdAttribute != null ? nodeIdAttribute : member.getUuid()); } - return lMembers; + return list; } @Override @@ -264,6 +267,11 @@ public void leave(Handler> resultHandler) { Thread.currentThread().interrupt(); } } + + if (customHazelcastCluster) { + hazelcast.getCluster().getLocalMember().removeAttribute(NODE_ID_ATTRIBUTE); + } + } catch (Throwable t) { fut.fail(t); } @@ -278,12 +286,16 @@ public synchronized void memberAdded(MembershipEvent membershipEvent) { if (!active) { return; } + Member member = membershipEvent.getMember(); + String memberNodeId = member.getStringAttribute(NODE_ID_ATTRIBUTE); + if (memberNodeId == null) { + memberNodeId = member.getUuid(); + } try { multimaps.forEach(HazelcastAsyncMultiMap::clearCache); if (nodeListener != null) { - Member member = membershipEvent.getMember(); - members.add(member); - nodeListener.nodeAdded(member.getUuid()); + nodeIds.add(memberNodeId); + nodeListener.nodeAdded(memberNodeId); } } catch (Throwable t) { log.error("Failed to handle memberAdded", t); @@ -295,12 +307,16 @@ public synchronized void memberRemoved(MembershipEvent membershipEvent) { if (!active) { return; } + Member member = membershipEvent.getMember(); + String memberNodeId = member.getStringAttribute(NODE_ID_ATTRIBUTE); + if (memberNodeId == null) { + memberNodeId = member.getUuid(); + } try { multimaps.forEach(HazelcastAsyncMultiMap::clearCache); if (nodeListener != null) { - Member member = membershipEvent.getMember(); - members.remove(member); - nodeListener.nodeLeft(member.getUuid()); + nodeIds.remove(memberNodeId); + nodeListener.nodeLeft(memberNodeId); } } catch (Throwable t) { log.error("Failed to handle memberRemoved", t); @@ -315,18 +331,18 @@ public synchronized void stateChanged(LifecycleEvent lifecycleEvent) { multimaps.forEach(HazelcastAsyncMultiMap::clearCache); // Safeguard to make sure members list is OK after a partition merge if(lifecycleEvent.getState() == LifecycleEvent.LifecycleState.MERGED) { - final Set currentMembers = hazelcast.getCluster().getMembers(); - Set newMembers = new HashSet<>(currentMembers); - newMembers.removeAll(members); - Set removedMembers = new HashSet<>(members); - removedMembers.removeAll(currentMembers); - for(Member m : newMembers) { - nodeListener.nodeAdded(m.getUuid()); + final List currentNodes = getNodes(); + Set newNodes = new HashSet<>(currentNodes); + newNodes.removeAll(nodeIds); + Set removedMembers = new HashSet<>(nodeIds); + removedMembers.removeAll(currentNodes); + for (String nodeId : newNodes) { + nodeListener.nodeAdded(nodeId); } - for(Member m : removedMembers) { - nodeListener.nodeLeft(m.getUuid()); + for (String nodeId : removedMembers) { + nodeListener.nodeLeft(nodeId); } - members.retainAll(currentMembers); + nodeIds.retainAll(currentNodes); } }