Skip to content

Commit

Permalink
Don't allow empty memberlist on cluster id changes [API-1215] (#20818) (
Browse files Browse the repository at this point in the history
#21178)

* Dont allow empty memberlist on cluster id changes

When cluster id changed we were resetting member list
with an empty memberlist. It turned out that there are implementations
that assumes it will never be empty.

Following test failure is an example of it.
#20264

Instead of clearing the memberlist on cluster id change, we
are keeping the cluster id as part of the latest snapshot
to be able to fire correct events.

Note that clearing memberlist on cluster id change was
introduced as a fix to #18245
So it is important not to break related ClientHotRestartTest

fixes #20264

* Refactor ClusterService to make it unittestable

Move getClient API out to reduce dependency to
HazelcastInstanceImpl

* Refactor: ClusterService to be able to unittest

ClusterService does not depend on connection manager
and client anymore.

* Add unittest for ClientClusterServiceImpl

Also revised the fix to cover blue/green case as well.
ClientClusterService will not serve empty memberlist even when
blue/green is happening.

(cherry picked from commit 4d6fc11)

Add method to ClientClusterService for enterprise tests (#21169)

(cherry picked from commit 1c769a2)
  • Loading branch information
sancar committed Apr 12, 2022
1 parent 8da93f0 commit 8b696b2
Show file tree
Hide file tree
Showing 22 changed files with 808 additions and 392 deletions.
Expand Up @@ -24,7 +24,9 @@
import com.hazelcast.client.config.impl.ClientAliasedDiscoveryConfigUtils;
import com.hazelcast.client.impl.ClientExtension;
import com.hazelcast.client.impl.connection.AddressProvider;
import com.hazelcast.client.impl.spi.ClientClusterService;
import com.hazelcast.client.impl.spi.impl.DefaultAddressProvider;
import com.hazelcast.client.impl.spi.impl.TranslateToPublicAddressProvider;
import com.hazelcast.client.impl.spi.impl.discovery.HazelcastCloudDiscovery;
import com.hazelcast.client.impl.spi.impl.discovery.RemoteAddressProvider;
import com.hazelcast.client.properties.ClientProperty;
Expand Down Expand Up @@ -71,17 +73,20 @@ class ClusterDiscoveryServiceBuilder {
private final Collection<ClientConfig> configs;
private final LifecycleService lifecycleService;
private final AddressProvider externalAddressProvider;
private final ClientClusterService clusterService;

ClusterDiscoveryServiceBuilder(int configsTryCount, List<ClientConfig> configs, LoggingService loggingService,
AddressProvider externalAddressProvider, HazelcastProperties properties,
ClientExtension clientExtension, LifecycleService lifecycleService) {
ClientExtension clientExtension, LifecycleService lifecycleService,
ClientClusterService clusterService) {
this.configsTryCount = configsTryCount;
this.configs = configs;
this.loggingService = loggingService;
this.externalAddressProvider = externalAddressProvider;
this.properties = properties;
this.clientExtension = clientExtension;
this.lifecycleService = lifecycleService;
this.clusterService = clusterService;
}

public ClusterDiscoveryService build() {
Expand All @@ -104,7 +109,8 @@ public ClusterDiscoveryService build() {

final SSLConfig sslConfig = networkConfig.getSSLConfig();
final SocketOptions socketOptions = networkConfig.getSocketOptions();
contexts.add(new CandidateClusterContext(config.getClusterName(), provider, discoveryService, credentialsFactory,
contexts.add(new CandidateClusterContext(config.getClusterName(), provider,
discoveryService, credentialsFactory,
interceptor, clientExtension.createChannelInitializer(sslConfig, socketOptions)));
}
return new ClusterDiscoveryService(unmodifiableList(contexts), configsTryCount, lifecycleService);
Expand Down Expand Up @@ -145,7 +151,11 @@ private AddressProvider createAddressProvider(ClientConfig clientConfig, Discove
} else if (networkConfig.getAddresses().isEmpty() && discoveryService != null) {
return new RemoteAddressProvider(() -> discoverAddresses(discoveryService), usePublicAddress(clientConfig));
}
return new DefaultAddressProvider(networkConfig);
TranslateToPublicAddressProvider toPublicAddressProvider = new TranslateToPublicAddressProvider(networkConfig,
properties,
loggingService.getLogger(TranslateToPublicAddressProvider.class));
clusterService.addMembershipListener(toPublicAddressProvider);
return new DefaultAddressProvider(networkConfig, toPublicAddressProvider);
}

private Map<Address, Address> discoverAddresses(DiscoveryService discoveryService) {
Expand Down
Expand Up @@ -28,16 +28,17 @@
import com.hazelcast.client.cp.internal.CPSubsystemImpl;
import com.hazelcast.client.cp.internal.session.ClientProxySessionManager;
import com.hazelcast.client.impl.ClientExtension;
import com.hazelcast.client.impl.ClientImpl;
import com.hazelcast.client.impl.client.DistributedObjectInfo;
import com.hazelcast.client.impl.connection.AddressProvider;
import com.hazelcast.client.impl.connection.ClientConnectionManager;
import com.hazelcast.client.impl.connection.tcp.ClientICMPManager;
import com.hazelcast.client.impl.connection.tcp.HeartbeatManager;
import com.hazelcast.client.impl.connection.tcp.TcpClientConnection;
import com.hazelcast.client.impl.connection.tcp.TcpClientConnectionManager;
import com.hazelcast.client.impl.protocol.ClientExceptionFactory;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.codec.ClientGetDistributedObjectsCodec;
import com.hazelcast.client.impl.proxy.ClientClusterProxy;
import com.hazelcast.client.impl.proxy.PartitionServiceProxy;
import com.hazelcast.client.impl.spi.ClientClusterService;
import com.hazelcast.client.impl.spi.ClientContext;
Expand Down Expand Up @@ -138,6 +139,7 @@
import com.hazelcast.transaction.impl.xa.XAService;

import javax.annotation.Nonnull;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.EventListener;
Expand Down Expand Up @@ -167,6 +169,7 @@
import static com.hazelcast.internal.util.ExceptionUtil.rethrow;
import static com.hazelcast.internal.util.Preconditions.checkNotNull;
import static java.lang.System.currentTimeMillis;
import static java.util.Collections.unmodifiableSet;

public class HazelcastClientInstanceImpl implements HazelcastInstance, SerializationServiceSupport {

Expand Down Expand Up @@ -253,11 +256,11 @@ public HazelcastClientInstanceImpl(String instanceName, ClientConfig clientConfi
loadBalancer = initLoadBalancer(config);
transactionManager = new ClientTransactionManagerServiceImpl(this);
partitionService = new ClientPartitionServiceImpl(this);
clusterService = new ClientClusterServiceImpl(loggingService.getLogger(ClientClusterService.class));
clusterDiscoveryService = initClusterDiscoveryService(externalAddressProvider);
connectionManager = (TcpClientConnectionManager) clientConnectionManagerFactory.createConnectionManager(this);
invocationService = new ClientInvocationServiceImpl(this);
listenerService = new ClientListenerServiceImpl(this);
clusterService = new ClientClusterServiceImpl(this);
clientClusterViewListenerService = new ClientClusterViewListenerService(this);
userContext.putAll(config.getUserContext());
diagnostics = initDiagnostics();
Expand Down Expand Up @@ -295,7 +298,7 @@ private ClusterDiscoveryService initClusterDiscoveryService(AddressProvider exte
configs = clientFailoverConfig.getClientConfigs();
}
ClusterDiscoveryServiceBuilder builder = new ClusterDiscoveryServiceBuilder(tryCount, configs, loggingService,
externalAddressProvider, properties, clientExtension, getLifecycleService());
externalAddressProvider, properties, clientExtension, getLifecycleService(), clusterService);
return builder.build();
}

Expand Down Expand Up @@ -551,13 +554,16 @@ public ClientICacheManager getCacheManager() {
@Nonnull
@Override
public Cluster getCluster() {
return new ClientClusterProxy(clusterService);
return clusterService.getCluster();
}

@Nonnull
@Override
public Client getLocalEndpoint() {
return clusterService.getLocalClient();
TcpClientConnection connection = (TcpClientConnection) connectionManager.getRandomConnection();
InetSocketAddress inetSocketAddress = connection != null ? connection.getLocalSocketAddress() : null;
UUID clientUuid = connectionManager.getClientUuid();
return new ClientImpl(clientUuid, inetSocketAddress, instanceName, unmodifiableSet(config.getLabels()));
}

@Nonnull
Expand Down Expand Up @@ -863,8 +869,8 @@ public void onClusterChange() {
logger.info("Resetting local state of the client, because of a cluster change.");

dispose(onClusterChangeDisposables);
//clear the member lists
clusterService.reset();
//reset the member list version
clusterService.onClusterChange();
//clear partition service
partitionService.reset();
//close all the connections, consequently waiting invocations get TargetDisconnectedException
Expand All @@ -873,12 +879,12 @@ public void onClusterChange() {
connectionManager.reset();
}

public void onClusterRestart() {
public void onClusterConnect() {
ILogger logger = loggingService.getLogger(HazelcastInstance.class);
logger.info("Clearing local state of the client, because of a cluster restart.");

dispose(onClusterChangeDisposables);
clusterService.clearMemberList();
clusterService.onClusterConnect();
}

public void waitForInitialMembershipEvents() {
Expand Down
Expand Up @@ -18,6 +18,7 @@


import com.hazelcast.cluster.Address;
import com.hazelcast.cluster.Member;

/**
* Provides initial addresses for client to find and connect to a node &
Expand All @@ -40,4 +41,10 @@ public interface AddressProvider {
* @throws Exception when a remote service can not provide addressee
*/
Address translate(Address address) throws Exception;

/*
* Implementations of this will handle returning the public address of the member if necessary.
* See {@link com.hazelcast.client.impl.spi.impl.DefaultAddressProvider#addressOf(Member)}
*/
Address translate(Member member) throws Exception;
}
Expand Up @@ -21,6 +21,7 @@
import com.hazelcast.internal.nio.Connection;

import java.util.Map;
import java.util.UUID;

/**
* The ClientConnection is connection that lives on the client side on behalf of a Java client.
Expand All @@ -44,6 +45,10 @@ public interface ClientConnection extends Connection {

void addEventHandler(long correlationId, EventHandler handler);

void setClusterUuid(UUID uuid);

UUID getClusterUuid();

// used in tests
Map<Long, EventHandler> getEventHandlers();

Expand Down
Expand Up @@ -77,6 +77,7 @@ public class TcpClientConnection implements ClientConnection {
private volatile String closeReason;
private String connectedServerVersion;
private volatile UUID remoteUuid;
private volatile UUID clusterUuid;

public TcpClientConnection(HazelcastClientInstanceImpl client, int connectionId, Channel channel) {
this.client = client;
Expand Down Expand Up @@ -286,10 +287,6 @@ public void setConnectedServerVersion(String connectedServerVersion) {
this.connectedServerVersion = connectedServerVersion;
}

public String getConnectedServerVersion() {
return connectedServerVersion;
}

@Override
public EventHandler getEventHandler(long correlationId) {
return eventHandlerMap.get(correlationId);
Expand All @@ -305,6 +302,16 @@ public void addEventHandler(long correlationId, EventHandler handler) {
eventHandlerMap.put(correlationId, handler);
}

@Override
public void setClusterUuid(UUID uuid) {
clusterUuid = uuid;
}

@Override
public UUID getClusterUuid() {
return clusterUuid;
}

// used in tests
@Override
public Map<Long, EventHandler> getEventHandlers() {
Expand Down
Expand Up @@ -44,12 +44,13 @@
import com.hazelcast.client.impl.spi.impl.ClientPartitionServiceImpl;
import com.hazelcast.cluster.Address;
import com.hazelcast.cluster.Member;
import com.hazelcast.cluster.MembershipEvent;
import com.hazelcast.cluster.MembershipListener;
import com.hazelcast.config.InvalidConfigurationException;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.LifecycleEvent.LifecycleState;
import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.instance.BuildInfoProvider;
import com.hazelcast.instance.EndpointQualifier;
import com.hazelcast.instance.ProtocolType;
import com.hazelcast.internal.networking.Channel;
import com.hazelcast.internal.networking.ChannelErrorHandler;
import com.hazelcast.internal.networking.nio.NioNetworking;
Expand Down Expand Up @@ -119,13 +120,11 @@
/**
* Implementation of {@link ClientConnectionManager}.
*/
public class TcpClientConnectionManager implements ClientConnectionManager {
public class TcpClientConnectionManager implements ClientConnectionManager, MembershipListener {

private static final int DEFAULT_SMART_CLIENT_THREAD_COUNT = 3;
private static final int EXECUTOR_CORE_POOL_SIZE = 10;
private static final int SMALL_MACHINE_PROCESSOR_COUNT = 8;
private static final EndpointQualifier CLIENT_PUBLIC_ENDPOINT_QUALIFIER =
EndpointQualifier.resolve(ProtocolType.CLIENT, "public");
private static final int SQL_CONNECTION_RANDOM_ATTEMPTS = 10;

protected final AtomicInteger connectionIdGen = new AtomicInteger();
Expand Down Expand Up @@ -727,29 +726,25 @@ protected TcpClientConnection createSocketConnection(Address target) {
}

private Address translate(Member member) {
if (client.getClientClusterService().translateToPublicAddress()) {
Address publicAddress = member.getAddressMap().get(CLIENT_PUBLIC_ENDPOINT_QUALIFIER);
if (publicAddress != null) {
return publicAddress;
}
return member.getAddress();
}
return translate(member.getAddress());
return translate(member, AddressProvider::translate);
}

private Address translate(Address address) {
return translate(address, AddressProvider::translate);
}

private Address translate(Address target) {
private <T> Address translate(T target, BiFunctionEx<AddressProvider, T, Address> translateFunction) {
CandidateClusterContext currentContext = clusterDiscoveryService.current();
AddressProvider addressProvider = currentContext.getAddressProvider();
try {
Address translatedAddress = addressProvider.translate(target);
Address translatedAddress = translateFunction.apply(addressProvider, target);
if (translatedAddress == null) {
throw new HazelcastException("Address Provider " + addressProvider.getClass()
+ " could not translate address " + target);
+ " could not translate " + target);
}

return translatedAddress;
} catch (Exception e) {
logger.warning("Failed to translate address " + target + " via address provider " + e.getMessage());
logger.warning("Failed to translate " + target + " via address provider " + e.getMessage());
throw rethrow(e);
}
}
Expand Down Expand Up @@ -896,9 +891,9 @@ private TcpClientConnection onAuthenticated(TcpClientConnection connection,
boolean switchingToNextCluster) {
synchronized (clientStateMutex) {
checkAuthenticationResponse(connection, response);
connection.setConnectedServerVersion(response.serverHazelcastVersion);
connection.setRemoteAddress(response.address);
connection.setRemoteUuid(response.memberUuid);
connection.setClusterUuid(response.clusterId);

TcpClientConnection existingConnection = activeConnections.get(response.memberUuid);
if (existingConnection != null) {
Expand All @@ -919,7 +914,7 @@ private TcpClientConnection onAuthenticated(TcpClientConnection connection,
if (clusterIdChanged) {
checkClientStateOnClusterIdChange(connection, switchingToNextCluster);
logger.warning("Switching from current cluster: " + this.clusterId + " to new cluster: " + newClusterId);
client.onClusterRestart();
client.onClusterConnect();
}
checkClientState(connection, switchingToNextCluster);

Expand Down Expand Up @@ -1193,4 +1188,20 @@ public void run() {
}
}
}

@Override
public void memberAdded(MembershipEvent membershipEvent) {

}

@Override
public void memberRemoved(MembershipEvent membershipEvent) {
Member member = membershipEvent.getMember();
Connection connection = getConnection(member.getUuid());
if (connection != null) {
connection.close(null,
new TargetDisconnectedException("The client has closed the connection to this member,"
+ " after receiving a member left event from the cluster. " + connection));
}
}
}
Expand Up @@ -17,11 +17,12 @@
package com.hazelcast.client.impl.proxy;

import com.hazelcast.client.impl.spi.impl.ClientClusterServiceImpl;
import com.hazelcast.cluster.ClusterState;
import com.hazelcast.cluster.Cluster;
import com.hazelcast.cluster.ClusterState;
import com.hazelcast.cluster.Member;
import com.hazelcast.cluster.MembershipListener;
import com.hazelcast.hotrestart.HotRestartService;
import com.hazelcast.internal.util.Clock;
import com.hazelcast.persistence.PersistenceService;
import com.hazelcast.transaction.TransactionOptions;
import com.hazelcast.version.Version;
Expand Down Expand Up @@ -70,7 +71,7 @@ public Member getLocalMember() {

@Override
public long getClusterTime() {
return clusterService.getClusterTime();
return Clock.currentTimeMillis();
}

@Nonnull
Expand Down

0 comments on commit 8b696b2

Please sign in to comment.