From 12d1c96a070f07544a72077dfa2c2bb77d6759f9 Mon Sep 17 00:00:00 2001 From: technoboy Date: Fri, 23 Jul 2021 00:04:14 +0800 Subject: [PATCH] Add metrics storageLogicalSize for the TopicStats and NamespaceStats (#11430) --- .../org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java | 5 +++++ .../bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java | 5 +++++ .../broker/stats/prometheus/AggregatedNamespaceStats.java | 1 + .../pulsar/broker/stats/prometheus/ManagedLedgerStats.java | 2 ++ .../broker/stats/prometheus/NamespaceStatsAggregator.java | 2 ++ .../apache/pulsar/broker/stats/prometheus/TopicStats.java | 2 ++ .../broker/stats/prometheus/TransactionAggregator.java | 3 +++ .../apache/pulsar/broker/stats/TransactionMetricsTest.java | 6 ++++++ .../stats/prometheus/AggregatedNamespaceStatsTest.java | 4 ++++ site2/docs/reference-metrics.md | 1 + 10 files changed, 31 insertions(+) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java index d6f2b8969a3e4..b4444c0147737 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java @@ -40,6 +40,11 @@ public interface ManagedLedgerMXBean { */ long getStoredMessagesSize(); + /** + * @return the total size of the messages in active ledgers (without accounting for replicas) + */ + long getStoredMessagesLogicalSize(); + /** * @return the number of backlog messages for all the consumers */ diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java index 87606d2953612..9e22b515e4abe 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java @@ -281,6 +281,11 @@ public long getStoredMessagesSize() { return managedLedger.getTotalSize() * managedLedger.getConfig().getWriteQuorumSize(); } + @Override + public long getStoredMessagesLogicalSize() { + return managedLedger.getTotalSize(); + } + @Override public long getNumberOfMessagesInBacklog() { long count = 0; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java index 05f1b15f530e0..c049366fe03b4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java @@ -65,6 +65,7 @@ void updateStats(TopicStats stats) { msgOutCounter += stats.msgOutCounter; managedLedgerStats.storageSize += stats.managedLedgerStats.storageSize; + managedLedgerStats.storageLogicalSize += stats.managedLedgerStats.storageLogicalSize; managedLedgerStats.backlogSize += stats.managedLedgerStats.backlogSize; managedLedgerStats.offloadedStorageUsed += stats.managedLedgerStats.offloadedStorageUsed; backlogQuotaLimit = Math.max(backlogQuotaLimit, stats.backlogQuotaLimit); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/ManagedLedgerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/ManagedLedgerStats.java index f91416627582a..9f124a47f9383 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/ManagedLedgerStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/ManagedLedgerStats.java @@ -26,6 +26,7 @@ public class ManagedLedgerStats { long storageSize; long backlogSize; long offloadedStorageUsed; + long storageLogicalSize; StatsBuckets storageWriteLatencyBuckets = new StatsBuckets(ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC); StatsBuckets storageLedgerWriteLatencyBuckets = new StatsBuckets(ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC); @@ -40,6 +41,7 @@ public void reset() { storageReadRate = 0; backlogSize = 0; offloadedStorageUsed = 0; + storageLogicalSize = 0; storageWriteLatencyBuckets.reset(); storageLedgerWriteLatencyBuckets.reset(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index 18c4f97f40c38..43119d7eb207d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -96,6 +96,7 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl) ml.getStats(); stats.managedLedgerStats.storageSize = mlStats.getStoredMessagesSize(); + stats.managedLedgerStats.storageLogicalSize = mlStats.getStoredMessagesLogicalSize(); stats.managedLedgerStats.backlogSize = ml.getEstimatedBacklogSize(); stats.managedLedgerStats.offloadedStorageUsed = ml.getOffloadedSize(); stats.backlogQuotaLimit = topic.getBacklogQuota().getLimitSize(); @@ -258,6 +259,7 @@ private static void printNamespaceStats(SimpleTextOutputStream stream, String cl metric(stream, cluster, namespace, "pulsar_out_messages_total", stats.msgOutCounter); metric(stream, cluster, namespace, "pulsar_storage_size", stats.managedLedgerStats.storageSize); + metric(stream, cluster, namespace, "pulsar_storage_logical_size", stats.managedLedgerStats.storageLogicalSize); metric(stream, cluster, namespace, "pulsar_storage_backlog_size", stats.managedLedgerStats.backlogSize); metric(stream, cluster, namespace, "pulsar_storage_offloaded_size", stats.managedLedgerStats.offloadedStorageUsed); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java index 0649235a36960..1ac514a348542 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java @@ -92,6 +92,8 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin metric(stream, cluster, namespace, topic, "pulsar_average_msg_size", stats.averageMsgSize); metric(stream, cluster, namespace, topic, "pulsar_storage_size", stats.managedLedgerStats.storageSize); + metric(stream, cluster, namespace, topic, "pulsar_storage_logical_size", + stats.managedLedgerStats.storageLogicalSize); metric(stream, cluster, namespace, topic, "pulsar_msg_backlog", stats.msgBacklog); metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_size", stats.managedLedgerStats.backlogSize); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java index a709933767d30..5ea2bea6fa7ee 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java @@ -119,6 +119,7 @@ private static void generateManageLedgerStats(ManagedLedger managedLedger, Simpl ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl) managedLedger.getStats(); managedLedgerStats.storageSize = mlStats.getStoredMessagesSize(); + managedLedgerStats.storageLogicalSize = mlStats.getStoredMessagesLogicalSize(); managedLedgerStats.backlogSize = managedLedger.getEstimatedBacklogSize(); managedLedgerStats.offloadedStorageUsed = managedLedger.getOffloadedSize(); @@ -167,6 +168,8 @@ private static void printManageLedgerStats(SimpleTextOutputStream stream, String metrics(stream, cluster, namespace, topic, subscription, "pulsar_storage_size", stats.storageSize); + metrics(stream, cluster, namespace, topic, subscription, + "pulsar_storage_logical_size", stats.storageLogicalSize); metrics(stream, cluster, namespace, topic, subscription, "pulsar_storage_backlog_size", stats.backlogSize); metrics(stream, cluster, namespace, topic, subscription, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java index aedf40df7fd06..c18c97fb4f117 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java @@ -235,6 +235,10 @@ public void testManagedLedgerMetrics() throws Exception{ checkManagedLedgerMetrics(subName, 32, metric); checkManagedLedgerMetrics(MLTransactionLogImpl.TRANSACTION_SUBSCRIPTION_NAME, 252, metric); + metric = metrics.get("pulsar_storage_logical_size"); + checkManagedLedgerMetrics(subName, 16, metric); + checkManagedLedgerMetrics(MLTransactionLogImpl.TRANSACTION_SUBSCRIPTION_NAME, 126, metric); + metric = metrics.get("pulsar_storage_backlog_size"); checkManagedLedgerMetrics(subName, 16, metric); checkManagedLedgerMetrics(MLTransactionLogImpl.TRANSACTION_SUBSCRIPTION_NAME, 126, metric); @@ -245,6 +249,8 @@ public void testManagedLedgerMetrics() throws Exception{ metrics = parseMetrics(metricsStr); metric = metrics.get("pulsar_storage_size"); assertEquals(metric.size(), 3); + metric = metrics.get("pulsar_storage_logical_size"); + assertEquals(metric.size(), 2); metric = metrics.get("pulsar_storage_backlog_size"); assertEquals(metric.size(), 2); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java index 2d0a5f938df5d..978492887b9ac 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java @@ -39,6 +39,7 @@ public void testSimpleAggregation() { topicStats1.throughputIn = 10240.0; topicStats1.throughputOut = 20480.0; topicStats1.managedLedgerStats.storageSize = 5120; + topicStats1.managedLedgerStats.storageLogicalSize = 2048; topicStats1.msgBacklog = 30; topicStats1.managedLedgerStats.storageWriteRate = 12.0; topicStats1.managedLedgerStats.storageReadRate = 6.0; @@ -70,6 +71,7 @@ public void testSimpleAggregation() { topicStats2.throughputIn = 512.0; topicStats2.throughputOut = 1024.5; topicStats2.managedLedgerStats.storageSize = 1024; + topicStats2.managedLedgerStats.storageLogicalSize = 512; topicStats2.msgBacklog = 7; topicStats2.managedLedgerStats.storageWriteRate = 5.0; topicStats2.managedLedgerStats.storageReadRate = 2.5; @@ -108,6 +110,8 @@ public void testSimpleAggregation() { assertEquals(nsStats.msgBacklog, 37); assertEquals(nsStats.managedLedgerStats.storageWriteRate, 17.0); assertEquals(nsStats.managedLedgerStats.storageReadRate, 8.5); + assertEquals(nsStats.managedLedgerStats.storageSize, 6144); + assertEquals(nsStats.managedLedgerStats.storageLogicalSize, 2560); AggregatedReplicationStats nsReplStats = nsStats.replicationStats.get(namespace); assertNotNull(nsReplStats); diff --git a/site2/docs/reference-metrics.md b/site2/docs/reference-metrics.md index f9be3bc3f1537..c89ca5c4270f3 100644 --- a/site2/docs/reference-metrics.md +++ b/site2/docs/reference-metrics.md @@ -144,6 +144,7 @@ All the namespace metrics are labelled with the following labels: | pulsar_throughput_in | Gauge | The total throughput of the namespace coming into this broker (bytes/second). | | pulsar_throughput_out | Gauge | The total throughput of the namespace going out from this broker (bytes/second). | | pulsar_storage_size | Gauge | The total storage size of the topics in this namespace owned by this broker (bytes). | +| pulsar_storage_logical_size | Gauge | The storage size (without accounting for replicas) of the topics in this namespace owned by this broker (bytes). | | pulsar_storage_backlog_size | Gauge | The total backlog size of the topics of this namespace owned by this broker (messages). | | pulsar_storage_offloaded_size | Gauge | The total amount of the data in this namespace offloaded to the tiered storage (bytes). | | pulsar_storage_write_rate | Gauge | The total message batches (entries) written to the storage for this namespace (message batches / second). |