Skip to content

Commit

Permalink
Fixes a race on bluegreen
Browse files Browse the repository at this point in the history
Investigation of jet.enterprise.BlueGreenClientTest.testOriginalClusterComebackBeforeSwitching
revealed that we have a bug because of a race.

The race happens as follows:
we have two clusters with one member each.
clusterA on port 5701
clusterB on port 5702

The client is on clusterA and expected to switch to clusterB
when clusterA is down.
After clusterA is restarted, client starts connection attempts to 5701
to connect back. Sends two authentication attempts to 5701
```
2021-03-04 06:37:32,333 [ INFO] [hz.flamboyant_buck.priority-generic-operation.thread-0] [c.h.c.i.p.t.AuthenticationMessageTask]: [127.0.0.1]:5701 [clusterA] [4.5-SNAPSHOT] Received auth from Connection[id=1, /127.0.0.1:5701->/127.0.0.1:38650, qualifier=null, endpoint=[127.0.0.1]:38650, alive=true, connectionType=JVM, planeIndex=-1], successfully authenticated, clientUuid: 040378d9-ce22-4efd-a25a-6c6dc11f9122, client version: 4.2-SNAPSHOT
2021-03-04 06:37:32,336 [ INFO] [hz.flamboyant_buck.priority-generic-operation.thread-0] [c.h.c.i.p.t.AuthenticationMessageTask]: [127.0.0.1]:5701 [clusterA] [4.5-SNAPSHOT] Received auth from Connection[id=2, /127.0.0.1:5701->/127.0.0.1:45775, qualifier=null, endpoint=[127.0.0.1]:45775, alive=true, connectionType=JVM, planeIndex=-1], successfully authenticated, clientUuid: 040378d9-ce22-4efd-a25a-6c6dc11f9122, client version: 4.2-SNAPSHOT
```
Gots a response back from one of them and finds out that cluster id has changed.
At this point, we fallback to failover logic "Force to hard cluster switch"
```///////
2021-03-04 06:37:32,338 [ INFO] [hz.client_2.internal-1] [c.h.c.i.c.t.TcpClientConnection]: hz.client_2 [clusterA] [4.5-SNAPSHOT] [4.2-SNAPSHOT] ClientConnection{alive=false, connectionId=11, channel=NioChannel{/127.0.0.1:38650->/127.0.0.1:5701}, remoteAddress=[127.0.0.1]:5701, lastReadTime=2021-03-04 06:37:32.337, lastWriteTime=2021-03-04 06:37:32.332, closedTime=2021-03-04 06:37:32.337, connected server version=4.2-SNAPSHOT} closed. Reason: Force to hard cluster switch
2021-03-04 06:37:32,339 [ WARN] [hz.client_2.internal-1] [c.h.c.i.c.ClientConnectionManager]: hz.client_2 [clusterA] [4.5-SNAPSHOT] [4.2-SNAPSHOT] Exception during initial connection to Member [127.0.0.1]:5701 - 047bf9a6-2604-4c69-895b-d5504e16ea65: com.hazelcast.client.ClientNotAllowedInClusterException: Force to hard cluster switch
2021-03-04 06:37:32,339 [ WARN] [hz.client_2.internal-1] [c.h.c.i.c.ClientConnectionManager]: hz.client_2 [clusterA] [4.5-SNAPSHOT] [4.2-SNAPSHOT] Stopped trying on the cluster: clusterA reason: Force to hard cluster switch
```
We see in the logs that client has changed context and will try to connect to clusterB
```
2021-03-04 06:37:32,339 [ INFO] [hz.client_2.internal-1] [c.h.c.i.c.ClientConnectionManager]: hz.client_2 [clusterB] [4.5-SNAPSHOT] [4.2-SNAPSHOT] Trying to connect to next cluster: clusterB
2021-03-04 06:37:32,340 [ INFO] [hz.client_2.internal-1] [c.h.c.i.c.ClientConnectionManager]: hz.client_2 [clusterB] [4.5-SNAPSHOT] [4.2-SNAPSHOT] Trying to connect to [127.0.0.1]:5701
```
Attempts 5701 because on the client config ports are not given but the cluster names are different.
So we expect this one to fail the authentication.
```
2021-03-04 06:37:32,346 [ WARN] [hz.flamboyant_buck.generic-operation.thread-32] [c.h.c.i.p.t.AuthenticationMessageTask]: [127.0.0.1]:5701 [clusterA] [4.5-SNAPSHOT] Received auth from Connection[id=3, /127.0.0.1:5701->/127.0.0.1:42692, qualifier=null, endpoint=null, alive=true, connectionType=NONE, planeIndex=-1] with clientUuid 040378d9-ce22-4efd-a25a-6c6dc11f9122, authentication failed
2021-03-04 06:37:32,528 [ WARN] [hz.client_2.internal-1] [c.h.c.i.c.t.TcpClientConnection]: hz.client_2 [clusterB] [4.5-SNAPSHOT] [4.2-SNAPSHOT] ClientConnection{alive=false, connectionId=13, channel=NioChannel{/127.0.0.1:42692->/127.0.0.1:5701}, remoteAddress=null, lastReadTime=2021-03-04 06:37:32.526, lastWriteTime=2021-03-04 06:37:32.343, closedTime=2021-03-04 06:37:32.528, connected server version=null} closed. Reason: Failed to authenticate connection
com.hazelcast.client.AuthenticationException: Authentication failed. The configured cluster name on the client (see ClientConfig.setClusterName()) does not match the one configured in the cluster or the credentials set in the Client security config could not be authenticated
```
Just before that we see that client is switching cluster(which should not happen)
```
2021-03-04 06:37:32,341 [ WARN] [hz.client_2.internal-4] [c.h.c.i.c.ClientConnectionManager]: hz.client_2 [clusterB] [4.5-SNAPSHOT] [4.2-SNAPSHOT] Switching from current cluster: f9c285d1-168d-460d-8b94-f1db7687a948 to new cluster: b3a9a9af-763e-4d36-b810-9d7e31de8640
2021-03-04 06:37:32,342 [ INFO] [hz.client_2.internal-4] [c.h.c.HazelcastInstance]: hz.client_2 [clusterB] [4.5-SNAPSHOT] [4.2-SNAPSHOT] Clearing local state of the client, because of a cluster restart.
```
And also client says that it is connected to 5701 again
```
2021-03-04 06:37:32,476 [ INFO] [hz.client_2.internal-4] [c.h.c.i.c.ClientConnectionManager]: hz.client_2 [clusterB] [4.5-SNAPSHOT] [4.2-SNAPSHOT] Authenticated with server [127.0.0.1]:5701:35d11648-d679-4651-b1b7-333302e89e1b, server version: 4.2-SNAPSHOT, local address: /127.0.0.1:45775
2021-03-04 06:37:32,482 [ INFO] [hz.client_2.internal-4] [c.h.c.LifecycleService]: hz.client_2 [clusterB] [4.5-SNAPSHOT] [4.2-SNAPSHOT] HazelcastClient 4.2-SNAPSHOT (20210303 - 83fa58a, 9a3e1e7) is CLIENT_CONNECTED
```
This is happened because of the second connection attempt to 5701. First one is rejected and we made client to switch by force.
They were setting a global switch `switchingToNextCluster` to denote that further responses are OK to change the switch to cluster.
The false assumption was that we will not get any response other than the path that started with failover logic.
But the response to the second connection was done by periodic task `ConnectToAllClusterMembersTask`, and its intend is not to switch cluster.

