Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics storageLogicalSize for the TopicStats and NamespaceStats #11430

Merged
merged 1 commit into from Jul 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -40,6 +40,11 @@ public interface ManagedLedgerMXBean {
*/
long getStoredMessagesSize();

/**
* @return the total size of the messages in active ledgers (without accounting for replicas)
*/
long getStoredMessagesLogicalSize();

/**
* @return the number of backlog messages for all the consumers
*/
Expand Down
Expand Up @@ -281,6 +281,11 @@ public long getStoredMessagesSize() {
return managedLedger.getTotalSize() * managedLedger.getConfig().getWriteQuorumSize();
}

@Override
public long getStoredMessagesLogicalSize() {
return managedLedger.getTotalSize();
}

@Override
public long getNumberOfMessagesInBacklog() {
long count = 0;
Expand Down
Expand Up @@ -65,6 +65,7 @@ void updateStats(TopicStats stats) {
msgOutCounter += stats.msgOutCounter;

managedLedgerStats.storageSize += stats.managedLedgerStats.storageSize;
managedLedgerStats.storageLogicalSize += stats.managedLedgerStats.storageLogicalSize;
managedLedgerStats.backlogSize += stats.managedLedgerStats.backlogSize;
managedLedgerStats.offloadedStorageUsed += stats.managedLedgerStats.offloadedStorageUsed;
backlogQuotaLimit = Math.max(backlogQuotaLimit, stats.backlogQuotaLimit);
Expand Down
Expand Up @@ -26,6 +26,7 @@ public class ManagedLedgerStats {
long storageSize;
long backlogSize;
long offloadedStorageUsed;
long storageLogicalSize;

StatsBuckets storageWriteLatencyBuckets = new StatsBuckets(ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC);
StatsBuckets storageLedgerWriteLatencyBuckets = new StatsBuckets(ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC);
Expand All @@ -40,6 +41,7 @@ public void reset() {
storageReadRate = 0;
backlogSize = 0;
offloadedStorageUsed = 0;
storageLogicalSize = 0;

storageWriteLatencyBuckets.reset();
storageLedgerWriteLatencyBuckets.reset();
Expand Down
Expand Up @@ -96,6 +96,7 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl) ml.getStats();

stats.managedLedgerStats.storageSize = mlStats.getStoredMessagesSize();
stats.managedLedgerStats.storageLogicalSize = mlStats.getStoredMessagesLogicalSize();
stats.managedLedgerStats.backlogSize = ml.getEstimatedBacklogSize();
stats.managedLedgerStats.offloadedStorageUsed = ml.getOffloadedSize();
stats.backlogQuotaLimit = topic.getBacklogQuota().getLimitSize();
Expand Down Expand Up @@ -258,6 +259,7 @@ private static void printNamespaceStats(SimpleTextOutputStream stream, String cl
metric(stream, cluster, namespace, "pulsar_out_messages_total", stats.msgOutCounter);

metric(stream, cluster, namespace, "pulsar_storage_size", stats.managedLedgerStats.storageSize);
metric(stream, cluster, namespace, "pulsar_storage_logical_size", stats.managedLedgerStats.storageLogicalSize);
metric(stream, cluster, namespace, "pulsar_storage_backlog_size", stats.managedLedgerStats.backlogSize);
metric(stream, cluster, namespace, "pulsar_storage_offloaded_size",
stats.managedLedgerStats.offloadedStorageUsed);
Expand Down
Expand Up @@ -92,6 +92,8 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin
metric(stream, cluster, namespace, topic, "pulsar_average_msg_size", stats.averageMsgSize);

metric(stream, cluster, namespace, topic, "pulsar_storage_size", stats.managedLedgerStats.storageSize);
metric(stream, cluster, namespace, topic, "pulsar_storage_logical_size",
stats.managedLedgerStats.storageLogicalSize);
metric(stream, cluster, namespace, topic, "pulsar_msg_backlog", stats.msgBacklog);
metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_size",
stats.managedLedgerStats.backlogSize);
Expand Down
Expand Up @@ -119,6 +119,7 @@ private static void generateManageLedgerStats(ManagedLedger managedLedger, Simpl
ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl) managedLedger.getStats();

managedLedgerStats.storageSize = mlStats.getStoredMessagesSize();
managedLedgerStats.storageLogicalSize = mlStats.getStoredMessagesLogicalSize();
managedLedgerStats.backlogSize = managedLedger.getEstimatedBacklogSize();
managedLedgerStats.offloadedStorageUsed = managedLedger.getOffloadedSize();

Expand Down Expand Up @@ -167,6 +168,8 @@ private static void printManageLedgerStats(SimpleTextOutputStream stream, String

metrics(stream, cluster, namespace, topic, subscription,
"pulsar_storage_size", stats.storageSize);
metrics(stream, cluster, namespace, topic, subscription,
"pulsar_storage_logical_size", stats.storageLogicalSize);
metrics(stream, cluster, namespace, topic, subscription,
"pulsar_storage_backlog_size", stats.backlogSize);
metrics(stream, cluster, namespace, topic, subscription,
Expand Down
Expand Up @@ -235,6 +235,10 @@ public void testManagedLedgerMetrics() throws Exception{
checkManagedLedgerMetrics(subName, 32, metric);
checkManagedLedgerMetrics(MLTransactionLogImpl.TRANSACTION_SUBSCRIPTION_NAME, 252, metric);

metric = metrics.get("pulsar_storage_logical_size");
checkManagedLedgerMetrics(subName, 16, metric);
checkManagedLedgerMetrics(MLTransactionLogImpl.TRANSACTION_SUBSCRIPTION_NAME, 126, metric);

metric = metrics.get("pulsar_storage_backlog_size");
checkManagedLedgerMetrics(subName, 16, metric);
checkManagedLedgerMetrics(MLTransactionLogImpl.TRANSACTION_SUBSCRIPTION_NAME, 126, metric);
Expand All @@ -245,6 +249,8 @@ public void testManagedLedgerMetrics() throws Exception{
metrics = parseMetrics(metricsStr);
metric = metrics.get("pulsar_storage_size");
assertEquals(metric.size(), 3);
metric = metrics.get("pulsar_storage_logical_size");
assertEquals(metric.size(), 2);
metric = metrics.get("pulsar_storage_backlog_size");
assertEquals(metric.size(), 2);
}
Expand Down
Expand Up @@ -39,6 +39,7 @@ public void testSimpleAggregation() {
topicStats1.throughputIn = 10240.0;
topicStats1.throughputOut = 20480.0;
topicStats1.managedLedgerStats.storageSize = 5120;
topicStats1.managedLedgerStats.storageLogicalSize = 2048;
topicStats1.msgBacklog = 30;
topicStats1.managedLedgerStats.storageWriteRate = 12.0;
topicStats1.managedLedgerStats.storageReadRate = 6.0;
Expand Down Expand Up @@ -70,6 +71,7 @@ public void testSimpleAggregation() {
topicStats2.throughputIn = 512.0;
topicStats2.throughputOut = 1024.5;
topicStats2.managedLedgerStats.storageSize = 1024;
topicStats2.managedLedgerStats.storageLogicalSize = 512;
topicStats2.msgBacklog = 7;
topicStats2.managedLedgerStats.storageWriteRate = 5.0;
topicStats2.managedLedgerStats.storageReadRate = 2.5;
Expand Down Expand Up @@ -108,6 +110,8 @@ public void testSimpleAggregation() {
assertEquals(nsStats.msgBacklog, 37);
assertEquals(nsStats.managedLedgerStats.storageWriteRate, 17.0);
assertEquals(nsStats.managedLedgerStats.storageReadRate, 8.5);
assertEquals(nsStats.managedLedgerStats.storageSize, 6144);
assertEquals(nsStats.managedLedgerStats.storageLogicalSize, 2560);

AggregatedReplicationStats nsReplStats = nsStats.replicationStats.get(namespace);
assertNotNull(nsReplStats);
Expand Down
1 change: 1 addition & 0 deletions site2/docs/reference-metrics.md
Expand Up @@ -144,6 +144,7 @@ All the namespace metrics are labelled with the following labels:
| pulsar_throughput_in | Gauge | The total throughput of the namespace coming into this broker (bytes/second). |
| pulsar_throughput_out | Gauge | The total throughput of the namespace going out from this broker (bytes/second). |
| pulsar_storage_size | Gauge | The total storage size of the topics in this namespace owned by this broker (bytes). |
| pulsar_storage_logical_size | Gauge | The storage size (without accounting for replicas) of the topics in this namespace owned by this broker (bytes). |
| pulsar_storage_backlog_size | Gauge | The total backlog size of the topics of this namespace owned by this broker (messages). |
| pulsar_storage_offloaded_size | Gauge | The total amount of the data in this namespace offloaded to the tiered storage (bytes). |
| pulsar_storage_write_rate | Gauge | The total message batches (entries) written to the storage for this namespace (message batches / second). |
Expand Down