Skip to content

Commit

Permalink
Merge pull request #95 from tsegismont/issue/90
Browse files Browse the repository at this point in the history
Fixes cluster manager is corrupted after merging of hazelcast partition
  • Loading branch information
tsegismont committed Sep 20, 2018
2 parents a0d59e8 + a5c0c68 commit 7e0932b
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 94 deletions.
4 changes: 1 addition & 3 deletions src/main/asciidoc/index.adoc
Expand Up @@ -135,9 +135,7 @@ Notice that the custom Hazelcast instance need to be configured with:
</semaphore>
----

**IMPORTANT** Do not use Hazelcast clients or smart clients when using high-availability (HA, or fail-over) in your
cluster as they do not notify when they leave the cluster and you may loose data, or leave the cluster in an
inconsistent state. See https://github.com/vert-x3/vertx-hazelcast/issues/24 for more details.
**IMPORTANT** Hazelcast clients or smart clients are not supported.

**IMPORTANT** Make sure Hazelcast is started before and shut down after Vert.x.
Also, the Hazelcast shutdown hook should be disabled (see xml example, or via system property).
Expand Down
Expand Up @@ -54,6 +54,7 @@ public class HazelcastClusterManager implements ClusterManager, MembershipListen
private static final Logger log = LoggerFactory.getLogger(HazelcastClusterManager.class);

private static final String LOCK_SEMAPHORE_PREFIX = "__vertx.";
private static final String NODE_ID_ATTRIBUTE = "__vertx.nodeId";

// Hazelcast config file
private static final String DEFAULT_CONFIG_FILE = "default-cluster.xml";
Expand All @@ -77,7 +78,7 @@ public class HazelcastClusterManager implements ClusterManager, MembershipListen
private String membershipListenerId;
private String lifecycleListenerId;
private boolean customHazelcastCluster;
private Set<Member> members = new HashSet<>();
private Set<String> nodeIds = new HashSet<>();
// Guarded by this
private Set<HazelcastAsyncMultiMap> multimaps = Collections.newSetFromMap(new WeakHashMap<>(1));

Expand Down Expand Up @@ -134,7 +135,9 @@ public synchronized void join(Handler<AsyncResult<Void>> resultHandler) {
hazelcast = Hazelcast.newHazelcastInstance(conf);
}

