Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix mock connection management [HZ-836] #20371

Merged
merged 5 commits into from Jan 14, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -166,6 +166,9 @@ private synchronized MockServerConnection createConnection(Node targetNode) {
UUID localMemberUuid = node.getThisUuid();
UUID remoteMemberUuid = targetNode.getThisUuid();

// These two connections below are actually only used for sending packets from
// the local member to the remote member.
// this connection only send packets
ramizdundar marked this conversation as resolved.
Show resolved Hide resolved
MockServerConnection connectionFromLocalToRemote = new MockServerConnection(
lifecycleListener,
localAddress,
Expand All @@ -177,6 +180,8 @@ private synchronized MockServerConnection createConnection(Node targetNode) {
node.getServer().getConnectionManager(EndpointQualifier.MEMBER)
);

// this connection only receive packets (This connection is not registered in the
// remote member's connection manager)
MockServerConnection connectionFromRemoteToLocal = new MockServerConnection(
lifecycleListener,
remoteAddress,
Expand All @@ -188,8 +193,8 @@ private synchronized MockServerConnection createConnection(Node targetNode) {
targetNode.getServer().getConnectionManager(EndpointQualifier.MEMBER)
);

connectionFromRemoteToLocal.localConnection = connectionFromLocalToRemote;
connectionFromLocalToRemote.localConnection = connectionFromRemoteToLocal;
connectionFromRemoteToLocal.otherConnection = connectionFromLocalToRemote;
connectionFromLocalToRemote.otherConnection = connectionFromRemoteToLocal;

if (!connectionFromRemoteToLocal.isAlive()) {
// targetNode is not alive anymore.
Expand All @@ -198,9 +203,6 @@ private synchronized MockServerConnection createConnection(Node targetNode) {
}

addressRegistry.register(remoteMemberUuid, LinkedAddresses.getResolvedAddresses(remoteAddress));
LocalAddressRegistry remoteAddressRegistry = targetNode.getLocalAddressRegistry();
remoteAddressRegistry.register(localMemberUuid, LinkedAddresses.getResolvedAddresses(localAddress));

server.connectionMap.put(remoteMemberUuid, connectionFromLocalToRemote);
server.logger.info("Created connection to endpoint: " + remoteAddress + "-" + remoteMemberUuid + ", connection: "
+ connectionFromLocalToRemote);
Expand Down Expand Up @@ -234,10 +236,8 @@ public synchronized boolean register(
if (!connection.isAlive()) {
return false;
}

connection.setLifecycleListener(lifecycleListener);
connection.setRemoteAddress(remoteAddress);
connection.setRemoteUuid(remoteUuid);
connection.setLifecycleListener(lifecycleListener);
server.connectionMap.put(remoteUuid, connection);
LinkedAddresses addressesToRegister = LinkedAddresses.getResolvedAddresses(remoteAddress);
if (targetAddress != null) {
Expand Down Expand Up @@ -348,23 +348,26 @@ private class MockConnLifecycleListener

@Override
public void onConnectionClose(MockServerConnection connection, Throwable t, boolean silent) {
Address endpointAddress = connection.getRemoteAddress();
UUID endpointUuid = connection.getRemoteUuid();
assert endpointUuid != null;
if (!server.connectionMap.remove(endpointUuid, connection)) {
return;
}
addressRegistry.tryRemoveRegistration(endpointUuid, endpointAddress);

Server server = connection.remoteNodeEngine.getNode().getServer();
// all mock implementations of networking service ignore the provided endpoint qualifier
// so we pass in null. Once they are changed to use the parameter, we should be notified
// and this parameter can be changed
Connection remoteConnection = server.getConnectionManager(null)
.get(connection.getRemoteAddress(), 0);
.get(connection.localAddress, 0);
if (remoteConnection != null) {
remoteConnection.close("Connection closed by the other side", null);
}

MockServerConnectionManager.this.server.logger.info("Removed connection to endpoint: " + endpointUuid
+ ", connection: " + connection);
MockServerConnectionManager.this.server.logger.info("Removed connection to endpoint: [address="
+ endpointAddress + ", uuid=" + endpointUuid + "], connection: " + connection);
fireConnectionRemovedEvent(connection, endpointUuid);
}

Expand Down
Expand Up @@ -20,8 +20,6 @@
import com.hazelcast.internal.networking.OutboundFrame;
import com.hazelcast.internal.nio.ConnectionLifecycleListener;
import com.hazelcast.internal.nio.ConnectionType;
import com.hazelcast.internal.server.FirewallingServer;
import com.hazelcast.internal.server.Server;
import com.hazelcast.internal.server.ServerConnectionManager;
import com.hazelcast.internal.nio.Packet;
import com.hazelcast.internal.nio.PacketIOHelper;
Expand All @@ -47,7 +45,7 @@ public class MockServerConnection implements ServerConnection {
protected final NodeEngineImpl localNodeEngine;
protected final NodeEngineImpl remoteNodeEngine;

volatile MockServerConnection localConnection;
volatile MockServerConnection otherConnection;

private volatile ConnectionLifecycleListener lifecycleListener;

Expand All @@ -56,7 +54,7 @@ public class MockServerConnection implements ServerConnection {
private final Address remoteAddress;

private final UUID localUuid;
private UUID remoteUuid;
private volatile UUID remoteUuid;

private final ServerConnectionManager connectionManager;

Expand Down Expand Up @@ -167,7 +165,7 @@ private Packet readFromPacket(Packet packet) {
} while (!writeDone);

assertNotNull(newPacket);
newPacket.setConn(localConnection);
newPacket.setConn(otherConnection);
return newPacket;
}

Expand All @@ -184,26 +182,13 @@ public void close(String msg, Throwable cause) {
return;
}

if (localConnection != null) {
localConnection.close(msg, cause);
if (otherConnection != null) {
otherConnection.close(msg, cause);
}

if (lifecycleListener != null) {
lifecycleListener.onConnectionClose(this, cause, false);
}

if (localNodeEngine != null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cleanup was already performed in onConnectionClose (MockServer#L354). But since the other side of the connection is not closed properly due to this error, this cleanup was not performed in the opposite side member.

localNodeEngine.getNode()
.getLocalAddressRegistry()
.tryRemoveRegistration(remoteUuid, remoteAddress);
Server server = localNodeEngine.getNode().getServer();
// this is a member-to-member connection
if (server instanceof FirewallingServer) {
(((MockServer) ((FirewallingServer) server).delegate)).connectionMap.remove(remoteUuid);
} else if (server instanceof MockServer) {
((MockServer) server).connectionMap.remove(remoteUuid);
}
}
}

@Override
Expand Down