Skip to content

Commit

Permalink
Fix mock connection management (#20371)
Browse files Browse the repository at this point in the history
  • Loading branch information
ufukyilmaz committed Jan 14, 2022
1 parent 9571cbc commit f19e279
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 37 deletions.
Expand Up @@ -104,10 +104,10 @@ public Server getServer() {
@Override
public ServerConnection get(@Nonnull Address address, int streamId) {
UUID memberUuid = server.nodeRegistry.uuidOf(address);
return memberUuid != null ? get(memberUuid, 0) : null;
return memberUuid != null ? get(memberUuid) : null;
}

public MockServerConnection get(UUID memberUuid, int streamId) {
public MockServerConnection get(UUID memberUuid) {
return server.connectionMap.get(memberUuid);
}

Expand All @@ -118,7 +118,7 @@ public List<ServerConnection> getAllConnections(@Nonnull Address address) {
if (memberUuid == null) {
return Collections.emptyList();
}
ServerConnection conn = get(memberUuid, 0);
ServerConnection conn = get(memberUuid);
return conn != null ? Collections.singletonList(conn) : Collections.emptyList();
}

Expand Down Expand Up @@ -166,6 +166,14 @@ private synchronized MockServerConnection createConnection(Node targetNode) {
UUID localMemberUuid = node.getThisUuid();
UUID remoteMemberUuid = targetNode.getThisUuid();


// Create a unidirectional connection that is split into
// two distinct connection objects (one for the local member
// side and the other for the remote member side)
// These two connections below are only used for sending
// packets from the local member to the remote member.

// this connection is only used to send packets to remote member
MockServerConnection connectionFromLocalToRemote = new MockServerConnection(
lifecycleListener,
localAddress,
Expand All @@ -177,6 +185,13 @@ private synchronized MockServerConnection createConnection(Node targetNode) {
node.getServer().getConnectionManager(EndpointQualifier.MEMBER)
);

// This connection is only used to receive packets on the remote member
// which are sent from the local member's connection created above.
// Since this connection is not registered in the connection map of remote
// member's connection server, when the remote member intends to send a
// packet to this local member, it won't have access to this connection,
// and then it will create a new pair of connections. This means that a
// bidirectional connection consists of 4 MockServerConnections.
MockServerConnection connectionFromRemoteToLocal = new MockServerConnection(
lifecycleListener,
remoteAddress,
Expand All @@ -188,8 +203,8 @@ private synchronized MockServerConnection createConnection(Node targetNode) {
targetNode.getServer().getConnectionManager(EndpointQualifier.MEMBER)
);

connectionFromRemoteToLocal.localConnection = connectionFromLocalToRemote;
connectionFromLocalToRemote.localConnection = connectionFromRemoteToLocal;
connectionFromRemoteToLocal.otherConnection = connectionFromLocalToRemote;
connectionFromLocalToRemote.otherConnection = connectionFromRemoteToLocal;

if (!connectionFromRemoteToLocal.isAlive()) {
// targetNode is not alive anymore.
Expand All @@ -198,9 +213,6 @@ private synchronized MockServerConnection createConnection(Node targetNode) {
}

addressRegistry.register(remoteMemberUuid, LinkedAddresses.getResolvedAddresses(remoteAddress));
LocalAddressRegistry remoteAddressRegistry = targetNode.getLocalAddressRegistry();
remoteAddressRegistry.register(localMemberUuid, LinkedAddresses.getResolvedAddresses(localAddress));

server.connectionMap.put(remoteMemberUuid, connectionFromLocalToRemote);
server.logger.info("Created connection to endpoint: " + remoteAddress + "-" + remoteMemberUuid + ", connection: "
+ connectionFromLocalToRemote);
Expand Down Expand Up @@ -234,10 +246,8 @@ public synchronized boolean register(
if (!connection.isAlive()) {
return false;
}

connection.setLifecycleListener(lifecycleListener);
connection.setRemoteAddress(remoteAddress);
connection.setRemoteUuid(remoteUuid);
connection.setLifecycleListener(lifecycleListener);
server.connectionMap.put(remoteUuid, connection);
LinkedAddresses addressesToRegister = LinkedAddresses.getResolvedAddresses(remoteAddress);
if (targetAddress != null) {
Expand Down Expand Up @@ -309,7 +319,7 @@ private boolean send(Packet packet, Address targetAddress, SendTask sendTask) {
UUID targetUuid = server.nodeRegistry.uuidOf(targetAddress);
MockServerConnection connection = null;
if (targetUuid != null) {
connection = get(targetUuid, 0);
connection = get(targetUuid);
}
if (connection != null) {
return connection.write(packet);
Expand Down Expand Up @@ -348,23 +358,26 @@ private class MockConnLifecycleListener

@Override
public void onConnectionClose(MockServerConnection connection, Throwable t, boolean silent) {
Address endpointAddress = connection.getRemoteAddress();
UUID endpointUuid = connection.getRemoteUuid();
assert endpointUuid != null;
if (!server.connectionMap.remove(endpointUuid, connection)) {
return;
}
addressRegistry.tryRemoveRegistration(endpointUuid, endpointAddress);

Server server = connection.remoteNodeEngine.getNode().getServer();
Server remoteServer = connection.remoteNodeEngine.getNode().getServer();
// all mock implementations of networking service ignore the provided endpoint qualifier
// so we pass in null. Once they are changed to use the parameter, we should be notified
// and this parameter can be changed
Connection remoteConnection = server.getConnectionManager(null)
.get(connection.getRemoteAddress(), 0);
Connection remoteConnection = remoteServer.getConnectionManager(null)
.get(connection.localAddress, 0);
if (remoteConnection != null) {
remoteConnection.close("Connection closed by the other side", null);
}

MockServerConnectionManager.this.server.logger.info("Removed connection to endpoint: " + endpointUuid
+ ", connection: " + connection);
MockServerConnectionManager.this.server.logger.info("Removed connection to endpoint: [address="
+ endpointAddress + ", uuid=" + endpointUuid + "], connection: " + connection);
fireConnectionRemovedEvent(connection, endpointUuid);
}

Expand Down
Expand Up @@ -20,8 +20,6 @@
import com.hazelcast.internal.networking.OutboundFrame;
import com.hazelcast.internal.nio.ConnectionLifecycleListener;
import com.hazelcast.internal.nio.ConnectionType;
import com.hazelcast.internal.server.FirewallingServer;
import com.hazelcast.internal.server.Server;
import com.hazelcast.internal.server.ServerConnectionManager;
import com.hazelcast.internal.nio.Packet;
import com.hazelcast.internal.nio.PacketIOHelper;
Expand All @@ -47,7 +45,7 @@ public class MockServerConnection implements ServerConnection {
protected final NodeEngineImpl localNodeEngine;
protected final NodeEngineImpl remoteNodeEngine;

volatile MockServerConnection localConnection;
volatile MockServerConnection otherConnection;

private volatile ConnectionLifecycleListener lifecycleListener;

Expand All @@ -56,7 +54,7 @@ public class MockServerConnection implements ServerConnection {
private final Address remoteAddress;

private final UUID localUuid;
private UUID remoteUuid;
private volatile UUID remoteUuid;

private final ServerConnectionManager connectionManager;

Expand Down Expand Up @@ -167,7 +165,7 @@ private Packet readFromPacket(Packet packet) {
} while (!writeDone);

assertNotNull(newPacket);
newPacket.setConn(localConnection);
newPacket.setConn(otherConnection);
return newPacket;
}

Expand All @@ -184,26 +182,13 @@ public void close(String msg, Throwable cause) {
return;
}

if (localConnection != null) {
localConnection.close(msg, cause);
if (otherConnection != null) {
otherConnection.close(msg, cause);
}

if (lifecycleListener != null) {
lifecycleListener.onConnectionClose(this, cause, false);
}

if (localNodeEngine != null) {
localNodeEngine.getNode()
.getLocalAddressRegistry()
.tryRemoveRegistration(remoteUuid, remoteAddress);
Server server = localNodeEngine.getNode().getServer();
// this is a member-to-member connection
if (server instanceof FirewallingServer) {
(((MockServer) ((FirewallingServer) server).delegate)).connectionMap.remove(remoteUuid);
} else if (server instanceof MockServer) {
((MockServer) server).connectionMap.remove(remoteUuid);
}
}
}

@Override
Expand Down

0 comments on commit f19e279

Please sign in to comment.