diff --git a/hazelcast/src/main/java/com/hazelcast/cluster/impl/TcpIpJoiner.java b/hazelcast/src/main/java/com/hazelcast/cluster/impl/TcpIpJoiner.java index 62f59410e9b9..9c37717dcc7f 100644 --- a/hazelcast/src/main/java/com/hazelcast/cluster/impl/TcpIpJoiner.java +++ b/hazelcast/src/main/java/com/hazelcast/cluster/impl/TcpIpJoiner.java @@ -21,6 +21,7 @@ import com.hazelcast.config.JoinConfig; import com.hazelcast.config.NetworkConfig; import com.hazelcast.config.TcpIpConfig; +import com.hazelcast.core.Member; import com.hazelcast.instance.Node; import com.hazelcast.internal.cluster.impl.AbstractJoiner; import com.hazelcast.internal.cluster.impl.SplitBrainJoinMessage; @@ -40,8 +41,11 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -61,6 +65,13 @@ public class TcpIpJoiner extends AbstractJoiner { private final int maxPortTryCount; private volatile boolean claimingMastership; private final JoinConfig joinConfig; + private final long previouslyJoinedMemberAddressRetentionDuration; + + /** + * We register the member addresses to this map which are known with the + * member list update/when a new member joins the cluster + */ + private final ConcurrentMap knownMemberAddresses = new ConcurrentHashMap<>(); public TcpIpJoiner(Node node) { super(node); @@ -71,6 +82,8 @@ public TcpIpJoiner(Node node) { } maxPortTryCount = tryCount; joinConfig = getActiveMemberNetworkConfig(config).getJoin(); + previouslyJoinedMemberAddressRetentionDuration = node.getProperties().getMillis( + GroupProperty.TCP_PREVIOUSLY_JOINED_MEMBER_ADDRESS_RETENTION_DURATION); } public boolean isClaimingMastership() { @@ -366,7 +379,8 @@ protected Collection
getPossibleAddresses() { } } } - + cleanupKnownMemberAddresses(); + possibleAddresses.addAll(knownMemberAddresses.keySet()); possibleAddresses.remove(node.getThisAddress()); return possibleAddresses; } @@ -419,6 +433,18 @@ public static Collection getConfigurationMembers(TcpIpConfig tcpIpConfig return possibleMembers; } + public void onMemberAdded(Member member) { + if (!member.localMember()) { + knownMemberAddresses.put(member.getAddress(), Long.MAX_VALUE); + } + } + public void onMemberRemoved(Member member) { + if (!member.localMember()) { + knownMemberAddresses.put(member.getAddress(), Clock.currentTimeMillis()); + } + } + + @Override public void searchForOtherClusters() { final Collection
possibleAddresses; @@ -446,6 +472,21 @@ public void searchForOtherClusters() { } } + private void cleanupKnownMemberAddresses() { + long currentTime = Clock.currentTimeMillis(); + Iterator iterator = knownMemberAddresses.values().iterator(); + while (iterator.hasNext()) { + Long memberLeftTime = iterator.next(); + if (currentTime - memberLeftTime >= previouslyJoinedMemberAddressRetentionDuration) { + iterator.remove(); + } + } + } + + // only used in tests + public ConcurrentMap getKnownMemberAddresses() { + return knownMemberAddresses; + } @Override public String getType() { return "tcp-ip"; diff --git a/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/MembershipManager.java b/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/MembershipManager.java index d27d309f2da5..20bf4c5f4546 100644 --- a/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/MembershipManager.java +++ b/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/MembershipManager.java @@ -17,6 +17,8 @@ package com.hazelcast.internal.cluster.impl; import com.hazelcast.cluster.ClusterState; +import com.hazelcast.cluster.Joiner; +import com.hazelcast.cluster.impl.TcpIpJoiner; import com.hazelcast.core.Member; import com.hazelcast.core.MembershipEvent; import com.hazelcast.hotrestart.InternalHotRestartService; @@ -755,6 +757,10 @@ void onMemberRemove(MemberImpl deadMember) { node.getPartitionService().memberRemoved(deadMember); nodeEngine.onMemberLeft(deadMember); node.getNodeExtension().onMemberListChange(); + Joiner joiner = node.getJoiner(); + if (joiner != null && joiner.getClass() == TcpIpJoiner.class) { + ((TcpIpJoiner) joiner).onMemberRemoved(deadMember); + } } void sendMembershipEvents(Collection currentMembers, Collection newMembers, boolean sortMembers) { @@ -764,7 +770,10 @@ void sendMembershipEvents(Collection currentMembers, Collection rememberedAddresses = tcpJoiner1.getKnownMemberAddresses().keySet(); + assertContains(rememberedAddresses, hz2Address); + assertNotContains(rememberedAddresses, hz3Address); + } + }, 20); + } + + private static Config prepareConfigWithTcpIpConfig(int port, int... otherKnownMemberPorts) { + Config config = smallInstanceConfig() + .setProperty("hazelcast.merge.first.run.delay.seconds", "10") + .setProperty("hazelcast.merge.next.run.delay.seconds", "10"); + JoinConfig join = config.getNetworkConfig().setPort(port).getJoin(); + join.getMulticastConfig().setEnabled(false); + TcpIpConfig tcpIpConfig = join.getTcpIpConfig(); + tcpIpConfig.setEnabled(true); + for (int otherMemberPort : otherKnownMemberPorts) { + tcpIpConfig.addMember("127.0.0.1:" + otherMemberPort); + } + return config; + } + + private static void overrideAddressRetentionDuration(TcpIpJoiner tcpIpJoiner, long addressRetentionDuration) throws Exception { + Field privateField = TcpIpJoiner.class.getDeclaredField("previouslyJoinedMemberAddressRetentionDuration"); + privateField.setAccessible(true); + privateField.set(tcpIpJoiner, addressRetentionDuration); + } }