From 6d148f8c5d84ab05cde3597232e8c34841901363 Mon Sep 17 00:00:00 2001 From: Thomas Segismont Date: Wed, 19 Sep 2018 16:53:53 +0200 Subject: [PATCH] Fixes #90 Cluster manager is corrupted after merging of hazelcast partition The original HZ member uuid is saved as a cluster member attribute. Consequently, even if HZ changes it during partition merging, HAManager and ClusteredEventBus will keep using the same original uuid. Note that when member added event is handled by existing cluster nodes, the member attribute may not be saved yet. In this case, it is safe to use member uuid (at this stage, the member uuid and member attribute are equal). --- src/main/asciidoc/index.adoc | 4 +- .../hazelcast/HazelcastClusterManager.java | 62 ++++++++++------- ...ogrammaticHazelcastClusterManagerTest.java | 68 ------------------- 3 files changed, 40 insertions(+), 94 deletions(-) diff --git a/src/main/asciidoc/index.adoc b/src/main/asciidoc/index.adoc index adc2d272..38be41f8 100644 --- a/src/main/asciidoc/index.adoc +++ b/src/main/asciidoc/index.adoc @@ -135,9 +135,7 @@ Notice that the custom Hazelcast instance need to be configured with: ---- -**IMPORTANT** Do not use Hazelcast clients or smart clients when using high-availability (HA, or fail-over) in your -cluster as they do not notify when they leave the cluster and you may loose data, or leave the cluster in an -inconsistent state. See https://github.com/vert-x3/vertx-hazelcast/issues/24 for more details. +**IMPORTANT** Hazelcast clients or smart clients are not supported. **IMPORTANT** Make sure Hazelcast is started before and shut down after Vert.x. Also, the Hazelcast shutdown hook should be disabled (see xml example, or via system property). diff --git a/src/main/java/io/vertx/spi/cluster/hazelcast/HazelcastClusterManager.java b/src/main/java/io/vertx/spi/cluster/hazelcast/HazelcastClusterManager.java index 2da9ee24..322d2540 100644 --- a/src/main/java/io/vertx/spi/cluster/hazelcast/HazelcastClusterManager.java +++ b/src/main/java/io/vertx/spi/cluster/hazelcast/HazelcastClusterManager.java @@ -54,6 +54,7 @@ public class HazelcastClusterManager implements ClusterManager, MembershipListen private static final Logger log = LoggerFactory.getLogger(HazelcastClusterManager.class); private static final String LOCK_SEMAPHORE_PREFIX = "__vertx."; + private static final String NODE_ID_ATTRIBUTE = "__vertx.nodeId"; // Hazelcast config file private static final String DEFAULT_CONFIG_FILE = "default-cluster.xml"; @@ -77,7 +78,7 @@ public class HazelcastClusterManager implements ClusterManager, MembershipListen private String membershipListenerId; private String lifecycleListenerId; private boolean customHazelcastCluster; - private Set members = new HashSet<>(); + private Set nodeIds = new HashSet<>(); // Guarded by this private Set multimaps = Collections.newSetFromMap(new WeakHashMap<>(1)); @@ -134,7 +135,9 @@ public synchronized void join(Handler> resultHandler) { hazelcast = Hazelcast.newHazelcastInstance(conf); } - nodeID = hazelcast.getLocalEndpoint().getUuid(); + Member localMember = hazelcast.getCluster().getLocalMember(); + nodeID = localMember.getUuid(); + localMember.setStringAttribute(NODE_ID_ATTRIBUTE, nodeID); membershipListenerId = hazelcast.getCluster().addMembershipListener(this); lifecycleListenerId = hazelcast.getLifecycleService().addLifecycleListener(this); fut.complete(); @@ -171,12 +174,12 @@ public String getNodeID() { @Override public List getNodes() { - Set members = hazelcast.getCluster().getMembers(); - List lMembers = new ArrayList<>(); - for (Member member : members) { - lMembers.add(member.getUuid()); + List list = new ArrayList<>(); + for (Member member : hazelcast.getCluster().getMembers()) { + String nodeIdAttribute = member.getStringAttribute(NODE_ID_ATTRIBUTE); + list.add(nodeIdAttribute != null ? nodeIdAttribute : member.getUuid()); } - return lMembers; + return list; } @Override @@ -261,6 +264,11 @@ public void leave(Handler> resultHandler) { Thread.currentThread().interrupt(); } } + + if (customHazelcastCluster) { + hazelcast.getCluster().getLocalMember().removeAttribute(NODE_ID_ATTRIBUTE); + } + } catch (Throwable t) { fut.fail(t); } @@ -275,12 +283,16 @@ public synchronized void memberAdded(MembershipEvent membershipEvent) { if (!active) { return; } + Member member = membershipEvent.getMember(); + String memberNodeId = member.getStringAttribute(NODE_ID_ATTRIBUTE); + if (memberNodeId == null) { + memberNodeId = member.getUuid(); + } try { multimaps.forEach(HazelcastAsyncMultiMap::clearCache); if (nodeListener != null) { - Member member = membershipEvent.getMember(); - members.add(member); - nodeListener.nodeAdded(member.getUuid()); + nodeIds.add(memberNodeId); + nodeListener.nodeAdded(memberNodeId); } } catch (Throwable t) { log.error("Failed to handle memberAdded", t); @@ -292,12 +304,16 @@ public synchronized void memberRemoved(MembershipEvent membershipEvent) { if (!active) { return; } + Member member = membershipEvent.getMember(); + String memberNodeId = member.getStringAttribute(NODE_ID_ATTRIBUTE); + if (memberNodeId == null) { + memberNodeId = member.getUuid(); + } try { multimaps.forEach(HazelcastAsyncMultiMap::clearCache); if (nodeListener != null) { - Member member = membershipEvent.getMember(); - members.remove(member); - nodeListener.nodeLeft(member.getUuid()); + nodeIds.remove(memberNodeId); + nodeListener.nodeLeft(memberNodeId); } } catch (Throwable t) { log.error("Failed to handle memberRemoved", t); @@ -312,18 +328,18 @@ public synchronized void stateChanged(LifecycleEvent lifecycleEvent) { multimaps.forEach(HazelcastAsyncMultiMap::clearCache); // Safeguard to make sure members list is OK after a partition merge if(lifecycleEvent.getState() == LifecycleEvent.LifecycleState.MERGED) { - final Set currentMembers = hazelcast.getCluster().getMembers(); - Set newMembers = new HashSet<>(currentMembers); - newMembers.removeAll(members); - Set removedMembers = new HashSet<>(members); - removedMembers.removeAll(currentMembers); - for(Member m : newMembers) { - nodeListener.nodeAdded(m.getUuid()); + final List currentNodes = getNodes(); + Set newNodes = new HashSet<>(currentNodes); + newNodes.removeAll(nodeIds); + Set removedMembers = new HashSet<>(nodeIds); + removedMembers.removeAll(currentNodes); + for (String nodeId : newNodes) { + nodeListener.nodeAdded(nodeId); } - for(Member m : removedMembers) { - nodeListener.nodeLeft(m.getUuid()); + for (String nodeId : removedMembers) { + nodeListener.nodeLeft(nodeId); } - members.retainAll(currentMembers); + nodeIds.retainAll(currentNodes); } } diff --git a/src/test/java/io/vertx/core/ProgrammaticHazelcastClusterManagerTest.java b/src/test/java/io/vertx/core/ProgrammaticHazelcastClusterManagerTest.java index 0e99e07a..f5037d1e 100644 --- a/src/test/java/io/vertx/core/ProgrammaticHazelcastClusterManagerTest.java +++ b/src/test/java/io/vertx/core/ProgrammaticHazelcastClusterManagerTest.java @@ -16,8 +16,6 @@ package io.vertx.core; -import com.hazelcast.client.HazelcastClient; -import com.hazelcast.client.config.ClientConfig; import com.hazelcast.config.Config; import com.hazelcast.config.GroupConfig; import com.hazelcast.core.*; @@ -257,72 +255,6 @@ public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) { assertWaitUntil(() -> vertx1.get() == null); } - @Test - public void testSharedDataUsingCustomHazelcastClients() throws Exception { - HazelcastInstance dataNode1 = Hazelcast.newHazelcastInstance(createConfig()); - HazelcastInstance dataNode2 = Hazelcast.newHazelcastInstance(createConfig()); - - ClientConfig clientConfig = new ClientConfig().setGroupConfig(new GroupConfig() - .setName(System.getProperty("vertx.hazelcast.test.group.name")) - .setPassword(System.getProperty("vertx.hazelcast.test.group.password"))); - - HazelcastInstance clientNode1 = HazelcastClient.newHazelcastClient(clientConfig); - HazelcastInstance clientNode2 = HazelcastClient.newHazelcastClient(clientConfig); - - HazelcastClusterManager mgr1 = new HazelcastClusterManager(clientNode1); - HazelcastClusterManager mgr2 = new HazelcastClusterManager(clientNode2); - VertxOptions options1 = new VertxOptions().setClusterManager(mgr1).setClustered(true).setClusterHost("127.0.0.1"); - VertxOptions options2 = new VertxOptions().setClusterManager(mgr2).setClustered(true).setClusterHost("127.0.0.1"); - - AtomicReference vertx1 = new AtomicReference<>(); - AtomicReference vertx2 = new AtomicReference<>(); - - Vertx.clusteredVertx(options1, res -> { - assertTrue(res.succeeded()); - assertNotNull(mgr1.getHazelcastInstance()); - res.result().sharedData().getClusterWideMap("mymap1", ar -> { - ar.result().put("news", "hello", v -> { - vertx1.set(res.result()); - }); - }); - }); - - assertWaitUntil(() -> vertx1.get() != null); - - Vertx.clusteredVertx(options2, res -> { - assertTrue(res.succeeded()); - assertNotNull(mgr2.getHazelcastInstance()); - vertx2.set(res.result()); - res.result().sharedData().getClusterWideMap("mymap1", ar -> { - ar.result().get("news", r -> { - assertEquals("hello", r.result()); - testComplete(); - }); - }); - }); - - await(); - - vertx1.get().close(ar -> vertx1.set(null)); - vertx2.get().close(ar -> vertx2.set(null)); - - assertWaitUntil(() -> vertx1.get() == null && vertx2.get() == null); - - // be sure stopping vertx did not cause or require our custom hazelcast to shutdown - - assertTrue(clientNode1.getLifecycleService().isRunning()); - assertTrue(clientNode2.getLifecycleService().isRunning()); - - clientNode1.shutdown(); - clientNode2.shutdown(); - - assertTrue(dataNode1.getLifecycleService().isRunning()); - assertTrue(dataNode2.getLifecycleService().isRunning()); - - dataNode1.shutdown(); - dataNode2.shutdown(); - } - @AfterClass public static void afterTests() { System.clearProperty("hazelcast.client.max.no.heartbeat.seconds");