nodeID = hazelcast.getLocalEndpoint().getUuid();
Member localMember = hazelcast.getCluster().getLocalMember();
nodeID = localMember.getUuid();
localMember.setStringAttribute(NODE_ID_ATTRIBUTE, nodeID);
membershipListenerId = hazelcast.getCluster().addMembershipListener(this);
lifecycleListenerId = hazelcast.getLifecycleService().addLifecycleListener(this);
fut.complete();
Expand Down Expand Up @@ -171,12 +174,12 @@ public String getNodeID() {

@Override
public List<String> getNodes() {
Set<Member> members = hazelcast.getCluster().getMembers();
List<String> lMembers = new ArrayList<>();
for (Member member : members) {
lMembers.add(member.getUuid());
List<String> list = new ArrayList<>();
for (Member member : hazelcast.getCluster().getMembers()) {
String nodeIdAttribute = member.getStringAttribute(NODE_ID_ATTRIBUTE);
list.add(nodeIdAttribute != null ? nodeIdAttribute : member.getUuid());
}
return lMembers;
return list;
}

@Override
Expand Down Expand Up @@ -261,6 +264,11 @@ public void leave(Handler<AsyncResult<Void>> resultHandler) {
Thread.currentThread().interrupt();
}
}

if (customHazelcastCluster) {
hazelcast.getCluster().getLocalMember().removeAttribute(NODE_ID_ATTRIBUTE);
}

} catch (Throwable t) {
fut.fail(t);
}
Expand All @@ -275,12 +283,16 @@ public synchronized void memberAdded(MembershipEvent membershipEvent) {
if (!active) {
return;
}
Member member = membershipEvent.getMember();
String memberNodeId = member.getStringAttribute(NODE_ID_ATTRIBUTE);
if (memberNodeId == null) {
memberNodeId = member.getUuid();
}
try {
multimaps.forEach(HazelcastAsyncMultiMap::clearCache);
if (nodeListener != null) {
Member member = membershipEvent.getMember();
members.add(member);
nodeListener.nodeAdded(member.getUuid());
nodeIds.add(memberNodeId);
nodeListener.nodeAdded(memberNodeId);
}
} catch (Throwable t) {
log.error("Failed to handle memberAdded", t);
Expand All @@ -292,12 +304,16 @@ public synchronized void memberRemoved(MembershipEvent membershipEvent) {
if (!active) {
return;
}
Member member = membershipEvent.getMember();
String memberNodeId = member.getStringAttribute(NODE_ID_ATTRIBUTE);
if (memberNodeId == null) {
memberNodeId = member.getUuid();
}
try {
multimaps.forEach(HazelcastAsyncMultiMap::clearCache);
if (nodeListener != null) {
Member member = membershipEvent.getMember();
members.remove(member);
nodeListener.nodeLeft(member.getUuid());
nodeIds.remove(memberNodeId);
nodeListener.nodeLeft(memberNodeId);
}
} catch (Throwable t) {
log.error("Failed to handle memberRemoved", t);
Expand All @@ -312,18 +328,18 @@ public synchronized void stateChanged(LifecycleEvent lifecycleEvent) {
multimaps.forEach(HazelcastAsyncMultiMap::clearCache);
// Safeguard to make sure members list is OK after a partition merge
if(lifecycleEvent.getState() == LifecycleEvent.LifecycleState.MERGED) {
final Set<Member> currentMembers = hazelcast.getCluster().getMembers();
Set<Member> newMembers = new HashSet<>(currentMembers);
newMembers.removeAll(members);
Set<Member> removedMembers = new HashSet<>(members);
removedMembers.removeAll(currentMembers);
for(Member m : newMembers) {
nodeListener.nodeAdded(m.getUuid());
final List<String> currentNodes = getNodes();
Set<String> newNodes = new HashSet<>(currentNodes);
newNodes.removeAll(nodeIds);
Set<String> removedMembers = new HashSet<>(nodeIds);
removedMembers.removeAll(currentNodes);
for (String nodeId : newNodes) {
nodeListener.nodeAdded(nodeId);
}
for(Member m : removedMembers) {
nodeListener.nodeLeft(m.getUuid());
for (String nodeId : removedMembers) {
nodeListener.nodeLeft(nodeId);
}
members.retainAll(currentMembers);
nodeIds.retainAll(currentNodes);
}
}

Expand Down
Expand Up @@ -16,8 +16,6 @@

package io.vertx.core;

import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.config.Config;
import com.hazelcast.config.GroupConfig;
import com.hazelcast.core.*;
Expand Down Expand Up @@ -257,72 +255,6 @@ public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {
assertWaitUntil(() -> vertx1.get() == null);
}

@Test
public void testSharedDataUsingCustomHazelcastClients() throws Exception {
HazelcastInstance dataNode1 = Hazelcast.newHazelcastInstance(createConfig());
HazelcastInstance dataNode2 = Hazelcast.newHazelcastInstance(createConfig());

ClientConfig clientConfig = new ClientConfig().setGroupConfig(new GroupConfig()
.setName(System.getProperty("vertx.hazelcast.test.group.name"))
.setPassword(System.getProperty("vertx.hazelcast.test.group.password")));

HazelcastInstance clientNode1 = HazelcastClient.newHazelcastClient(clientConfig);
HazelcastInstance clientNode2 = HazelcastClient.newHazelcastClient(clientConfig);

HazelcastClusterManager mgr1 = new HazelcastClusterManager(clientNode1);
HazelcastClusterManager mgr2 = new HazelcastClusterManager(clientNode2);
VertxOptions options1 = new VertxOptions().setClusterManager(mgr1).setClustered(true).setClusterHost("127.0.0.1");
VertxOptions options2 = new VertxOptions().setClusterManager(mgr2).setClustered(true).setClusterHost("127.0.0.1");

AtomicReference<Vertx> vertx1 = new AtomicReference<>();
AtomicReference<Vertx> vertx2 = new AtomicReference<>();

Vertx.clusteredVertx(options1, res -> {
assertTrue(res.succeeded());
assertNotNull(mgr1.getHazelcastInstance());
res.result().sharedData().getClusterWideMap("mymap1", ar -> {
ar.result().put("news", "hello", v -> {
vertx1.set(res.result());
});
});
});

assertWaitUntil(() -> vertx1.get() != null);

Vertx.clusteredVertx(options2, res -> {
assertTrue(res.succeeded());
assertNotNull(mgr2.getHazelcastInstance());
vertx2.set(res.result());
res.result().sharedData().getClusterWideMap("mymap1", ar -> {
ar.result().get("news", r -> {
assertEquals("hello", r.result());
testComplete();
});
});
});

await();

vertx1.get().close(ar -> vertx1.set(null));
vertx2.get().close(ar -> vertx2.set(null));

assertWaitUntil(() -> vertx1.get() == null && vertx2.get() == null);

// be sure stopping vertx did not cause or require our custom hazelcast to shutdown

assertTrue(clientNode1.getLifecycleService().isRunning());
assertTrue(clientNode2.getLifecycleService().isRunning());

clientNode1.shutdown();
clientNode2.shutdown();

assertTrue(dataNode1.getLifecycleService().isRunning());
assertTrue(dataNode2.getLifecycleService().isRunning());

dataNode1.shutdown();
dataNode2.shutdown();
}

@AfterClass
public static void afterTests() {
System.clearProperty("hazelcast.client.max.no.heartbeat.seconds");
Expand Down

0 comments on commit 7e0932b

Please sign in to comment.