diff --git a/src/main/asciidoc/index.adoc b/src/main/asciidoc/index.adoc index adc2d272..38be41f8 100644 --- a/src/main/asciidoc/index.adoc +++ b/src/main/asciidoc/index.adoc @@ -135,9 +135,7 @@ Notice that the custom Hazelcast instance need to be configured with: ---- -**IMPORTANT** Do not use Hazelcast clients or smart clients when using high-availability (HA, or fail-over) in your -cluster as they do not notify when they leave the cluster and you may loose data, or leave the cluster in an -inconsistent state. See https://github.com/vert-x3/vertx-hazelcast/issues/24 for more details. +**IMPORTANT** Hazelcast clients or smart clients are not supported. **IMPORTANT** Make sure Hazelcast is started before and shut down after Vert.x. Also, the Hazelcast shutdown hook should be disabled (see xml example, or via system property). diff --git a/src/main/java/io/vertx/spi/cluster/hazelcast/HazelcastClusterManager.java b/src/main/java/io/vertx/spi/cluster/hazelcast/HazelcastClusterManager.java index 2da9ee24..322d2540 100644 --- a/src/main/java/io/vertx/spi/cluster/hazelcast/HazelcastClusterManager.java +++ b/src/main/java/io/vertx/spi/cluster/hazelcast/HazelcastClusterManager.java @@ -54,6 +54,7 @@ public class HazelcastClusterManager implements ClusterManager, MembershipListen private static final Logger log = LoggerFactory.getLogger(HazelcastClusterManager.class); private static final String LOCK_SEMAPHORE_PREFIX = "__vertx."; + private static final String NODE_ID_ATTRIBUTE = "__vertx.nodeId"; // Hazelcast config file private static final String DEFAULT_CONFIG_FILE = "default-cluster.xml"; @@ -77,7 +78,7 @@ public class HazelcastClusterManager implements ClusterManager, MembershipListen private String membershipListenerId; private String lifecycleListenerId; private boolean customHazelcastCluster; - private Set members = new HashSet<>(); + private Set nodeIds = new HashSet<>(); // Guarded by this private Set multimaps = Collections.newSetFromMap(new WeakHashMap<>(1)); @@ -134,7 +135,9 @@ public synchronized void join(Handler> resultHandler) { hazelcast = Hazelcast.newHazelcastInstance(conf); } - nodeID = hazelcast.getLocalEndpoint().getUuid(); + Member localMember = hazelcast.getCluster().getLocalMember(); + nodeID = localMember.getUuid(); + localMember.setStringAttribute(NODE_ID_ATTRIBUTE, nodeID); membershipListenerId = hazelcast.getCluster().addMembershipListener(this); lifecycleListenerId = hazelcast.getLifecycleService().addLifecycleListener(this); fut.complete(); @@ -171,12 +174,12 @@ public String getNodeID() { @Override public List getNodes() { - Set members = hazelcast.getCluster().getMembers(); - List lMembers = new ArrayList<>(); - for (Member member : members) { - lMembers.add(member.getUuid()); + List list = new ArrayList<>(); + for (Member member : hazelcast.getCluster().getMembers()) { + String nodeIdAttribute = member.getStringAttribute(NODE_ID_ATTRIBUTE); + list.add(nodeIdAttribute != null ? nodeIdAttribute : member.getUuid()); } - return lMembers; + return list; } @Override @@ -261,6 +264,11 @@ public void leave(Handler> resultHandler) { Thread.currentThread().interrupt(); } } + + if (customHazelcastCluster) { + hazelcast.getCluster().getLocalMember().removeAttribute(NODE_ID_ATTRIBUTE); + } + } catch (Throwable t) { fut.fail(t); } @@ -275,12 +283,16 @@ public synchronized void memberAdded(MembershipEvent membershipEvent) { if (!active) { return; } + Member member = membershipEvent.getMember(); + String memberNodeId = member.getStringAttribute(NODE_ID_ATTRIBUTE); + if (memberNodeId == null) { + memberNodeId = member.getUuid(); + } try { multimaps.forEach(HazelcastAsyncMultiMap::clearCache); if (nodeListener != null) { - Member member = membershipEvent.getMember(); - members.add(member); - nodeListener.nodeAdded(member.getUuid()); + nodeIds.add(memberNodeId); + nodeListener.nodeAdded(memberNodeId); } } catch (Throwable t) { log.error("Failed to handle memberAdded", t); @@ -292,12 +304,16 @@ public synchronized void memberRemoved(MembershipEvent membershipEvent) { if (!active) { return; } + Member member = membershipEvent.getMember(); + String memberNodeId = member.getStringAttribute(NODE_ID_ATTRIBUTE); + if (memberNodeId == null) { + memberNodeId = member.getUuid(); + } try { multimaps.forEach(HazelcastAsyncMultiMap::clearCache); if (nodeListener != null) { - Member member = membershipEvent.getMember(); - members.remove(member); - nodeListener.nodeLeft(member.getUuid()); + nodeIds.remove(memberNodeId); + nodeListener.nodeLeft(memberNodeId); } } catch (Throwable t) { log.error("Failed to handle memberRemoved", t); @@ -312,18 +328,18 @@ public synchronized void stateChanged(LifecycleEvent lifecycleEvent) { multimaps.forEach(HazelcastAsyncMultiMap::clearCache); // Safeguard to make sure members list is OK after a partition merge if(lifecycleEvent.getState() == LifecycleEvent.LifecycleState.MERGED) { - final Set currentMembers = hazelcast.getCluster().getMembers(); - Set newMembers = new HashSet<>(currentMembers); - newMembers.removeAll(members); - Set removedMembers = new HashSet<>(members); - removedMembers.removeAll(currentMembers); - for(Member m : newMembers) { - nodeListener.nodeAdded(m.getUuid()); + final List currentNodes = getNodes(); + Set newNodes = new HashSet<>(currentNodes); + newNodes.removeAll(nodeIds); + Set removedMembers = new HashSet<>(nodeIds); + removedMembers.removeAll(currentNodes); + for (String nodeId : newNodes) { + nodeListener.nodeAdded(nodeId); } - for(Member m : removedMembers) { - nodeListener.nodeLeft(m.getUuid()); + for (String nodeId : removedMembers) { + nodeListener.nodeLeft(nodeId); } - members.retainAll(currentMembers); + nodeIds.retainAll(currentNodes); } } diff --git a/src/test/java/io/vertx/core/ProgrammaticHazelcastClusterManagerTest.java b/src/test/java/io/vertx/core/ProgrammaticHazelcastClusterManagerTest.java index 0e99e07a..f5037d1e 100644 --- a/src/test/java/io/vertx/core/ProgrammaticHazelcastClusterManagerTest.java +++ b/src/test/java/io/vertx/core/ProgrammaticHazelcastClusterManagerTest.java @@ -16,8 +16,6 @@ package io.vertx.core; -import com.hazelcast.client.HazelcastClient; -import com.hazelcast.client.config.ClientConfig; import com.hazelcast.config.Config; import com.hazelcast.config.GroupConfig; import com.hazelcast.core.*; @@ -257,72 +255,6 @@ public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) { assertWaitUntil(() -> vertx1.get() == null); } - @Test - public void testSharedDataUsingCustomHazelcastClients() throws Exception { - HazelcastInstance dataNode1 = Hazelcast.newHazelcastInstance(createConfig()); - HazelcastInstance dataNode2 = Hazelcast.newHazelcastInstance(createConfig()); - - ClientConfig clientConfig = new ClientConfig().setGroupConfig(new GroupConfig() - .setName(System.getProperty("vertx.hazelcast.test.group.name")) - .setPassword(System.getProperty("vertx.hazelcast.test.group.password"))); - - HazelcastInstance clientNode1 = HazelcastClient.newHazelcastClient(clientConfig); - HazelcastInstance clientNode2 = HazelcastClient.newHazelcastClient(clientConfig); - - HazelcastClusterManager mgr1 = new HazelcastClusterManager(clientNode1); - HazelcastClusterManager mgr2 = new HazelcastClusterManager(clientNode2); - VertxOptions options1 = new VertxOptions().setClusterManager(mgr1).setClustered(true).setClusterHost("127.0.0.1"); - VertxOptions options2 = new VertxOptions().setClusterManager(mgr2).setClustered(true).setClusterHost("127.0.0.1"); - - AtomicReference vertx1 = new AtomicReference<>(); - AtomicReference vertx2 = new AtomicReference<>(); - - Vertx.clusteredVertx(options1, res -> { - assertTrue(res.succeeded()); - assertNotNull(mgr1.getHazelcastInstance()); - res.result().sharedData().getClusterWideMap("mymap1", ar -> { - ar.result().put("news", "hello", v -> { - vertx1.set(res.result()); - }); - }); - }); - - assertWaitUntil(() -> vertx1.get() != null); - - Vertx.clusteredVertx(options2, res -> { - assertTrue(res.succeeded()); - assertNotNull(mgr2.getHazelcastInstance()); - vertx2.set(res.result()); - res.result().sharedData().getClusterWideMap("mymap1", ar -> { - ar.result().get("news", r -> { - assertEquals("hello", r.result()); - testComplete(); - }); - }); - }); - - await(); - - vertx1.get().close(ar -> vertx1.set(null)); - vertx2.get().close(ar -> vertx2.set(null)); - - assertWaitUntil(() -> vertx1.get() == null && vertx2.get() == null); - - // be sure stopping vertx did not cause or require our custom hazelcast to shutdown - - assertTrue(clientNode1.getLifecycleService().isRunning()); - assertTrue(clientNode2.getLifecycleService().isRunning()); - - clientNode1.shutdown(); - clientNode2.shutdown(); - - assertTrue(dataNode1.getLifecycleService().isRunning()); - assertTrue(dataNode2.getLifecycleService().isRunning()); - - dataNode1.shutdown(); - dataNode2.shutdown(); - } - @AfterClass public static void afterTests() { System.clearProperty("hazelcast.client.max.no.heartbeat.seconds");