From 8b696b20cbd36b1f6f2d1d861744ce9c710164d3 Mon Sep 17 00:00:00 2001 From: sancar Date: Tue, 12 Apr 2022 11:12:15 +0300 Subject: [PATCH] Don't allow empty memberlist on cluster id changes [API-1215] (#20818) (#21178) * Dont allow empty memberlist on cluster id changes When cluster id changed we were resetting member list with an empty memberlist. It turned out that there are implementations that assumes it will never be empty. Following test failure is an example of it. https://github.com/hazelcast/hazelcast/issues/20264 Instead of clearing the memberlist on cluster id change, we are keeping the cluster id as part of the latest snapshot to be able to fire correct events. Note that clearing memberlist on cluster id change was introduced as a fix to https://github.com/hazelcast/hazelcast/pull/18245 So it is important not to break related ClientHotRestartTest fixes https://github.com/hazelcast/hazelcast/issues/20264 * Refactor ClusterService to make it unittestable Move getClient API out to reduce dependency to HazelcastInstanceImpl * Refactor: ClusterService to be able to unittest ClusterService does not depend on connection manager and client anymore. * Add unittest for ClientClusterServiceImpl Also revised the fix to cover blue/green case as well. ClientClusterService will not serve empty memberlist even when blue/green is happening. (cherry picked from commit 4d6fc11cac67c49ae354f92a3aeb4acb7a3b57a9) Add method to ClientClusterService for enterprise tests (#21169) (cherry picked from commit 1c769a2fc10383a48237ff5462511ed27ecd18c5) --- .../ClusterDiscoveryServiceBuilder.java | 16 +- .../HazelcastClientInstanceImpl.java | 24 +- .../impl/connection/AddressProvider.java | 7 + .../impl/connection/ClientConnection.java | 5 + .../connection/tcp/TcpClientConnection.java | 15 +- .../tcp/TcpClientConnectionManager.java | 51 +- .../client/impl/proxy/ClientClusterProxy.java | 5 +- .../client/impl/spi/ClientClusterService.java | 29 +- .../spi/impl/ClientClusterServiceImpl.java | 182 +++---- .../impl/spi/impl/DefaultAddressProvider.java | 22 +- .../TranslateToPublicAddressProvider.java | 49 +- .../impl/discovery/RemoteAddressProvider.java | 6 + .../ClientClusterViewListenerService.java | 15 +- .../ClientRegressionWithRealNetworkTest.java | 6 + .../ClientClusterServiceMemberListTest.java | 117 ---- ...pClientConnectionManagerTranslateTest.java | 5 + .../impl/ClientClusterServiceImplTest.java | 503 ++++++++++++++++++ .../spi/impl/DefaultAddressProviderTest.java | 10 +- .../TranslateToPublicAddressProviderTest.java | 119 ++--- .../discovery/RemoteAddressProviderTest.java | 2 +- .../statistics/ClientStatisticsTest.java | 6 +- .../client/test/TestHazelcastFactory.java | 6 + 22 files changed, 808 insertions(+), 392 deletions(-) delete mode 100644 hazelcast/src/test/java/com/hazelcast/client/cluster/ClientClusterServiceMemberListTest.java create mode 100644 hazelcast/src/test/java/com/hazelcast/client/impl/spi/impl/ClientClusterServiceImplTest.java diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/clientside/ClusterDiscoveryServiceBuilder.java b/hazelcast/src/main/java/com/hazelcast/client/impl/clientside/ClusterDiscoveryServiceBuilder.java index e8430eb193c0..4bffb9a3d229 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/clientside/ClusterDiscoveryServiceBuilder.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/clientside/ClusterDiscoveryServiceBuilder.java @@ -24,7 +24,9 @@ import com.hazelcast.client.config.impl.ClientAliasedDiscoveryConfigUtils; import com.hazelcast.client.impl.ClientExtension; import com.hazelcast.client.impl.connection.AddressProvider; +import com.hazelcast.client.impl.spi.ClientClusterService; import com.hazelcast.client.impl.spi.impl.DefaultAddressProvider; +import com.hazelcast.client.impl.spi.impl.TranslateToPublicAddressProvider; import com.hazelcast.client.impl.spi.impl.discovery.HazelcastCloudDiscovery; import com.hazelcast.client.impl.spi.impl.discovery.RemoteAddressProvider; import com.hazelcast.client.properties.ClientProperty; @@ -71,10 +73,12 @@ class ClusterDiscoveryServiceBuilder { private final Collection configs; private final LifecycleService lifecycleService; private final AddressProvider externalAddressProvider; + private final ClientClusterService clusterService; ClusterDiscoveryServiceBuilder(int configsTryCount, List configs, LoggingService loggingService, AddressProvider externalAddressProvider, HazelcastProperties properties, - ClientExtension clientExtension, LifecycleService lifecycleService) { + ClientExtension clientExtension, LifecycleService lifecycleService, + ClientClusterService clusterService) { this.configsTryCount = configsTryCount; this.configs = configs; this.loggingService = loggingService; @@ -82,6 +86,7 @@ class ClusterDiscoveryServiceBuilder { this.properties = properties; this.clientExtension = clientExtension; this.lifecycleService = lifecycleService; + this.clusterService = clusterService; } public ClusterDiscoveryService build() { @@ -104,7 +109,8 @@ public ClusterDiscoveryService build() { final SSLConfig sslConfig = networkConfig.getSSLConfig(); final SocketOptions socketOptions = networkConfig.getSocketOptions(); - contexts.add(new CandidateClusterContext(config.getClusterName(), provider, discoveryService, credentialsFactory, + contexts.add(new CandidateClusterContext(config.getClusterName(), provider, + discoveryService, credentialsFactory, interceptor, clientExtension.createChannelInitializer(sslConfig, socketOptions))); } return new ClusterDiscoveryService(unmodifiableList(contexts), configsTryCount, lifecycleService); @@ -145,7 +151,11 @@ private AddressProvider createAddressProvider(ClientConfig clientConfig, Discove } else if (networkConfig.getAddresses().isEmpty() && discoveryService != null) { return new RemoteAddressProvider(() -> discoverAddresses(discoveryService), usePublicAddress(clientConfig)); } - return new DefaultAddressProvider(networkConfig); + TranslateToPublicAddressProvider toPublicAddressProvider = new TranslateToPublicAddressProvider(networkConfig, + properties, + loggingService.getLogger(TranslateToPublicAddressProvider.class)); + clusterService.addMembershipListener(toPublicAddressProvider); + return new DefaultAddressProvider(networkConfig, toPublicAddressProvider); } private Map discoverAddresses(DiscoveryService discoveryService) { diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/clientside/HazelcastClientInstanceImpl.java b/hazelcast/src/main/java/com/hazelcast/client/impl/clientside/HazelcastClientInstanceImpl.java index 68015ce44a06..a2e9c08c44db 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/clientside/HazelcastClientInstanceImpl.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/clientside/HazelcastClientInstanceImpl.java @@ -28,16 +28,17 @@ import com.hazelcast.client.cp.internal.CPSubsystemImpl; import com.hazelcast.client.cp.internal.session.ClientProxySessionManager; import com.hazelcast.client.impl.ClientExtension; +import com.hazelcast.client.impl.ClientImpl; import com.hazelcast.client.impl.client.DistributedObjectInfo; import com.hazelcast.client.impl.connection.AddressProvider; import com.hazelcast.client.impl.connection.ClientConnectionManager; import com.hazelcast.client.impl.connection.tcp.ClientICMPManager; import com.hazelcast.client.impl.connection.tcp.HeartbeatManager; +import com.hazelcast.client.impl.connection.tcp.TcpClientConnection; import com.hazelcast.client.impl.connection.tcp.TcpClientConnectionManager; import com.hazelcast.client.impl.protocol.ClientExceptionFactory; import com.hazelcast.client.impl.protocol.ClientMessage; import com.hazelcast.client.impl.protocol.codec.ClientGetDistributedObjectsCodec; -import com.hazelcast.client.impl.proxy.ClientClusterProxy; import com.hazelcast.client.impl.proxy.PartitionServiceProxy; import com.hazelcast.client.impl.spi.ClientClusterService; import com.hazelcast.client.impl.spi.ClientContext; @@ -138,6 +139,7 @@ import com.hazelcast.transaction.impl.xa.XAService; import javax.annotation.Nonnull; +import java.net.InetSocketAddress; import java.util.Collection; import java.util.Collections; import java.util.EventListener; @@ -167,6 +169,7 @@ import static com.hazelcast.internal.util.ExceptionUtil.rethrow; import static com.hazelcast.internal.util.Preconditions.checkNotNull; import static java.lang.System.currentTimeMillis; +import static java.util.Collections.unmodifiableSet; public class HazelcastClientInstanceImpl implements HazelcastInstance, SerializationServiceSupport { @@ -253,11 +256,11 @@ public HazelcastClientInstanceImpl(String instanceName, ClientConfig clientConfi loadBalancer = initLoadBalancer(config); transactionManager = new ClientTransactionManagerServiceImpl(this); partitionService = new ClientPartitionServiceImpl(this); + clusterService = new ClientClusterServiceImpl(loggingService.getLogger(ClientClusterService.class)); clusterDiscoveryService = initClusterDiscoveryService(externalAddressProvider); connectionManager = (TcpClientConnectionManager) clientConnectionManagerFactory.createConnectionManager(this); invocationService = new ClientInvocationServiceImpl(this); listenerService = new ClientListenerServiceImpl(this); - clusterService = new ClientClusterServiceImpl(this); clientClusterViewListenerService = new ClientClusterViewListenerService(this); userContext.putAll(config.getUserContext()); diagnostics = initDiagnostics(); @@ -295,7 +298,7 @@ private ClusterDiscoveryService initClusterDiscoveryService(AddressProvider exte configs = clientFailoverConfig.getClientConfigs(); } ClusterDiscoveryServiceBuilder builder = new ClusterDiscoveryServiceBuilder(tryCount, configs, loggingService, - externalAddressProvider, properties, clientExtension, getLifecycleService()); + externalAddressProvider, properties, clientExtension, getLifecycleService(), clusterService); return builder.build(); } @@ -551,13 +554,16 @@ public ClientICacheManager getCacheManager() { @Nonnull @Override public Cluster getCluster() { - return new ClientClusterProxy(clusterService); + return clusterService.getCluster(); } @Nonnull @Override public Client getLocalEndpoint() { - return clusterService.getLocalClient(); + TcpClientConnection connection = (TcpClientConnection) connectionManager.getRandomConnection(); + InetSocketAddress inetSocketAddress = connection != null ? connection.getLocalSocketAddress() : null; + UUID clientUuid = connectionManager.getClientUuid(); + return new ClientImpl(clientUuid, inetSocketAddress, instanceName, unmodifiableSet(config.getLabels())); } @Nonnull @@ -863,8 +869,8 @@ public void onClusterChange() { logger.info("Resetting local state of the client, because of a cluster change."); dispose(onClusterChangeDisposables); - //clear the member lists - clusterService.reset(); + //reset the member list version + clusterService.onClusterChange(); //clear partition service partitionService.reset(); //close all the connections, consequently waiting invocations get TargetDisconnectedException @@ -873,12 +879,12 @@ public void onClusterChange() { connectionManager.reset(); } - public void onClusterRestart() { + public void onClusterConnect() { ILogger logger = loggingService.getLogger(HazelcastInstance.class); logger.info("Clearing local state of the client, because of a cluster restart."); dispose(onClusterChangeDisposables); - clusterService.clearMemberList(); + clusterService.onClusterConnect(); } public void waitForInitialMembershipEvents() { diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/connection/AddressProvider.java b/hazelcast/src/main/java/com/hazelcast/client/impl/connection/AddressProvider.java index a6d750dc849e..b1732c48379e 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/connection/AddressProvider.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/connection/AddressProvider.java @@ -18,6 +18,7 @@ import com.hazelcast.cluster.Address; +import com.hazelcast.cluster.Member; /** * Provides initial addresses for client to find and connect to a node & @@ -40,4 +41,10 @@ public interface AddressProvider { * @throws Exception when a remote service can not provide addressee */ Address translate(Address address) throws Exception; + + /* + * Implementations of this will handle returning the public address of the member if necessary. + * See {@link com.hazelcast.client.impl.spi.impl.DefaultAddressProvider#addressOf(Member)} + */ + Address translate(Member member) throws Exception; } diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/connection/ClientConnection.java b/hazelcast/src/main/java/com/hazelcast/client/impl/connection/ClientConnection.java index 1f38c5398c13..f4533bf67cca 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/connection/ClientConnection.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/connection/ClientConnection.java @@ -21,6 +21,7 @@ import com.hazelcast.internal.nio.Connection; import java.util.Map; +import java.util.UUID; /** * The ClientConnection is connection that lives on the client side on behalf of a Java client. @@ -44,6 +45,10 @@ public interface ClientConnection extends Connection { void addEventHandler(long correlationId, EventHandler handler); + void setClusterUuid(UUID uuid); + + UUID getClusterUuid(); + // used in tests Map getEventHandlers(); diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/connection/tcp/TcpClientConnection.java b/hazelcast/src/main/java/com/hazelcast/client/impl/connection/tcp/TcpClientConnection.java index a7dac06ad071..c4179a01c36b 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/connection/tcp/TcpClientConnection.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/connection/tcp/TcpClientConnection.java @@ -77,6 +77,7 @@ public class TcpClientConnection implements ClientConnection { private volatile String closeReason; private String connectedServerVersion; private volatile UUID remoteUuid; + private volatile UUID clusterUuid; public TcpClientConnection(HazelcastClientInstanceImpl client, int connectionId, Channel channel) { this.client = client; @@ -286,10 +287,6 @@ public void setConnectedServerVersion(String connectedServerVersion) { this.connectedServerVersion = connectedServerVersion; } - public String getConnectedServerVersion() { - return connectedServerVersion; - } - @Override public EventHandler getEventHandler(long correlationId) { return eventHandlerMap.get(correlationId); @@ -305,6 +302,16 @@ public void addEventHandler(long correlationId, EventHandler handler) { eventHandlerMap.put(correlationId, handler); } + @Override + public void setClusterUuid(UUID uuid) { + clusterUuid = uuid; + } + + @Override + public UUID getClusterUuid() { + return clusterUuid; + } + // used in tests @Override public Map getEventHandlers() { diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/connection/tcp/TcpClientConnectionManager.java b/hazelcast/src/main/java/com/hazelcast/client/impl/connection/tcp/TcpClientConnectionManager.java index 590f2f9a49b9..f1686d800f2b 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/connection/tcp/TcpClientConnectionManager.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/connection/tcp/TcpClientConnectionManager.java @@ -44,12 +44,13 @@ import com.hazelcast.client.impl.spi.impl.ClientPartitionServiceImpl; import com.hazelcast.cluster.Address; import com.hazelcast.cluster.Member; +import com.hazelcast.cluster.MembershipEvent; +import com.hazelcast.cluster.MembershipListener; import com.hazelcast.config.InvalidConfigurationException; import com.hazelcast.core.HazelcastException; import com.hazelcast.core.LifecycleEvent.LifecycleState; +import com.hazelcast.function.BiFunctionEx; import com.hazelcast.instance.BuildInfoProvider; -import com.hazelcast.instance.EndpointQualifier; -import com.hazelcast.instance.ProtocolType; import com.hazelcast.internal.networking.Channel; import com.hazelcast.internal.networking.ChannelErrorHandler; import com.hazelcast.internal.networking.nio.NioNetworking; @@ -119,13 +120,11 @@ /** * Implementation of {@link ClientConnectionManager}. */ -public class TcpClientConnectionManager implements ClientConnectionManager { +public class TcpClientConnectionManager implements ClientConnectionManager, MembershipListener { private static final int DEFAULT_SMART_CLIENT_THREAD_COUNT = 3; private static final int EXECUTOR_CORE_POOL_SIZE = 10; private static final int SMALL_MACHINE_PROCESSOR_COUNT = 8; - private static final EndpointQualifier CLIENT_PUBLIC_ENDPOINT_QUALIFIER = - EndpointQualifier.resolve(ProtocolType.CLIENT, "public"); private static final int SQL_CONNECTION_RANDOM_ATTEMPTS = 10; protected final AtomicInteger connectionIdGen = new AtomicInteger(); @@ -727,29 +726,25 @@ protected TcpClientConnection createSocketConnection(Address target) { } private Address translate(Member member) { - if (client.getClientClusterService().translateToPublicAddress()) { - Address publicAddress = member.getAddressMap().get(CLIENT_PUBLIC_ENDPOINT_QUALIFIER); - if (publicAddress != null) { - return publicAddress; - } - return member.getAddress(); - } - return translate(member.getAddress()); + return translate(member, AddressProvider::translate); + } + + private Address translate(Address address) { + return translate(address, AddressProvider::translate); } - private Address translate(Address target) { + private Address translate(T target, BiFunctionEx translateFunction) { CandidateClusterContext currentContext = clusterDiscoveryService.current(); AddressProvider addressProvider = currentContext.getAddressProvider(); try { - Address translatedAddress = addressProvider.translate(target); + Address translatedAddress = translateFunction.apply(addressProvider, target); if (translatedAddress == null) { throw new HazelcastException("Address Provider " + addressProvider.getClass() - + " could not translate address " + target); + + " could not translate " + target); } - return translatedAddress; } catch (Exception e) { - logger.warning("Failed to translate address " + target + " via address provider " + e.getMessage()); + logger.warning("Failed to translate " + target + " via address provider " + e.getMessage()); throw rethrow(e); } } @@ -896,9 +891,9 @@ private TcpClientConnection onAuthenticated(TcpClientConnection connection, boolean switchingToNextCluster) { synchronized (clientStateMutex) { checkAuthenticationResponse(connection, response); - connection.setConnectedServerVersion(response.serverHazelcastVersion); connection.setRemoteAddress(response.address); connection.setRemoteUuid(response.memberUuid); + connection.setClusterUuid(response.clusterId); TcpClientConnection existingConnection = activeConnections.get(response.memberUuid); if (existingConnection != null) { @@ -919,7 +914,7 @@ private TcpClientConnection onAuthenticated(TcpClientConnection connection, if (clusterIdChanged) { checkClientStateOnClusterIdChange(connection, switchingToNextCluster); logger.warning("Switching from current cluster: " + this.clusterId + " to new cluster: " + newClusterId); - client.onClusterRestart(); + client.onClusterConnect(); } checkClientState(connection, switchingToNextCluster); @@ -1193,4 +1188,20 @@ public void run() { } } } + + @Override + public void memberAdded(MembershipEvent membershipEvent) { + + } + + @Override + public void memberRemoved(MembershipEvent membershipEvent) { + Member member = membershipEvent.getMember(); + Connection connection = getConnection(member.getUuid()); + if (connection != null) { + connection.close(null, + new TargetDisconnectedException("The client has closed the connection to this member," + + " after receiving a member left event from the cluster. " + connection)); + } + } } diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/proxy/ClientClusterProxy.java b/hazelcast/src/main/java/com/hazelcast/client/impl/proxy/ClientClusterProxy.java index c1965927b56a..1860cb8b7b60 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/proxy/ClientClusterProxy.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/proxy/ClientClusterProxy.java @@ -17,11 +17,12 @@ package com.hazelcast.client.impl.proxy; import com.hazelcast.client.impl.spi.impl.ClientClusterServiceImpl; -import com.hazelcast.cluster.ClusterState; import com.hazelcast.cluster.Cluster; +import com.hazelcast.cluster.ClusterState; import com.hazelcast.cluster.Member; import com.hazelcast.cluster.MembershipListener; import com.hazelcast.hotrestart.HotRestartService; +import com.hazelcast.internal.util.Clock; import com.hazelcast.persistence.PersistenceService; import com.hazelcast.transaction.TransactionOptions; import com.hazelcast.version.Version; @@ -70,7 +71,7 @@ public Member getLocalMember() { @Override public long getClusterTime() { - return clusterService.getClusterTime(); + return Clock.currentTimeMillis(); } @Nonnull diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/spi/ClientClusterService.java b/hazelcast/src/main/java/com/hazelcast/client/impl/spi/ClientClusterService.java index 1552de014f26..d4e4759c6fec 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/spi/ClientClusterService.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/spi/ClientClusterService.java @@ -16,7 +16,6 @@ package com.hazelcast.client.impl.spi; -import com.hazelcast.client.Client; import com.hazelcast.cluster.Address; import com.hazelcast.cluster.Member; import com.hazelcast.cluster.MemberSelector; @@ -29,15 +28,10 @@ /** * Cluster service for Hazelcast clients. *

