Skip to content

Commit

Permalink
Fixes vert-x3#90 Cluster manager is corrupted after merging of hazelc…
Browse files Browse the repository at this point in the history
…ast partition

The original HZ member uuid is saved as a cluster member attribute.
Consequently, even if HZ changes it during partition merging, HAManager and ClusteredEventBus will keep using the same original uuid.

Note that when member added event is handled by existing cluster nodes, the member attribute may not be saved yet.
In this case, it is safe to use member uuid (at this stage, the member uuid and member attribute are equal).
  • Loading branch information
tsegismont committed Sep 19, 2018
1 parent f2587ef commit 6d148f8
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 94 deletions.
4 changes: 1 addition & 3 deletions src/main/asciidoc/index.adoc
Expand Up @@ -135,9 +135,7 @@ Notice that the custom Hazelcast instance need to be configured with:
</semaphore>
----

**IMPORTANT** Do not use Hazelcast clients or smart clients when using high-availability (HA, or fail-over) in your
cluster as they do not notify when they leave the cluster and you may loose data, or leave the cluster in an
inconsistent state. See https://github.com/vert-x3/vertx-hazelcast/issues/24 for more details.
**IMPORTANT** Hazelcast clients or smart clients are not supported.

**IMPORTANT** Make sure Hazelcast is started before and shut down after Vert.x.
Also, the Hazelcast shutdown hook should be disabled (see xml example, or via system property).
Expand Down
Expand Up @@ -54,6 +54,7 @@ public class HazelcastClusterManager implements ClusterManager, MembershipListen
private static final Logger log = LoggerFactory.getLogger(HazelcastClusterManager.class);

private static final String LOCK_SEMAPHORE_PREFIX = "__vertx.";
private static final String NODE_ID_ATTRIBUTE = "__vertx.nodeId";

// Hazelcast config file
private static final String DEFAULT_CONFIG_FILE = "default-cluster.xml";
Expand All @@ -77,7 +78,7 @@ public class HazelcastClusterManager implements ClusterManager, MembershipListen
private String membershipListenerId;
private String lifecycleListenerId;
private boolean customHazelcastCluster;
private Set<Member> members = new HashSet<>();
private Set<String> nodeIds = new HashSet<>();
// Guarded by this
private Set<HazelcastAsyncMultiMap> multimaps = Collections.newSetFromMap(new WeakHashMap<>(1));

Expand Down Expand Up @@ -134,7 +135,9 @@ public synchronized void join(Handler<AsyncResult<Void>> resultHandler) {
hazelcast = Hazelcast.newHazelcastInstance(conf);
}

