Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.12.z] Remember previously joined member addresses in TcpIpJoiner #21860

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -21,6 +21,7 @@
import com.hazelcast.config.JoinConfig;
import com.hazelcast.config.NetworkConfig;
import com.hazelcast.config.TcpIpConfig;
import com.hazelcast.core.Member;
import com.hazelcast.instance.Node;
import com.hazelcast.internal.cluster.impl.AbstractJoiner;
import com.hazelcast.internal.cluster.impl.SplitBrainJoinMessage;
Expand All @@ -40,8 +41,11 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

Expand All @@ -55,12 +59,27 @@

public class TcpIpJoiner extends AbstractJoiner {

/**
* Specifies how long the address of a member that has previously joined the cluster
* will be retained in the knownMemberAddresses after the member leaves the cluster.
*/
public static final String PREVIOUSLY_JOINED_MEMBER_ADDRESS_RETENTION_DURATION_PROP =
"hazelcast.previously.joined.member.address.retention.duration.seconds";
// Selected this default value same as default value of missing-cp-member-auto-removal-seconds
private static final int DEFAULT_PREVIOUSLY_JOINED_MEMBER_ADDRESS_RETENTION_DURATION_IN_SECS = 14400;
private static final long JOIN_RETRY_WAIT_TIME = 1000L;
private static final int MASTERSHIP_CLAIM_TIMEOUT = 10;

private final int maxPortTryCount;
private volatile boolean claimingMastership;
private final JoinConfig joinConfig;
private final long previouslyJoinedMemberAddressRetentionDuration;
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved

/**
* We register the member addresses to this map which are known with the
* member list update/when a new member joins the cluster
*/
private final ConcurrentMap<Address, Long> knownMemberAddresses = new ConcurrentHashMap<>();

public TcpIpJoiner(Node node) {
super(node);
Expand All @@ -71,6 +90,9 @@ public TcpIpJoiner(Node node) {
}
maxPortTryCount = tryCount;
joinConfig = getActiveMemberNetworkConfig(config).getJoin();
previouslyJoinedMemberAddressRetentionDuration = TimeUnit.SECONDS.toMillis(
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved
Integer.getInteger(PREVIOUSLY_JOINED_MEMBER_ADDRESS_RETENTION_DURATION_PROP,
DEFAULT_PREVIOUSLY_JOINED_MEMBER_ADDRESS_RETENTION_DURATION_IN_SECS));
}

public boolean isClaimingMastership() {
Expand Down Expand Up @@ -366,7 +388,8 @@ protected Collection<Address> getPossibleAddresses() {
}
}
}

cleanupKnownMemberAddresses();
possibleAddresses.addAll(knownMemberAddresses.keySet());
possibleAddresses.remove(node.getThisAddress());
return possibleAddresses;
}
Expand Down Expand Up @@ -419,6 +442,18 @@ public static Collection<String> getConfigurationMembers(TcpIpConfig tcpIpConfig
return possibleMembers;
}

public void onMemberAdded(Member member) {
if (!member.localMember()) {
knownMemberAddresses.put(member.getAddress(), Long.MAX_VALUE);
}
}
public void onMemberRemoved(Member member) {
if (!member.localMember()) {
knownMemberAddresses.put(member.getAddress(), Clock.currentTimeMillis());
}
}


@Override
public void searchForOtherClusters() {
final Collection<Address> possibleAddresses;
Expand Down Expand Up @@ -446,6 +481,21 @@ public void searchForOtherClusters() {
}
}

private void cleanupKnownMemberAddresses() {
long currentTime = Clock.currentTimeMillis();
Iterator<Long> iterator = knownMemberAddresses.values().iterator();
while (iterator.hasNext()) {
Long memberLeftTime = iterator.next();
if (currentTime - memberLeftTime >= previouslyJoinedMemberAddressRetentionDuration) {
iterator.remove();
}
}
}