- * Allows to retrieve Hazelcast members of the cluster, e.g. by their {@link Address} or UUID. + * Allows retrieving Hazelcast members of the cluster, e.g. by their {@link Address} or UUID. */ public interface ClientClusterService { - /** - * @return The client interface representing the local client. - */ - Client getLocalClient(); - /** * Gets the member with the given UUID. * @@ -68,27 +62,6 @@ public interface ClientClusterService { */ Member getMasterMember(); - /** - * Gets the current number of members. - * - * @return The current number of members. - */ - int getSize(); - - /** - * Returns the cluster-time. - * - * @return The cluster-time. - */ - long getClusterTime(); - - /** - * Returns {@code true} if member internal address should be translated into its public address. - * - * @return true if member address should be translated into its public address. - */ - boolean translateToPublicAddress(); - /** * @param listener The listener to be registered. * @return The registration ID diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/ClientClusterServiceImpl.java b/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/ClientClusterServiceImpl.java index c696f1d38761..2cbf72c00dd5 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/ClientClusterServiceImpl.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/ClientClusterServiceImpl.java @@ -16,13 +16,8 @@ package com.hazelcast.client.impl.spi.impl; -import com.hazelcast.client.Client; -import com.hazelcast.client.impl.ClientImpl; -import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl; -import com.hazelcast.client.impl.connection.ClientConnectionManager; -import com.hazelcast.client.impl.connection.tcp.TcpClientConnection; +import com.hazelcast.client.impl.proxy.ClientClusterProxy; import com.hazelcast.client.impl.spi.ClientClusterService; -import com.hazelcast.client.impl.spi.ClientPartitionService; import com.hazelcast.cluster.Address; import com.hazelcast.cluster.Cluster; import com.hazelcast.cluster.InitialMembershipEvent; @@ -35,15 +30,11 @@ import com.hazelcast.instance.EndpointQualifier; import com.hazelcast.internal.cluster.MemberInfo; import com.hazelcast.internal.cluster.impl.MemberSelectingCollection; -import com.hazelcast.internal.nio.Connection; -import com.hazelcast.internal.util.Clock; import com.hazelcast.internal.util.ExceptionUtil; import com.hazelcast.internal.util.UuidUtil; import com.hazelcast.logging.ILogger; -import com.hazelcast.spi.exception.TargetDisconnectedException; import javax.annotation.Nonnull; -import java.net.InetSocketAddress; import java.util.Collection; import java.util.EventListener; import java.util.HashSet; @@ -63,48 +54,47 @@ import static com.hazelcast.instance.EndpointQualifier.CLIENT; import static com.hazelcast.instance.EndpointQualifier.MEMBER; import static com.hazelcast.internal.util.Preconditions.checkNotNull; -import static java.util.Collections.EMPTY_SET; import static java.util.Collections.emptyList; import static java.util.Collections.unmodifiableSet; /** - * The {@link ClientClusterService} and {@link ClientPartitionService} implementation. + * Responsible for + * - keeping track of the cluster members and serving them to other services + * - firing membership events based on the incoming MemberListSnapshot events. */ -public class ClientClusterServiceImpl - implements ClientClusterService { +public class ClientClusterServiceImpl implements ClientClusterService { + /** + * Initial list version is used at the start and also after cluster has changed with blue-green deployment feature. + * In both cases, we need to fire InitialMembershipEvent. + */ + public static final int INITIAL_MEMBER_LIST_VERSION = -1; private static final int INITIAL_MEMBERS_TIMEOUT_SECONDS = 120; - - private static final MemberListSnapshot EMPTY_SNAPSHOT = new MemberListSnapshot(-1, new LinkedHashMap<>()); - private final HazelcastClientInstanceImpl client; - - private final AtomicReference memberListSnapshot = new AtomicReference<>(EMPTY_SNAPSHOT); + private final AtomicReference memberListSnapshot = + new AtomicReference<>(new MemberListSnapshot(INITIAL_MEMBER_LIST_VERSION, new LinkedHashMap<>(), null)); private final ConcurrentMap listeners = new ConcurrentHashMap<>(); - private final Set labels; private final ILogger logger; - private final ClientConnectionManager connectionManager; private final Object clusterViewLock = new Object(); - private final TranslateToPublicAddressProvider translateToPublicAddress; //read and written under clusterViewLock private CountDownLatch initialListFetchedLatch = new CountDownLatch(1); - private static final class MemberListSnapshot { private final int version; private final LinkedHashMap members; + private final UUID clusterUuid; - private MemberListSnapshot(int version, LinkedHashMap members) { + private MemberListSnapshot(int version, LinkedHashMap members, UUID clusterUuid) { this.version = version; this.members = members; + this.clusterUuid = clusterUuid; } } - public ClientClusterServiceImpl(HazelcastClientInstanceImpl client) { - this.client = client; - labels = unmodifiableSet(client.getClientConfig().getLabels()); - logger = client.getLoggingService().getLogger(ClientClusterService.class); - connectionManager = client.getConnectionManager(); - translateToPublicAddress = new TranslateToPublicAddressProvider(client.getClientConfig().getNetworkConfig(), - client.getProperties(), logger); + public ClientClusterServiceImpl(ILogger logger) { + this.logger = logger; + } + + public Cluster getCluster() { + return new ClientClusterProxy(this); } @Override @@ -134,30 +124,6 @@ public Member getMasterMember() { return null; } - @Override - public int getSize() { - return getMemberList().size(); - } - - @Override - public long getClusterTime() { - return Clock.currentTimeMillis(); - } - - @Override - public boolean translateToPublicAddress() { - return translateToPublicAddress.get(); - } - - @Override - public Client getLocalClient() { - final ClientConnectionManager cm = client.getConnectionManager(); - final TcpClientConnection connection = (TcpClientConnection) cm.getRandomConnection(); - InetSocketAddress inetSocketAddress = connection != null ? connection.getLocalSocketAddress() : null; - UUID clientUuid = cm.getClientUuid(); - return new ClientImpl(clientUuid, inetSocketAddress, client.getName(), labels); - } - @Nonnull @Override public UUID addMembershipListener(@Nonnull MembershipListener listener) { @@ -166,7 +132,7 @@ public UUID addMembershipListener(@Nonnull MembershipListener listener) { synchronized (clusterViewLock) { UUID id = addMembershipListenerWithoutInit(listener); if (listener instanceof InitialMembershipListener) { - Cluster cluster = client.getCluster(); + Cluster cluster = getCluster(); Collection members = memberListSnapshot.get().members.values(); //if members are empty,it means initial event did not arrive yet //it will be redirected to listeners when it arrives see #handleInitialMembershipEvent @@ -208,61 +174,47 @@ public void waitInitialMemberListFetched() { } } - public void clearMemberListVersion() { + public void onClusterConnect() { synchronized (clusterViewLock) { if (logger.isFineEnabled()) { logger.fine("Resetting the member list version "); } MemberListSnapshot clusterViewSnapshot = memberListSnapshot.get(); - // This check is necessary so that when handling auth response, it will not - // intervene with client failover logic - if (clusterViewSnapshot != EMPTY_SNAPSHOT) { - memberListSnapshot.set(new MemberListSnapshot(0, clusterViewSnapshot.members)); - } - } - } - - /** - * Clears the member list and fires member removed event for members in the list. - */ - public void clearMemberList() { - List events = null; - synchronized (clusterViewLock) { - if (logger.isFineEnabled()) { - logger.fine("Resetting the member list "); + // This check is necessary so in order not to override changing cluster information when: + // - registering cluster view listener back to the new cluster. + // - on authentication response when cluster uuid change is detected. + if (clusterViewSnapshot.version != INITIAL_MEMBER_LIST_VERSION) { + memberListSnapshot.set(new MemberListSnapshot(0, + clusterViewSnapshot.members, + clusterViewSnapshot.clusterUuid)); } - MemberListSnapshot clusterViewSnapshot = this.memberListSnapshot.get(); - // This check is necessary so that when handling auth response, it will not - // intervene with client failover logic - if (clusterViewSnapshot != EMPTY_SNAPSHOT) { - Collection prevMembers = clusterViewSnapshot.members.values(); - this.memberListSnapshot.set(new MemberListSnapshot(0, new LinkedHashMap<>())); - events = detectMembershipEvents(prevMembers, EMPTY_SET); - } - - } - if (events != null) { - fireEvents(events); } } - public void reset() { + public void onClusterChange() { synchronized (clusterViewLock) { if (logger.isFineEnabled()) { logger.fine("Resetting the cluster snapshot"); } initialListFetchedLatch = new CountDownLatch(1); - memberListSnapshot.set(EMPTY_SNAPSHOT); + MemberListSnapshot clusterViewSnapshot = memberListSnapshot.get(); + memberListSnapshot.set(new MemberListSnapshot(INITIAL_MEMBER_LIST_VERSION, + clusterViewSnapshot.members, + clusterViewSnapshot.clusterUuid)); } } - private void applyInitialState(int version, Collection memberInfos) { - MemberListSnapshot snapshot = createSnapshot(version, memberInfos); - translateToPublicAddress.refresh(client.getClusterDiscoveryService().current().getAddressProvider(), memberInfos); + //public for tests on enterprise + public int getMemberListVersion() { + return memberListSnapshot.get().version; + } + + private void applyInitialState(int version, Collection memberInfos, UUID clusterUuid) { + MemberListSnapshot snapshot = createSnapshot(version, memberInfos, clusterUuid); memberListSnapshot.set(snapshot); logger.info(membersString(snapshot)); Set members = toUnmodifiableHasSet(snapshot.members.values()); - InitialMembershipEvent event = new InitialMembershipEvent(client.getCluster(), members); + InitialMembershipEvent event = new InitialMembershipEvent(getCluster(), members); for (MembershipListener listener : listeners.values()) { if (listener instanceof InitialMembershipListener) { ((InitialMembershipListener) listener).init(event); @@ -270,7 +222,7 @@ private void applyInitialState(int version, Collection memberInfos) } } - private MemberListSnapshot createSnapshot(int memberListVersion, Collection memberInfos) { + private MemberListSnapshot createSnapshot(int memberListVersion, Collection memberInfos, UUID clusterUuid) { LinkedHashMap newMembers = new LinkedHashMap<>(); for (MemberInfo memberInfo : memberInfos) { MemberImpl.Builder memberBuilder; @@ -288,35 +240,38 @@ private MemberListSnapshot createSnapshot(int memberListVersion, Collection toUnmodifiableHasSet(Collection members) { return unmodifiableSet(new HashSet<>(members)); } - private List detectMembershipEvents(Collection prevMembers, Set currentMembers) { + private List detectMembershipEvents(Collection prevMembers, + Set currentMembers, + UUID clusterUuid) { List newMembers = new LinkedList<>(); Set deadMembers = new HashSet<>(prevMembers); - for (Member member : currentMembers) { - if (!deadMembers.remove(member)) { - newMembers.add(member); + + if (clusterUuid.equals(memberListSnapshot.get().clusterUuid)) { + for (Member member : currentMembers) { + if (!deadMembers.remove(member)) { + newMembers.add(member); + } } + } else { + // if cluster uuid is not same, then we will not try to match the current members to previous members + // As a result all previous members are dead and all current members are new members + newMembers.addAll(currentMembers); } List events = new LinkedList<>(); for (Member member : deadMembers) { - events.add(new MembershipEvent(client.getCluster(), member, MembershipEvent.MEMBER_REMOVED, currentMembers)); - Connection connection = connectionManager.getConnection(member.getUuid()); - if (connection != null) { - connection.close(null, - new TargetDisconnectedException("The client has closed the connection to this member," - + " after receiving a member left event from the cluster. " + connection)); - } + events.add(new MembershipEvent(getCluster(), member, MembershipEvent.MEMBER_REMOVED, currentMembers)); } for (Member member : newMembers) { - events.add(new MembershipEvent(client.getCluster(), member, MembershipEvent.MEMBER_ADDED, currentMembers)); + events.add(new MembershipEvent(getCluster(), member, MembershipEvent.MEMBER_ADDED, currentMembers)); } if (events.size() != 0) { @@ -340,19 +295,19 @@ private String membersString(MemberListSnapshot snapshot) { return sb.toString(); } - public void handleMembersViewEvent(int memberListVersion, Collection memberInfos) { + public void handleMembersViewEvent(int memberListVersion, Collection memberInfos, UUID clusterUuid) { if (logger.isFinestEnabled()) { - MemberListSnapshot snapshot = createSnapshot(memberListVersion, memberInfos); + MemberListSnapshot snapshot = createSnapshot(memberListVersion, memberInfos, clusterUuid); logger.finest("Handling new snapshot with membership version: " + memberListVersion + ", membersString " + membersString(snapshot)); } MemberListSnapshot clusterViewSnapshot = memberListSnapshot.get(); - if (clusterViewSnapshot == EMPTY_SNAPSHOT) { + if (clusterViewSnapshot.version == INITIAL_MEMBER_LIST_VERSION) { synchronized (clusterViewLock) { clusterViewSnapshot = memberListSnapshot.get(); - if (clusterViewSnapshot == EMPTY_SNAPSHOT) { - //this means this is the first time client connected to cluster - applyInitialState(memberListVersion, memberInfos); + if (clusterViewSnapshot.version == INITIAL_MEMBER_LIST_VERSION) { + //this means this is the first time client connected to cluster/cluster has changed(blue/green) + applyInitialState(memberListVersion, memberInfos, clusterUuid); initialListFetchedLatch.countDown(); return; } @@ -365,10 +320,11 @@ public void handleMembersViewEvent(int memberListVersion, Collection clusterViewSnapshot = memberListSnapshot.get(); if (memberListVersion > clusterViewSnapshot.version) { Collection prevMembers = clusterViewSnapshot.members.values(); - MemberListSnapshot snapshot = createSnapshot(memberListVersion, memberInfos); + UUID previousClusterUuid = clusterViewSnapshot.clusterUuid; + MemberListSnapshot snapshot = createSnapshot(memberListVersion, memberInfos, clusterUuid); memberListSnapshot.set(snapshot); Set currentMembers = toUnmodifiableHasSet(snapshot.members.values()); - events = detectMembershipEvents(prevMembers, currentMembers); + events = detectMembershipEvents(prevMembers, currentMembers, previousClusterUuid); } } } diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/DefaultAddressProvider.java b/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/DefaultAddressProvider.java index 779ec4a98cca..1584f4e06941 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/DefaultAddressProvider.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/DefaultAddressProvider.java @@ -21,8 +21,12 @@ import com.hazelcast.client.impl.connection.Addresses; import com.hazelcast.client.util.AddressHelper; import com.hazelcast.cluster.Address; +import com.hazelcast.cluster.Member; +import com.hazelcast.instance.EndpointQualifier; +import com.hazelcast.instance.ProtocolType; import java.util.List; +import java.util.function.BooleanSupplier; /** * Default address provider of Hazelcast. @@ -31,10 +35,15 @@ */ public class DefaultAddressProvider implements AddressProvider { + private static final EndpointQualifier CLIENT_PUBLIC_ENDPOINT_QUALIFIER = + EndpointQualifier.resolve(ProtocolType.CLIENT, "public"); private final ClientNetworkConfig networkConfig; + private final BooleanSupplier translateToPublicAddressSupplier; - public DefaultAddressProvider(ClientNetworkConfig networkConfig) { + public DefaultAddressProvider(ClientNetworkConfig networkConfig, + BooleanSupplier translateToPublicAddressSupplier) { this.networkConfig = networkConfig; + this.translateToPublicAddressSupplier = translateToPublicAddressSupplier; } @Override @@ -57,4 +66,15 @@ public Addresses loadAddresses() { public Address translate(Address address) { return address; } + + @Override + public Address translate(Member member) { + if (translateToPublicAddressSupplier.getAsBoolean()) { + Address publicAddress = member.getAddressMap().get(CLIENT_PUBLIC_ENDPOINT_QUALIFIER); + if (publicAddress != null) { + return publicAddress; + } + } + return member.getAddress(); + } } diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/TranslateToPublicAddressProvider.java b/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/TranslateToPublicAddressProvider.java index cda147c41477..e89f07b6b9f0 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/TranslateToPublicAddressProvider.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/TranslateToPublicAddressProvider.java @@ -17,13 +17,15 @@ package com.hazelcast.client.impl.spi.impl; import com.hazelcast.client.config.ClientNetworkConfig; -import com.hazelcast.client.impl.connection.AddressProvider; import com.hazelcast.client.properties.ClientProperty; import com.hazelcast.cluster.Address; +import com.hazelcast.cluster.InitialMembershipEvent; +import com.hazelcast.cluster.InitialMembershipListener; +import com.hazelcast.cluster.Member; +import com.hazelcast.cluster.MembershipEvent; import com.hazelcast.config.SSLConfig; import com.hazelcast.instance.EndpointQualifier; import com.hazelcast.instance.ProtocolType; -import com.hazelcast.internal.cluster.MemberInfo; import com.hazelcast.internal.util.AddressUtil; import com.hazelcast.logging.ILogger; import com.hazelcast.spi.properties.HazelcastProperties; @@ -38,9 +40,10 @@ import java.util.Iterator; import java.util.List; import java.util.Objects; +import java.util.function.BooleanSupplier; import java.util.stream.Collectors; -class TranslateToPublicAddressProvider { +public class TranslateToPublicAddressProvider implements InitialMembershipListener, BooleanSupplier { private static final int REACHABLE_ADDRESS_TIMEOUT_MILLIS = 1000; private static final int NON_REACHABLE_ADDRESS_TIMEOUT_MILLIS = 3000; private static final int REACHABLE_CHECK_NUMBER = 3; @@ -52,21 +55,13 @@ class TranslateToPublicAddressProvider { private volatile boolean translateToPublicAddress; - TranslateToPublicAddressProvider(ClientNetworkConfig config, HazelcastProperties properties, ILogger logger) { + public TranslateToPublicAddressProvider(ClientNetworkConfig config, HazelcastProperties properties, ILogger logger) { this.config = config; this.properties = properties; this.logger = logger; } - void refresh(AddressProvider addressProvider, Collection members) { - translateToPublicAddress = resolve(addressProvider, members); - } - - private boolean resolve(AddressProvider addressProvider, Collection members) { - if (!(addressProvider instanceof DefaultAddressProvider)) { - return false; - } - + private boolean resolve(Collection members) { // Default value of DISCOVERY_SPI_PUBLIC_IP_ENABLED is `null` intentionally. // If DISCOVERY_SPI_PUBLIC_IP_ENABLED is not set to true/false, we don't know the intention of the user, // we will try to decide if we should use private/public address automatically in that case. @@ -104,7 +99,7 @@ private boolean resolve(AddressProvider addressProvider, Collection * If any member has its internal/private address the same as configured in ClientConfig, then it means that the client is * able to connect to members via configured address. No need to use make any address translation. */ - boolean memberInternalAddressAsDefinedInClientConfig(Collection members) { + boolean memberInternalAddressAsDefinedInClientConfig(Collection members) { List addresses = config.getAddresses(); List resolvedHosts = addresses.stream().map(s -> { try { @@ -128,15 +123,15 @@ boolean memberInternalAddressAsDefinedInClientConfig(Collection memb *

* We check only limited number of random members to reduce the slowdown of the startup. */ - private boolean membersReachableOnlyViaPublicAddress(Collection members) { - List shuffledList = new ArrayList<>(members); + private boolean membersReachableOnlyViaPublicAddress(Collection members) { + List shuffledList = new ArrayList<>(members); Collections.shuffle(shuffledList); - Iterator iter = shuffledList.iterator(); + Iterator iter = shuffledList.iterator(); for (int i = 0; i < REACHABLE_CHECK_NUMBER; i++) { if (!iter.hasNext()) { iter = shuffledList.iterator(); } - MemberInfo member = iter.next(); + Member member = iter.next(); Address publicAddress = member.getAddressMap().get(CLIENT_PUBLIC_ENDPOINT_QUALIFIER); Address internalAddress = member.getAddress(); if (publicAddress == null) { @@ -178,7 +173,23 @@ private boolean isReachable(Address address, int timeoutMs) { return true; } - boolean get() { + @Override + public void init(InitialMembershipEvent event) { + translateToPublicAddress = resolve(event.getMembers()); + } + + @Override + public void memberAdded(MembershipEvent membershipEvent) { + + } + + @Override + public void memberRemoved(MembershipEvent membershipEvent) { + + } + + @Override + public boolean getAsBoolean() { return translateToPublicAddress; } } diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/discovery/RemoteAddressProvider.java b/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/discovery/RemoteAddressProvider.java index 4234d066219c..d904f8df9211 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/discovery/RemoteAddressProvider.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/discovery/RemoteAddressProvider.java @@ -19,6 +19,7 @@ import com.hazelcast.client.impl.connection.AddressProvider; import com.hazelcast.client.impl.connection.Addresses; import com.hazelcast.cluster.Address; +import com.hazelcast.cluster.Member; import java.util.HashMap; import java.util.Map; @@ -61,4 +62,9 @@ public Address translate(Address address) throws Exception { return privateToPublic.get(address); } + + @Override + public Address translate(Member member) throws Exception { + return translate(member.getAddress()); + } } diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/listener/ClientClusterViewListenerService.java b/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/listener/ClientClusterViewListenerService.java index 6af2fc6cfc37..1f7ee721d203 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/listener/ClientClusterViewListenerService.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/listener/ClientClusterViewListenerService.java @@ -17,6 +17,7 @@ package com.hazelcast.client.impl.spi.impl.listener; import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl; +import com.hazelcast.client.impl.connection.ClientConnection; import com.hazelcast.client.impl.connection.ClientConnectionManager; import com.hazelcast.client.impl.protocol.ClientMessage; import com.hazelcast.client.impl.protocol.codec.ClientAddClusterViewListenerCodec; @@ -64,15 +65,15 @@ public void start() { private final class ClusterViewListenerHandler extends ClientAddClusterViewListenerCodec.AbstractEventHandler implements EventHandler { - private final Connection connection; + private final ClientConnection connection; - private ClusterViewListenerHandler(Connection connection) { + private ClusterViewListenerHandler(ClientConnection connection) { this.connection = connection; } @Override public void beforeListenerRegister(Connection connection) { - clusterService.clearMemberListVersion(); + clusterService.onClusterConnect(); if (logger.isFinestEnabled()) { logger.finest("Register attempt of ClusterViewListenerHandler to " + connection); } @@ -87,7 +88,7 @@ public void onListenerRegister(Connection connection) { @Override public void handleMembersViewEvent(int memberListVersion, Collection memberInfos) { - clusterService.handleMembersViewEvent(memberListVersion, memberInfos); + clusterService.handleMembersViewEvent(memberListVersion, memberInfos, connection.getClusterUuid()); } @Override @@ -98,7 +99,7 @@ public void handlePartitionsViewEvent(int version, Collection { - final ClientClusterService clusterService = getClientClusterService(client); - final Collection members = clusterService.getMembers(LITE_MEMBER_SELECTOR); - verifyMembers(members, singletonList(liteInstance)); - - assertEquals(1, clusterService.getMembers(LITE_MEMBER_SELECTOR).size()); - }); - } - - @Test - public void testDataMembers() { - assertTrueEventually(() -> { - final ClientClusterService clusterService = getClientClusterService(client); - final Collection members = clusterService.getMembers(DATA_MEMBER_SELECTOR); - verifyMembers(members, asList(dataInstance, dataInstance2)); - - assertEquals(2, clusterService.getMembers(DATA_MEMBER_SELECTOR).size()); - }); - } - - @Test - public void testMemberListOrderConsistentWithServer() { - Set membersFromClient = client.getCluster().getMembers(); - Set membersFromServer = dataInstance.getCluster().getMembers(); - assertArrayEquals(membersFromClient.toArray(), membersFromServer.toArray()); - } - - private void verifyMembers(Collection membersToCheck, Collection membersToExpect) { - for (HazelcastInstance instance : membersToExpect) { - assertContains(membersToCheck, getLocalMember(instance)); - } - - assertEquals(membersToExpect.size(), membersToCheck.size()); - } - - private Member getLocalMember(HazelcastInstance instance) { - return getNode(instance).getLocalMember(); - } - - private ClientClusterService getClientClusterService(HazelcastInstance client) { - return ClientTestUtil.getHazelcastClientInstanceImpl(client).getClientClusterService(); - } -} diff --git a/hazelcast/src/test/java/com/hazelcast/client/impl/connection/tcp/TcpClientConnectionManagerTranslateTest.java b/hazelcast/src/test/java/com/hazelcast/client/impl/connection/tcp/TcpClientConnectionManagerTranslateTest.java index d50d90713fed..12a080dfb086 100644 --- a/hazelcast/src/test/java/com/hazelcast/client/impl/connection/tcp/TcpClientConnectionManagerTranslateTest.java +++ b/hazelcast/src/test/java/com/hazelcast/client/impl/connection/tcp/TcpClientConnectionManagerTranslateTest.java @@ -202,6 +202,11 @@ public Address translate(Address address) { return null; } + @Override + public Address translate(Member member) { + return member.getAddress(); + } + @Override public Addresses loadAddresses() { try { diff --git a/hazelcast/src/test/java/com/hazelcast/client/impl/spi/impl/ClientClusterServiceImplTest.java b/hazelcast/src/test/java/com/hazelcast/client/impl/spi/impl/ClientClusterServiceImplTest.java new file mode 100644 index 000000000000..d16c174b9540 --- /dev/null +++ b/hazelcast/src/test/java/com/hazelcast/client/impl/spi/impl/ClientClusterServiceImplTest.java @@ -0,0 +1,503 @@ +/* + * Copyright (c) 2008-2022, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hazelcast.client.impl.spi.impl; + +import com.hazelcast.cluster.Address; +import com.hazelcast.cluster.InitialMembershipEvent; +import com.hazelcast.cluster.InitialMembershipListener; +import com.hazelcast.cluster.Member; +import com.hazelcast.cluster.MembershipEvent; +import com.hazelcast.cluster.MembershipListener; +import com.hazelcast.instance.BuildInfoProvider; +import com.hazelcast.internal.cluster.MemberInfo; +import com.hazelcast.logging.ILogger; +import com.hazelcast.test.HazelcastParallelClassRunner; +import com.hazelcast.test.HazelcastTestSupport; +import com.hazelcast.test.annotation.ParallelJVMTest; +import com.hazelcast.test.annotation.QuickTest; +import com.hazelcast.version.MemberVersion; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import javax.annotation.Nonnull; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Arrays.asList; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singleton; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +@RunWith(HazelcastParallelClassRunner.class) +@Category({QuickTest.class, ParallelJVMTest.class}) +public class ClientClusterServiceImplTest extends HazelcastTestSupport { + + private static final MemberVersion VERSION = MemberVersion.of(BuildInfoProvider.getBuildInfo().getVersion()); + + @Test + public void testMemberAdded() { + LinkedList members = new LinkedList<>(); + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + clusterService.addMembershipListener(new MembershipListener() { + @Override + public void memberAdded(MembershipEvent membershipEvent) { + members.add(membershipEvent.getMember()); + } + + @Override + public void memberRemoved(MembershipEvent membershipEvent) { + + } + }); + MemberInfo member = member("127.0.0.1"); + UUID clusterUuid = UUID.randomUUID(); + // triggers initial event + clusterService.handleMembersViewEvent(1, asList(member), clusterUuid); + // triggers member added + MemberInfo memberInfo = member("127.0.0.2"); + clusterService.handleMembersViewEvent(2, asList(member, memberInfo), clusterUuid); + assertCollection(members, Collections.singleton(memberInfo.toMember())); + assertEquals(2, clusterService.getMemberList().size()); + } + + @Test + public void testMemberRemoved() { + LinkedList members = new LinkedList<>(); + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + UUID clusterUuid = UUID.randomUUID(); + MemberInfo memberInfo = member("127.0.0.1"); + clusterService.handleMembersViewEvent(1, asList(memberInfo), clusterUuid); + clusterService.addMembershipListener(new MembershipListener() { + @Override + public void memberAdded(MembershipEvent membershipEvent) { + } + + @Override + public void memberRemoved(MembershipEvent membershipEvent) { + members.add(membershipEvent.getMember()); + } + }); + clusterService.handleMembersViewEvent(2, Collections.emptyList(), clusterUuid); + assertCollection(members, Collections.singleton(memberInfo.toMember())); + assertEquals(0, clusterService.getMemberList().size()); + } + + @Test + public void testInitialMembershipListener_AfterInitialListArrives() { + AtomicInteger initialEventCount = new AtomicInteger(); + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + UUID clusterUuid = UUID.randomUUID(); + clusterService.handleMembersViewEvent(1, asList(member("127.0.0.1")), clusterUuid); + clusterService.addMembershipListener(new InitialMembershipListener() { + @Override + public void init(InitialMembershipEvent event) { + initialEventCount.incrementAndGet(); + } + + @Override + public void memberAdded(MembershipEvent membershipEvent) { + } + + @Override + public void memberRemoved(MembershipEvent membershipEvent) { + } + }); + assertEquals(1, initialEventCount.get()); + } + + @Test + public void testInitialMembershipListener_BeforeInitialListArrives() { + AtomicInteger initialEventCount = new AtomicInteger(); + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + clusterService.addMembershipListener(new InitialMembershipListener() { + @Override + public void init(InitialMembershipEvent event) { + initialEventCount.incrementAndGet(); + } + + @Override + public void memberAdded(MembershipEvent membershipEvent) { + } + + @Override + public void memberRemoved(MembershipEvent membershipEvent) { + } + }); + clusterService.handleMembersViewEvent(1, asList(member("127.0.0.1")), UUID.randomUUID()); + assertEquals(1, initialEventCount.get()); + } + + @Test + public void testFireOnlyIncrementalEvents_AfterClusterRestart() { + AtomicInteger initialEventCount = new AtomicInteger(); + LinkedList addedMembers = new LinkedList<>(); + LinkedList removedMembers = new LinkedList<>(); + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + MemberInfo removedMemberInfo = member("127.0.0.1"); + clusterService.handleMembersViewEvent(1, asList(removedMemberInfo), UUID.randomUUID()); + + clusterService.addMembershipListener(new InitialMembershipListener() { + @Override + public void init(InitialMembershipEvent event) { + initialEventCount.incrementAndGet(); + } + + @Override + public void memberAdded(MembershipEvent membershipEvent) { + addedMembers.add(membershipEvent.getMember()); + } + + @Override + public void memberRemoved(MembershipEvent membershipEvent) { + removedMembers.add(membershipEvent.getMember()); + } + }); + + //called on cluster restart + clusterService.onClusterConnect(); + + MemberInfo addedMemberInfo = member("127.0.0.2"); + clusterService.handleMembersViewEvent(1, asList(addedMemberInfo), UUID.randomUUID()); + assertEquals(1, clusterService.getMemberList().size()); + assertCollection(addedMembers, Collections.singleton(addedMemberInfo.toMember())); + assertCollection(removedMembers, Collections.singleton(removedMemberInfo.toMember())); + assertEquals(1, initialEventCount.get()); + } + + @Test + public void testFireOnlyInitialEvent_AfterClusterChange() { + AtomicInteger initialEventCount = new AtomicInteger(); + AtomicInteger addedCount = new AtomicInteger(); + AtomicInteger removedCount = new AtomicInteger(); + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + clusterService.handleMembersViewEvent(1, asList(member("127.0.0.1")), UUID.randomUUID()); + + clusterService.addMembershipListener(new InitialMembershipListener() { + @Override + public void init(InitialMembershipEvent event) { + initialEventCount.incrementAndGet(); + } + + @Override + public void memberAdded(MembershipEvent membershipEvent) { + addedCount.incrementAndGet(); + } + + @Override + public void memberRemoved(MembershipEvent membershipEvent) { + removedCount.incrementAndGet(); + } + }); + + //called on cluster change + clusterService.onClusterChange(); + + clusterService.handleMembersViewEvent(1, asList(member("127.0.0.1")), UUID.randomUUID()); + assertEquals(1, clusterService.getMemberList().size()); + assertEquals(0, addedCount.get()); + assertEquals(0, removedCount.get()); + assertEquals(2, initialEventCount.get()); + } + + @Test + public void testDontFire_WhenReconnectToSameCluster() { + AtomicInteger initialEventCount = new AtomicInteger(); + AtomicInteger addedCount = new AtomicInteger(); + AtomicInteger removedCount = new AtomicInteger(); + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + List memberList = asList(member("127.0.0.1")); + UUID clusterUuid = UUID.randomUUID(); + clusterService.handleMembersViewEvent(1, memberList, clusterUuid); + + clusterService.addMembershipListener(new InitialMembershipListener() { + @Override + public void init(InitialMembershipEvent event) { + initialEventCount.incrementAndGet(); + } + + @Override + public void memberAdded(MembershipEvent membershipEvent) { + addedCount.incrementAndGet(); + } + + @Override + public void memberRemoved(MembershipEvent membershipEvent) { + removedCount.incrementAndGet(); + } + }); + + //called on reconnect to same cluster when registering the listener back + clusterService.onClusterConnect(); + + clusterService.handleMembersViewEvent(1, memberList, clusterUuid); + assertEquals(1, clusterService.getMemberList().size()); + assertEquals(0, addedCount.get()); + assertEquals(0, removedCount.get()); + assertEquals(1, initialEventCount.get()); + } + + + @Test + /* + Related to HotRestart where members keep their uuid's same but addresses changes. + */ + public void testFireEvents_WhenAddressOfTheMembersChanges() { + AtomicInteger initialEventCount = new AtomicInteger(); + LinkedList addedMembers = new LinkedList<>(); + LinkedList removedMembers = new LinkedList<>(); + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + UUID member1uuid = UUID.randomUUID(); + UUID member2uuid = UUID.randomUUID(); + UUID clusterUuid = UUID.randomUUID(); + MemberInfo removedMember1 = member("127.0.0.1", member1uuid); + MemberInfo removedMember2 = member("127.0.0.2", member2uuid); + clusterService.handleMembersViewEvent(1, + asList(removedMember1, removedMember2), + clusterUuid); + + clusterService.addMembershipListener(new InitialMembershipListener() { + @Override + public void init(InitialMembershipEvent event) { + initialEventCount.incrementAndGet(); + } + + @Override + public void memberAdded(MembershipEvent membershipEvent) { + addedMembers.add(membershipEvent.getMember()); + } + + @Override + public void memberRemoved(MembershipEvent membershipEvent) { + removedMembers.add(membershipEvent.getMember()); + } + }); + + //called on reconnect to same cluster when registering the listener back + clusterService.onClusterConnect(); + + MemberInfo addedMember1 = member("127.0.0.1", member2uuid); + MemberInfo addedMember2 = member("127.0.0.2", member1uuid); + clusterService.handleMembersViewEvent(1, + asList(addedMember1, addedMember2), + clusterUuid); + assertEquals(2, clusterService.getMemberList().size()); + assertCollection(addedMembers, Arrays.asList(addedMember1.toMember(), addedMember2.toMember())); + assertCollection(removedMembers, Arrays.asList(removedMember1.toMember(), removedMember2.toMember())); + assertEquals(1, initialEventCount.get()); + } + + @Test + /* + Related to HotRestart where members keep their uuid's and addresses same. + */ + public void testFireEvents_WhenAddressAndUuidsDoesNotChange() { + AtomicInteger initialEventCount = new AtomicInteger(); + LinkedList addedMembers = new LinkedList<>(); + LinkedList removedMembers = new LinkedList<>(); + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + UUID clusterUuid = UUID.randomUUID(); + MemberInfo member1 = member("127.0.0.1"); + MemberInfo member2 = member("127.0.0.2"); + List memberList = asList(member1, member2); + clusterService.handleMembersViewEvent(1, memberList, clusterUuid); + + clusterService.addMembershipListener(new InitialMembershipListener() { + @Override + public void init(InitialMembershipEvent event) { + initialEventCount.incrementAndGet(); + } + + @Override + public void memberAdded(MembershipEvent membershipEvent) { + addedMembers.add(membershipEvent.getMember()); + } + + @Override + public void memberRemoved(MembershipEvent membershipEvent) { + removedMembers.add(membershipEvent.getMember()); + } + }); + + //called on reconnect to same cluster when registering the listener back + clusterService.onClusterConnect(); + + clusterService.handleMembersViewEvent(1, memberList, UUID.randomUUID()); + assertEquals(2, clusterService.getMemberList().size()); + assertCollection(addedMembers, Arrays.asList(member1.toMember(), member2.toMember())); + assertCollection(removedMembers, Arrays.asList(member1.toMember(), member2.toMember())); + assertEquals(1, initialEventCount.get()); + + } + + @Test + public void testDontServeEmptyMemberList_DuringClusterRestart() { + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + clusterService.handleMembersViewEvent(1, asList(member("127.0.0.1")), UUID.randomUUID()); + assertEquals(1, clusterService.getMemberList().size()); + //called on cluster restart + clusterService.onClusterConnect(); + assertEquals(1, clusterService.getMemberList().size()); + clusterService.handleMembersViewEvent(1, asList(member("127.0.0.2")), UUID.randomUUID()); + assertEquals(1, clusterService.getMemberList().size()); + } + + @Test + public void testDontServeEmptyMemberList_DuringClusterChange() { + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + clusterService.handleMembersViewEvent(1, asList(member("127.0.0.1")), UUID.randomUUID()); + assertEquals(1, clusterService.getMemberList().size()); + //called on cluster change + clusterService.onClusterChange(); + assertEquals(1, clusterService.getMemberList().size()); + assertEquals(ClientClusterServiceImpl.INITIAL_MEMBER_LIST_VERSION, clusterService.getMemberListVersion()); + clusterService.handleMembersViewEvent(1, asList(member("127.0.0.2")), UUID.randomUUID()); + assertEquals(1, clusterService.getMemberList().size()); + } + + @Nonnull + private static MemberInfo member(String host) { + try { + return new MemberInfo(new Address(host, 5701), UUID.randomUUID(), emptyMap(), false, VERSION); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + } + + @Nonnull + private static MemberInfo liteMember(String host) { + try { + return new MemberInfo(new Address(host, 5701), UUID.randomUUID(), emptyMap(), true, VERSION); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + } + + @Nonnull + private static MemberInfo member(String host, UUID uuid) { + try { + return new MemberInfo(new Address(host, 5701), uuid, emptyMap(), false, VERSION); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + } + + @Test + public void testListenersFromConfigWorking() { + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + LinkedList addedMembers = new LinkedList<>(); + clusterService.start(singleton(new MembershipListener() { + @Override + public void memberAdded(MembershipEvent membershipEvent) { + addedMembers.add(membershipEvent.getMember()); + } + + @Override + public void memberRemoved(MembershipEvent membershipEvent) { + + } + })); + MemberInfo member = member("127.0.0.1"); + UUID clusterUuid = UUID.randomUUID(); + // triggers initial event + clusterService.handleMembersViewEvent(1, asList(member), clusterUuid); + // triggers member added + MemberInfo addedMemberInfo = member("127.0.0.2"); + clusterService.handleMembersViewEvent(2, asList(member, addedMemberInfo), clusterUuid); + assertCollection(addedMembers, Collections.singleton(addedMemberInfo.toMember())); + } + + @Test + public void testRemoveListener() { + AtomicInteger addedCount = new AtomicInteger(); + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + UUID listenerUuid = clusterService.addMembershipListener(new MembershipListener() { + @Override + public void memberAdded(MembershipEvent membershipEvent) { + addedCount.incrementAndGet(); + } + + @Override + public void memberRemoved(MembershipEvent membershipEvent) { + + } + }); + assertTrue(clusterService.removeMembershipListener(listenerUuid)); + MemberInfo member = member("127.0.0.1"); + UUID clusterUuid = UUID.randomUUID(); + // triggers initial event + clusterService.handleMembersViewEvent(1, asList(member), clusterUuid); + // triggers member added + clusterService.handleMembersViewEvent(2, asList(member, member("127.0.0.2")), clusterUuid); + // we have removed the listener. No event should be fired to our listener + assertEquals(0, addedCount.get()); + } + + @Test + public void testRemoveNonExistingListener() { + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + assertFalse(clusterService.removeMembershipListener(UUID.randomUUID())); + } + + @Test + public void testGetMasterMember() { + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + MemberInfo masterMember = member("127.0.0.1"); + clusterService.handleMembersViewEvent(1, asList(masterMember, member("127.0.0.2"), + member("127.0.0.3")), UUID.randomUUID()); + assertEquals(masterMember.toMember(), clusterService.getMasterMember()); + } + + @Test + public void testGetMember() { + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + MemberInfo masterMember = member("127.0.0.1"); + UUID member2Uuid = UUID.randomUUID(); + MemberInfo member2 = member("127.0.0.2", member2Uuid); + clusterService.handleMembersViewEvent(1, asList(masterMember, member2), UUID.randomUUID()); + assertEquals(member2.toMember(), clusterService.getMember(member2Uuid)); + } + + @Test + public void testGetMembers() { + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + MemberInfo masterMember = member("127.0.0.1"); + MemberInfo liteMember = liteMember("127.0.0.2"); + MemberInfo dataMember = member("127.0.0.3"); + clusterService.handleMembersViewEvent(1, asList(masterMember, liteMember, + dataMember), UUID.randomUUID()); + assertCollection(Arrays.asList(masterMember.toMember(), liteMember.toMember(), dataMember.toMember()), clusterService.getMemberList()); + assertCollection(Arrays.asList(liteMember.toMember()), clusterService.getMembers(Member::isLiteMember)); + assertCollection(Arrays.asList(masterMember.toMember(), dataMember.toMember()), clusterService.getMembers(member -> !member.isLiteMember())); + } + + @Test + public void testWaitInitialMembership() { + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + MemberInfo masterMember = member("127.0.0.1"); + clusterService.handleMembersViewEvent(1, asList(masterMember, liteMember("127.0.0.2"), + member("127.0.0.3")), UUID.randomUUID()); + clusterService.waitInitialMemberListFetched(); + } +} diff --git a/hazelcast/src/test/java/com/hazelcast/client/impl/spi/impl/DefaultAddressProviderTest.java b/hazelcast/src/test/java/com/hazelcast/client/impl/spi/impl/DefaultAddressProviderTest.java index 4a47dedef7ec..640e8549a88b 100644 --- a/hazelcast/src/test/java/com/hazelcast/client/impl/spi/impl/DefaultAddressProviderTest.java +++ b/hazelcast/src/test/java/com/hazelcast/client/impl/spi/impl/DefaultAddressProviderTest.java @@ -40,7 +40,7 @@ public class DefaultAddressProviderTest { @Test public void whenNoAddresses() throws UnknownHostException { ClientNetworkConfig config = new ClientNetworkConfig(); - DefaultAddressProvider provider = new DefaultAddressProvider(config); + DefaultAddressProvider provider = new DefaultAddressProvider(config, () -> false); Addresses addresses = provider.loadAddresses(); assertPrimary(addresses, new Address("127.0.0.1", 5701)); @@ -51,7 +51,7 @@ public void whenNoAddresses() throws UnknownHostException { public void whenExplicitNoPortAddress() throws UnknownHostException { ClientNetworkConfig config = new ClientNetworkConfig(); config.addAddress("10.0.0.1"); - DefaultAddressProvider provider = new DefaultAddressProvider(config); + DefaultAddressProvider provider = new DefaultAddressProvider(config, () -> false); Addresses addresses = provider.loadAddresses(); assertPrimary(addresses, new Address("10.0.0.1", 5701)); @@ -63,7 +63,7 @@ public void whenExplicitPorts() throws UnknownHostException { ClientNetworkConfig config = new ClientNetworkConfig(); config.addAddress("10.0.0.1:5703"); config.addAddress("10.0.0.1:5702"); - DefaultAddressProvider provider = new DefaultAddressProvider(config); + DefaultAddressProvider provider = new DefaultAddressProvider(config, () -> false); Addresses addresses = provider.loadAddresses(); assertPrimary(addresses, new Address("10.0.0.1", 5703), new Address("10.0.0.1", 5702)); @@ -76,7 +76,7 @@ public void whenMix() throws UnknownHostException { config.addAddress("10.0.0.1:5701"); config.addAddress("10.0.0.1:5702"); config.addAddress("10.0.0.2"); - DefaultAddressProvider provider = new DefaultAddressProvider(config); + DefaultAddressProvider provider = new DefaultAddressProvider(config, () -> false); Addresses addresses = provider.loadAddresses(); assertPrimary(addresses, new Address("10.0.0.1", 5701), @@ -89,7 +89,7 @@ public void whenMix() throws UnknownHostException { public void whenBogusAddress() { ClientNetworkConfig config = new ClientNetworkConfig(); config.addAddress(UUID.randomUUID().toString()); - DefaultAddressProvider provider = new DefaultAddressProvider(config); + DefaultAddressProvider provider = new DefaultAddressProvider(config, () -> false); Addresses addresses = provider.loadAddresses(); assertPrimaryEmpty(addresses); diff --git a/hazelcast/src/test/java/com/hazelcast/client/impl/spi/impl/TranslateToPublicAddressProviderTest.java b/hazelcast/src/test/java/com/hazelcast/client/impl/spi/impl/TranslateToPublicAddressProviderTest.java index 2aaf3168b006..5cd03b4f9e0c 100644 --- a/hazelcast/src/test/java/com/hazelcast/client/impl/spi/impl/TranslateToPublicAddressProviderTest.java +++ b/hazelcast/src/test/java/com/hazelcast/client/impl/spi/impl/TranslateToPublicAddressProviderTest.java @@ -17,15 +17,16 @@ package com.hazelcast.client.impl.spi.impl; import com.hazelcast.client.config.ClientConfig; -import com.hazelcast.client.impl.connection.AddressProvider; -import com.hazelcast.client.impl.connection.Addresses; import com.hazelcast.client.properties.ClientProperty; import com.hazelcast.cluster.Address; +import com.hazelcast.cluster.Cluster; +import com.hazelcast.cluster.InitialMembershipEvent; +import com.hazelcast.cluster.Member; +import com.hazelcast.cluster.impl.MemberImpl; import com.hazelcast.core.Hazelcast; import com.hazelcast.instance.BuildInfoProvider; import com.hazelcast.instance.EndpointQualifier; import com.hazelcast.instance.ProtocolType; -import com.hazelcast.internal.cluster.MemberInfo; import com.hazelcast.logging.ILogger; import com.hazelcast.spi.properties.HazelcastProperties; import com.hazelcast.test.HazelcastSerialClassRunner; @@ -38,12 +39,15 @@ import javax.annotation.Nonnull; import java.net.UnknownHostException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; import java.util.UUID; import static java.util.Arrays.asList; -import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; -import static java.util.Collections.singletonMap; +import static java.util.Collections.emptySet; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -62,19 +66,6 @@ public void teardown() { Hazelcast.shutdownAll(); } - @Test - public void nonDefaultAddressProvider() { - // given - TranslateToPublicAddressProvider translateProvider = createTranslateProvider(); - - // when - translateProvider.refresh(new TestAddressProvider(), null); - boolean result = translateProvider.get(); - - // then - assertFalse(result); - } - @Test public void propertyTrue() { // given @@ -82,8 +73,8 @@ public void propertyTrue() { TranslateToPublicAddressProvider translateProvider = createTranslateProvider(); // when - translateProvider.refresh(defaultAddressProvider(), null); - boolean result = translateProvider.get(); + translateProvider.init(new InitialMembershipEvent(mock(Cluster.class), emptySet())); + boolean result = translateProvider.getAsBoolean(); // then assertTrue(result); @@ -96,8 +87,8 @@ public void propertyFalse() { TranslateToPublicAddressProvider translateProvider = createTranslateProvider(); // when - translateProvider.refresh(defaultAddressProvider(), null); - boolean result = translateProvider.get(); + translateProvider.init(new InitialMembershipEvent(mock(Cluster.class), emptySet())); + boolean result = translateProvider.getAsBoolean(); // then assertFalse(result); @@ -110,8 +101,9 @@ public void memberInternalAddressAsDefinedInClientConfig() { TranslateToPublicAddressProvider translateProvider = createTranslateProvider(); // when - translateProvider.refresh(defaultAddressProvider(), asList(member("192.168.0.1"), member("127.0.0.1"))); - boolean result = translateProvider.get(); + translateProvider.init(new InitialMembershipEvent(mock(Cluster.class), + new HashSet<>(asList(member("192.168.0.1"), member("127.0.0.1"))))); + boolean result = translateProvider.getAsBoolean(); // then assertFalse(result); @@ -124,8 +116,9 @@ public void memberInternalAddressAsDefinedInClientConfig_memberUsesHostName() { TranslateToPublicAddressProvider translateProvider = createTranslateProvider(); // when - translateProvider.refresh(defaultAddressProvider(), asList(member("localhost"))); - boolean result = translateProvider.get(); + translateProvider.init(new InitialMembershipEvent(mock(Cluster.class), + new HashSet<>(Collections.singletonList(member("localhost"))))); + boolean result = translateProvider.getAsBoolean(); // then assertFalse(result); @@ -138,8 +131,9 @@ public void memberInternalAddressAsDefinedInClientConfig_clientUsesHostname() { TranslateToPublicAddressProvider translateProvider = createTranslateProvider(); // when - translateProvider.refresh(defaultAddressProvider(), asList(member("127.0.0.1"))); - boolean result = translateProvider.get(); + translateProvider.init(new InitialMembershipEvent(mock(Cluster.class), + new HashSet<>(Collections.singletonList(member("127.0.0.1"))))); + boolean result = translateProvider.getAsBoolean(); // then assertFalse(result); @@ -152,8 +146,9 @@ public void memberInternalAddressAsDefinedInClientConfig_clientUsesHostnameAndIp TranslateToPublicAddressProvider translateProvider = createTranslateProvider(); // when - translateProvider.refresh(defaultAddressProvider(), asList(member("127.0.0.1"))); - boolean result = translateProvider.get(); + translateProvider.init(new InitialMembershipEvent(mock(Cluster.class), + new HashSet<>(Collections.singletonList(member("127.0.0.1"))))); + boolean result = translateProvider.getAsBoolean(); // then assertFalse(result); @@ -165,8 +160,9 @@ public void membersWithoutPublicAddresses() { TranslateToPublicAddressProvider translateProvider = createTranslateProvider(); // when - translateProvider.refresh(defaultAddressProvider(), asList(member("127.0.0.1"))); - boolean result = translateProvider.get(); + translateProvider.init(new InitialMembershipEvent(mock(Cluster.class), + new HashSet<>(Collections.singletonList(member("127.0.0.1"))))); + boolean result = translateProvider.getAsBoolean(); // then assertFalse(result); @@ -179,9 +175,10 @@ public void membersReachableViaInternalAddress() { TranslateToPublicAddressProvider translateProvider = createTranslateProvider(); // when - translateProvider.refresh(defaultAddressProvider(), - asList(member(REACHABLE_HOST, UNREACHABLE_HOST), member(REACHABLE_HOST, UNREACHABLE_HOST))); - boolean result = translateProvider.get(); + translateProvider.init(new InitialMembershipEvent(mock(Cluster.class), + new HashSet<>(asList(member(REACHABLE_HOST, UNREACHABLE_HOST), + member(REACHABLE_HOST, UNREACHABLE_HOST))))); + boolean result = translateProvider.getAsBoolean(); // then assertFalse(result); @@ -193,8 +190,9 @@ public void membersUnreachable() { TranslateToPublicAddressProvider translateProvider = createTranslateProvider(); // when - translateProvider.refresh(defaultAddressProvider(), asList(member(UNREACHABLE_HOST, UNREACHABLE_HOST))); - boolean result = translateProvider.get(); + translateProvider.init(new InitialMembershipEvent(mock(Cluster.class), + new HashSet<>(Collections.singletonList(member(UNREACHABLE_HOST, UNREACHABLE_HOST))))); + boolean result = translateProvider.getAsBoolean(); // then assertFalse(result); @@ -207,8 +205,9 @@ public void membersReachableOnlyViaPublicAddress() { TranslateToPublicAddressProvider translateProvider = createTranslateProvider(); // when - translateProvider.refresh(defaultAddressProvider(), asList(member(UNREACHABLE_HOST, REACHABLE_HOST))); - boolean result = translateProvider.get(); + translateProvider.init(new InitialMembershipEvent(mock(Cluster.class), + new HashSet<>(Collections.singletonList(member(UNREACHABLE_HOST, REACHABLE_HOST))))); + boolean result = translateProvider.getAsBoolean(); // then assertTrue(result); @@ -221,40 +220,40 @@ private TranslateToPublicAddressProvider createTranslateProvider() { } @Nonnull - private DefaultAddressProvider defaultAddressProvider() { - return new DefaultAddressProvider(config.getNetworkConfig()); - } - - @Nonnull - private MemberInfo member(String host) { + private Member member(String host) { try { - return new MemberInfo(new Address(host, 5701), UUID.randomUUID(), emptyMap(), false, VERSION); + MemberImpl.Builder memberBuilder; + memberBuilder = new MemberImpl.Builder(new Address(host, 5701)); + return memberBuilder.version(VERSION) + .uuid(UUID.randomUUID()) + .attributes(emptyMap()) + .liteMember(false).build(); } catch (UnknownHostException e) { throw new RuntimeException(e); } } @Nonnull - private MemberInfo member(String host, String publicHost) { + private Member member(String host, String publicHost) { + try { + MemberImpl.Builder memberBuilder; + Address internalAddress = new Address(host, 5701); Address publicAddress = new Address(publicHost, 5701); - return new MemberInfo(internalAddress, UUID.randomUUID(), emptyMap(), false, VERSION, - singletonMap(EndpointQualifier.resolve(ProtocolType.CLIENT, "public"), publicAddress)); + + Map addressMap = new HashMap<>(); + addressMap.put(EndpointQualifier.resolve(ProtocolType.CLIENT, "public"), publicAddress); + addressMap.put(EndpointQualifier.MEMBER, internalAddress); + + memberBuilder = new MemberImpl.Builder(addressMap); + return memberBuilder.version(VERSION) + .uuid(UUID.randomUUID()) + .attributes(emptyMap()) + .liteMember(false).build(); } catch (UnknownHostException e) { throw new RuntimeException(e); } } - private class TestAddressProvider implements AddressProvider { - @Override - public Address translate(Address address) { - return address; - } - - @Override - public Addresses loadAddresses() { - return new Addresses(emptyList()); - } - } } diff --git a/hazelcast/src/test/java/com/hazelcast/client/impl/spi/impl/discovery/RemoteAddressProviderTest.java b/hazelcast/src/test/java/com/hazelcast/client/impl/spi/impl/discovery/RemoteAddressProviderTest.java index 0d19a93db641..61874046a4c9 100644 --- a/hazelcast/src/test/java/com/hazelcast/client/impl/spi/impl/discovery/RemoteAddressProviderTest.java +++ b/hazelcast/src/test/java/com/hazelcast/client/impl/spi/impl/discovery/RemoteAddressProviderTest.java @@ -72,7 +72,7 @@ public void testLoadAddresses_whenExceptionIsThrown() throws Exception { @Test public void testTranslate_whenAddressIsNull_thenReturnNull() throws Exception { RemoteAddressProvider provider = new RemoteAddressProvider(() -> expectedAddresses, true); - Address actual = provider.translate(null); + Address actual = provider.translate((Address) null); assertNull(actual); } diff --git a/hazelcast/src/test/java/com/hazelcast/client/statistics/ClientStatisticsTest.java b/hazelcast/src/test/java/com/hazelcast/client/statistics/ClientStatisticsTest.java index cbdde211d615..1692a922764d 100644 --- a/hazelcast/src/test/java/com/hazelcast/client/statistics/ClientStatisticsTest.java +++ b/hazelcast/src/test/java/com/hazelcast/client/statistics/ClientStatisticsTest.java @@ -202,8 +202,8 @@ public void testStatisticsTwoClients() { assertNotNull(clientStatistics); assertEquals(2, clientStatistics.size()); List expectedUUIDs = new ArrayList<>(2); - expectedUUIDs.add(client1.getClientClusterService().getLocalClient().getUuid()); - expectedUUIDs.add(client2.getClientClusterService().getLocalClient().getUuid()); + expectedUUIDs.add(client1.getLocalEndpoint().getUuid()); + expectedUUIDs.add(client2.getLocalEndpoint().getUuid()); for (Map.Entry clientEntry : clientStatistics.entrySet()) { assertTrue(expectedUUIDs.contains(clientEntry.getKey())); String clientAttributes = clientEntry.getValue().clientAttributes(); @@ -299,7 +299,7 @@ private static Map getStats(HazelcastClientInstanceImpl client, assertEquals("clientStatistics.size() should be 1", 1, clientStatistics.size()); Set> entries = clientStatistics.entrySet(); Map.Entry statEntry = entries.iterator().next(); - assertEquals(client.getClientClusterService().getLocalClient().getUuid(), statEntry.getKey()); + assertEquals(client.getLocalEndpoint().getUuid(), statEntry.getKey()); return parseClientAttributeValue(statEntry.getValue().clientAttributes()); } diff --git a/hazelcast/src/test/java/com/hazelcast/client/test/TestHazelcastFactory.java b/hazelcast/src/test/java/com/hazelcast/client/test/TestHazelcastFactory.java index 700c8bf6aac2..afe03d9289e3 100644 --- a/hazelcast/src/test/java/com/hazelcast/client/test/TestHazelcastFactory.java +++ b/hazelcast/src/test/java/com/hazelcast/client/test/TestHazelcastFactory.java @@ -27,6 +27,7 @@ import com.hazelcast.client.properties.ClientProperty; import com.hazelcast.client.util.AddressHelper; import com.hazelcast.cluster.Address; +import com.hazelcast.cluster.Member; import com.hazelcast.config.DiscoveryStrategyConfig; import com.hazelcast.config.InvalidConfigurationException; import com.hazelcast.core.HazelcastInstance; @@ -150,6 +151,11 @@ public Addresses loadAddresses() { public Address translate(Address address) { return address; } + + @Override + public Address translate(Member member) { + return member.getAddress(); + } }; }