diff --git a/src/main/asciidoc/java/index.adoc b/src/main/asciidoc/java/index.adoc index f60dd798..52dafe73 100644 --- a/src/main/asciidoc/java/index.adoc +++ b/src/main/asciidoc/java/index.adoc @@ -166,9 +166,7 @@ Notice that the custom Hazelcast instance need to be configured with: ---- -**IMPORTANT** Do not use Hazelcast clients or smart clients when using high-availability (HA, or fail-over) in your -cluster as they do not notify when they leave the cluster and you may loose data, or leave the cluster in an -inconsistent state. See https://github.com/vert-x3/vertx-hazelcast/issues/24 for more details. +**IMPORTANT** Hazelcast clients or smart clients are not supported. **IMPORTANT** Make sure Hazelcast is started before and shut down after Vert.x. Also, the Hazelcast shutdown hook should be disabled (see xml example, or via system property). diff --git a/src/main/java/io/vertx/spi/cluster/hazelcast/HazelcastClusterManager.java b/src/main/java/io/vertx/spi/cluster/hazelcast/HazelcastClusterManager.java index 74b66106..3c93d893 100644 --- a/src/main/java/io/vertx/spi/cluster/hazelcast/HazelcastClusterManager.java +++ b/src/main/java/io/vertx/spi/cluster/hazelcast/HazelcastClusterManager.java @@ -55,6 +55,7 @@ public class HazelcastClusterManager implements ClusterManager, MembershipListen private static final Logger log = LoggerFactory.getLogger(HazelcastClusterManager.class); private static final String LOCK_SEMAPHORE_PREFIX = "__vertx."; + private static final String NODE_ID_ATTRIBUTE = "__vertx.nodeId"; // Hazelcast config file private static final String DEFAULT_CONFIG_FILE = "default-cluster.xml"; @@ -78,7 +79,7 @@ public class HazelcastClusterManager implements ClusterManager, MembershipListen private String membershipListenerId; private String lifecycleListenerId; private boolean customHazelcastCluster; - private Set members = new HashSet<>(); + private Set nodeIds = new HashSet<>(); // Guarded by this private Set multimaps = Collections.newSetFromMap(new WeakHashMap<>(1)); @@ -135,7 +136,9 @@ public synchronized void join(Handler> resultHandler) { hazelcast = Hazelcast.newHazelcastInstance(conf); } - nodeID = hazelcast.getLocalEndpoint().getUuid(); + Member localMember = hazelcast.getCluster().getLocalMember(); + nodeID = localMember.getUuid(); + localMember.setStringAttribute(NODE_ID_ATTRIBUTE, nodeID); membershipListenerId = hazelcast.getCluster().addMembershipListener(this); lifecycleListenerId = hazelcast.getLifecycleService().addLifecycleListener(this); fut.complete(); @@ -172,12 +175,12 @@ public String getNodeID() { @Override public List getNodes() { - Set members = hazelcast.getCluster().getMembers(); - List lMembers = new ArrayList<>(); - for (Member member : members) { - lMembers.add(member.getUuid()); + List list = new ArrayList<>(); + for (Member member : hazelcast.getCluster().getMembers()) { + String nodeIdAttribute = member.getStringAttribute(NODE_ID_ATTRIBUTE); + list.add(nodeIdAttribute != null ? nodeIdAttribute : member.getUuid()); } - return lMembers; + return list; } @Override @@ -264,6 +267,11 @@ public void leave(Handler> resultHandler) { Thread.currentThread().interrupt(); } } + + if (customHazelcastCluster) { + hazelcast.getCluster().getLocalMember().removeAttribute(NODE_ID_ATTRIBUTE); + } + } catch (Throwable t) { fut.fail(t); } @@ -278,12 +286,16 @@ public synchronized void memberAdded(MembershipEvent membershipEvent) { if (!active) { return; } + Member member = membershipEvent.getMember(); + String memberNodeId = member.getStringAttribute(NODE_ID_ATTRIBUTE); + if (memberNodeId == null) { + memberNodeId = member.getUuid(); + } try { multimaps.forEach(HazelcastAsyncMultiMap::clearCache); if (nodeListener != null) { - Member member = membershipEvent.getMember(); - members.add(member); - nodeListener.nodeAdded(member.getUuid()); + nodeIds.add(memberNodeId); + nodeListener.nodeAdded(memberNodeId); } } catch (Throwable t) { log.error("Failed to handle memberAdded", t); @@ -295,12 +307,16 @@ public synchronized void memberRemoved(MembershipEvent membershipEvent) { if (!active) { return; } + Member member = membershipEvent.getMember(); + String memberNodeId = member.getStringAttribute(NODE_ID_ATTRIBUTE); + if (memberNodeId == null) { + memberNodeId = member.getUuid(); + } try { multimaps.forEach(HazelcastAsyncMultiMap::clearCache); if (nodeListener != null) { - Member member = membershipEvent.getMember(); - members.remove(member); - nodeListener.nodeLeft(member.getUuid()); + nodeIds.remove(memberNodeId); + nodeListener.nodeLeft(memberNodeId); } } catch (Throwable t) { log.error("Failed to handle memberRemoved", t); @@ -315,18 +331,18 @@ public synchronized void stateChanged(LifecycleEvent lifecycleEvent) { multimaps.forEach(HazelcastAsyncMultiMap::clearCache); // Safeguard to make sure members list is OK after a partition merge if(lifecycleEvent.getState() == LifecycleEvent.LifecycleState.MERGED) { - final Set currentMembers = hazelcast.getCluster().getMembers(); - Set newMembers = new HashSet<>(currentMembers); - newMembers.removeAll(members); - Set removedMembers = new HashSet<>(members); - removedMembers.removeAll(currentMembers); - for(Member m : newMembers) { - nodeListener.nodeAdded(m.getUuid()); + final List currentNodes = getNodes(); + Set newNodes = new HashSet<>(currentNodes); + newNodes.removeAll(nodeIds); + Set removedMembers = new HashSet<>(nodeIds); + removedMembers.removeAll(currentNodes); + for (String nodeId : newNodes) { + nodeListener.nodeAdded(nodeId); } - for(Member m : removedMembers) { - nodeListener.nodeLeft(m.getUuid()); + for (String nodeId : removedMembers) { + nodeListener.nodeLeft(nodeId); } - members.retainAll(currentMembers); + nodeIds.retainAll(currentNodes); } }