Skip to content

Commit

Permalink
Fix missing replicator metrics (#11264)
Browse files Browse the repository at this point in the history
Fix missing replicator metrics [msgRateExpired, connected, replicationDelayInSeconds] in prometheus.

1.  Fix the missing metrics.
2. Update the document.

- [X] Make sure that the change passes the CI checks.

(cherry picked from commit d811606)
  • Loading branch information
Technoboy- authored and codelipenghui committed Jul 14, 2021
1 parent e277464 commit 94b5ff5
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 3 deletions.
Expand Up @@ -88,6 +88,9 @@ void updateStats(TopicStats stats) {
replStats.msgThroughputIn += as.msgThroughputIn;
replStats.msgThroughputOut += as.msgThroughputOut;
replStats.replicationBacklog += as.replicationBacklog;
replStats.msgRateExpired += as.msgRateExpired;
replStats.connectedCount += as.connectedCount;
replStats.replicationDelayInSeconds += as.replicationDelayInSeconds;
});

stats.subscriptionStats.forEach((n, as) -> {
Expand Down
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.broker.stats.prometheus;

public class AggregatedReplicationStats {

/** Total rate of messages received from the remote cluster (msg/s). */
public double msgRateIn;

/** Total throughput received from the remote cluster. bytes/s */
Expand All @@ -30,9 +32,16 @@ public class AggregatedReplicationStats {
/** Total throughput delivered to the replication-subscriber. bytes/s */
public double msgThroughputOut;

/**
* Number of messages pending to be replicated to remote cluster.
*/
/** Total rate of messages expired (msg/s). */
public double msgRateExpired;

/** Number of messages pending to be replicated to remote cluster. */
public long replicationBacklog;

/** The count of replication-subscriber up and running to replicate to remote cluster. */
public long connectedCount;

/** Time in seconds from the time a message was produced to the time when it is about to be replicated. */
public long replicationDelayInSeconds;

}
Expand Up @@ -210,6 +210,11 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
aggReplStats.msgRateOut += replStats.msgRateOut;
aggReplStats.msgThroughputOut += replStats.msgThroughputOut;
aggReplStats.replicationBacklog += replStats.replicationBacklog;
aggReplStats.msgRateIn += replStats.msgRateIn;
aggReplStats.msgThroughputIn += replStats.msgThroughputIn;
aggReplStats.msgRateExpired += replStats.msgRateExpired;
aggReplStats.connectedCount += replStats.connected ? 1 : 0;
aggReplStats.replicationDelayInSeconds += replStats.replicationDelayInSeconds;
});
}

Expand Down Expand Up @@ -327,6 +332,12 @@ private static void printNamespaceStats(SimpleTextOutputStream stream, String cl
replStats.msgThroughputOut);
metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_backlog", remoteCluster,
replStats.replicationBacklog);
metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_connected_count", remoteCluster,
replStats.connectedCount);
metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_rate_expired", remoteCluster,
replStats.msgRateExpired);
metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_delay_in_seconds",
remoteCluster, replStats.replicationDelayInSeconds);
});
}
}
Expand Down
Expand Up @@ -237,6 +237,12 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin
replStats.msgThroughputOut);
metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_backlog", remoteCluster,
replStats.replicationBacklog);
metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_connected_count",
remoteCluster, replStats.connectedCount);
metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_rate_expired",
remoteCluster, replStats.msgRateExpired);
metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_delay_in_seconds",
remoteCluster, replStats.replicationDelayInSeconds);
});
}

Expand Down
Expand Up @@ -49,6 +49,9 @@ public void testSimpleAggregation() {
replStats1.msgRateOut = 2.0;
replStats1.msgThroughputOut = 256.0;
replStats1.replicationBacklog = 1;
replStats1.connectedCount = 0;
replStats1.msgRateExpired = 3.0;
replStats1.replicationDelayInSeconds = 20;
topicStats1.replicationStats.put(namespace, replStats1);

AggregatedSubscriptionStats subStats1 = new AggregatedSubscriptionStats();
Expand Down Expand Up @@ -77,6 +80,9 @@ public void testSimpleAggregation() {
replStats2.msgRateOut = 10.5;
replStats2.msgThroughputOut = 1536.0;
replStats2.replicationBacklog = 99;
replStats2.connectedCount = 1;
replStats2.msgRateExpired = 3.0;
replStats2.replicationDelayInSeconds = 20;
topicStats2.replicationStats.put(namespace, replStats2);

AggregatedSubscriptionStats subStats2 = new AggregatedSubscriptionStats();
Expand Down Expand Up @@ -110,6 +116,9 @@ public void testSimpleAggregation() {
assertEquals(nsReplStats.msgRateOut, 12.5);
assertEquals(nsReplStats.msgThroughputOut, 1792.0);
assertEquals(nsReplStats.replicationBacklog, 100);
assertEquals(nsReplStats.connectedCount, 1);
assertEquals(nsReplStats.msgRateExpired, 6.0);
assertEquals(nsReplStats.replicationDelayInSeconds, 40);

AggregatedSubscriptionStats nsSubStats = nsStats.subscriptionStats.get(namespace);
assertNotNull(nsSubStats);
Expand Down
4 changes: 4 additions & 0 deletions site2/docs/reference-metrics.md
Expand Up @@ -165,6 +165,10 @@ All the replication metrics are also labelled with `remoteCluster=${pulsar_remot
| pulsar_replication_throughput_in | Gauge | The total throughput of the namespace replicating from remote cluster (bytes/second). |
| pulsar_replication_throughput_out | Gauge | The total throughput of the namespace replicating to remote cluster (bytes/second). |
| pulsar_replication_backlog | Gauge | The total backlog of the namespace replicating to remote cluster (messages). |
| pulsar_replication_rate_expired | Gauge | Total rate of messages expired (messages/second). |
| pulsar_replication_connected_count | Gauge | The count of replication-subscriber up and running to replicate to remote cluster. |
| pulsar_replication_delay_in_seconds | Gauge | Time in seconds from the time a message was produced to the time when it is about to be replicated. |
~~~~
### Topic metrics
Expand Down

0 comments on commit 94b5ff5

Please sign in to comment.