Skip to content

Commit

Permalink
Remove clientStats from MemberState (#1253)
Browse files Browse the repository at this point in the history
`clientStats` string in `MemberState` does not provide any additional
information other than `enterprise` and `clusterConnectionTimestamp`
compared to `ClientEndpointDTO`.

In MC side, we parse this long string in every `MemberState` retrieval,
and this adds additional overhead to our `TimedMemberState` consumption.
For metrics processing, `ClientStats` does not provide additional
information as well, because we consume these metrics directly from the
member.

For this purpose, this PR introduces `enterprise` and
`clusterConnectionTimestamp` fields in `ClientStatsDTO`, removes
`clientStats` from `TimedMemberState`, and adds connection start
timestamp to `Connection` (we had it only in `ClientConnection`, and now
it's extended to `ServerConnection` as well).

Fixes https://hazelcast.atlassian.net/browse/MC-2620

Breaking changes (list specific methods/types/messages):
* API
* TimedMemberState
* Connection

Checklist:
- [X] Labels (`Team:`, `Type:`, `Source:`, `Module:`) and Milestone set
- [X] Add `Add to Release Notes` label if changes should be mentioned in
release notes or `Not Release Notes content` if changes are not relevant
for release notes
- [X] Request reviewers if possible
- [ ] New public APIs have `@Nonnull/@Nullable` annotations
- [ ] New public APIs have `@since` tags in Javadoc
- [ ] Send backports/forwardports if fix needs to be applied to
past/future releases
GitOrigin-RevId: 9e30d555e2d7c1250a3b42cd22090aa46daf6448
  • Loading branch information
emrberk authored and actions-user committed Apr 19, 2024
1 parent c5f2e18 commit cc4dad2
Show file tree
Hide file tree
Showing 12 changed files with 94 additions and 44 deletions.
Expand Up @@ -82,6 +82,8 @@ public interface ClientEndpoint extends Client, DynamicMetricsProvider {

ServerConnection getConnection();

long getConnectionStartTime();

void setLoginContext(LoginContext lc);

void authenticated(UUID clientUuid, Credentials credentials, String clientVersion,
Expand All @@ -102,6 +104,16 @@ void authenticated(UUID clientUuid, Credentials credentials, String clientVersio
*/
void setClientVersion(String version);

/**
* @return true if the client uses the enterprise build of Hazelcast
*/
boolean isEnterprise();

/**
* @param enterprise indicates whether the client uses the enterprise build or not
*/
void setEnterprise(Boolean enterprise);

/**
* Updates to the latest client statistics.
*
Expand Down
Expand Up @@ -65,12 +65,12 @@ public final class ClientEndpointImpl implements ClientEndpoint {
private final ConcurrentMap<UUID, Callable> removeListenerActions = new ConcurrentHashMap<>();
private final SocketAddress socketAddress;
private final long creationTime;

private LoginContext loginContext;
private UUID clientUuid;
private Credentials credentials;
private volatile boolean authenticated;
private String clientVersion;
private Boolean enterprise;
private final AtomicReference<ClientStatistics> statsRef = new AtomicReference<>();
private String clientName;
private Set<String> labels;
Expand All @@ -84,6 +84,7 @@ public ClientEndpointImpl(ClientEngine clientEngine, NodeEngine nodeEngine, Serv
this.connection = connection;
this.socketAddress = connection.getRemoteSocketAddress();
this.clientVersion = "Unknown";
this.enterprise = false;
this.creationTime = System.currentTimeMillis();
}

Expand All @@ -92,6 +93,11 @@ public ServerConnection getConnection() {
return connection;
}

@Override
public long getConnectionStartTime() {
return connection.getStartTime();
}

@Override
public UUID getUuid() {
return clientUuid;
Expand Down Expand Up @@ -139,8 +145,25 @@ public void setClientVersion(String version) {
clientVersion = version;
}

@Override
public boolean isEnterprise() {
return enterprise;
}

@Override
public void setEnterprise(Boolean enterprise) {
this.enterprise = enterprise;
}

@Override
public void setClientStatistics(ClientStatistics stats) {
boolean setBefore = getClientStatistics() != null;
if (!setBefore) {
String clientAttributes = stats.clientAttributes();
if (clientAttributes.contains("enterprise=true")) {
setEnterprise(true);
}
}
statsRef.set(stats);
}

Expand Down
Expand Up @@ -32,6 +32,7 @@

public class TpcChannelClientConnectionAdapter implements ClientConnection {

private final long startTime = System.currentTimeMillis();
private final Channel channel;

public TpcChannelClientConnectionAdapter(Channel channel) {
Expand Down Expand Up @@ -119,6 +120,11 @@ public long lastWriteTimeMillis() {
throw new UnsupportedOperationException("Not supported for TPC channels");
}

@Override
public long getStartTime() {
return startTime;
}

@Nullable
@Override
public InetSocketAddress getRemoteSocketAddress() {
Expand Down
Expand Up @@ -18,7 +18,6 @@

import com.hazelcast.cache.impl.CacheService;
import com.hazelcast.client.impl.ClientEndpoint;
import com.hazelcast.client.impl.statistics.ClientStatistics;
import com.hazelcast.cluster.Address;
import com.hazelcast.cluster.Member;
import com.hazelcast.cluster.impl.MemberImpl;
Expand Down Expand Up @@ -73,11 +72,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static com.hazelcast.config.ConfigAccessor.getActiveMemberNetworkConfig;
import static com.hazelcast.internal.util.MapUtil.createHashMap;
import static com.hazelcast.internal.util.SetUtil.createHashSet;

/**
Expand Down Expand Up @@ -183,26 +180,12 @@ private void createMemberState(MemberStateImpl memberState,
createNodeState(memberState);
createHotRestartState(memberState);
createClusterHotRestartStatus(memberState);

memberState.setClientStats(getClientAttributes(node.getClientEngine().getClientStatistics()));
}

protected void setCPMemberUuid(MemberStateImpl memberState) {
memberState.setCpMemberUuid(null);
}

private Map<UUID, String> getClientAttributes(Map<UUID, ClientStatistics> allClientStatistics) {
Map<UUID, String> statsMap = createHashMap(allClientStatistics.size());
for (Map.Entry<UUID, ClientStatistics> entry : allClientStatistics.entrySet()) {
UUID uuid = entry.getKey();
ClientStatistics statistics = entry.getValue();
if (statistics != null) {
statsMap.put(uuid, statistics.clientAttributes());
}
}
return statsMap;
}

private void createHotRestartState(MemberStateImpl memberState) {
final HotRestartService hotRestartService = instance.node.getNodeExtension().getHotRestartService();
boolean hotBackupEnabled = hotRestartService.isHotBackupEnabled();
Expand Down
Expand Up @@ -30,6 +30,8 @@
import java.util.UUID;

import static com.hazelcast.internal.util.JsonUtil.getArray;
import static com.hazelcast.internal.util.JsonUtil.getBoolean;
import static com.hazelcast.internal.util.JsonUtil.getLong;
import static com.hazelcast.internal.util.JsonUtil.getString;

/**
Expand All @@ -46,7 +48,10 @@ public class ClientEndPointDTO implements JsonSerializable {
public String address;
public String clientType;
public String clientVersion;
public boolean enterprise;
public boolean statsEnabled;
public String name;
public long clusterConnectionTimestamp;
public Set<String> labels;

/**
Expand All @@ -66,7 +71,10 @@ public ClientEndPointDTO(ClientEndpoint clientEndpoint) {
this.uuid = clientEndpoint.getUuid();
this.clientType = clientEndpoint.getClientType();
this.clientVersion = clientEndpoint.getClientVersion();
this.enterprise = clientEndpoint.isEnterprise();
this.statsEnabled = clientEndpoint.getClientStatistics() != null;
this.name = clientEndpoint.getName();
this.clusterConnectionTimestamp = clientEndpoint.getConnectionStartTime();
this.labels = clientEndpoint.getLabels();

InetSocketAddress socketAddress = clientEndpoint.getSocketAddress();
Expand All @@ -84,11 +92,14 @@ public JsonObject toJson() {
root.add("address", address);
root.add("clientType", clientType);
root.add("clientVersion", clientVersion);
root.add("enterprise", enterprise);
root.add("statsEnabled", statsEnabled);
root.add("name", name);
JsonArray labelsObject = Json.array();
for (String label : labels) {
labelsObject.add(label);
}
root.add("clusterConnectionTimestamp", clusterConnectionTimestamp);
root.add("labels", labelsObject);
root.add("ipAddress", ipAddress);
root.add("canonicalHostName", canonicalHostName);
Expand All @@ -101,7 +112,10 @@ public void fromJson(JsonObject json) {
address = getString(json, "address");
clientType = getString(json, "clientType");
clientVersion = getString(json, "clientVersion");
enterprise = getBoolean(json, "enterprise");
statsEnabled = getBoolean(json, "statsEnabled");
name = getString(json, "name");
clusterConnectionTimestamp = getLong(json, "clusterConnectionTimestamp");
JsonArray labelsArray = getArray(json, "labels");
labels = new HashSet<>();
for (JsonValue labelValue : labelsArray) {
Expand Down
Expand Up @@ -73,7 +73,6 @@ public class MemberStateImpl implements MemberState {
private Set<String> flakeIdGeneratorsWithStats = emptySet();
private Set<String> userCodeNamespacesWithStats = emptySet();
private Collection<ClientEndPointDTO> clients = emptySet();
private Map<UUID, String> clientStats = emptyMap();
private MemberPartitionState memberPartitionState = new MemberPartitionStateImpl();
private LocalOperationStats operationStats = new LocalOperationStatsImpl();
private NodeState nodeState = new NodeStateImpl();
Expand Down Expand Up @@ -277,14 +276,6 @@ public void setClusterHotRestartStatus(ClusterHotRestartStatusDTO clusterHotRest
this.clusterHotRestartStatus = clusterHotRestartStatus;
}

public Map<UUID, String> getClientStats() {
return clientStats;
}

public void setClientStats(Map<UUID, String> clientStats) {
this.clientStats = clientStats;
}

@Override
public JsonObject toJson() {
final JsonObject root = new JsonObject();
Expand Down Expand Up @@ -339,11 +330,6 @@ public JsonObject toJson() {
root.add("hotRestartState", hotRestartState.toJson());
root.add("clusterHotRestartStatus", clusterHotRestartStatus.toJson());

JsonObject clientStatsObject = new JsonObject();
for (Map.Entry<UUID, String> entry : clientStats.entrySet()) {
clientStatsObject.add(entry.getKey().toString(), entry.getValue());
}
root.add("clientStats", clientStatsObject);
return root;
}

Expand Down Expand Up @@ -509,10 +495,6 @@ public void fromJson(JsonObject json) {
clusterHotRestartStatus = new ClusterHotRestartStatusDTO();
clusterHotRestartStatus.fromJson(jsonClusterHotRestartStatus);
}
clientStats = new HashMap<>();
for (JsonObject.Member next : getObject(json, "clientStats")) {
clientStats.put(UUID.fromString(next.getName()), next.getValue().asString());
}
}

@Override
Expand All @@ -522,6 +504,7 @@ public String toString() {
+ ", uuid=" + uuid
+ ", cpMemberUuid=" + cpMemberUuid
+ ", name=" + name
+ ", clients=" + clients
+ ", mapsWithStats=" + mapsWithStats
+ ", multiMapsWithStats=" + multiMapsWithStats
+ ", replicatedMapsWithStats=" + replicatedMapsWithStats
Expand All @@ -541,7 +524,6 @@ public String toString() {
+ ", nodeState=" + nodeState
+ ", hotRestartState=" + hotRestartState
+ ", clusterHotRestartStatus=" + clusterHotRestartStatus
+ ", clientStats=" + clientStats
+ '}';
}
}
Expand Up @@ -70,6 +70,13 @@ public interface Connection {
*/
long lastWriteTimeMillis();

/**
* Returns the clock time in milliseconds of the initialization of this connection.
*
* @return the clock time of the initialization of this connection.
*/
long getStartTime();

/**
* Returns the address of the endpoint this Connection is connected to, or
* <code>null</code> if it is unconnected.
Expand Down
Expand Up @@ -71,6 +71,8 @@ public class TcpServerConnection implements ServerConnection {
// indicate whether connection handshake is in progress/done (true) or not yet initiated (when false)
private final AtomicBoolean handshake = new AtomicBoolean();

private final long startTime = System.currentTimeMillis();

private final ILogger logger;

// Flag that indicates if the connection is accepted on this member (server-side)
Expand Down Expand Up @@ -162,6 +164,10 @@ public void setConnectionType(String connectionType) {
}
}

public long getStartTime() {
return startTime;
}

public TcpServerConnectionManager getConnectionManager() {
return connectionManager;
}
Expand Down
Expand Up @@ -78,14 +78,18 @@ public void testSerialization()
CacheStatisticsImpl cacheStatistics = new CacheStatisticsImpl(Clock.currentTimeMillis());
cacheStatistics.increaseCacheHits(5);
UUID clientUuid = UUID.randomUUID();
long connectionTimestamp = System.currentTimeMillis();

Collection<ClientEndPointDTO> clients = new ArrayList<>();
ClientEndPointDTO client = new ClientEndPointDTO();
client.uuid = clientUuid;
client.address = "localhost";
client.clientType = "undefined";
client.clientVersion = "5.2";
client.enterprise = true;
client.statsEnabled = true;
client.name = "aClient";
client.clusterConnectionTimestamp = connectionTimestamp;
client.labels = new HashSet<>(Collections.singletonList("label"));
client.ipAddress = "10.176.167.34";
client.canonicalHostName = "ip-10-176-167-34.ec2.internal";
Expand All @@ -100,9 +104,6 @@ public void testSerialization()
final String backupDirectory = "/hot/backup/dir";
final HotRestartStateImpl hotRestartState = new HotRestartStateImpl(backupTaskStatus, true, backupDirectory);

Map<UUID, String> clientStats = new HashMap<>();
clientStats.put(clientUuid, "someStats");

Map<EndpointQualifier, Address> endpoints = new HashMap<>();
endpoints.put(EndpointQualifier.MEMBER, new Address("127.0.0.1", 5701));
endpoints.put(EndpointQualifier.resolve(ProtocolType.WAN, "MyWAN"), new Address("127.0.0.1", 5901));
Expand Down Expand Up @@ -132,7 +133,6 @@ public void testSerialization()
memberState.setClients(clients);
memberState.setNodeState(state);
memberState.setHotRestartState(hotRestartState);
memberState.setClientStats(clientStats);

MemberStateImpl deserialized = new MemberStateImpl();
deserialized.fromJson(memberState.toJson());
Expand Down Expand Up @@ -162,7 +162,10 @@ public void testSerialization()
assertEquals(clientUuid, client.uuid);
assertEquals("localhost", client.address);
assertEquals("undefined", client.clientType);
assertTrue(client.enterprise);
assertTrue(client.statsEnabled);
assertEquals("aClient", client.name);
assertEquals(connectionTimestamp, client.clusterConnectionTimestamp);
assertContains(client.labels, "label");
assertEquals("10.176.167.34", client.ipAddress);
assertEquals("ip-10-176-167-34.ec2.internal", client.canonicalHostName);
Expand All @@ -184,8 +187,5 @@ public void testSerialization()
assertEquals(-1, clusterHotRestartStatus.getRemainingValidationTimeMillis());
assertEquals(-1, clusterHotRestartStatus.getRemainingDataLoadTimeMillis());
assertTrue(clusterHotRestartStatus.getMemberHotRestartStatusMap().isEmpty());

Map<UUID, String> deserializedClientStats = deserialized.getClientStats();
assertEquals("someStats", deserializedClientStats.get(clientUuid));
}
}
Expand Up @@ -93,6 +93,11 @@ public long lastWriteTimeMillis() {
return timestamp;
}

@Override
public long getStartTime() {
return timestamp;
}

@Override
public void close(String msg, Throwable cause) {
if (!isAlive.compareAndSet(true, false)) {
Expand Down
Expand Up @@ -373,6 +373,11 @@ public long lastWriteTimeMillis() {
return delegate.lastWriteTimeMillis();
}

@Override
public long getStartTime() {
return delegate.getStartTime();
}

@Override
public InetSocketAddress getRemoteSocketAddress() {
return delegate.getRemoteSocketAddress();
Expand Down

0 comments on commit cc4dad2

Please sign in to comment.