Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't allow empty memberlist on cluster id changes [API-1215] (#20818) #21178

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -24,7 +24,9 @@
import com.hazelcast.client.config.impl.ClientAliasedDiscoveryConfigUtils;
import com.hazelcast.client.impl.ClientExtension;
import com.hazelcast.client.impl.connection.AddressProvider;
import com.hazelcast.client.impl.spi.ClientClusterService;
import com.hazelcast.client.impl.spi.impl.DefaultAddressProvider;
import com.hazelcast.client.impl.spi.impl.TranslateToPublicAddressProvider;
import com.hazelcast.client.impl.spi.impl.discovery.HazelcastCloudDiscovery;
import com.hazelcast.client.impl.spi.impl.discovery.RemoteAddressProvider;
import com.hazelcast.client.properties.ClientProperty;
Expand Down Expand Up @@ -71,17 +73,20 @@ class ClusterDiscoveryServiceBuilder {
private final Collection<ClientConfig> configs;
private final LifecycleService lifecycleService;
private final AddressProvider externalAddressProvider;
private final ClientClusterService clusterService;

ClusterDiscoveryServiceBuilder(int configsTryCount, List<ClientConfig> configs, LoggingService loggingService,
AddressProvider externalAddressProvider, HazelcastProperties properties,
ClientExtension clientExtension, LifecycleService lifecycleService) {
ClientExtension clientExtension, LifecycleService lifecycleService,
ClientClusterService clusterService) {
this.configsTryCount = configsTryCount;
this.configs = configs;
this.loggingService = loggingService;
this.externalAddressProvider = externalAddressProvider;
this.properties = properties;
this.clientExtension = clientExtension;
this.lifecycleService = lifecycleService;
this.clusterService = clusterService;
}

public ClusterDiscoveryService build() {
Expand All @@ -104,7 +109,8 @@ public ClusterDiscoveryService build() {

final SSLConfig sslConfig = networkConfig.getSSLConfig();
final SocketOptions socketOptions = networkConfig.getSocketOptions();
contexts.add(new CandidateClusterContext(config.getClusterName(), provider, discoveryService, credentialsFactory,
contexts.add(new CandidateClusterContext(config.getClusterName(), provider,
discoveryService, credentialsFactory,
interceptor, clientExtension.createChannelInitializer(sslConfig, socketOptions)));
}
return new ClusterDiscoveryService(unmodifiableList(contexts), configsTryCount, lifecycleService);
Expand Down Expand Up @@ -145,7 +151,11 @@ private AddressProvider createAddressProvider(ClientConfig clientConfig, Discove
} else if (networkConfig.getAddresses().isEmpty() && discoveryService != null) {
return new RemoteAddressProvider(() -> discoverAddresses(discoveryService), usePublicAddress(clientConfig));
}
return new DefaultAddressProvider(networkConfig);
TranslateToPublicAddressProvider toPublicAddressProvider = new TranslateToPublicAddressProvider(networkConfig,
properties,
loggingService.getLogger(TranslateToPublicAddressProvider.class));
clusterService.addMembershipListener(toPublicAddressProvider);
return new DefaultAddressProvider(networkConfig, toPublicAddressProvider);
}

private Map<Address, Address> discoverAddresses(DiscoveryService discoveryService) {
Expand Down
Expand Up @@ -28,16 +28,17 @@
import com.hazelcast.client.cp.internal.CPSubsystemImpl;
import com.hazelcast.client.cp.internal.session.ClientProxySessionManager;
import com.hazelcast.client.impl.ClientExtension;
import com.hazelcast.client.impl.ClientImpl;
import com.hazelcast.client.impl.client.DistributedObjectInfo;
import com.hazelcast.client.impl.connection.AddressProvider;
import com.hazelcast.client.impl.connection.ClientConnectionManager;
import com.hazelcast.client.impl.connection.tcp.ClientICMPManager;
import com.hazelcast.client.impl.connection.tcp.HeartbeatManager;
import com.hazelcast.client.impl.connection.tcp.TcpClientConnection;
import com.hazelcast.client.impl.connection.tcp.TcpClientConnectionManager;
import com.hazelcast.client.impl.protocol.ClientExceptionFactory;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.codec.ClientGetDistributedObjectsCodec;
import com.hazelcast.client.impl.proxy.ClientClusterProxy;
import com.hazelcast.client.impl.proxy.PartitionServiceProxy;
import com.hazelcast.client.impl.spi.ClientClusterService;
import com.hazelcast.client.impl.spi.ClientContext;
Expand Down Expand Up @@ -138,6 +139,7 @@
import com.hazelcast.transaction.impl.xa.XAService;

import javax.annotation.Nonnull;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.EventListener;
Expand Down Expand Up @@ -167,6 +169,7 @@
import static com.hazelcast.internal.util.ExceptionUtil.rethrow;
import static com.hazelcast.internal.util.Preconditions.checkNotNull;
import static java.lang.System.currentTimeMillis;
import static java.util.Collections.unmodifiableSet;

public class HazelcastClientInstanceImpl implements HazelcastInstance, SerializationServiceSupport {

Expand Down Expand Up @@ -253,11 +256,11 @@ public HazelcastClientInstanceImpl(String instanceName, ClientConfig clientConfi
loadBalancer = initLoadBalancer(config);
transactionManager = new ClientTransactionManagerServiceImpl(this);
partitionService = new ClientPartitionServiceImpl(this);
clusterService = new ClientClusterServiceImpl(loggingService.getLogger(ClientClusterService.class));
clusterDiscoveryService = initClusterDiscoveryService(externalAddressProvider);
connectionManager = (TcpClientConnectionManager) clientConnectionManagerFactory.createConnectionManager(this);
invocationService = new ClientInvocationServiceImpl(this);
listenerService = new ClientListenerServiceImpl(this);
clusterService = new ClientClusterServiceImpl(this);
clientClusterViewListenerService = new ClientClusterViewListenerService(this);
userContext.putAll(config.getUserContext());
diagnostics = initDiagnostics();
Expand Down Expand Up @@ -295,7 +298,7 @@ private ClusterDiscoveryService initClusterDiscoveryService(AddressProvider exte
configs = clientFailoverConfig.getClientConfigs();
}
ClusterDiscoveryServiceBuilder builder = new ClusterDiscoveryServiceBuilder(tryCount, configs, loggingService,
externalAddressProvider, properties, clientExtension, getLifecycleService());
externalAddressProvider, properties, clientExtension, getLifecycleService(), clusterService);
return builder.build();
}

Expand Down Expand Up @@ -551,13 +554,16 @@ public ClientICacheManager getCacheManager() {
@Nonnull
@Override
public Cluster getCluster() {
return new ClientClusterProxy(clusterService);
return clusterService.getCluster();
}

@Nonnull
@Override
public Client getLocalEndpoint() {
return clusterService.getLocalClient();
TcpClientConnection connection = (TcpClientConnection) connectionManager.getRandomConnection();
InetSocketAddress inetSocketAddress = connection != null ? connection.getLocalSocketAddress() : null;
UUID clientUuid = connectionManager.getClientUuid();
return new ClientImpl(clientUuid, inetSocketAddress, instanceName, unmodifiableSet(config.getLabels()));
}

@Nonnull
Expand Down Expand Up @@ -863,8 +869,8 @@ public void onClusterChange() {
logger.info("Resetting local state of the client, because of a cluster change.");

dispose(onClusterChangeDisposables);
//clear the member lists
clusterService.reset();
//reset the member list version
clusterService.onClusterChange();
//clear partition service
partitionService.reset();
//close all the connections, consequently waiting invocations get TargetDisconnectedException
Expand All @@ -873,12 +879,12 @@ public void onClusterChange() {
connectionManager.reset();
}

public void onClusterRestart() {
public void onClusterConnect() {
ILogger logger = loggingService.getLogger(HazelcastInstance.class);
logger.info("Clearing local state of the client, because of a cluster restart.");

dispose(onClusterChangeDisposables);
clusterService.clearMemberList();
clusterService.onClusterConnect();
}

public void waitForInitialMembershipEvents() {
Expand Down
Expand Up @@ -18,6 +18,7 @@


import com.hazelcast.cluster.Address;
import com.hazelcast.cluster.Member;

/**
* Provides initial addresses for client to find and connect to a node &
Expand All @@ -40,4 +41,10 @@ public interface AddressProvider {
* @throws Exception when a remote service can not provide addressee
*/
Address translate(Address address) throws Exception;

/*
* Implementations of this will handle returning the public address of the member if necessary.
* See {@link com.hazelcast.client.impl.spi.impl.DefaultAddressProvider#addressOf(Member)}
*/
Address translate(Member member) throws Exception;
}
Expand Up @@ -21,6 +21,7 @@
import com.hazelcast.internal.nio.Connection;

import java.util.Map;
import java.util.UUID;

/**
* The ClientConnection is connection that lives on the client side on behalf of a Java client.
Expand All @@ -44,6 +45,10 @@ public interface ClientConnection extends Connection {

void addEventHandler(long correlationId, EventHandler handler);

void setClusterUuid(UUID uuid);

UUID getClusterUuid();

// used in tests
Map<Long, EventHandler> getEventHandlers();

Expand Down
Expand Up @@ -77,6 +77,7 @@ public class TcpClientConnection implements ClientConnection {
private volatile String closeReason;
private String connectedServerVersion;
private volatile UUID remoteUuid;
private volatile UUID clusterUuid;

public TcpClientConnection(HazelcastClientInstanceImpl client, int connectionId, Channel channel) {
this.client = client;
Expand Down Expand Up @@ -286,10 +287,6 @@ public void setConnectedServerVersion(String connectedServerVersion) {
this.connectedServerVersion = connectedServerVersion;
}

public String getConnectedServerVersion() {
return connectedServerVersion;
}

@Override
public EventHandler getEventHandler(long correlationId) {
return eventHandlerMap.get(correlationId);
Expand All @@ -305,6 +302,16 @@ public void addEventHandler(long correlationId, EventHandler handler) {
eventHandlerMap.put(correlationId, handler);
}

@Override
public void setClusterUuid(UUID uuid) {
clusterUuid = uuid;
}

@Override
public UUID getClusterUuid() {
return clusterUuid;
}

// used in tests
@Override
public Map<Long, EventHandler> getEventHandlers() {
Expand Down
Expand Up @@ -44,12 +44,13 @@
import com.hazelcast.client.impl.spi.impl.ClientPartitionServiceImpl;
import com.hazelcast.cluster.Address;
import com.hazelcast.cluster.Member;
import com.hazelcast.cluster.MembershipEvent;
import com.hazelcast.cluster.MembershipListener;
import com.hazelcast.config.InvalidConfigurationException;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.LifecycleEvent.LifecycleState;
import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.instance.BuildInfoProvider;
import com.hazelcast.instance.EndpointQualifier;
import com.hazelcast.instance.ProtocolType;
import com.hazelcast.internal.networking.Channel;
import com.hazelcast.internal.networking.ChannelErrorHandler;
import com.hazelcast.internal.networking.nio.NioNetworking;
Expand Down Expand Up @@ -119,13 +120,11 @@
/**
* Implementation of {@link ClientConnectionManager}.
*/
public class TcpClientConnectionManager implements ClientConnectionManager {
public class TcpClientConnectionManager implements ClientConnectionManager, MembershipListener {

private static final int DEFAULT_SMART_CLIENT_THREAD_COUNT = 3;
private static final int EXECUTOR_CORE_POOL_SIZE = 10;
private static final int SMALL_MACHINE_PROCESSOR_COUNT = 8;
private static final EndpointQualifier CLIENT_PUBLIC_ENDPOINT_QUALIFIER =
EndpointQualifier.resolve(ProtocolType.CLIENT, "public");
private static final int SQL_CONNECTION_RANDOM_ATTEMPTS = 10;

protected final AtomicInteger connectionIdGen = new AtomicInteger();
Expand Down Expand Up @@ -727,29 +726,25 @@ protected TcpClientConnection createSocketConnection(Address target) {
}

private Address translate(Member member) {
if (client.getClientClusterService().translateToPublicAddress()) {
Address publicAddress = member.getAddressMap().get(CLIENT_PUBLIC_ENDPOINT_QUALIFIER);
if (publicAddress != null) {
return publicAddress;
}
return member.getAddress();
}
return translate(member.getAddress());
return translate(member, AddressProvider::translate);
}

private Address translate(Address address) {
return translate(address, AddressProvider::translate);
}

private Address translate(Address target) {
private <T> Address translate(T target, BiFunctionEx<AddressProvider, T, Address> translateFunction) {
CandidateClusterContext currentContext = clusterDiscoveryService.current();
AddressProvider addressProvider = currentContext.getAddressProvider();
try {
Address translatedAddress = addressProvider.translate(target);
Address translatedAddress = translateFunction.apply(addressProvider, target);
if (translatedAddress == null) {
throw new HazelcastException("Address Provider " + addressProvider.getClass()
+ " could not translate address " + target);
+ " could not translate " + target);
}

return translatedAddress;
} catch (Exception e) {
logger.warning("Failed to translate address " + target + " via address provider " + e.getMessage());
logger.warning("Failed to translate " + target + " via address provider " + e.getMessage());
throw rethrow(e);
}
}
Expand Down Expand Up @@ -896,9 +891,9 @@ private TcpClientConnection onAuthenticated(TcpClientConnection connection,
boolean switchingToNextCluster) {
synchronized (clientStateMutex) {
checkAuthenticationResponse(connection, response);
connection.setConnectedServerVersion(response.serverHazelcastVersion);
connection.setRemoteAddress(response.address);
connection.setRemoteUuid(response.memberUuid);
connection.setClusterUuid(response.clusterId);

TcpClientConnection existingConnection = activeConnections.get(response.memberUuid);
if (existingConnection != null) {
Expand All @@ -919,7 +914,7 @@ private TcpClientConnection onAuthenticated(TcpClientConnection connection,
if (clusterIdChanged) {
checkClientStateOnClusterIdChange(connection, switchingToNextCluster);
logger.warning("Switching from current cluster: " + this.clusterId + " to new cluster: " + newClusterId);
client.onClusterRestart();
client.onClusterConnect();
}
checkClientState(connection, switchingToNextCluster);

Expand Down Expand Up @@ -1193,4 +1188,20 @@ public void run() {
}
}
}

@Override
public void memberAdded(MembershipEvent membershipEvent) {

}

@Override
public void memberRemoved(MembershipEvent membershipEvent) {
Member member = membershipEvent.getMember();
Connection connection = getConnection(member.getUuid());
if (connection != null) {
connection.close(null,
new TargetDisconnectedException("The client has closed the connection to this member,"
+ " after receiving a member left event from the cluster. " + connection));
}
}
}
Expand Up @@ -17,11 +17,12 @@
package com.hazelcast.client.impl.proxy;

import com.hazelcast.client.impl.spi.impl.ClientClusterServiceImpl;
import com.hazelcast.cluster.ClusterState;
import com.hazelcast.cluster.Cluster;
import com.hazelcast.cluster.ClusterState;
import com.hazelcast.cluster.Member;
import com.hazelcast.cluster.MembershipListener;
import com.hazelcast.hotrestart.HotRestartService;
import com.hazelcast.internal.util.Clock;
import com.hazelcast.persistence.PersistenceService;
import com.hazelcast.transaction.TransactionOptions;
import com.hazelcast.version.Version;
Expand Down Expand Up @@ -70,7 +71,7 @@ public Member getLocalMember() {

@Override
public long getClusterTime() {
return clusterService.getClusterTime();
return Clock.currentTimeMillis();
}

@Nonnull
Expand Down