// only used in tests
public ConcurrentMap<Address, Long> getKnownMemberAddresses() {
return knownMemberAddresses;
}
@Override
public String getType() {
return "tcp-ip";
Expand Down
Expand Up @@ -17,6 +17,8 @@
package com.hazelcast.internal.cluster.impl;

import com.hazelcast.cluster.ClusterState;
import com.hazelcast.cluster.Joiner;
import com.hazelcast.cluster.impl.TcpIpJoiner;
import com.hazelcast.core.Member;
import com.hazelcast.core.MembershipEvent;
import com.hazelcast.hotrestart.InternalHotRestartService;
Expand Down Expand Up @@ -755,6 +757,10 @@ void onMemberRemove(MemberImpl deadMember) {
node.getPartitionService().memberRemoved(deadMember);
nodeEngine.onMemberLeft(deadMember);
node.getNodeExtension().onMemberListChange();
Joiner joiner = node.getJoiner();
if (joiner instanceof TcpIpJoiner) {
((TcpIpJoiner) joiner).onMemberRemoved(deadMember);
}
}

void sendMembershipEvents(Collection<MemberImpl> currentMembers, Collection<MemberImpl> newMembers, boolean sortMembers) {
Expand All @@ -764,7 +770,10 @@ void sendMembershipEvents(Collection<MemberImpl> currentMembers, Collection<Memb
// sync calls
node.getPartitionService().memberAdded(newMember);
node.getNodeExtension().onMemberListChange();

Joiner joiner = node.getJoiner();
if (joiner instanceof TcpIpJoiner) {
((TcpIpJoiner) joiner).onMemberAdded(newMember);
}
// async events
eventMembers.add(newMember);
if (sortMembers) {
Expand Down
81 changes: 81 additions & 0 deletions hazelcast/src/test/java/com/hazelcast/cluster/TcpIpJoinTest.java
Expand Up @@ -16,6 +16,7 @@

package com.hazelcast.cluster;

import com.hazelcast.cluster.impl.TcpIpJoiner;
import com.hazelcast.config.Config;
import com.hazelcast.config.InterfacesConfig;
import com.hazelcast.config.JoinConfig;
Expand All @@ -25,7 +26,10 @@
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.instance.HazelcastInstanceFactory;
import com.hazelcast.instance.TestUtil;
import com.hazelcast.nio.Address;
import com.hazelcast.spi.properties.GroupProperty;
import com.hazelcast.test.AssertTask;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.annotation.QuickTest;
import org.junit.After;
Expand All @@ -35,6 +39,10 @@
import org.junit.runner.RunWith;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.TimeUnit;

@RunWith(HazelcastSerialClassRunner.class)
@Category(QuickTest.class)
Expand Down Expand Up @@ -224,4 +232,77 @@ public void test_whenIncompatibleJoiners() throws Exception {

assertIncompatible(config1, config2);
}

@Test
public void test_tcpIpJoinerRemembersJoinedMemberAddresses() {
HazelcastInstance hz1 = Hazelcast.newHazelcastInstance(prepareConfigWithTcpIpConfig(5701));
HazelcastInstance hz2 = Hazelcast.newHazelcastInstance(prepareConfigWithTcpIpConfig(8899, 5701));
HazelcastInstance hz3 = Hazelcast.newHazelcastInstance(prepareConfigWithTcpIpConfig(9099, 5701));
Address hz2Address = hz2.getCluster().getLocalMember().getAddress();
Address hz3Address = hz3.getCluster().getLocalMember().getAddress();
TcpIpJoiner tcpJoiner1 = (TcpIpJoiner) TestUtil.getNode(hz1).getJoiner();
assertTrueEventually(new AssertTask() {
@Override
public void run() {
assertContainsAll(tcpJoiner1.getKnownMemberAddresses().keySet(), Arrays.asList(hz2Address, hz3Address));
}
});
hz2.shutdown();
hz3.shutdown();
assertTrueAllTheTime(new AssertTask() {
@Override
public void run() {
assertContainsAll(tcpJoiner1.getKnownMemberAddresses().keySet(), Arrays.asList(hz2Address, hz3Address));
}
}, 15);
}

@Test
public void test_tcpIpJoinerCleanupAddressesAfterAddressRetentionPeriodIsPassed() throws Exception {


HazelcastInstance hz1 = Hazelcast.newHazelcastInstance(prepareConfigWithTcpIpConfig(5701));
TcpIpJoiner tcpJoiner1 = (TcpIpJoiner) TestUtil.getNode(hz1).getJoiner();
overrideAddressRetentionDuration(tcpJoiner1, TimeUnit.SECONDS.toMillis(10));
HazelcastInstance hz2 = Hazelcast.newHazelcastInstance(prepareConfigWithTcpIpConfig(8899, 5701));
HazelcastInstance hz3 = Hazelcast.newHazelcastInstance(prepareConfigWithTcpIpConfig(9099, 5701));
Address hz2Address = hz2.getCluster().getLocalMember().getAddress();
Address hz3Address = hz3.getCluster().getLocalMember().getAddress();
assertTrueEventually(new AssertTask() {
@Override
public void run() {
assertContainsAll(tcpJoiner1.getKnownMemberAddresses().keySet(), Arrays.asList(hz2Address, hz3Address));
}
});
hz3.shutdown();
// Timeout below must be greater than both hazelcast.merge.next.run.delay.seconds and previouslyJoinedMemberAddressRetentionDuration
assertTrueEventually(new AssertTask() {
@Override
public void run() {
Set<Address> rememberedAddresses = tcpJoiner1.getKnownMemberAddresses().keySet();
assertContains(rememberedAddresses, hz2Address);
assertNotContains(rememberedAddresses, hz3Address);
}
}, 20);
}

private static Config prepareConfigWithTcpIpConfig(int port, int... otherKnownMemberPorts) {
Config config = smallInstanceConfig()
.setProperty("hazelcast.merge.first.run.delay.seconds", "10")
.setProperty("hazelcast.merge.next.run.delay.seconds", "10");
JoinConfig join = config.getNetworkConfig().setPort(port).getJoin();
join.getMulticastConfig().setEnabled(false);
TcpIpConfig tcpIpConfig = join.getTcpIpConfig();
tcpIpConfig.setEnabled(true);
for (int otherMemberPort : otherKnownMemberPorts) {
tcpIpConfig.addMember("127.0.0.1:" + otherMemberPort);
}
return config;
}

private static void overrideAddressRetentionDuration(TcpIpJoiner tcpIpJoiner, long addressRetentionDuration) throws Exception {
Field privateField = TcpIpJoiner.class.getDeclaredField("previouslyJoinedMemberAddressRetentionDuration");
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved
privateField.setAccessible(true);
privateField.set(tcpIpJoiner, addressRetentionDuration);
}
}