The solution:
Together with global state `switchingToNextCluster`, I have introduced
a boolean on the connect methods to deliver the intend of the callee so that
we can decide on the response by comparing these two. If the connection is
opened without switching cluster intend, it could be connected to false cluster
and we should close it back.
Also note that the boolean `switchingToNextCluster` is moved to `ClientState.SWITCHING_CLUSTER`.

fixes https://github.com/hazelcast/hazelcast-jet-enterprise/issues/155
backport of #18511

(cherry picked from commit fbe3386)
  • Loading branch information
sancar committed May 6, 2021
1 parent 0a3bb4d commit 8bcf23d
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ public class TcpClientConnectionManager implements ClientConnectionManager {
private volatile UUID clusterId;
private volatile ClientState clientState = ClientState.INITIAL;
private volatile boolean connectToClusterTaskSubmitted;
private volatile boolean switchingToNextCluster;

private enum ClientState {
/**
Expand All @@ -177,13 +176,18 @@ private enum ClientState {

/**
* When a client sends its local state to the cluster it has connected,
* it switches to this state. When a client loses all connections to
* the current cluster and connects to a new cluster, its state goes
* back to {@link #CONNECTED_TO_CLUSTER}.
* it switches to this state.
* <p>
* Invocations are allowed in this state.
*/
INITIALIZED_ON_CLUSTER
INITIALIZED_ON_CLUSTER,
/**
* We get into this state before we try to connect to next cluster. As soon as the state is `SWITCHING_CLUSTER`
* any connection happened without cluster switch intent are no longer allowed and will be closed.
* Also we will not allow ConnectToAllClusterMembersTask to make any further connection attempts as long as
* the state is `SWITCHING_CLUSTER`
*/
SWITCHING_CLUSTER
}

public TcpClientConnectionManager(HazelcastClientInstanceImpl client) {
Expand Down Expand Up @@ -324,7 +328,7 @@ public void tryConnectToAllClusterMembers(boolean sync) {
if (sync) {
for (Member member : client.getClientClusterService().getMemberList()) {
try {
getOrConnectToMember(member);
getOrConnectToMember(member, false);
} catch (Exception e) {
EmptyStatement.ignore(e);
}
Expand Down Expand Up @@ -403,10 +407,20 @@ private void doConnectToCluster() {
logger.info("Trying to connect to cluster: " + currentContext.getClusterName());

// try the current cluster
if (doConnectToCandidateCluster(currentContext)) {
if (doConnectToCandidateCluster(currentContext, false)) {
return;
}

synchronized (clientStateMutex) {
if (activeConnections.isEmpty()) {
clientState = ClientState.SWITCHING_CLUSTER;
} else {
//ConnectToAllClusterMembersTask connected back to the same cluster
//we don't need to switch cluster anymore.
return;
}
}

// try the next cluster
if (clusterDiscoveryService.tryNextCluster(this::destroyCurrentClusterConnectionAndTryNextCluster)) {
return;
Expand All @@ -429,8 +443,8 @@ private Boolean destroyCurrentClusterConnectionAndTryNextCluster(CandidateCluste
((ClientLoggingService) client.getLoggingService()).updateClusterName(nextContext.getClusterName());

logger.info("Trying to connect to next cluster: " + nextContext.getClusterName());
switchingToNextCluster = true;
if (doConnectToCandidateCluster(nextContext)) {

if (doConnectToCandidateCluster(nextContext, true)) {
client.waitForInitialMembershipEvents();
fireLifecycleEvent(CLIENT_CHANGED_CLUSTER);
return true;
Expand Down Expand Up @@ -459,7 +473,7 @@ private void fireLifecycleEvent(LifecycleState state) {
lifecycleService.fireLifecycleEvent(state);
}

private boolean doConnectToCandidateCluster(CandidateClusterContext context) {
private boolean doConnectToCandidateCluster(CandidateClusterContext context, boolean switchingToNextCluster) {
Set<Address> triedAddresses = new HashSet<>();
try {
waitStrategy.reset();
Expand All @@ -474,7 +488,7 @@ private boolean doConnectToCandidateCluster(CandidateClusterContext context) {
for (Member member : memberList) {
checkClientActive();
triedAddressesPerAttempt.add(member.getAddress());
Connection connection = connect(member, o -> getOrConnectToMember((Member) o));
Connection connection = connect(member, o -> getOrConnectToMember((Member) o, switchingToNextCluster));
if (connection != null) {
return true;
}
Expand All @@ -487,7 +501,7 @@ private boolean doConnectToCandidateCluster(CandidateClusterContext context) {
continue;
}

Connection connection = connect(address, o -> getOrConnectToAddress((Address) o));
Connection connection = connect(address, o -> getOrConnectToAddress((Address) o, switchingToNextCluster));
if (connection != null) {
return true;
}
Expand Down Expand Up @@ -582,7 +596,7 @@ public ClientConnection getConnection(@Nonnull UUID uuid) {
return activeConnections.get(uuid);
}

TcpClientConnection getOrConnectToAddress(@Nonnull Address address) {
TcpClientConnection getOrConnectToAddress(@Nonnull Address address, boolean switchingToNextCluster) {
for (TcpClientConnection connection : activeConnections.values()) {
if (connection.getRemoteAddress().equals(address)) {
return connection;
Expand All @@ -592,10 +606,10 @@ TcpClientConnection getOrConnectToAddress(@Nonnull Address address) {
address = translate(address);
TcpClientConnection connection = createSocketConnection(address);
ClientAuthenticationCodec.ResponseParameters response = authenticateOnCluster(connection);
return onAuthenticated(connection, response);
return onAuthenticated(connection, response, switchingToNextCluster);
}

TcpClientConnection getOrConnectToMember(@Nonnull Member member) {
TcpClientConnection getOrConnectToMember(@Nonnull Member member, boolean switchingToNextCluster) {
UUID uuid = member.getUuid();
TcpClientConnection connection = activeConnections.get(uuid);
if (connection != null) {
Expand All @@ -605,7 +619,7 @@ TcpClientConnection getOrConnectToMember(@Nonnull Member member) {
Address address = translate(member);
connection = createSocketConnection(address);
ClientAuthenticationCodec.ResponseParameters response = authenticateOnCluster(connection);
return onAuthenticated(connection, response);
return onAuthenticated(connection, response, switchingToNextCluster);
}

private void fireConnectionEvent(TcpClientConnection connection, boolean isAdded) {
Expand Down Expand Up @@ -852,49 +866,23 @@ private ClientAuthenticationCodec.ResponseParameters authenticateOnCluster(TcpCl
Address memberAddress = connection.getInitAddress();
ClientMessage request = encodeAuthenticationRequest(memberAddress);
ClientInvocationFuture future = new ClientInvocation(client, request, null, connection).invokeUrgent();
ClientAuthenticationCodec.ResponseParameters response;
try {
response = ClientAuthenticationCodec.decodeResponse(future.get(authenticationTimeout, MILLISECONDS));
return ClientAuthenticationCodec.decodeResponse(future.get(authenticationTimeout, MILLISECONDS));
} catch (Exception e) {
connection.close("Failed to authenticate connection", e);
throw rethrow(e);
}

AuthenticationStatus authenticationStatus = AuthenticationStatus.getById(response.status);
if (failoverConfigProvided && !response.failoverSupported) {
logger.warning("Cluster does not support failover. This feature is available in Hazelcast Enterprise");
authenticationStatus = NOT_ALLOWED_IN_CLUSTER;
}
switch (authenticationStatus) {
case AUTHENTICATED:
return response;
case CREDENTIALS_FAILED:
AuthenticationException authException = new AuthenticationException("Authentication failed. The configured "
+ "cluster name on the client (see ClientConfig.setClusterName()) does not match the one configured in "
+ "the cluster or the credentials set in the Client security config could not be authenticated");
connection.close("Failed to authenticate connection", authException);
throw authException;
case NOT_ALLOWED_IN_CLUSTER:
ClientNotAllowedInClusterException notAllowedException =
new ClientNotAllowedInClusterException("Client is not allowed in the cluster");
connection.close("Failed to authenticate connection", notAllowedException);
throw notAllowedException;
default:
AuthenticationException exception =
new AuthenticationException("Authentication status code not supported. status: " + authenticationStatus);
connection.close("Failed to authenticate connection", exception);
throw exception;
}
}

/**
* The returned connection could be different than the one passed to this method if there is already an existing
* connection to the given member.
*/
private TcpClientConnection onAuthenticated(TcpClientConnection connection,
ClientAuthenticationCodec.ResponseParameters response) {
ClientAuthenticationCodec.ResponseParameters response,
boolean switchingToNextCluster) {
synchronized (clientStateMutex) {
checkPartitionCount(response.partitionCount);
checkAuthenticationResponse(connection, response);
connection.setConnectedServerVersion(response.serverHazelcastVersion);
connection.setRemoteAddress(response.address);
connection.setRemoteUuid(response.memberUuid);
Expand All @@ -906,21 +894,27 @@ private TcpClientConnection onAuthenticated(TcpClientConnection connection,
}

UUID newClusterId = response.clusterId;

if (logger.isFineEnabled()) {
logger.fine("Checking the cluster: " + newClusterId + ", current cluster: " + this.clusterId);
}

// `clusterId` is `null` only at the start of the client.
// It is only set in this method below under `clientStateMutex`.
// `clusterId` is set by master when a cluster is started.
// `clusterId` is not preserved during HotRestart.
// In split brain, both sides have the same `clusterId`
boolean clusterIdChanged = this.clusterId != null && !newClusterId.equals(this.clusterId);
if (clusterIdChanged) {
checkClientStateOnClusterIdChange(connection);
checkClientStateOnClusterIdChange(connection, switchingToNextCluster);
logger.warning("Switching from current cluster: " + this.clusterId + " to new cluster: " + newClusterId);
client.onClusterRestart();
}
checkClientState(connection, switchingToNextCluster);

boolean connectionsEmpty = activeConnections.isEmpty();
activeConnections.put(response.memberUuid, connection);
if (connectionsEmpty) {
// The first connection that opens a connection to the new cluster should set `clusterId`.
// This one will initiate `initializeClientOnCluster` if necessary.
clusterId = newClusterId;
if (clusterIdChanged) {
clientState = ClientState.CONNECTED_TO_CLUSTER;
Expand Down Expand Up @@ -949,16 +943,78 @@ private TcpClientConnection onAuthenticated(TcpClientConnection connection,
return connection;
}

private void checkClientStateOnClusterIdChange(TcpClientConnection connection) {
/**
* Checks the client state against the intend of the callee(switchingToNextCluster)
* closes the connection and throws exception if the authentication needs to be cancelled.
*/
private void checkClientState(TcpClientConnection connection, boolean switchingToNextCluster) {
if (clientState == ClientState.SWITCHING_CLUSTER && !switchingToNextCluster) {
String reason = "There is a cluster switch in progress. "
+ "This connection attempt initiated before the progress and not allowed to be authenticated.";
connection.close(reason, null);
throw new AuthenticationException(reason);
}
//Following state can not happen. There is only one path with `switchingToNextCluster` as true
//and that path starts only when the old switch fails. There are no concurrent run of that path.
if (clientState != ClientState.SWITCHING_CLUSTER && switchingToNextCluster) {
String reason = "The cluster switch is already completed. "
+ "This connection attempt is not allowed to be authenticated.";
connection.close(reason, null);
throw new AuthenticationException(reason);
}
}

/**
* Checks the response from the server to see if authentication needs to be continued,
* closes the connection and throws exception if the authentication needs to be cancelled.
*/
private void checkAuthenticationResponse(TcpClientConnection connection,
ClientAuthenticationCodec.ResponseParameters response) {
AuthenticationStatus authenticationStatus = AuthenticationStatus.getById(response.status);
if (failoverConfigProvided && !response.failoverSupported) {
logger.warning("Cluster does not support failover. This feature is available in Hazelcast Enterprise");
authenticationStatus = NOT_ALLOWED_IN_CLUSTER;
}
switch (authenticationStatus) {
case AUTHENTICATED:
break;
case CREDENTIALS_FAILED:
AuthenticationException authException = new AuthenticationException("Authentication failed. The configured "
+ "cluster name on the client (see ClientConfig.setClusterName()) does not match the one configured "
+ "in the cluster or the credentials set in the Client security config could not be authenticated");
connection.close("Failed to authenticate connection", authException);
throw authException;
case NOT_ALLOWED_IN_CLUSTER:
ClientNotAllowedInClusterException notAllowedException =
new ClientNotAllowedInClusterException("Client is not allowed in the cluster");
connection.close("Failed to authenticate connection", notAllowedException);
throw notAllowedException;
default:
AuthenticationException exception =
new AuthenticationException("Authentication status code not supported. status: " + authenticationStatus);
connection.close("Failed to authenticate connection", exception);
throw exception;
}
ClientPartitionServiceImpl partitionService = (ClientPartitionServiceImpl) client.getClientPartitionService();
if (!partitionService.checkAndSetPartitionCount(response.partitionCount)) {
ClientNotAllowedInClusterException exception =
new ClientNotAllowedInClusterException("Client can not work with this cluster"
+ " because it has a different partition count. "
+ "Expected partition count: " + partitionService.getPartitionCount()
+ ", Member partition count: " + response.partitionCount);
connection.close("Failed to authenticate connection", exception);
throw exception;
}
}

private void checkClientStateOnClusterIdChange(TcpClientConnection connection, boolean switchingToNextCluster) {
if (activeConnections.isEmpty()) {
// We only have single connection established
if (failoverConfigProvided) {
// If failover is provided, and this single connection is established after failover logic kicks in
// (checked via `switchingToNextCluster`), then it is OK to continue. Otherwise, we force the failover logic
// to be used by throwing `ClientNotAllowedInClusterException`
if (switchingToNextCluster) {
switchingToNextCluster = false;
} else {
// (checked via `switchingToNextCluster`), then it is OK to continue.
// Otherwise, we force the failover logic to be used by throwing `ClientNotAllowedInClusterException`
if (!switchingToNextCluster) {
String reason = "Force to hard cluster switch";
connection.close(reason, null);
throw new ClientNotAllowedInClusterException(reason);
Expand All @@ -975,7 +1031,6 @@ private void checkClientStateOnClusterIdChange(TcpClientConnection connection) {
// 4. In this case we will close the connection to the second member, thinking that it is not part of the
// cluster we think we are in. We will reconnect to this member, and the connection is closed unnecessarily.
// 5. The connection to the first cluster will be gone after that and we will initiate a reconnect to the cluster.

String reason = "Connection does not belong to this cluster";
connection.close(reason, null);
throw new IllegalStateException(reason);
Expand Down Expand Up @@ -1014,16 +1069,6 @@ protected void checkClientActive() {
}
}

private void checkPartitionCount(int newPartitionCount) {
ClientPartitionServiceImpl partitionService = (ClientPartitionServiceImpl) client.getClientPartitionService();
if (!partitionService.checkAndSetPartitionCount(newPartitionCount)) {
throw new ClientNotAllowedInClusterException("Client can not work with this cluster"
+ " because it has a different partition count. "
+ "Expected partition count: " + partitionService.getPartitionCount()
+ ", Member partition count: " + newPartitionCount);
}
}

private void initializeClientOnCluster(UUID targetClusterId) {
// submitted inside synchronized(clientStateMutex)

Expand Down Expand Up @@ -1101,6 +1146,10 @@ public void run() {
}

for (Member member : client.getClientClusterService().getMemberList()) {
if (clientState == ClientState.SWITCHING_CLUSTER) {
// when switching cluster we only want to open a new connection via `doConnectToCandidateCluster`
return;
}
UUID uuid = member.getUuid();
if (activeConnections.get(uuid) != null) {
continue;
Expand All @@ -1117,7 +1166,7 @@ public void run() {
if (!client.getLifecycleService().isRunning()) {
return;
}
getOrConnectToMember(member);
getOrConnectToMember(member, false);
} catch (Exception e) {
logger.warning("Could not connect to member " + uuid + ", reason " + e);
} finally {
Expand Down

0 comments on commit 8bcf23d

Please sign in to comment.