Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix mock connection management [HZ-836] #20371

Merged
merged 5 commits into from Jan 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -104,10 +104,10 @@ public Server getServer() {
@Override
public ServerConnection get(@Nonnull Address address, int streamId) {
UUID memberUuid = server.nodeRegistry.uuidOf(address);
return memberUuid != null ? get(memberUuid, 0) : null;
return memberUuid != null ? get(memberUuid) : null;
}

public MockServerConnection get(UUID memberUuid, int streamId) {
public MockServerConnection get(UUID memberUuid) {
return server.connectionMap.get(memberUuid);
}

Expand All @@ -118,7 +118,7 @@ public List<ServerConnection> getAllConnections(@Nonnull Address address) {
if (memberUuid == null) {
return Collections.emptyList();
}
ServerConnection conn = get(memberUuid, 0);
ServerConnection conn = get(memberUuid);
return conn != null ? Collections.singletonList(conn) : Collections.emptyList();
}

Expand Down Expand Up @@ -166,6 +166,14 @@ private synchronized MockServerConnection createConnection(Node targetNode) {
UUID localMemberUuid = node.getThisUuid();
UUID remoteMemberUuid = targetNode.getThisUuid();


// Create a unidirectional connection that is split into
// two distinct connection objects (one for the local member
// side and the other for the remote member side)
// These two connections below are only used for sending
// packets from the local member to the remote member.

// this connection is only used to send packets to remote member
MockServerConnection connectionFromLocalToRemote = new MockServerConnection(
lifecycleListener,
localAddress,
Expand All @@ -177,6 +185,13 @@ private synchronized MockServerConnection createConnection(Node targetNode) {
node.getServer().getConnectionManager(EndpointQualifier.MEMBER)
);

// This connection is only used to receive packets on the remote member
// which are sent from the local member's connection created above.
// Since this connection is not registered in the connection map of remote
// member's connection server, when the remote member intends to send a
// packet to this local member, it won't have access to this connection,
// and then it will create a new pair of connections. This means that a
// bidirectional connection consists of 4 MockServerConnections.
MockServerConnection connectionFromRemoteToLocal = new MockServerConnection(
lifecycleListener,
remoteAddress,
Expand All @@ -188,8 +203,8 @@ private synchronized MockServerConnection createConnection(Node targetNode) {
targetNode.getServer().getConnectionManager(EndpointQualifier.MEMBER)
);

connectionFromRemoteToLocal.localConnection = connectionFromLocalToRemote;
connectionFromLocalToRemote.localConnection = connectionFromRemoteToLocal;
connectionFromRemoteToLocal.otherConnection = connectionFromLocalToRemote;
connectionFromLocalToRemote.otherConnection = connectionFromRemoteToLocal;

if (!connectionFromRemoteToLocal.isAlive()) {
// targetNode is not alive anymore.
Expand All @@ -198,9 +213,6 @@ private synchronized MockServerConnection createConnection(Node targetNode) {
}

addressRegistry.register(remoteMemberUuid, LinkedAddresses.getResolvedAddresses(remoteAddress));
LocalAddressRegistry remoteAddressRegistry = targetNode.getLocalAddressRegistry();
remoteAddressRegistry.register(localMemberUuid, LinkedAddresses.getResolvedAddresses(localAddress));

server.connectionMap.put(remoteMemberUuid, connectionFromLocalToRemote);
server.logger.info("Created connection to endpoint: " + remoteAddress + "-" + remoteMemberUuid + ", connection: "
+ connectionFromLocalToRemote);
Expand Down Expand Up @@ -234,10 +246,8 @@ public synchronized boolean register(
if (!connection.isAlive()) {
return false;
}

connection.setLifecycleListener(lifecycleListener);
connection.setRemoteAddress(remoteAddress);
connection.setRemoteUuid(remoteUuid);
connection.setLifecycleListener(lifecycleListener);
server.connectionMap.put(remoteUuid, connection);
LinkedAddresses addressesToRegister = LinkedAddresses.getResolvedAddresses(remoteAddress);
if (targetAddress != null) {
Expand Down Expand Up @@ -309,7 +319,7 @@ private boolean send(Packet packet, Address targetAddress, SendTask sendTask) {
UUID targetUuid = server.nodeRegistry.uuidOf(targetAddress);
MockServerConnection connection = null;
if (targetUuid != null) {
connection = get(targetUuid, 0);
connection = get(targetUuid);
}
if (connection != null) {
return connection.write(packet);
Expand Down Expand Up @@ -348,23 +358,26 @@ private class MockConnLifecycleListener

@Override
public void onConnectionClose(MockServerConnection connection, Throwable t, boolean silent) {
Address endpointAddress = connection.getRemoteAddress();
UUID endpointUuid = connection.getRemoteUuid();
assert endpointUuid != null;
if (!server.connectionMap.remove(endpointUuid, connection)) {
return;
}
addressRegistry.tryRemoveRegistration(endpointUuid, endpointAddress);

Server server = connection.remoteNodeEngine.getNode().getServer();
Server remoteServer = connection.remoteNodeEngine.getNode().getServer();
// all mock implementations of networking service ignore the provided endpoint qualifier
// so we pass in null. Once they are changed to use the parameter, we should be notified
// and this parameter can be changed
Connection remoteConnection = server.getConnectionManager(null)
.get(connection.getRemoteAddress(), 0);
Connection remoteConnection = remoteServer.getConnectionManager(null)
.get(connection.localAddress, 0);
if (remoteConnection != null) {
remoteConnection.close("Connection closed by the other side", null);
}

MockServerConnectionManager.this.server.logger.info("Removed connection to endpoint: " + endpointUuid
+ ", connection: " + connection);
MockServerConnectionManager.this.server.logger.info("Removed connection to endpoint: [address="
+ endpointAddress + ", uuid=" + endpointUuid + "], connection: " + connection);
fireConnectionRemovedEvent(connection, endpointUuid);
}

Expand Down
Expand Up @@ -20,8 +20,6 @@
import com.hazelcast.internal.networking.OutboundFrame;
import com.hazelcast.internal.nio.ConnectionLifecycleListener;
import com.hazelcast.internal.nio.ConnectionType;
import com.hazelcast.internal.server.FirewallingServer;
import com.hazelcast.internal.server.Server;
import com.hazelcast.internal.server.ServerConnectionManager;
import com.hazelcast.internal.nio.Packet;
import com.hazelcast.internal.nio.PacketIOHelper;
Expand All @@ -47,7 +45,7 @@ public class MockServerConnection implements ServerConnection {
protected final NodeEngineImpl localNodeEngine;
protected final NodeEngineImpl remoteNodeEngine;

volatile MockServerConnection localConnection;
volatile MockServerConnection otherConnection;

private volatile ConnectionLifecycleListener lifecycleListener;

Expand All @@ -56,7 +54,7 @@ public class MockServerConnection implements ServerConnection {
private final Address remoteAddress;

private final UUID localUuid;
private UUID remoteUuid;
private volatile UUID remoteUuid;

private final ServerConnectionManager connectionManager;

Expand Down Expand Up @@ -167,7 +165,7 @@ private Packet readFromPacket(Packet packet) {
} while (!writeDone);

assertNotNull(newPacket);
newPacket.setConn(localConnection);
newPacket.setConn(otherConnection);
return newPacket;
}

Expand All @@ -184,26 +182,13 @@ public void close(String msg, Throwable cause) {
return;
}

if (localConnection != null) {
localConnection.close(msg, cause);
if (otherConnection != null) {
otherConnection.close(msg, cause);
}

if (lifecycleListener != null) {
lifecycleListener.onConnectionClose(this, cause, false);
}

if (localNodeEngine != null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cleanup was already performed in onConnectionClose (MockServer#L354). But since the other side of the connection is not closed properly due to this error, this cleanup was not performed in the opposite side member.

localNodeEngine.getNode()
.getLocalAddressRegistry()
.tryRemoveRegistration(remoteUuid, remoteAddress);
Server server = localNodeEngine.getNode().getServer();
// this is a member-to-member connection
if (server instanceof FirewallingServer) {
(((MockServer) ((FirewallingServer) server).delegate)).connectionMap.remove(remoteUuid);
} else if (server instanceof MockServer) {
((MockServer) server).connectionMap.remove(remoteUuid);
}
}
}

@Override
Expand Down