diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/clientside/ClusterDiscoveryServiceBuilder.java b/hazelcast/src/main/java/com/hazelcast/client/impl/clientside/ClusterDiscoveryServiceBuilder.java index e8430eb193c0..4bffb9a3d229 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/clientside/ClusterDiscoveryServiceBuilder.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/clientside/ClusterDiscoveryServiceBuilder.java @@ -24,7 +24,9 @@ import com.hazelcast.client.config.impl.ClientAliasedDiscoveryConfigUtils; import com.hazelcast.client.impl.ClientExtension; import com.hazelcast.client.impl.connection.AddressProvider; +import com.hazelcast.client.impl.spi.ClientClusterService; import com.hazelcast.client.impl.spi.impl.DefaultAddressProvider; +import com.hazelcast.client.impl.spi.impl.TranslateToPublicAddressProvider; import com.hazelcast.client.impl.spi.impl.discovery.HazelcastCloudDiscovery; import com.hazelcast.client.impl.spi.impl.discovery.RemoteAddressProvider; import com.hazelcast.client.properties.ClientProperty; @@ -71,10 +73,12 @@ class ClusterDiscoveryServiceBuilder { private final Collection configs; private final LifecycleService lifecycleService; private final AddressProvider externalAddressProvider; + private final ClientClusterService clusterService; ClusterDiscoveryServiceBuilder(int configsTryCount, List configs, LoggingService loggingService, AddressProvider externalAddressProvider, HazelcastProperties properties, - ClientExtension clientExtension, LifecycleService lifecycleService) { + ClientExtension clientExtension, LifecycleService lifecycleService, + ClientClusterService clusterService) { this.configsTryCount = configsTryCount; this.configs = configs; this.loggingService = loggingService; @@ -82,6 +86,7 @@ class ClusterDiscoveryServiceBuilder { this.properties = properties; this.clientExtension = clientExtension; this.lifecycleService = lifecycleService; + this.clusterService = clusterService; } public ClusterDiscoveryService build() { @@ -104,7 +109,8 @@ public ClusterDiscoveryService build() { final SSLConfig sslConfig = networkConfig.getSSLConfig(); final SocketOptions socketOptions = networkConfig.getSocketOptions(); - contexts.add(new CandidateClusterContext(config.getClusterName(), provider, discoveryService, credentialsFactory, + contexts.add(new CandidateClusterContext(config.getClusterName(), provider, + discoveryService, credentialsFactory, interceptor, clientExtension.createChannelInitializer(sslConfig, socketOptions))); } return new ClusterDiscoveryService(unmodifiableList(contexts), configsTryCount, lifecycleService); @@ -145,7 +151,11 @@ private AddressProvider createAddressProvider(ClientConfig clientConfig, Discove } else if (networkConfig.getAddresses().isEmpty() && discoveryService != null) { return new RemoteAddressProvider(() -> discoverAddresses(discoveryService), usePublicAddress(clientConfig)); } - return new DefaultAddressProvider(networkConfig); + TranslateToPublicAddressProvider toPublicAddressProvider = new TranslateToPublicAddressProvider(networkConfig, + properties, + loggingService.getLogger(TranslateToPublicAddressProvider.class)); + clusterService.addMembershipListener(toPublicAddressProvider); + return new DefaultAddressProvider(networkConfig, toPublicAddressProvider); } private Map discoverAddresses(DiscoveryService discoveryService) { diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/clientside/HazelcastClientInstanceImpl.java b/hazelcast/src/main/java/com/hazelcast/client/impl/clientside/HazelcastClientInstanceImpl.java index 68015ce44a06..a2e9c08c44db 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/clientside/HazelcastClientInstanceImpl.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/clientside/HazelcastClientInstanceImpl.java @@ -28,16 +28,17 @@ import com.hazelcast.client.cp.internal.CPSubsystemImpl; import com.hazelcast.client.cp.internal.session.ClientProxySessionManager; import com.hazelcast.client.impl.ClientExtension; +import com.hazelcast.client.impl.ClientImpl; import com.hazelcast.client.impl.client.DistributedObjectInfo; import com.hazelcast.client.impl.connection.AddressProvider; import com.hazelcast.client.impl.connection.ClientConnectionManager; import com.hazelcast.client.impl.connection.tcp.ClientICMPManager; import com.hazelcast.client.impl.connection.tcp.HeartbeatManager; +import com.hazelcast.client.impl.connection.tcp.TcpClientConnection; import com.hazelcast.client.impl.connection.tcp.TcpClientConnectionManager; import com.hazelcast.client.impl.protocol.ClientExceptionFactory; import com.hazelcast.client.impl.protocol.ClientMessage; import com.hazelcast.client.impl.protocol.codec.ClientGetDistributedObjectsCodec; -import com.hazelcast.client.impl.proxy.ClientClusterProxy; import com.hazelcast.client.impl.proxy.PartitionServiceProxy; import com.hazelcast.client.impl.spi.ClientClusterService; import com.hazelcast.client.impl.spi.ClientContext; @@ -138,6 +139,7 @@ import com.hazelcast.transaction.impl.xa.XAService; import javax.annotation.Nonnull; +import java.net.InetSocketAddress; import java.util.Collection; import java.util.Collections; import java.util.EventListener; @@ -167,6 +169,7 @@ import static com.hazelcast.internal.util.ExceptionUtil.rethrow; import static com.hazelcast.internal.util.Preconditions.checkNotNull; import static java.lang.System.currentTimeMillis; +import static java.util.Collections.unmodifiableSet; public class HazelcastClientInstanceImpl implements HazelcastInstance, SerializationServiceSupport { @@ -253,11 +256,11 @@ public HazelcastClientInstanceImpl(String instanceName, ClientConfig clientConfi loadBalancer = initLoadBalancer(config); transactionManager = new ClientTransactionManagerServiceImpl(this); partitionService = new ClientPartitionServiceImpl(this); + clusterService = new ClientClusterServiceImpl(loggingService.getLogger(ClientClusterService.class)); clusterDiscoveryService = initClusterDiscoveryService(externalAddressProvider); connectionManager = (TcpClientConnectionManager) clientConnectionManagerFactory.createConnectionManager(this); invocationService = new ClientInvocationServiceImpl(this); listenerService = new ClientListenerServiceImpl(this); - clusterService = new ClientClusterServiceImpl(this); clientClusterViewListenerService = new ClientClusterViewListenerService(this); userContext.putAll(config.getUserContext()); diagnostics = initDiagnostics(); @@ -295,7 +298,7 @@ private ClusterDiscoveryService initClusterDiscoveryService(AddressProvider exte configs = clientFailoverConfig.getClientConfigs(); } ClusterDiscoveryServiceBuilder builder = new ClusterDiscoveryServiceBuilder(tryCount, configs, loggingService, - externalAddressProvider, properties, clientExtension, getLifecycleService()); + externalAddressProvider, properties, clientExtension, getLifecycleService(), clusterService); return builder.build(); } @@ -551,13 +554,16 @@ public ClientICacheManager getCacheManager() { @Nonnull @Override public Cluster getCluster() { - return new ClientClusterProxy(clusterService); + return clusterService.getCluster(); } @Nonnull @Override public Client getLocalEndpoint() { - return clusterService.getLocalClient(); + TcpClientConnection connection = (TcpClientConnection) connectionManager.getRandomConnection(); + InetSocketAddress inetSocketAddress = connection != null ? connection.getLocalSocketAddress() : null; + UUID clientUuid = connectionManager.getClientUuid(); + return new ClientImpl(clientUuid, inetSocketAddress, instanceName, unmodifiableSet(config.getLabels())); } @Nonnull @@ -863,8 +869,8 @@ public void onClusterChange() { logger.info("Resetting local state of the client, because of a cluster change."); dispose(onClusterChangeDisposables); - //clear the member lists - clusterService.reset(); + //reset the member list version + clusterService.onClusterChange(); //clear partition service partitionService.reset(); //close all the connections, consequently waiting invocations get TargetDisconnectedException @@ -873,12 +879,12 @@ public void onClusterChange() { connectionManager.reset(); } - public void onClusterRestart() { + public void onClusterConnect() { ILogger logger = loggingService.getLogger(HazelcastInstance.class); logger.info("Clearing local state of the client, because of a cluster restart."); dispose(onClusterChangeDisposables); - clusterService.clearMemberList(); + clusterService.onClusterConnect(); } public void waitForInitialMembershipEvents() { diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/connection/AddressProvider.java b/hazelcast/src/main/java/com/hazelcast/client/impl/connection/AddressProvider.java index a6d750dc849e..b1732c48379e 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/connection/AddressProvider.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/connection/AddressProvider.java @@ -18,6 +18,7 @@ import com.hazelcast.cluster.Address; +import com.hazelcast.cluster.Member; /** * Provides initial addresses for client to find and connect to a node & @@ -40,4 +41,10 @@ public interface AddressProvider { * @throws Exception when a remote service can not provide addressee */ Address translate(Address address) throws Exception; + + /* + * Implementations of this will handle returning the public address of the member if necessary. + * See {@link com.hazelcast.client.impl.spi.impl.DefaultAddressProvider#addressOf(Member)} + */ + Address translate(Member member) throws Exception; } diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/connection/ClientConnection.java b/hazelcast/src/main/java/com/hazelcast/client/impl/connection/ClientConnection.java index 1f38c5398c13..f4533bf67cca 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/connection/ClientConnection.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/connection/ClientConnection.java @@ -21,6 +21,7 @@ import com.hazelcast.internal.nio.Connection; import java.util.Map; +import java.util.UUID; /** * The ClientConnection is connection that lives on the client side on behalf of a Java client. @@ -44,6 +45,10 @@ public interface ClientConnection extends Connection { void addEventHandler(long correlationId, EventHandler handler); + void setClusterUuid(UUID uuid); + + UUID getClusterUuid(); + // used in tests Map getEventHandlers(); diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/connection/tcp/TcpClientConnection.java b/hazelcast/src/main/java/com/hazelcast/client/impl/connection/tcp/TcpClientConnection.java index a7dac06ad071..c4179a01c36b 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/connection/tcp/TcpClientConnection.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/connection/tcp/TcpClientConnection.java @@ -77,6 +77,7 @@ public class TcpClientConnection implements ClientConnection { private volatile String closeReason; private String connectedServerVersion; private volatile UUID remoteUuid; + private volatile UUID clusterUuid; public TcpClientConnection(HazelcastClientInstanceImpl client, int connectionId, Channel channel) { this.client = client; @@ -286,10 +287,6 @@ public void setConnectedServerVersion(String connectedServerVersion) { this.connectedServerVersion = connectedServerVersion; } - public String getConnectedServerVersion() { - return connectedServerVersion; - } - @Override public EventHandler getEventHandler(long correlationId) { return eventHandlerMap.get(correlationId); @@ -305,6 +302,16 @@ public void addEventHandler(long correlationId, EventHandler handler) { eventHandlerMap.put(correlationId, handler); } + @Override + public void setClusterUuid(UUID uuid) { + clusterUuid = uuid; + } + + @Override + public UUID getClusterUuid() { + return clusterUuid; + } + // used in tests @Override public Map getEventHandlers() { diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/connection/tcp/TcpClientConnectionManager.java b/hazelcast/src/main/java/com/hazelcast/client/impl/connection/tcp/TcpClientConnectionManager.java index 590f2f9a49b9..f1686d800f2b 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/connection/tcp/TcpClientConnectionManager.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/connection/tcp/TcpClientConnectionManager.java @@ -44,12 +44,13 @@ import com.hazelcast.client.impl.spi.impl.ClientPartitionServiceImpl; import com.hazelcast.cluster.Address; import com.hazelcast.cluster.Member; +import com.hazelcast.cluster.MembershipEvent; +import com.hazelcast.cluster.MembershipListener; import com.hazelcast.config.InvalidConfigurationException; import com.hazelcast.core.HazelcastException; import com.hazelcast.core.LifecycleEvent.LifecycleState; +import com.hazelcast.function.BiFunctionEx; import com.hazelcast.instance.BuildInfoProvider; -import com.hazelcast.instance.EndpointQualifier; -import com.hazelcast.instance.ProtocolType; import com.hazelcast.internal.networking.Channel; import com.hazelcast.internal.networking.ChannelErrorHandler; import com.hazelcast.internal.networking.nio.NioNetworking; @@ -119,13 +120,11 @@ /** * Implementation of {@link ClientConnectionManager}. */ -public class TcpClientConnectionManager implements ClientConnectionManager { +public class TcpClientConnectionManager implements ClientConnectionManager, MembershipListener { private static final int DEFAULT_SMART_CLIENT_THREAD_COUNT = 3; private static final int EXECUTOR_CORE_POOL_SIZE = 10; private static final int SMALL_MACHINE_PROCESSOR_COUNT = 8; - private static final EndpointQualifier CLIENT_PUBLIC_ENDPOINT_QUALIFIER = - EndpointQualifier.resolve(ProtocolType.CLIENT, "public"); private static final int SQL_CONNECTION_RANDOM_ATTEMPTS = 10; protected final AtomicInteger connectionIdGen = new AtomicInteger(); @@ -727,29 +726,25 @@ protected TcpClientConnection createSocketConnection(Address target) { } private Address translate(Member member) { - if (client.getClientClusterService().translateToPublicAddress()) { - Address publicAddress = member.getAddressMap().get(CLIENT_PUBLIC_ENDPOINT_QUALIFIER); - if (publicAddress != null) { - return publicAddress; - } - return member.getAddress(); - } - return translate(member.getAddress()); + return translate(member, AddressProvider::translate); + } + + private Address translate(Address address) { + return translate(address, AddressProvider::translate); } - private Address translate(Address target) { + private Address translate(T target, BiFunctionEx translateFunction) { CandidateClusterContext currentContext = clusterDiscoveryService.current(); AddressProvider addressProvider = currentContext.getAddressProvider(); try { - Address translatedAddress = addressProvider.translate(target); + Address translatedAddress = translateFunction.apply(addressProvider, target); if (translatedAddress == null) { throw new HazelcastException("Address Provider " + addressProvider.getClass() - + " could not translate address " + target); + + " could not translate " + target); } - return translatedAddress; } catch (Exception e) { - logger.warning("Failed to translate address " + target + " via address provider " + e.getMessage()); + logger.warning("Failed to translate " + target + " via address provider " + e.getMessage()); throw rethrow(e); } } @@ -896,9 +891,9 @@ private TcpClientConnection onAuthenticated(TcpClientConnection connection, boolean switchingToNextCluster) { synchronized (clientStateMutex) { checkAuthenticationResponse(connection, response); - connection.setConnectedServerVersion(response.serverHazelcastVersion); connection.setRemoteAddress(response.address); connection.setRemoteUuid(response.memberUuid); + connection.setClusterUuid(response.clusterId); TcpClientConnection existingConnection = activeConnections.get(response.memberUuid); if (existingConnection != null) { @@ -919,7 +914,7 @@ private TcpClientConnection onAuthenticated(TcpClientConnection connection, if (clusterIdChanged) { checkClientStateOnClusterIdChange(connection, switchingToNextCluster); logger.warning("Switching from current cluster: " + this.clusterId + " to new cluster: " + newClusterId); - client.onClusterRestart(); + client.onClusterConnect(); } checkClientState(connection, switchingToNextCluster); @@ -1193,4 +1188,20 @@ public void run() { } } } + + @Override + public void memberAdded(MembershipEvent membershipEvent) { + + } + + @Override + public void memberRemoved(MembershipEvent membershipEvent) { + Member member = membershipEvent.getMember(); + Connection connection = getConnection(member.getUuid()); + if (connection != null) { + connection.close(null, + new TargetDisconnectedException("The client has closed the connection to this member," + + " after receiving a member left event from the cluster. " + connection)); + } + } } diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/proxy/ClientClusterProxy.java b/hazelcast/src/main/java/com/hazelcast/client/impl/proxy/ClientClusterProxy.java index c1965927b56a..1860cb8b7b60 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/proxy/ClientClusterProxy.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/proxy/ClientClusterProxy.java @@ -17,11 +17,12 @@ package com.hazelcast.client.impl.proxy; import com.hazelcast.client.impl.spi.impl.ClientClusterServiceImpl; -import com.hazelcast.cluster.ClusterState; import com.hazelcast.cluster.Cluster; +import com.hazelcast.cluster.ClusterState; import com.hazelcast.cluster.Member; import com.hazelcast.cluster.MembershipListener; import com.hazelcast.hotrestart.HotRestartService; +import com.hazelcast.internal.util.Clock; import com.hazelcast.persistence.PersistenceService; import com.hazelcast.transaction.TransactionOptions; import com.hazelcast.version.Version; @@ -70,7 +71,7 @@ public Member getLocalMember() { @Override public long getClusterTime() { - return clusterService.getClusterTime(); + return Clock.currentTimeMillis(); } @Nonnull diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/spi/ClientClusterService.java b/hazelcast/src/main/java/com/hazelcast/client/impl/spi/ClientClusterService.java index 1552de014f26..d4e4759c6fec 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/spi/ClientClusterService.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/spi/ClientClusterService.java @@ -16,7 +16,6 @@ package com.hazelcast.client.impl.spi; -import com.hazelcast.client.Client; import com.hazelcast.cluster.Address; import com.hazelcast.cluster.Member; import com.hazelcast.cluster.MemberSelector; @@ -29,15 +28,10 @@ /** * Cluster service for Hazelcast clients. *

- * Allows to retrieve Hazelcast members of the cluster, e.g. by their {@link Address} or UUID. + * Allows retrieving Hazelcast members of the cluster, e.g. by their {@link Address} or UUID. */ public interface ClientClusterService { - /** - * @return The client interface representing the local client. - */ - Client getLocalClient(); - /** * Gets the member with the given UUID. * @@ -68,27 +62,6 @@ public interface ClientClusterService { */ Member getMasterMember(); - /** - * Gets the current number of members. - * - * @return The current number of members. - */ - int getSize(); - - /** - * Returns the cluster-time. - * - * @return The cluster-time. - */ - long getClusterTime(); - - /** - * Returns {@code true} if member internal address should be translated into its public address. - * - * @return true if member address should be translated into its public address. - */ - boolean translateToPublicAddress(); - /** * @param listener The listener to be registered. * @return The registration ID diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/ClientClusterServiceImpl.java b/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/ClientClusterServiceImpl.java index c696f1d38761..2cbf72c00dd5 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/ClientClusterServiceImpl.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/ClientClusterServiceImpl.java @@ -16,13 +16,8 @@ package com.hazelcast.client.impl.spi.impl; -import com.hazelcast.client.Client; -import com.hazelcast.client.impl.ClientImpl; -import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl; -import com.hazelcast.client.impl.connection.ClientConnectionManager; -import com.hazelcast.client.impl.connection.tcp.TcpClientConnection; +import com.hazelcast.client.impl.proxy.ClientClusterProxy; import com.hazelcast.client.impl.spi.ClientClusterService; -import com.hazelcast.client.impl.spi.ClientPartitionService; import com.hazelcast.cluster.Address; import com.hazelcast.cluster.Cluster; import com.hazelcast.cluster.InitialMembershipEvent; @@ -35,15 +30,11 @@ import com.hazelcast.instance.EndpointQualifier; import com.hazelcast.internal.cluster.MemberInfo; import com.hazelcast.internal.cluster.impl.MemberSelectingCollection; -import com.hazelcast.internal.nio.Connection; -import com.hazelcast.internal.util.Clock; import com.hazelcast.internal.util.ExceptionUtil; import com.hazelcast.internal.util.UuidUtil; import com.hazelcast.logging.ILogger; -import com.hazelcast.spi.exception.TargetDisconnectedException; import javax.annotation.Nonnull; -import java.net.InetSocketAddress; import java.util.Collection; import java.util.EventListener; import java.util.HashSet; @@ -63,48 +54,47 @@ import static com.hazelcast.instance.EndpointQualifier.CLIENT; import static com.hazelcast.instance.EndpointQualifier.MEMBER; import static com.hazelcast.internal.util.Preconditions.checkNotNull; -import static java.util.Collections.EMPTY_SET; import static java.util.Collections.emptyList; import static java.util.Collections.unmodifiableSet; /** - * The {@link ClientClusterService} and {@link ClientPartitionService} implementation. + * Responsible for + * - keeping track of the cluster members and serving them to other services + * - firing membership events based on the incoming MemberListSnapshot events. */ -public class ClientClusterServiceImpl - implements ClientClusterService { +public class ClientClusterServiceImpl implements ClientClusterService { + /** + * Initial list version is used at the start and also after cluster has changed with blue-green deployment feature. + * In both cases, we need to fire InitialMembershipEvent. + */ + public static final int INITIAL_MEMBER_LIST_VERSION = -1; private static final int INITIAL_MEMBERS_TIMEOUT_SECONDS = 120; - - private static final MemberListSnapshot EMPTY_SNAPSHOT = new MemberListSnapshot(-1, new LinkedHashMap<>()); - private final HazelcastClientInstanceImpl client; - - private final AtomicReference memberListSnapshot = new AtomicReference<>(EMPTY_SNAPSHOT); + private final AtomicReference memberListSnapshot = + new AtomicReference<>(new MemberListSnapshot(INITIAL_MEMBER_LIST_VERSION, new LinkedHashMap<>(), null)); private final ConcurrentMap listeners = new ConcurrentHashMap<>(); - private final Set labels; private final ILogger logger; - private final ClientConnectionManager connectionManager; private final Object clusterViewLock = new Object(); - private final TranslateToPublicAddressProvider translateToPublicAddress; //read and written under clusterViewLock private CountDownLatch initialListFetchedLatch = new CountDownLatch(1); - private static final class MemberListSnapshot { private final int version; private final LinkedHashMap members; + private final UUID clusterUuid; - private MemberListSnapshot(int version, LinkedHashMap members) { + private MemberListSnapshot(int version, LinkedHashMap members, UUID clusterUuid) { this.version = version; this.members = members; + this.clusterUuid = clusterUuid; } } - public ClientClusterServiceImpl(HazelcastClientInstanceImpl client) { - this.client = client; - labels = unmodifiableSet(client.getClientConfig().getLabels()); - logger = client.getLoggingService().getLogger(ClientClusterService.class); - connectionManager = client.getConnectionManager(); - translateToPublicAddress = new TranslateToPublicAddressProvider(client.getClientConfig().getNetworkConfig(), - client.getProperties(), logger); + public ClientClusterServiceImpl(ILogger logger) { + this.logger = logger; + } + + public Cluster getCluster() { + return new ClientClusterProxy(this); } @Override @@ -134,30 +124,6 @@ public Member getMasterMember() { return null; } - @Override - public int getSize() { - return getMemberList().size(); - } - - @Override - public long getClusterTime() { - return Clock.currentTimeMillis(); - } - - @Override - public boolean translateToPublicAddress() { - return translateToPublicAddress.get(); - } - - @Override - public Client getLocalClient() { - final ClientConnectionManager cm = client.getConnectionManager(); - final TcpClientConnection connection = (TcpClientConnection) cm.getRandomConnection(); - InetSocketAddress inetSocketAddress = connection != null ? connection.getLocalSocketAddress() : null; - UUID clientUuid = cm.getClientUuid(); - return new ClientImpl(clientUuid, inetSocketAddress, client.getName(), labels); - } - @Nonnull @Override public UUID addMembershipListener(@Nonnull MembershipListener listener) { @@ -166,7 +132,7 @@ public UUID addMembershipListener(@Nonnull MembershipListener listener) { synchronized (clusterViewLock) { UUID id = addMembershipListenerWithoutInit(listener); if (listener instanceof InitialMembershipListener) { - Cluster cluster = client.getCluster(); + Cluster cluster = getCluster(); Collection members = memberListSnapshot.get().members.values(); //if members are empty,it means initial event did not arrive yet //it will be redirected to listeners when it arrives see #handleInitialMembershipEvent @@ -208,61 +174,47 @@ public void waitInitialMemberListFetched() { } } - public void clearMemberListVersion() { + public void onClusterConnect() { synchronized (clusterViewLock) { if (logger.isFineEnabled()) { logger.fine("Resetting the member list version "); } MemberListSnapshot clusterViewSnapshot = memberListSnapshot.get(); - // This check is necessary so that when handling auth response, it will not - // intervene with client failover logic - if (clusterViewSnapshot != EMPTY_SNAPSHOT) { - memberListSnapshot.set(new MemberListSnapshot(0, clusterViewSnapshot.members)); - } - } - } - - /** - * Clears the member list and fires member removed event for members in the list. - */ - public void clearMemberList() { - List events = null; - synchronized (clusterViewLock) { - if (logger.isFineEnabled()) { - logger.fine("Resetting the member list "); + // This check is necessary so in order not to override changing cluster information when: + // - registering cluster view listener back to the new cluster. + // - on authentication response when cluster uuid change is detected. + if (clusterViewSnapshot.version != INITIAL_MEMBER_LIST_VERSION) { + memberListSnapshot.set(new MemberListSnapshot(0, + clusterViewSnapshot.members, + clusterViewSnapshot.clusterUuid)); } - MemberListSnapshot clusterViewSnapshot = this.memberListSnapshot.get(); - // This check is necessary so that when handling auth response, it will not - // intervene with client failover logic - if (clusterViewSnapshot != EMPTY_SNAPSHOT) { - Collection prevMembers = clusterViewSnapshot.members.values(); - this.memberListSnapshot.set(new MemberListSnapshot(0, new LinkedHashMap<>())); - events = detectMembershipEvents(prevMembers, EMPTY_SET); - } - - } - if (events != null) { - fireEvents(events); } } - public void reset() { + public void onClusterChange() { synchronized (clusterViewLock) { if (logger.isFineEnabled()) { logger.fine("Resetting the cluster snapshot"); } initialListFetchedLatch = new CountDownLatch(1); - memberListSnapshot.set(EMPTY_SNAPSHOT); + MemberListSnapshot clusterViewSnapshot = memberListSnapshot.get(); + memberListSnapshot.set(new MemberListSnapshot(INITIAL_MEMBER_LIST_VERSION, + clusterViewSnapshot.members, + clusterViewSnapshot.clusterUuid)); } } - private void applyInitialState(int version, Collection memberInfos) { - MemberListSnapshot snapshot = createSnapshot(version, memberInfos); - translateToPublicAddress.refresh(client.getClusterDiscoveryService().current().getAddressProvider(), memberInfos); + //public for tests on enterprise + public int getMemberListVersion() { + return memberListSnapshot.get().version; + } + + private void applyInitialState(int version, Collection memberInfos, UUID clusterUuid) { + MemberListSnapshot snapshot = createSnapshot(version, memberInfos, clusterUuid); memberListSnapshot.set(snapshot); logger.info(membersString(snapshot)); Set members = toUnmodifiableHasSet(snapshot.members.values()); - InitialMembershipEvent event = new InitialMembershipEvent(client.getCluster(), members); + InitialMembershipEvent event = new InitialMembershipEvent(getCluster(), members); for (MembershipListener listener : listeners.values()) { if (listener instanceof InitialMembershipListener) { ((InitialMembershipListener) listener).init(event); @@ -270,7 +222,7 @@ private void applyInitialState(int version, Collection memberInfos) } } - private MemberListSnapshot createSnapshot(int memberListVersion, Collection memberInfos) { + private MemberListSnapshot createSnapshot(int memberListVersion, Collection memberInfos, UUID clusterUuid) { LinkedHashMap newMembers = new LinkedHashMap<>(); for (MemberInfo memberInfo : memberInfos) { MemberImpl.Builder memberBuilder; @@ -288,35 +240,38 @@ private MemberListSnapshot createSnapshot(int memberListVersion, Collection toUnmodifiableHasSet(Collection members) { return unmodifiableSet(new HashSet<>(members)); } - private List detectMembershipEvents(Collection prevMembers, Set currentMembers) { + private List detectMembershipEvents(Collection prevMembers, + Set currentMembers, + UUID clusterUuid) { List newMembers = new LinkedList<>(); Set deadMembers = new HashSet<>(prevMembers); - for (Member member : currentMembers) { - if (!deadMembers.remove(member)) { - newMembers.add(member); + + if (clusterUuid.equals(memberListSnapshot.get().clusterUuid)) { + for (Member member : currentMembers) { + if (!deadMembers.remove(member)) { + newMembers.add(member); + } } + } else { + // if cluster uuid is not same, then we will not try to match the current members to previous members + // As a result all previous members are dead and all current members are new members + newMembers.addAll(currentMembers); } List events = new LinkedList<>(); for (Member member : deadMembers) { - events.add(new MembershipEvent(client.getCluster(), member, MembershipEvent.MEMBER_REMOVED, currentMembers)); - Connection connection = connectionManager.getConnection(member.getUuid()); - if (connection != null) { - connection.close(null, - new TargetDisconnectedException("The client has closed the connection to this member," - + " after receiving a member left event from the cluster. " + connection)); - } + events.add(new MembershipEvent(getCluster(), member, MembershipEvent.MEMBER_REMOVED, currentMembers)); } for (Member member : newMembers) { - events.add(new MembershipEvent(client.getCluster(), member, MembershipEvent.MEMBER_ADDED, currentMembers)); + events.add(new MembershipEvent(getCluster(), member, MembershipEvent.MEMBER_ADDED, currentMembers)); } if (events.size() != 0) { @@ -340,19 +295,19 @@ private String membersString(MemberListSnapshot snapshot) { return sb.toString(); } - public void handleMembersViewEvent(int memberListVersion, Collection memberInfos) { + public void handleMembersViewEvent(int memberListVersion, Collection memberInfos, UUID clusterUuid) { if (logger.isFinestEnabled()) { - MemberListSnapshot snapshot = createSnapshot(memberListVersion, memberInfos); + MemberListSnapshot snapshot = createSnapshot(memberListVersion, memberInfos, clusterUuid); logger.finest("Handling new snapshot with membership version: " + memberListVersion + ", membersString " + membersString(snapshot)); } MemberListSnapshot clusterViewSnapshot = memberListSnapshot.get(); - if (clusterViewSnapshot == EMPTY_SNAPSHOT) { + if (clusterViewSnapshot.version == INITIAL_MEMBER_LIST_VERSION) { synchronized (clusterViewLock) { clusterViewSnapshot = memberListSnapshot.get(); - if (clusterViewSnapshot == EMPTY_SNAPSHOT) { - //this means this is the first time client connected to cluster - applyInitialState(memberListVersion, memberInfos); + if (clusterViewSnapshot.version == INITIAL_MEMBER_LIST_VERSION) { + //this means this is the first time client connected to cluster/cluster has changed(blue/green) + applyInitialState(memberListVersion, memberInfos, clusterUuid); initialListFetchedLatch.countDown(); return; } @@ -365,10 +320,11 @@ public void handleMembersViewEvent(int memberListVersion, Collection clusterViewSnapshot = memberListSnapshot.get(); if (memberListVersion > clusterViewSnapshot.version) { Collection prevMembers = clusterViewSnapshot.members.values(); - MemberListSnapshot snapshot = createSnapshot(memberListVersion, memberInfos); + UUID previousClusterUuid = clusterViewSnapshot.clusterUuid; + MemberListSnapshot snapshot = createSnapshot(memberListVersion, memberInfos, clusterUuid); memberListSnapshot.set(snapshot); Set currentMembers = toUnmodifiableHasSet(snapshot.members.values()); - events = detectMembershipEvents(prevMembers, currentMembers); + events = detectMembershipEvents(prevMembers, currentMembers, previousClusterUuid); } } } diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/DefaultAddressProvider.java b/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/DefaultAddressProvider.java index 779ec4a98cca..1584f4e06941 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/DefaultAddressProvider.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/DefaultAddressProvider.java @@ -21,8 +21,12 @@ import com.hazelcast.client.impl.connection.Addresses; import com.hazelcast.client.util.AddressHelper; import com.hazelcast.cluster.Address; +import com.hazelcast.cluster.Member; +import com.hazelcast.instance.EndpointQualifier; +import com.hazelcast.instance.ProtocolType; import java.util.List; +import java.util.function.BooleanSupplier; /** * Default address provider of Hazelcast. @@ -31,10 +35,15 @@ */ public class DefaultAddressProvider implements AddressProvider { + private static final EndpointQualifier CLIENT_PUBLIC_ENDPOINT_QUALIFIER = + EndpointQualifier.resolve(ProtocolType.CLIENT, "public"); private final ClientNetworkConfig networkConfig; + private final BooleanSupplier translateToPublicAddressSupplier; - public DefaultAddressProvider(ClientNetworkConfig networkConfig) { + public DefaultAddressProvider(ClientNetworkConfig networkConfig, + BooleanSupplier translateToPublicAddressSupplier) { this.networkConfig = networkConfig; + this.translateToPublicAddressSupplier = translateToPublicAddressSupplier; } @Override @@ -57,4 +66,15 @@ public Addresses loadAddresses() { public Address translate(Address address) { return address; } + + @Override + public Address translate(Member member) { + if (translateToPublicAddressSupplier.getAsBoolean()) { + Address publicAddress = member.getAddressMap().get(CLIENT_PUBLIC_ENDPOINT_QUALIFIER); + if (publicAddress != null) { + return publicAddress; + } + } + return member.getAddress(); + } } diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/TranslateToPublicAddressProvider.java b/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/TranslateToPublicAddressProvider.java index cda147c41477..e89f07b6b9f0 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/TranslateToPublicAddressProvider.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/TranslateToPublicAddressProvider.java @@ -17,13 +17,15 @@ package com.hazelcast.client.impl.spi.impl; import com.hazelcast.client.config.ClientNetworkConfig; -import com.hazelcast.client.impl.connection.AddressProvider; import com.hazelcast.client.properties.ClientProperty; import com.hazelcast.cluster.Address; +import com.hazelcast.cluster.InitialMembershipEvent; +import com.hazelcast.cluster.InitialMembershipListener; +import com.hazelcast.cluster.Member; +import com.hazelcast.cluster.MembershipEvent; import com.hazelcast.config.SSLConfig; import com.hazelcast.instance.EndpointQualifier; import com.hazelcast.instance.ProtocolType; -import com.hazelcast.internal.cluster.MemberInfo; import com.hazelcast.internal.util.AddressUtil; import com.hazelcast.logging.ILogger; import com.hazelcast.spi.properties.HazelcastProperties; @@ -38,9 +40,10 @@ import java.util.Iterator; import java.util.List; import java.util.Objects; +import java.util.function.BooleanSupplier; import java.util.stream.Collectors; -class TranslateToPublicAddressProvider { +public class TranslateToPublicAddressProvider implements InitialMembershipListener, BooleanSupplier { private static final int REACHABLE_ADDRESS_TIMEOUT_MILLIS = 1000; private static final int NON_REACHABLE_ADDRESS_TIMEOUT_MILLIS = 3000; private static final int REACHABLE_CHECK_NUMBER = 3; @@ -52,21 +55,13 @@ class TranslateToPublicAddressProvider { private volatile boolean translateToPublicAddress; - TranslateToPublicAddressProvider(ClientNetworkConfig config, HazelcastProperties properties, ILogger logger) { + public TranslateToPublicAddressProvider(ClientNetworkConfig config, HazelcastProperties properties, ILogger logger) { this.config = config; this.properties = properties; this.logger = logger; } - void refresh(AddressProvider addressProvider, Collection members) { - translateToPublicAddress = resolve(addressProvider, members); - } - - private boolean resolve(AddressProvider addressProvider, Collection members) { - if (!(addressProvider instanceof DefaultAddressProvider)) { - return false; - } - + private boolean resolve(Collection members) { // Default value of DISCOVERY_SPI_PUBLIC_IP_ENABLED is `null` intentionally. // If DISCOVERY_SPI_PUBLIC_IP_ENABLED is not set to true/false, we don't know the intention of the user, // we will try to decide if we should use private/public address automatically in that case. @@ -104,7 +99,7 @@ private boolean resolve(AddressProvider addressProvider, Collection * If any member has its internal/private address the same as configured in ClientConfig, then it means that the client is * able to connect to members via configured address. No need to use make any address translation. */ - boolean memberInternalAddressAsDefinedInClientConfig(Collection members) { + boolean memberInternalAddressAsDefinedInClientConfig(Collection members) { List addresses = config.getAddresses(); List resolvedHosts = addresses.stream().map(s -> { try { @@ -128,15 +123,15 @@ boolean memberInternalAddressAsDefinedInClientConfig(Collection memb *

* We check only limited number of random members to reduce the slowdown of the startup. */ - private boolean membersReachableOnlyViaPublicAddress(Collection members) { - List shuffledList = new ArrayList<>(members); + private boolean membersReachableOnlyViaPublicAddress(Collection members) { + List shuffledList = new ArrayList<>(members); Collections.shuffle(shuffledList); - Iterator iter = shuffledList.iterator(); + Iterator iter = shuffledList.iterator(); for (int i = 0; i < REACHABLE_CHECK_NUMBER; i++) { if (!iter.hasNext()) { iter = shuffledList.iterator(); } - MemberInfo member = iter.next(); + Member member = iter.next(); Address publicAddress = member.getAddressMap().get(CLIENT_PUBLIC_ENDPOINT_QUALIFIER); Address internalAddress = member.getAddress(); if (publicAddress == null) { @@ -178,7 +173,23 @@ private boolean isReachable(Address address, int timeoutMs) { return true; } - boolean get() { + @Override + public void init(InitialMembershipEvent event) { + translateToPublicAddress = resolve(event.getMembers()); + } + + @Override + public void memberAdded(MembershipEvent membershipEvent) { + + } + + @Override + public void memberRemoved(MembershipEvent membershipEvent) { + + } + + @Override + public boolean getAsBoolean() { return translateToPublicAddress; } } diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/discovery/RemoteAddressProvider.java b/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/discovery/RemoteAddressProvider.java index 4234d066219c..d904f8df9211 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/discovery/RemoteAddressProvider.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/discovery/RemoteAddressProvider.java @@ -19,6 +19,7 @@ import com.hazelcast.client.impl.connection.AddressProvider; import com.hazelcast.client.impl.connection.Addresses; import com.hazelcast.cluster.Address; +import com.hazelcast.cluster.Member; import java.util.HashMap; import java.util.Map; @@ -61,4 +62,9 @@ public Address translate(Address address) throws Exception { return privateToPublic.get(address); } + + @Override + public Address translate(Member member) throws Exception { + return translate(member.getAddress()); + } } diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/listener/ClientClusterViewListenerService.java b/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/listener/ClientClusterViewListenerService.java index 6af2fc6cfc37..1f7ee721d203 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/listener/ClientClusterViewListenerService.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/listener/ClientClusterViewListenerService.java @@ -17,6 +17,7 @@ package com.hazelcast.client.impl.spi.impl.listener; import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl; +import com.hazelcast.client.impl.connection.ClientConnection; import com.hazelcast.client.impl.connection.ClientConnectionManager; import com.hazelcast.client.impl.protocol.ClientMessage; import com.hazelcast.client.impl.protocol.codec.ClientAddClusterViewListenerCodec; @@ -64,15 +65,15 @@ public void start() { private final class ClusterViewListenerHandler extends ClientAddClusterViewListenerCodec.AbstractEventHandler implements EventHandler { - private final Connection connection; + private final ClientConnection connection; - private ClusterViewListenerHandler(Connection connection) { + private ClusterViewListenerHandler(ClientConnection connection) { this.connection = connection; } @Override public void beforeListenerRegister(Connection connection) { - clusterService.clearMemberListVersion(); + clusterService.onClusterConnect(); if (logger.isFinestEnabled()) { logger.finest("Register attempt of ClusterViewListenerHandler to " + connection); } @@ -87,7 +88,7 @@ public void onListenerRegister(Connection connection) { @Override public void handleMembersViewEvent(int memberListVersion, Collection memberInfos) { - clusterService.handleMembersViewEvent(memberListVersion, memberInfos); + clusterService.handleMembersViewEvent(memberListVersion, memberInfos, connection.getClusterUuid()); } @Override @@ -98,7 +99,7 @@ public void handlePartitionsViewEvent(int version, Collection { - final ClientClusterService clusterService = getClientClusterService(client); - final Collection members = clusterService.getMembers(LITE_MEMBER_SELECTOR); - verifyMembers(members, singletonList(liteInstance)); - - assertEquals(1, clusterService.getMembers(LITE_MEMBER_SELECTOR).size()); - }); - } - - @Test - public void testDataMembers() { - assertTrueEventually(() -> { - final ClientClusterService clusterService = getClientClusterService(client); - final Collection members = clusterService.getMembers(DATA_MEMBER_SELECTOR); - verifyMembers(members, asList(dataInstance, dataInstance2)); - - assertEquals(2, clusterService.getMembers(DATA_MEMBER_SELECTOR).size()); - }); - } - - @Test - public void testMemberListOrderConsistentWithServer() { - Set membersFromClient = client.getCluster().getMembers(); - Set membersFromServer = dataInstance.getCluster().getMembers(); - assertArrayEquals(membersFromClient.toArray(), membersFromServer.toArray()); - } - - private void verifyMembers(Collection membersToCheck, Collection membersToExpect) { - for (HazelcastInstance instance : membersToExpect) { - assertContains(membersToCheck, getLocalMember(instance)); - } - - assertEquals(membersToExpect.size(), membersToCheck.size()); - } - - private Member getLocalMember(HazelcastInstance instance) { - return getNode(instance).getLocalMember(); - } - - private ClientClusterService getClientClusterService(HazelcastInstance client) { - return ClientTestUtil.getHazelcastClientInstanceImpl(client).getClientClusterService(); - } -} diff --git a/hazelcast/src/test/java/com/hazelcast/client/impl/connection/tcp/TcpClientConnectionManagerTranslateTest.java b/hazelcast/src/test/java/com/hazelcast/client/impl/connection/tcp/TcpClientConnectionManagerTranslateTest.java index d50d90713fed..12a080dfb086 100644 --- a/hazelcast/src/test/java/com/hazelcast/client/impl/connection/tcp/TcpClientConnectionManagerTranslateTest.java +++ b/hazelcast/src/test/java/com/hazelcast/client/impl/connection/tcp/TcpClientConnectionManagerTranslateTest.java @@ -202,6 +202,11 @@ public Address translate(Address address) { return null; } + @Override + public Address translate(Member member) { + return member.getAddress(); + } + @Override public Addresses loadAddresses() { try { diff --git a/hazelcast/src/test/java/com/hazelcast/client/impl/spi/impl/ClientClusterServiceImplTest.java b/hazelcast/src/test/java/com/hazelcast/client/impl/spi/impl/ClientClusterServiceImplTest.java new file mode 100644 index 000000000000..d16c174b9540 --- /dev/null +++ b/hazelcast/src/test/java/com/hazelcast/client/impl/spi/impl/ClientClusterServiceImplTest.java @@ -0,0 +1,503 @@ +/* + * Copyright (c) 2008-2022, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hazelcast.client.impl.spi.impl; + +import com.hazelcast.cluster.Address; +import com.hazelcast.cluster.InitialMembershipEvent; +import com.hazelcast.cluster.InitialMembershipListener; +import com.hazelcast.cluster.Member; +import com.hazelcast.cluster.MembershipEvent; +import com.hazelcast.cluster.MembershipListener; +import com.hazelcast.instance.BuildInfoProvider; +import com.hazelcast.internal.cluster.MemberInfo; +import com.hazelcast.logging.ILogger; +import com.hazelcast.test.HazelcastParallelClassRunner; +import com.hazelcast.test.HazelcastTestSupport; +import com.hazelcast.test.annotation.ParallelJVMTest; +import com.hazelcast.test.annotation.QuickTest; +import com.hazelcast.version.MemberVersion; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import javax.annotation.Nonnull; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Arrays.asList; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singleton; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +@RunWith(HazelcastParallelClassRunner.class) +@Category({QuickTest.class, ParallelJVMTest.class}) +public class ClientClusterServiceImplTest extends HazelcastTestSupport { + + private static final MemberVersion VERSION = MemberVersion.of(BuildInfoProvider.getBuildInfo().getVersion()); + + @Test + public void testMemberAdded() { + LinkedList members = new LinkedList<>(); + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + clusterService.addMembershipListener(new MembershipListener() { + @Override + public void memberAdded(MembershipEvent membershipEvent) { + members.add(membershipEvent.getMember()); + } + + @Override + public void memberRemoved(MembershipEvent membershipEvent) { + + } + }); + MemberInfo member = member("127.0.0.1"); + UUID clusterUuid = UUID.randomUUID(); + // triggers initial event + clusterService.handleMembersViewEvent(1, asList(member), clusterUuid); + // triggers member added + MemberInfo memberInfo = member("127.0.0.2"); + clusterService.handleMembersViewEvent(2, asList(member, memberInfo), clusterUuid); + assertCollection(members, Collections.singleton(memberInfo.toMember())); + assertEquals(2, clusterService.getMemberList().size()); + } + + @Test + public void testMemberRemoved() { + LinkedList members = new LinkedList<>(); + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + UUID clusterUuid = UUID.randomUUID(); + MemberInfo memberInfo = member("127.0.0.1"); + clusterService.handleMembersViewEvent(1, asList(memberInfo), clusterUuid); + clusterService.addMembershipListener(new MembershipListener() { + @Override + public void memberAdded(MembershipEvent membershipEvent) { + } + + @Override + public void memberRemoved(MembershipEvent membershipEvent) { + members.add(membershipEvent.getMember()); + } + }); + clusterService.handleMembersViewEvent(2, Collections.emptyList(), clusterUuid); + assertCollection(members, Collections.singleton(memberInfo.toMember())); + assertEquals(0, clusterService.getMemberList().size()); + } + + @Test + public void testInitialMembershipListener_AfterInitialListArrives() { + AtomicInteger initialEventCount = new AtomicInteger(); + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + UUID clusterUuid = UUID.randomUUID(); + clusterService.handleMembersViewEvent(1, asList(member("127.0.0.1")), clusterUuid); + clusterService.addMembershipListener(new InitialMembershipListener() { + @Override + public void init(InitialMembershipEvent event) { + initialEventCount.incrementAndGet(); + } + + @Override + public void memberAdded(MembershipEvent membershipEvent) { + } + + @Override + public void memberRemoved(MembershipEvent membershipEvent) { + } + }); + assertEquals(1, initialEventCount.get()); + } + + @Test + public void testInitialMembershipListener_BeforeInitialListArrives() { + AtomicInteger initialEventCount = new AtomicInteger(); + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + clusterService.addMembershipListener(new InitialMembershipListener() { + @Override + public void init(InitialMembershipEvent event) { + initialEventCount.incrementAndGet(); + } + + @Override + public void memberAdded(MembershipEvent membershipEvent) { + } + + @Override + public void memberRemoved(MembershipEvent membershipEvent) { + } + }); + clusterService.handleMembersViewEvent(1, asList(member("127.0.0.1")), UUID.randomUUID()); + assertEquals(1, initialEventCount.get()); + } + + @Test + public void testFireOnlyIncrementalEvents_AfterClusterRestart() { + AtomicInteger initialEventCount = new AtomicInteger(); + LinkedList addedMembers = new LinkedList<>(); + LinkedList removedMembers = new LinkedList<>(); + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + MemberInfo removedMemberInfo = member("127.0.0.1"); + clusterService.handleMembersViewEvent(1, asList(removedMemberInfo), UUID.randomUUID()); + + clusterService.addMembershipListener(new InitialMembershipListener() { + @Override + public void init(InitialMembershipEvent event) { + initialEventCount.incrementAndGet(); + } + + @Override + public void memberAdded(MembershipEvent membershipEvent) { + addedMembers.add(membershipEvent.getMember()); + } + + @Override + public void memberRemoved(MembershipEvent membershipEvent) { + removedMembers.add(membershipEvent.getMember()); + } + }); + + //called on cluster restart + clusterService.onClusterConnect(); + + MemberInfo addedMemberInfo = member("127.0.0.2"); + clusterService.handleMembersViewEvent(1, asList(addedMemberInfo), UUID.randomUUID()); + assertEquals(1, clusterService.getMemberList().size()); + assertCollection(addedMembers, Collections.singleton(addedMemberInfo.toMember())); + assertCollection(removedMembers, Collections.singleton(removedMemberInfo.toMember())); + assertEquals(1, initialEventCount.get()); + } + + @Test + public void testFireOnlyInitialEvent_AfterClusterChange() { + AtomicInteger initialEventCount = new AtomicInteger(); + AtomicInteger addedCount = new AtomicInteger(); + AtomicInteger removedCount = new AtomicInteger(); + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + clusterService.handleMembersViewEvent(1, asList(member("127.0.0.1")), UUID.randomUUID()); + + clusterService.addMembershipListener(new InitialMembershipListener() { + @Override + public void init(InitialMembershipEvent event) { + initialEventCount.incrementAndGet(); + } + + @Override + public void memberAdded(MembershipEvent membershipEvent) { + addedCount.incrementAndGet(); + } + + @Override + public void memberRemoved(MembershipEvent membershipEvent) { + removedCount.incrementAndGet(); + } + }); + + //called on cluster change + clusterService.onClusterChange(); + + clusterService.handleMembersViewEvent(1, asList(member("127.0.0.1")), UUID.randomUUID()); + assertEquals(1, clusterService.getMemberList().size()); + assertEquals(0, addedCount.get()); + assertEquals(0, removedCount.get()); + assertEquals(2, initialEventCount.get()); + } + + @Test + public void testDontFire_WhenReconnectToSameCluster() { + AtomicInteger initialEventCount = new AtomicInteger(); + AtomicInteger addedCount = new AtomicInteger(); + AtomicInteger removedCount = new AtomicInteger(); + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + List memberList = asList(member("127.0.0.1")); + UUID clusterUuid = UUID.randomUUID(); + clusterService.handleMembersViewEvent(1, memberList, clusterUuid); + + clusterService.addMembershipListener(new InitialMembershipListener() { + @Override + public void init(InitialMembershipEvent event) { + initialEventCount.incrementAndGet(); + } + + @Override + public void memberAdded(MembershipEvent membershipEvent) { + addedCount.incrementAndGet(); + } + + @Override + public void memberRemoved(MembershipEvent membershipEvent) { + removedCount.incrementAndGet(); + } + }); + + //called on reconnect to same cluster when registering the listener back + clusterService.onClusterConnect(); + + clusterService.handleMembersViewEvent(1, memberList, clusterUuid); + assertEquals(1, clusterService.getMemberList().size()); + assertEquals(0, addedCount.get()); + assertEquals(0, removedCount.get()); + assertEquals(1, initialEventCount.get()); + } + + + @Test + /* + Related to HotRestart where members keep their uuid's same but addresses changes. + */ + public void testFireEvents_WhenAddressOfTheMembersChanges() { + AtomicInteger initialEventCount = new AtomicInteger(); + LinkedList addedMembers = new LinkedList<>(); + LinkedList removedMembers = new LinkedList<>(); + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + UUID member1uuid = UUID.randomUUID(); + UUID member2uuid = UUID.randomUUID(); + UUID clusterUuid = UUID.randomUUID(); + MemberInfo removedMember1 = member("127.0.0.1", member1uuid); + MemberInfo removedMember2 = member("127.0.0.2", member2uuid); + clusterService.handleMembersViewEvent(1, + asList(removedMember1, removedMember2), + clusterUuid); + + clusterService.addMembershipListener(new InitialMembershipListener() { + @Override + public void init(InitialMembershipEvent event) { + initialEventCount.incrementAndGet(); + } + + @Override + public void memberAdded(MembershipEvent membershipEvent) { + addedMembers.add(membershipEvent.getMember()); + } + + @Override + public void memberRemoved(MembershipEvent membershipEvent) { + removedMembers.add(membershipEvent.getMember()); + } + }); + + //called on reconnect to same cluster when registering the listener back + clusterService.onClusterConnect(); + + MemberInfo addedMember1 = member("127.0.0.1", member2uuid); + MemberInfo addedMember2 = member("127.0.0.2", member1uuid); + clusterService.handleMembersViewEvent(1, + asList(addedMember1, addedMember2), + clusterUuid); + assertEquals(2, clusterService.getMemberList().size()); + assertCollection(addedMembers, Arrays.asList(addedMember1.toMember(), addedMember2.toMember())); + assertCollection(removedMembers, Arrays.asList(removedMember1.toMember(), removedMember2.toMember())); + assertEquals(1, initialEventCount.get()); + } + + @Test + /* + Related to HotRestart where members keep their uuid's and addresses same. + */ + public void testFireEvents_WhenAddressAndUuidsDoesNotChange() { + AtomicInteger initialEventCount = new AtomicInteger(); + LinkedList addedMembers = new LinkedList<>(); + LinkedList removedMembers = new LinkedList<>(); + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + UUID clusterUuid = UUID.randomUUID(); + MemberInfo member1 = member("127.0.0.1"); + MemberInfo member2 = member("127.0.0.2"); + List memberList = asList(member1, member2); + clusterService.handleMembersViewEvent(1, memberList, clusterUuid); + + clusterService.addMembershipListener(new InitialMembershipListener() { + @Override + public void init(InitialMembershipEvent event) { + initialEventCount.incrementAndGet(); + } + + @Override + public void memberAdded(MembershipEvent membershipEvent) { + addedMembers.add(membershipEvent.getMember()); + } + + @Override + public void memberRemoved(MembershipEvent membershipEvent) { + removedMembers.add(membershipEvent.getMember()); + } + }); + + //called on reconnect to same cluster when registering the listener back + clusterService.onClusterConnect(); + + clusterService.handleMembersViewEvent(1, memberList, UUID.randomUUID()); + assertEquals(2, clusterService.getMemberList().size()); + assertCollection(addedMembers, Arrays.asList(member1.toMember(), member2.toMember())); + assertCollection(removedMembers, Arrays.asList(member1.toMember(), member2.toMember())); + assertEquals(1, initialEventCount.get()); + + } + + @Test + public void testDontServeEmptyMemberList_DuringClusterRestart() { + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + clusterService.handleMembersViewEvent(1, asList(member("127.0.0.1")), UUID.randomUUID()); + assertEquals(1, clusterService.getMemberList().size()); + //called on cluster restart + clusterService.onClusterConnect(); + assertEquals(1, clusterService.getMemberList().size()); + clusterService.handleMembersViewEvent(1, asList(member("127.0.0.2")), UUID.randomUUID()); + assertEquals(1, clusterService.getMemberList().size()); + } + + @Test + public void testDontServeEmptyMemberList_DuringClusterChange() { + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + clusterService.handleMembersViewEvent(1, asList(member("127.0.0.1")), UUID.randomUUID()); + assertEquals(1, clusterService.getMemberList().size()); + //called on cluster change + clusterService.onClusterChange(); + assertEquals(1, clusterService.getMemberList().size()); + assertEquals(ClientClusterServiceImpl.INITIAL_MEMBER_LIST_VERSION, clusterService.getMemberListVersion()); + clusterService.handleMembersViewEvent(1, asList(member("127.0.0.2")), UUID.randomUUID()); + assertEquals(1, clusterService.getMemberList().size()); + } + + @Nonnull + private static MemberInfo member(String host) { + try { + return new MemberInfo(new Address(host, 5701), UUID.randomUUID(), emptyMap(), false, VERSION); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + } + + @Nonnull + private static MemberInfo liteMember(String host) { + try { + return new MemberInfo(new Address(host, 5701), UUID.randomUUID(), emptyMap(), true, VERSION); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + } + + @Nonnull + private static MemberInfo member(String host, UUID uuid) { + try { + return new MemberInfo(new Address(host, 5701), uuid, emptyMap(), false, VERSION); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + } + + @Test + public void testListenersFromConfigWorking() { + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + LinkedList addedMembers = new LinkedList<>(); + clusterService.start(singleton(new MembershipListener() { + @Override + public void memberAdded(MembershipEvent membershipEvent) { + addedMembers.add(membershipEvent.getMember()); + } + + @Override + public void memberRemoved(MembershipEvent membershipEvent) { + + } + })); + MemberInfo member = member("127.0.0.1"); + UUID clusterUuid = UUID.randomUUID(); + // triggers initial event + clusterService.handleMembersViewEvent(1, asList(member), clusterUuid); + // triggers member added + MemberInfo addedMemberInfo = member("127.0.0.2"); + clusterService.handleMembersViewEvent(2, asList(member, addedMemberInfo), clusterUuid); + assertCollection(addedMembers, Collections.singleton(addedMemberInfo.toMember())); + } + + @Test + public void testRemoveListener() { + AtomicInteger addedCount = new AtomicInteger(); + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + UUID listenerUuid = clusterService.addMembershipListener(new MembershipListener() { + @Override + public void memberAdded(MembershipEvent membershipEvent) { + addedCount.incrementAndGet(); + } + + @Override + public void memberRemoved(MembershipEvent membershipEvent) { + + } + }); + assertTrue(clusterService.removeMembershipListener(listenerUuid)); + MemberInfo member = member("127.0.0.1"); + UUID clusterUuid = UUID.randomUUID(); + // triggers initial event + clusterService.handleMembersViewEvent(1, asList(member), clusterUuid); + // triggers member added + clusterService.handleMembersViewEvent(2, asList(member, member("127.0.0.2")), clusterUuid); + // we have removed the listener. No event should be fired to our listener + assertEquals(0, addedCount.get()); + } + + @Test + public void testRemoveNonExistingListener() { + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + assertFalse(clusterService.removeMembershipListener(UUID.randomUUID())); + } + + @Test + public void testGetMasterMember() { + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + MemberInfo masterMember = member("127.0.0.1"); + clusterService.handleMembersViewEvent(1, asList(masterMember, member("127.0.0.2"), + member("127.0.0.3")), UUID.randomUUID()); + assertEquals(masterMember.toMember(), clusterService.getMasterMember()); + } + + @Test + public void testGetMember() { + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + MemberInfo masterMember = member("127.0.0.1"); + UUID member2Uuid = UUID.randomUUID(); + MemberInfo member2 = member("127.0.0.2", member2Uuid); + clusterService.handleMembersViewEvent(1, asList(masterMember, member2), UUID.randomUUID()); + assertEquals(member2.toMember(), clusterService.getMember(member2Uuid)); + } + + @Test + public void testGetMembers() { + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + MemberInfo masterMember = member("127.0.0.1"); + MemberInfo liteMember = liteMember("127.0.0.2"); + MemberInfo dataMember = member("127.0.0.3"); + clusterService.handleMembersViewEvent(1, asList(masterMember, liteMember, + dataMember), UUID.randomUUID()); + assertCollection(Arrays.asList(masterMember.toMember(), liteMember.toMember(), dataMember.toMember()), clusterService.getMemberList()); + assertCollection(Arrays.asList(liteMember.toMember()), clusterService.getMembers(Member::isLiteMember)); + assertCollection(Arrays.asList(masterMember.toMember(), dataMember.toMember()), clusterService.getMembers(member -> !member.isLiteMember())); + } + + @Test + public void testWaitInitialMembership() { + ClientClusterServiceImpl clusterService = new ClientClusterServiceImpl(mock(ILogger.class)); + MemberInfo masterMember = member("127.0.0.1"); + clusterService.handleMembersViewEvent(1, asList(masterMember, liteMember("127.0.0.2"), + member("127.0.0.3")), UUID.randomUUID()); + clusterService.waitInitialMemberListFetched(); + } +} diff --git a/hazelcast/src/test/java/com/hazelcast/client/impl/spi/impl/DefaultAddressProviderTest.java b/hazelcast/src/test/java/com/hazelcast/client/impl/spi/impl/DefaultAddressProviderTest.java index 4a47dedef7ec..640e8549a88b 100644 --- a/hazelcast/src/test/java/com/hazelcast/client/impl/spi/impl/DefaultAddressProviderTest.java +++ b/hazelcast/src/test/java/com/hazelcast/client/impl/spi/impl/DefaultAddressProviderTest.java @@ -40,7 +40,7 @@ public class DefaultAddressProviderTest { @Test public void whenNoAddresses() throws UnknownHostException { ClientNetworkConfig config = new ClientNetworkConfig(); - DefaultAddressProvider provider = new DefaultAddressProvider(config); + DefaultAddressProvider provider = new DefaultAddressProvider(config, () -> false); Addresses addresses = provider.loadAddresses(); assertPrimary(addresses, new Address("127.0.0.1", 5701)); @@ -51,7 +51,7 @@ public void whenNoAddresses() throws UnknownHostException { public void whenExplicitNoPortAddress() throws UnknownHostException { ClientNetworkConfig config = new ClientNetworkConfig(); config.addAddress("10.0.0.1"); - DefaultAddressProvider provider = new DefaultAddressProvider(config); + DefaultAddressProvider provider = new DefaultAddressProvider(config, () -> false); Addresses addresses = provider.loadAddresses(); assertPrimary(addresses, new Address("10.0.0.1", 5701)); @@ -63,7 +63,7 @@ public void whenExplicitPorts() throws UnknownHostException { ClientNetworkConfig config = new ClientNetworkConfig(); config.addAddress("10.0.0.1:5703"); config.addAddress("10.0.0.1:5702"); - DefaultAddressProvider provider = new DefaultAddressProvider(config); + DefaultAddressProvider provider = new DefaultAddressProvider(config, () -> false); Addresses addresses = provider.loadAddresses(); assertPrimary(addresses, new Address("10.0.0.1", 5703), new Address("10.0.0.1", 5702)); @@ -76,7 +76,7 @@ public void whenMix() throws UnknownHostException { config.addAddress("10.0.0.1:5701"); config.addAddress("10.0.0.1:5702"); config.addAddress("10.0.0.2"); - DefaultAddressProvider provider = new DefaultAddressProvider(config); + DefaultAddressProvider provider = new DefaultAddressProvider(config, () -> false); Addresses addresses = provider.loadAddresses(); assertPrimary(addresses, new Address("10.0.0.1", 5701), @@ -89,7 +89,7 @@ public void whenMix() throws UnknownHostException { public void whenBogusAddress() { ClientNetworkConfig config = new ClientNetworkConfig(); config.addAddress(UUID.randomUUID().toString()); - DefaultAddressProvider provider = new DefaultAddressProvider(config); + DefaultAddressProvider provider = new DefaultAddressProvider(config, () -> false); Addresses addresses = provider.loadAddresses(); assertPrimaryEmpty(addresses); diff --git a/hazelcast/src/test/java/com/hazelcast/client/impl/spi/impl/TranslateToPublicAddressProviderTest.java b/hazelcast/src/test/java/com/hazelcast/client/impl/spi/impl/TranslateToPublicAddressProviderTest.java index 2aaf3168b006..5cd03b4f9e0c 100644 --- a/hazelcast/src/test/java/com/hazelcast/client/impl/spi/impl/TranslateToPublicAddressProviderTest.java +++ b/hazelcast/src/test/java/com/hazelcast/client/impl/spi/impl/TranslateToPublicAddressProviderTest.java @@ -17,15 +17,16 @@ package com.hazelcast.client.impl.spi.impl; import com.hazelcast.client.config.ClientConfig; -import com.hazelcast.client.impl.connection.AddressProvider; -import com.hazelcast.client.impl.connection.Addresses; import com.hazelcast.client.properties.ClientProperty; import com.hazelcast.cluster.Address; +import com.hazelcast.cluster.Cluster; +import com.hazelcast.cluster.InitialMembershipEvent; +import com.hazelcast.cluster.Member; +import com.hazelcast.cluster.impl.MemberImpl; import com.hazelcast.core.Hazelcast; import com.hazelcast.instance.BuildInfoProvider; import com.hazelcast.instance.EndpointQualifier; import com.hazelcast.instance.ProtocolType; -import com.hazelcast.internal.cluster.MemberInfo; import com.hazelcast.logging.ILogger; import com.hazelcast.spi.properties.HazelcastProperties; import com.hazelcast.test.HazelcastSerialClassRunner; @@ -38,12 +39,15 @@ import javax.annotation.Nonnull; import java.net.UnknownHostException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; import java.util.UUID; import static java.util.Arrays.asList; -import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; -import static java.util.Collections.singletonMap; +import static java.util.Collections.emptySet; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -62,19 +66,6 @@ public void teardown() { Hazelcast.shutdownAll(); } - @Test - public void nonDefaultAddressProvider() { - // given - TranslateToPublicAddressProvider translateProvider = createTranslateProvider(); - - // when - translateProvider.refresh(new TestAddressProvider(), null); - boolean result = translateProvider.get(); - - // then - assertFalse(result); - } - @Test public void propertyTrue() { // given @@ -82,8 +73,8 @@ public void propertyTrue() { TranslateToPublicAddressProvider translateProvider = createTranslateProvider(); // when - translateProvider.refresh(defaultAddressProvider(), null); - boolean result = translateProvider.get(); + translateProvider.init(new InitialMembershipEvent(mock(Cluster.class), emptySet())); + boolean result = translateProvider.getAsBoolean(); // then assertTrue(result); @@ -96,8 +87,8 @@ public void propertyFalse() { TranslateToPublicAddressProvider translateProvider = createTranslateProvider(); // when - translateProvider.refresh(defaultAddressProvider(), null); - boolean result = translateProvider.get(); + translateProvider.init(new InitialMembershipEvent(mock(Cluster.class), emptySet())); + boolean result = translateProvider.getAsBoolean(); // then assertFalse(result); @@ -110,8 +101,9 @@ public void memberInternalAddressAsDefinedInClientConfig() { TranslateToPublicAddressProvider translateProvider = createTranslateProvider(); // when - translateProvider.refresh(defaultAddressProvider(), asList(member("192.168.0.1"), member("127.0.0.1"))); - boolean result = translateProvider.get(); + translateProvider.init(new InitialMembershipEvent(mock(Cluster.class), + new HashSet<>(asList(member("192.168.0.1"), member("127.0.0.1"))))); + boolean result = translateProvider.getAsBoolean(); // then assertFalse(result); @@ -124,8 +116,9 @@ public void memberInternalAddressAsDefinedInClientConfig_memberUsesHostName() { TranslateToPublicAddressProvider translateProvider = createTranslateProvider(); // when - translateProvider.refresh(defaultAddressProvider(), asList(member("localhost"))); - boolean result = translateProvider.get(); + translateProvider.init(new InitialMembershipEvent(mock(Cluster.class), + new HashSet<>(Collections.singletonList(member("localhost"))))); + boolean result = translateProvider.getAsBoolean(); // then assertFalse(result); @@ -138,8 +131,9 @@ public void memberInternalAddressAsDefinedInClientConfig_clientUsesHostname() { TranslateToPublicAddressProvider translateProvider = createTranslateProvider(); // when - translateProvider.refresh(defaultAddressProvider(), asList(member("127.0.0.1"))); - boolean result = translateProvider.get(); + translateProvider.init(new InitialMembershipEvent(mock(Cluster.class), + new HashSet<>(Collections.singletonList(member("127.0.0.1"))))); + boolean result = translateProvider.getAsBoolean(); // then assertFalse(result); @@ -152,8 +146,9 @@ public void memberInternalAddressAsDefinedInClientConfig_clientUsesHostnameAndIp TranslateToPublicAddressProvider translateProvider = createTranslateProvider(); // when - translateProvider.refresh(defaultAddressProvider(), asList(member("127.0.0.1"))); - boolean result = translateProvider.get(); + translateProvider.init(new InitialMembershipEvent(mock(Cluster.class), + new HashSet<>(Collections.singletonList(member("127.0.0.1"))))); + boolean result = translateProvider.getAsBoolean(); // then assertFalse(result); @@ -165,8 +160,9 @@ public void membersWithoutPublicAddresses() { TranslateToPublicAddressProvider translateProvider = createTranslateProvider(); // when - translateProvider.refresh(defaultAddressProvider(), asList(member("127.0.0.1"))); - boolean result = translateProvider.get(); + translateProvider.init(new InitialMembershipEvent(mock(Cluster.class), + new HashSet<>(Collections.singletonList(member("127.0.0.1"))))); + boolean result = translateProvider.getAsBoolean(); // then assertFalse(result); @@ -179,9 +175,10 @@ public void membersReachableViaInternalAddress() { TranslateToPublicAddressProvider translateProvider = createTranslateProvider(); // when - translateProvider.refresh(defaultAddressProvider(), - asList(member(REACHABLE_HOST, UNREACHABLE_HOST), member(REACHABLE_HOST, UNREACHABLE_HOST))); - boolean result = translateProvider.get(); + translateProvider.init(new InitialMembershipEvent(mock(Cluster.class), + new HashSet<>(asList(member(REACHABLE_HOST, UNREACHABLE_HOST), + member(REACHABLE_HOST, UNREACHABLE_HOST))))); + boolean result = translateProvider.getAsBoolean(); // then assertFalse(result); @@ -193,8 +190,9 @@ public void membersUnreachable() { TranslateToPublicAddressProvider translateProvider = createTranslateProvider(); // when - translateProvider.refresh(defaultAddressProvider(), asList(member(UNREACHABLE_HOST, UNREACHABLE_HOST))); - boolean result = translateProvider.get(); + translateProvider.init(new InitialMembershipEvent(mock(Cluster.class), + new HashSet<>(Collections.singletonList(member(UNREACHABLE_HOST, UNREACHABLE_HOST))))); + boolean result = translateProvider.getAsBoolean(); // then assertFalse(result); @@ -207,8 +205,9 @@ public void membersReachableOnlyViaPublicAddress() { TranslateToPublicAddressProvider translateProvider = createTranslateProvider(); // when - translateProvider.refresh(defaultAddressProvider(), asList(member(UNREACHABLE_HOST, REACHABLE_HOST))); - boolean result = translateProvider.get(); + translateProvider.init(new InitialMembershipEvent(mock(Cluster.class), + new HashSet<>(Collections.singletonList(member(UNREACHABLE_HOST, REACHABLE_HOST))))); + boolean result = translateProvider.getAsBoolean(); // then assertTrue(result); @@ -221,40 +220,40 @@ private TranslateToPublicAddressProvider createTranslateProvider() { } @Nonnull - private DefaultAddressProvider defaultAddressProvider() { - return new DefaultAddressProvider(config.getNetworkConfig()); - } - - @Nonnull - private MemberInfo member(String host) { + private Member member(String host) { try { - return new MemberInfo(new Address(host, 5701), UUID.randomUUID(), emptyMap(), false, VERSION); + MemberImpl.Builder memberBuilder; + memberBuilder = new MemberImpl.Builder(new Address(host, 5701)); + return memberBuilder.version(VERSION) + .uuid(UUID.randomUUID()) + .attributes(emptyMap()) + .liteMember(false).build(); } catch (UnknownHostException e) { throw new RuntimeException(e); } } @Nonnull - private MemberInfo member(String host, String publicHost) { + private Member member(String host, String publicHost) { + try { + MemberImpl.Builder memberBuilder; + Address internalAddress = new Address(host, 5701); Address publicAddress = new Address(publicHost, 5701); - return new MemberInfo(internalAddress, UUID.randomUUID(), emptyMap(), false, VERSION, - singletonMap(EndpointQualifier.resolve(ProtocolType.CLIENT, "public"), publicAddress)); + + Map addressMap = new HashMap<>(); + addressMap.put(EndpointQualifier.resolve(ProtocolType.CLIENT, "public"), publicAddress); + addressMap.put(EndpointQualifier.MEMBER, internalAddress); + + memberBuilder = new MemberImpl.Builder(addressMap); + return memberBuilder.version(VERSION) + .uuid(UUID.randomUUID()) + .attributes(emptyMap()) + .liteMember(false).build(); } catch (UnknownHostException e) { throw new RuntimeException(e); } } - private class TestAddressProvider implements AddressProvider { - @Override - public Address translate(Address address) { - return address; - } - - @Override - public Addresses loadAddresses() { - return new Addresses(emptyList()); - } - } } diff --git a/hazelcast/src/test/java/com/hazelcast/client/impl/spi/impl/discovery/RemoteAddressProviderTest.java b/hazelcast/src/test/java/com/hazelcast/client/impl/spi/impl/discovery/RemoteAddressProviderTest.java index 0d19a93db641..61874046a4c9 100644 --- a/hazelcast/src/test/java/com/hazelcast/client/impl/spi/impl/discovery/RemoteAddressProviderTest.java +++ b/hazelcast/src/test/java/com/hazelcast/client/impl/spi/impl/discovery/RemoteAddressProviderTest.java @@ -72,7 +72,7 @@ public void testLoadAddresses_whenExceptionIsThrown() throws Exception { @Test public void testTranslate_whenAddressIsNull_thenReturnNull() throws Exception { RemoteAddressProvider provider = new RemoteAddressProvider(() -> expectedAddresses, true); - Address actual = provider.translate(null); + Address actual = provider.translate((Address) null); assertNull(actual); } diff --git a/hazelcast/src/test/java/com/hazelcast/client/statistics/ClientStatisticsTest.java b/hazelcast/src/test/java/com/hazelcast/client/statistics/ClientStatisticsTest.java index cbdde211d615..1692a922764d 100644 --- a/hazelcast/src/test/java/com/hazelcast/client/statistics/ClientStatisticsTest.java +++ b/hazelcast/src/test/java/com/hazelcast/client/statistics/ClientStatisticsTest.java @@ -202,8 +202,8 @@ public void testStatisticsTwoClients() { assertNotNull(clientStatistics); assertEquals(2, clientStatistics.size()); List expectedUUIDs = new ArrayList<>(2); - expectedUUIDs.add(client1.getClientClusterService().getLocalClient().getUuid()); - expectedUUIDs.add(client2.getClientClusterService().getLocalClient().getUuid()); + expectedUUIDs.add(client1.getLocalEndpoint().getUuid()); + expectedUUIDs.add(client2.getLocalEndpoint().getUuid()); for (Map.Entry clientEntry : clientStatistics.entrySet()) { assertTrue(expectedUUIDs.contains(clientEntry.getKey())); String clientAttributes = clientEntry.getValue().clientAttributes(); @@ -299,7 +299,7 @@ private static Map getStats(HazelcastClientInstanceImpl client, assertEquals("clientStatistics.size() should be 1", 1, clientStatistics.size()); Set> entries = clientStatistics.entrySet(); Map.Entry statEntry = entries.iterator().next(); - assertEquals(client.getClientClusterService().getLocalClient().getUuid(), statEntry.getKey()); + assertEquals(client.getLocalEndpoint().getUuid(), statEntry.getKey()); return parseClientAttributeValue(statEntry.getValue().clientAttributes()); } diff --git a/hazelcast/src/test/java/com/hazelcast/client/test/TestHazelcastFactory.java b/hazelcast/src/test/java/com/hazelcast/client/test/TestHazelcastFactory.java index 700c8bf6aac2..afe03d9289e3 100644 --- a/hazelcast/src/test/java/com/hazelcast/client/test/TestHazelcastFactory.java +++ b/hazelcast/src/test/java/com/hazelcast/client/test/TestHazelcastFactory.java @@ -27,6 +27,7 @@ import com.hazelcast.client.properties.ClientProperty; import com.hazelcast.client.util.AddressHelper; import com.hazelcast.cluster.Address; +import com.hazelcast.cluster.Member; import com.hazelcast.config.DiscoveryStrategyConfig; import com.hazelcast.config.InvalidConfigurationException; import com.hazelcast.core.HazelcastInstance; @@ -150,6 +151,11 @@ public Addresses loadAddresses() { public Address translate(Address address) { return address; } + + @Override + public Address translate(Member member) { + return member.getAddress(); + } }; }