nodeID = hazelcast.getLocalEndpoint().getUuid();
Member localMember = hazelcast.getCluster().getLocalMember();
nodeID = localMember.getUuid();
localMember.setStringAttribute(NODE_ID_ATTRIBUTE, nodeID);
membershipListenerId = hazelcast.getCluster().addMembershipListener(this);
lifecycleListenerId = hazelcast.getLifecycleService().addLifecycleListener(this);
fut.complete();
Expand Down Expand Up @@ -171,12 +174,12 @@ public String getNodeID() {

@Override
public List<String> getNodes() {
Set<Member> members = hazelcast.getCluster().getMembers();
List<String> lMembers = new ArrayList<>();
for (Member member : members) {
lMembers.add(member.getUuid());
List<String> list = new ArrayList<>();
for (Member member : hazelcast.getCluster().getMembers()) {
String nodeIdAttribute = member.getStringAttribute(NODE_ID_ATTRIBUTE);
list.add(nodeIdAttribute != null ? nodeIdAttribute : member.getUuid());
}
return lMembers;
return list;
}

@Override
Expand Down Expand Up @@ -261,6 +264,11 @@ public void leave(Handler<AsyncResult<Void>> resultHandler) {
Thread.currentThread().interrupt();
}
}

if (customHazelcastCluster) {
hazelcast.getCluster().getLocalMember().removeAttribute(NODE_ID_ATTRIBUTE);
}

} catch (Throwable t) {
fut.fail(t);
}
Expand All @@ -275,12 +283,16 @@ public synchronized void memberAdded(MembershipEvent membershipEvent) {
if (!active) {
return;
}
Member member = membershipEvent.getMember();
String memberNodeId = member.getStringAttribute(NODE_ID_ATTRIBUTE);
if (memberNodeId == null) {
memberNodeId = member.getUuid();
}
try {
multimaps.forEach(HazelcastAsyncMultiMap::clearCache);
if (nodeListener != null) {
Member member = membershipEvent.getMember();
members.add(member);
nodeListener.nodeAdded(member.getUuid());
nodeIds.add(memberNodeId);
nodeListener.nodeAdded(memberNodeId);
}
} catch (Throwable t) {
log.error("Failed to handle memberAdded", t);
Expand All @@ -292,12 +304,16 @@ public synchronized void memberRemoved(MembershipEvent membershipEvent) {
if (!active) {
return;
}
Member member = membershipEvent.getMember();
String memberNodeId = member.getStringAttribute(NODE_ID_ATTRIBUTE);
if (memberNodeId == null) {
memberNodeId = member.getUuid();
}
try {
multimaps.forEach(HazelcastAsyncMultiMap::clearCache);
if (nodeListener != null) {
Member member = membershipEvent.getMember();
members.remove(member);
nodeListener.nodeLeft(member.getUuid());
nodeIds.remove(memberNodeId);
nodeListener.nodeLeft(memberNodeId);
}
} catch (Throwable t) {
log.error("Failed to handle memberRemoved", t);
Expand All @@ -312,18 +328,18 @@ public synchronized void stateChanged(LifecycleEvent lifecycleEvent) {
multimaps.forEach(HazelcastAsyncMultiMap::clearCache);
// Safeguard to make sure members list is OK after a partition merge
if(lifecycleEvent.getState() == LifecycleEvent.LifecycleState.MERGED) {
final Set<Member> currentMembers = hazelcast.getCluster().getMembers();
Set<Member> newMembers = new HashSet<>(currentMembers);
newMembers.removeAll(members);
Set<Member> removedMembers = new HashSet<>(members);
removedMembers.removeAll(currentMembers);
for(Member m : newMembers) {
nodeListener.nodeAdded(m.getUuid());
final List<String> currentNodes = getNodes();
Set<String> newNodes = new HashSet<>(currentNodes);
newNodes.removeAll(nodeIds);
Set<String> removedMembers = new HashSet<>(nodeIds);
removedMembers.removeAll(currentNodes);
for (String nodeId : newNodes) {
nodeListener.nodeAdded(nodeId);
}
for(Member m : removedMembers) {
nodeListener.nodeLeft(m.getUuid());
for (String nodeId : removedMembers) {
nodeListener.nodeLeft(nodeId);
}
members.retainAll(currentMembers);
nodeIds.retainAll(currentNodes);
}
}

Expand Down
Expand Up @@ -16,8 +16,6 @@

package io.vertx.core;

import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.config.Config;
import com.hazelcast.config.GroupConfig;
import com.hazelcast.core.*;
Expand Down Expand Up @@ -257,72 +255,6 @@ public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {
assertWaitUntil(() -> vertx1.get() == null);
}

@Test
public void testSharedDataUsingCustomHazelcastClients() throws Exception {
HazelcastInstance dataNode1 = Hazelcast.newHazelcastInstance(createConfig());
HazelcastInstance dataNode2 = Hazelcast.newHazelcastInstance(createConfig());

ClientConfig clientConfig = new ClientConfig().setGroupConfig(new GroupConfig()
.setName(System.getProperty("vertx.hazelcast.test.group.name"))
.setPassword(System.getProperty("vertx.hazelcast.test.group.password")));

HazelcastInstance clientNode1 = HazelcastClient.newHazelcastClient(clientConfig);
HazelcastInstance clientNode2 = HazelcastClient.newHazelcastClient(clientConfig);

HazelcastClusterManager mgr1 = new HazelcastClusterManager(clientNode1);
HazelcastClusterManager mgr2 = new HazelcastClusterManager(clientNode2);
VertxOptions options1 = new VertxOptions().setClusterManager(mgr1).setClustered(true).setClusterHost("127.0.0.1");
VertxOptions options2 = new VertxOptions().setClusterManager(mgr2).setClustered(true).setClusterHost("127.0.0.1");

AtomicReference<Vertx> vertx1 = new AtomicReference<>();
AtomicReference<Vertx> vertx2 = new AtomicReference<>();

Vertx.clusteredVertx(options1, res -> {
assertTrue(res.succeeded());
assertNotNull(mgr1.getHazelcastInstance());
res.result().sharedData().getClusterWideMap("mymap1", ar -> {
ar.result().put("news", "hello", v -> {
vertx1.set(res.result());
});
});
});

assertWaitUntil(() -> vertx1.get() != null);

Vertx.clusteredVertx(options2, res -> {
assertTrue(res.succeeded());
assertNotNull(mgr2.getHazelcastInstance());
vertx2.set(res.result());
res.result().sharedData().getClusterWideMap("mymap1", ar -> {
ar.result().get("news", r -> {
assertEquals("hello", r.result());
testComplete();
});
});
});

await();

vertx1.get().close(ar -> vertx1.set(null));
vertx2.get().close(ar -> vertx2.set(null));

assertWaitUntil(() -> vertx1.get() == null && vertx2.get() == null);

// be sure stopping vertx did not cause or require our custom hazelcast to shutdown

assertTrue(clientNode1.getLifecycleService().isRunning());
assertTrue(clientNode2.getLifecycleService().isRunning());

clientNode1.shutdown();
clientNode2.shutdown();

assertTrue(dataNode1.getLifecycleService().isRunning());
assertTrue(dataNode2.getLifecycleService().isRunning());

dataNode1.shutdown();
dataNode2.shutdown();
}

@AfterClass
public static void afterTests() {
System.clearProperty("hazelcast.client.max.no.heartbeat.seconds");
Expand Down

0 comments on commit 6d148f8

Please sign in to comment.