From 65f74b72867d48f3cfb260b00dcc5a7076296f42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ufuk=20Y=C4=B1lmaz?= Date: Tue, 26 Jul 2022 11:32:43 +0300 Subject: [PATCH 1/6] Remember previously joined member addresses in TcpIpJoiner --- .../hazelcast/cluster/impl/TcpIpJoiner.java | 46 +++++++- .../cluster/impl/MembershipManager.java | 11 +- .../com/hazelcast/cluster/TcpIpJoinTest.java | 107 ++++++++++++++++++ 3 files changed, 162 insertions(+), 2 deletions(-) diff --git a/hazelcast/src/main/java/com/hazelcast/cluster/impl/TcpIpJoiner.java b/hazelcast/src/main/java/com/hazelcast/cluster/impl/TcpIpJoiner.java index 62f59410e9b9..8a99d33cefd4 100644 --- a/hazelcast/src/main/java/com/hazelcast/cluster/impl/TcpIpJoiner.java +++ b/hazelcast/src/main/java/com/hazelcast/cluster/impl/TcpIpJoiner.java @@ -21,6 +21,7 @@ import com.hazelcast.config.JoinConfig; import com.hazelcast.config.NetworkConfig; import com.hazelcast.config.TcpIpConfig; +import com.hazelcast.core.Member; import com.hazelcast.instance.Node; import com.hazelcast.internal.cluster.impl.AbstractJoiner; import com.hazelcast.internal.cluster.impl.SplitBrainJoinMessage; @@ -42,6 +43,8 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -55,12 +58,27 @@ public class TcpIpJoiner extends AbstractJoiner { + /** + * Specifies how long the address of a member that has previously joined the cluster + * will be retained in the knownMemberAddresses after the member leaves the cluster. + */ + public static final String PREVIOUSLY_JOINED_MEMBER_ADDRESS_RETENTION_DURATION_PROP = + "hazelcast.previously.joined.member.address.retention.duration.seconds"; + // Selected this default value same as default value of missing-cp-member-auto-removal-seconds + private static final int DEFAULT_PREVIOUSLY_JOINED_MEMBER_ADDRESS_RETENTION_DURATION_IN_SECS = 14400; private static final long JOIN_RETRY_WAIT_TIME = 1000L; private static final int MASTERSHIP_CLAIM_TIMEOUT = 10; private final int maxPortTryCount; private volatile boolean claimingMastership; private final JoinConfig joinConfig; + private final long previouslyJoinedMemberAddressRetentionDuration; + + /** + * We register the member addresses to this map which are known with the + * member list update/when a new member joins the cluster + */ + private final ConcurrentMap knownMemberAddresses = new ConcurrentHashMap<>(); public TcpIpJoiner(Node node) { super(node); @@ -71,6 +89,9 @@ public TcpIpJoiner(Node node) { } maxPortTryCount = tryCount; joinConfig = getActiveMemberNetworkConfig(config).getJoin(); + previouslyJoinedMemberAddressRetentionDuration = TimeUnit.SECONDS.toMillis( + Integer.getInteger(PREVIOUSLY_JOINED_MEMBER_ADDRESS_RETENTION_DURATION_PROP, + DEFAULT_PREVIOUSLY_JOINED_MEMBER_ADDRESS_RETENTION_DURATION_IN_SECS)); } public boolean isClaimingMastership() { @@ -366,7 +387,8 @@ protected Collection
getPossibleAddresses() { } } } - + cleanupKnownMemberAddresses(); + possibleAddresses.addAll(knownMemberAddresses.keySet()); possibleAddresses.remove(node.getThisAddress()); return possibleAddresses; } @@ -419,6 +441,18 @@ public static Collection getConfigurationMembers(TcpIpConfig tcpIpConfig return possibleMembers; } + public void onMemberAdded(Member member) { + if (!member.localMember()) { + knownMemberAddresses.put(member.getAddress(), Long.MAX_VALUE); + } + } + public void onMemberRemoved(Member member) { + if (!member.localMember()) { + knownMemberAddresses.put(member.getAddress(), Clock.currentTimeMillis()); + } + } + + @Override public void searchForOtherClusters() { final Collection
possibleAddresses; @@ -446,6 +480,16 @@ public void searchForOtherClusters() { } } + private void cleanupKnownMemberAddresses() { + long currentTime = Clock.currentTimeMillis(); + knownMemberAddresses.values().removeIf(memberLeftTime -> + (currentTime - memberLeftTime) >= previouslyJoinedMemberAddressRetentionDuration); + } + + // only used in tests + public ConcurrentMap getKnownMemberAddresses() { + return knownMemberAddresses; + } @Override public String getType() { return "tcp-ip"; diff --git a/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/MembershipManager.java b/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/MembershipManager.java index d27d309f2da5..20bf4c5f4546 100644 --- a/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/MembershipManager.java +++ b/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/MembershipManager.java @@ -17,6 +17,8 @@ package com.hazelcast.internal.cluster.impl; import com.hazelcast.cluster.ClusterState; +import com.hazelcast.cluster.Joiner; +import com.hazelcast.cluster.impl.TcpIpJoiner; import com.hazelcast.core.Member; import com.hazelcast.core.MembershipEvent; import com.hazelcast.hotrestart.InternalHotRestartService; @@ -755,6 +757,10 @@ void onMemberRemove(MemberImpl deadMember) { node.getPartitionService().memberRemoved(deadMember); nodeEngine.onMemberLeft(deadMember); node.getNodeExtension().onMemberListChange(); + Joiner joiner = node.getJoiner(); + if (joiner != null && joiner.getClass() == TcpIpJoiner.class) { + ((TcpIpJoiner) joiner).onMemberRemoved(deadMember); + } } void sendMembershipEvents(Collection currentMembers, Collection newMembers, boolean sortMembers) { @@ -764,7 +770,10 @@ void sendMembershipEvents(Collection currentMembers, Collection rememberedAddresses = tcpJoiner1.getKnownMemberAddresses().keySet(); + assertContains(rememberedAddresses, hz2Address); + assertNotContains(rememberedAddresses, hz3Address); + } + }, 20); + } + + private static void overrideAddressRetentionDuration(TcpIpJoiner tcpIpJoiner, long addressRetentionDuration) throws Exception { + Field privateField = TcpIpJoiner.class.getDeclaredField("previouslyJoinedMemberAddressRetentionDuration"); + privateField.setAccessible(true); + privateField.set(tcpIpJoiner, addressRetentionDuration); + } } From 57846cddfa0fab4ebac955e4b86670fbf818d434 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ufuk=20Y=C4=B1lmaz?= Date: Wed, 27 Jul 2022 15:16:53 +0300 Subject: [PATCH 2/6] Make it java 6 compatible --- .../java/com/hazelcast/cluster/impl/TcpIpJoiner.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/hazelcast/src/main/java/com/hazelcast/cluster/impl/TcpIpJoiner.java b/hazelcast/src/main/java/com/hazelcast/cluster/impl/TcpIpJoiner.java index 8a99d33cefd4..4b944a30b308 100644 --- a/hazelcast/src/main/java/com/hazelcast/cluster/impl/TcpIpJoiner.java +++ b/hazelcast/src/main/java/com/hazelcast/cluster/impl/TcpIpJoiner.java @@ -41,6 +41,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -482,8 +483,13 @@ public void searchForOtherClusters() { private void cleanupKnownMemberAddresses() { long currentTime = Clock.currentTimeMillis(); - knownMemberAddresses.values().removeIf(memberLeftTime -> - (currentTime - memberLeftTime) >= previouslyJoinedMemberAddressRetentionDuration); + Iterator iterator = knownMemberAddresses.values().iterator(); + while (iterator.hasNext()) { + Long memberLeftTime = iterator.next(); + if (currentTime - memberLeftTime >= previouslyJoinedMemberAddressRetentionDuration) { + iterator.remove(); + } + } } // only used in tests From ff5152e3c16c51c6b82c86c85da73d2da139ddcd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ufuk=20Y=C4=B1lmaz?= Date: Fri, 29 Jul 2022 15:12:30 +0300 Subject: [PATCH 3/6] Address review comments --- .../cluster/impl/MembershipManager.java | 4 +- .../com/hazelcast/cluster/TcpIpJoinTest.java | 68 ++++++------------- 2 files changed, 23 insertions(+), 49 deletions(-) diff --git a/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/MembershipManager.java b/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/MembershipManager.java index 20bf4c5f4546..4b0b41b36da3 100644 --- a/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/MembershipManager.java +++ b/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/MembershipManager.java @@ -758,7 +758,7 @@ void onMemberRemove(MemberImpl deadMember) { nodeEngine.onMemberLeft(deadMember); node.getNodeExtension().onMemberListChange(); Joiner joiner = node.getJoiner(); - if (joiner != null && joiner.getClass() == TcpIpJoiner.class) { + if (joiner instanceof TcpIpJoiner) { ((TcpIpJoiner) joiner).onMemberRemoved(deadMember); } } @@ -771,7 +771,7 @@ void sendMembershipEvents(Collection currentMembers, Collection Date: Fri, 29 Jul 2022 16:10:00 +0300 Subject: [PATCH 4/6] Nit followup fix --- .../src/test/java/com/hazelcast/cluster/TcpIpJoinTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hazelcast/src/test/java/com/hazelcast/cluster/TcpIpJoinTest.java b/hazelcast/src/test/java/com/hazelcast/cluster/TcpIpJoinTest.java index d60f8bedc1f8..6b6bcb4d3635 100644 --- a/hazelcast/src/test/java/com/hazelcast/cluster/TcpIpJoinTest.java +++ b/hazelcast/src/test/java/com/hazelcast/cluster/TcpIpJoinTest.java @@ -295,7 +295,7 @@ private static Config prepareConfigWithTcpIpConfig(int port, int... otherKnownMe TcpIpConfig tcpIpConfig = join.getTcpIpConfig(); tcpIpConfig.setEnabled(true); for (int otherMemberPort : otherKnownMemberPorts) { - tcpIpConfig.setEnabled(true).addMember("127.0.0.1:" + otherMemberPort); + tcpIpConfig.addMember("127.0.0.1:" + otherMemberPort); } return config; } From 44138d618f2d814d3b66fbe099f7c74aca315c2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ufuk=20Y=C4=B1lmaz?= Date: Mon, 1 Aug 2022 12:12:35 +0300 Subject: [PATCH 5/6] Revert `joiner instanceof TcpIpJoiner` change --- .../hazelcast/internal/cluster/impl/MembershipManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/MembershipManager.java b/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/MembershipManager.java index 4b0b41b36da3..20bf4c5f4546 100644 --- a/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/MembershipManager.java +++ b/hazelcast/src/main/java/com/hazelcast/internal/cluster/impl/MembershipManager.java @@ -758,7 +758,7 @@ void onMemberRemove(MemberImpl deadMember) { nodeEngine.onMemberLeft(deadMember); node.getNodeExtension().onMemberListChange(); Joiner joiner = node.getJoiner(); - if (joiner instanceof TcpIpJoiner) { + if (joiner != null && joiner.getClass() == TcpIpJoiner.class) { ((TcpIpJoiner) joiner).onMemberRemoved(deadMember); } } @@ -771,7 +771,7 @@ void sendMembershipEvents(Collection currentMembers, Collection Date: Thu, 11 Aug 2022 13:30:35 +0300 Subject: [PATCH 6/6] Pull upstream branch changes This change includes exposing member address retention duration property in GroupProperty. --- .../com/hazelcast/cluster/impl/TcpIpJoiner.java | 13 ++----------- .../com/hazelcast/spi/properties/GroupProperty.java | 10 ++++++++++ 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/hazelcast/src/main/java/com/hazelcast/cluster/impl/TcpIpJoiner.java b/hazelcast/src/main/java/com/hazelcast/cluster/impl/TcpIpJoiner.java index 4b944a30b308..9c37717dcc7f 100644 --- a/hazelcast/src/main/java/com/hazelcast/cluster/impl/TcpIpJoiner.java +++ b/hazelcast/src/main/java/com/hazelcast/cluster/impl/TcpIpJoiner.java @@ -59,14 +59,6 @@ public class TcpIpJoiner extends AbstractJoiner { - /** - * Specifies how long the address of a member that has previously joined the cluster - * will be retained in the knownMemberAddresses after the member leaves the cluster. - */ - public static final String PREVIOUSLY_JOINED_MEMBER_ADDRESS_RETENTION_DURATION_PROP = - "hazelcast.previously.joined.member.address.retention.duration.seconds"; - // Selected this default value same as default value of missing-cp-member-auto-removal-seconds - private static final int DEFAULT_PREVIOUSLY_JOINED_MEMBER_ADDRESS_RETENTION_DURATION_IN_SECS = 14400; private static final long JOIN_RETRY_WAIT_TIME = 1000L; private static final int MASTERSHIP_CLAIM_TIMEOUT = 10; @@ -90,9 +82,8 @@ public TcpIpJoiner(Node node) { } maxPortTryCount = tryCount; joinConfig = getActiveMemberNetworkConfig(config).getJoin(); - previouslyJoinedMemberAddressRetentionDuration = TimeUnit.SECONDS.toMillis( - Integer.getInteger(PREVIOUSLY_JOINED_MEMBER_ADDRESS_RETENTION_DURATION_PROP, - DEFAULT_PREVIOUSLY_JOINED_MEMBER_ADDRESS_RETENTION_DURATION_IN_SECS)); + previouslyJoinedMemberAddressRetentionDuration = node.getProperties().getMillis( + GroupProperty.TCP_PREVIOUSLY_JOINED_MEMBER_ADDRESS_RETENTION_DURATION); } public boolean isClaimingMastership() { diff --git a/hazelcast/src/main/java/com/hazelcast/spi/properties/GroupProperty.java b/hazelcast/src/main/java/com/hazelcast/spi/properties/GroupProperty.java index d989e3e39826..348805347455 100644 --- a/hazelcast/src/main/java/com/hazelcast/spi/properties/GroupProperty.java +++ b/hazelcast/src/main/java/com/hazelcast/spi/properties/GroupProperty.java @@ -559,6 +559,16 @@ public final class GroupProperty { public static final HazelcastProperty TCP_JOIN_PORT_TRY_COUNT = new HazelcastProperty("hazelcast.tcp.join.port.try.count", 3); + /** + * Specifies how long the address of a member that has previously joined the + * cluster will be retained/remembered in the TcpIpJoiner after it leaves the + * cluster. The remembered member addresses is used to discover other cluster + * by split-brain handler. + */ + public static final HazelcastProperty TCP_PREVIOUSLY_JOINED_MEMBER_ADDRESS_RETENTION_DURATION + = new HazelcastProperty("hazelcast.tcp.join.previously.joined.member.address.retention.seconds", + 14400, SECONDS); + public static final HazelcastProperty MAP_REPLICA_SCHEDULED_TASK_DELAY_SECONDS = new HazelcastProperty("hazelcast.map.replica.scheduled.task.delay.seconds", 10, SECONDS);