From 629a470587a3ff63e06fd1c295260df1e8a20d0a Mon Sep 17 00:00:00 2001 From: GuoJiwei Date: Wed, 4 Aug 2021 23:42:54 +0800 Subject: [PATCH] Add metrics [AddEntryWithReplicasBytesRate] for namespace (#11472) (cherry picked from commit 92e7825fce9b41b801a7762dea49b592d364b6b3) --- .../mledger/ManagedLedgerMXBean.java | 5 +++++ .../mledger/impl/EntryCacheManager.java | 6 ++--- .../mledger/impl/ManagedLedgerMBeanImpl.java | 10 ++++++++- .../mledger/impl/ManagedLedgerMBeanTest.java | 2 ++ .../stats/metrics/ManagedLedgerMetrics.java | 2 ++ .../prometheus/NamespaceStatsAggregator.java | 1 + .../broker/stats/prometheus/TopicStats.java | 22 +++++++++---------- .../broker/stats/TransactionMetricsTest.java | 2 +- site2/docs/reference-metrics.md | 4 +++- 9 files changed, 37 insertions(+), 17 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java index b4444c0147737..6eccd029cbaf5 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java @@ -60,6 +60,11 @@ public interface ManagedLedgerMXBean { */ double getAddEntryBytesRate(); + /** + * @return the bytes/s rate of messages added with replicas + */ + double getAddEntryWithReplicasBytesRate(); + /** * @return the msg/s rate of messages read */ diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java index 81008cc299b6c..c87bcb8aa4031 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java @@ -46,7 +46,7 @@ public class EntryCacheManager { private final long maxSize; private final long evictionTriggerThreshold; - private final double cacheEvictionWatermak; + private final double cacheEvictionWatermark; private final AtomicLong currentSize = new AtomicLong(0); private final ConcurrentMap caches = Maps.newConcurrentMap(); private final EntryCacheEvictionPolicy evictionPolicy; @@ -64,7 +64,7 @@ public class EntryCacheManager { public EntryCacheManager(ManagedLedgerFactoryImpl factory) { this.maxSize = factory.getConfig().getMaxCacheSize(); this.evictionTriggerThreshold = (long) (maxSize * evictionTriggerThresholdPercent); - this.cacheEvictionWatermak = factory.getConfig().getCacheEvictionWatermark(); + this.cacheEvictionWatermark = factory.getConfig().getCacheEvictionWatermark(); this.evictionPolicy = new EntryCacheDefaultEvictionPolicy(); this.mlFactory = factory; this.mlFactoryMBean = factory.mbean; @@ -109,7 +109,7 @@ boolean hasSpaceInCache() { mlFactory.scheduledExecutor.execute(safeRun(() -> { // Trigger a new cache eviction cycle to bring the used memory below the cacheEvictionWatermark // percentage limit - long sizeToEvict = currentSize - (long) (maxSize * cacheEvictionWatermak); + long sizeToEvict = currentSize - (long) (maxSize * cacheEvictionWatermark); long startTime = System.nanoTime(); log.info("Triggering cache eviction. total size: {} Mb -- Need to discard: {} Mb", currentSize / MB, sizeToEvict / MB); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java index 9e22b515e4abe..01a88c157bb0c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java @@ -35,6 +35,7 @@ public class ManagedLedgerMBeanImpl implements ManagedLedgerMXBean { private final ManagedLedgerImpl managedLedger; private final Rate addEntryOps = new Rate(); + private final Rate addEntryWithReplicasOps = new Rate(); private final Rate addEntryOpsFailed = new Rate(); private final Rate readEntriesOps = new Rate(); private final Rate readEntriesOpsFailed = new Rate(); @@ -49,7 +50,7 @@ public class ManagedLedgerMBeanImpl implements ManagedLedgerMXBean { private final LongAdder cursorLedgerCreateOp = new LongAdder(); private final LongAdder cursorLedgerDeleteOp = new LongAdder(); - // addEntryLatencyStatsUsec measure total latency including time entry spent while waiting in queue + // addEntryLatencyStatsUsec measure total latency including time entry spent while waiting in queue private final StatsBuckets addEntryLatencyStatsUsec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC); // ledgerAddEntryLatencyStatsUsec measure latency to persist entry into ledger private final StatsBuckets ledgerAddEntryLatencyStatsUsec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC); @@ -63,6 +64,7 @@ public ManagedLedgerMBeanImpl(ManagedLedgerImpl managedLedger) { public void refreshStats(long period, TimeUnit unit) { double seconds = unit.toMillis(period) / 1000.0; addEntryOps.calculateRate(seconds); + addEntryWithReplicasOps.calculateRate(seconds); addEntryOpsFailed.calculateRate(seconds); readEntriesOps.calculateRate(seconds); readEntriesOpsFailed.calculateRate(seconds); @@ -77,6 +79,7 @@ public void refreshStats(long period, TimeUnit unit) { public void addAddEntrySample(long size) { addEntryOps.recordEvent(size); entryStats.addValue(size); + addEntryWithReplicasOps.recordEvent(size * managedLedger.getConfig().getWriteQuorumSize()); } public void addMarkDeleteOp() { @@ -186,6 +189,11 @@ public double getAddEntryBytesRate() { return addEntryOps.getValueRate(); } + @Override + public double getAddEntryWithReplicasBytesRate() { + return addEntryWithReplicasOps.getValueRate(); + } + @Override public double getReadEntriesRate() { return readEntriesOps.getRate(); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java index 66accf3c2a401..89c69a08db40c 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java @@ -74,6 +74,7 @@ public void simple() throws Exception { }).get(); assertEquals(mbean.getAddEntryBytesRate(), 0.0); + assertEquals(mbean.getAddEntryWithReplicasBytesRate(), 0.0); assertEquals(mbean.getAddEntryMessagesRate(), 0.0); assertEquals(mbean.getAddEntrySucceed(), 0); assertEquals(mbean.getAddEntryErrors(), 0); @@ -101,6 +102,7 @@ public void simple() throws Exception { }).get(); assertEquals(mbean.getAddEntryBytesRate(), 800.0); + assertEquals(mbean.getAddEntryWithReplicasBytesRate(), 1600.0); assertEquals(mbean.getAddEntryMessagesRate(), 2.0); assertEquals(mbean.getAddEntrySucceed(), 2); assertEquals(mbean.getAddEntryErrors(), 0); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java index 8e15affd126ba..889eb8ea5c1d7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java @@ -85,6 +85,8 @@ private List aggregate(Map> ledgersByD populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_ml_AddEntryBytesRate", lStats.getAddEntryBytesRate()); + populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_ml_AddEntryWithReplicasBytesRate", + lStats.getAddEntryWithReplicasBytesRate()); populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_ml_AddEntryErrors", (double) lStats.getAddEntryErrors()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index 43119d7eb207d..c08641d46a936 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -231,6 +231,7 @@ private static void printDefaultBrokerStats(SimpleTextOutputStream stream, Strin metric(stream, cluster, "pulsar_throughput_in", 0); metric(stream, cluster, "pulsar_throughput_out", 0); metric(stream, cluster, "pulsar_storage_size", 0); + metric(stream, cluster, "pulsar_storage_logical_size", 0); metric(stream, cluster, "pulsar_storage_write_rate", 0); metric(stream, cluster, "pulsar_storage_read_rate", 0); metric(stream, cluster, "pulsar_msg_backlog", 0); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java index 1ac514a348542..9474ecb1656da 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java @@ -119,27 +119,27 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_sum", stats.managedLedgerStats.storageWriteLatencyBuckets.getSum()); - long[] ledgerWritelatencyBuckets = stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets(); + long[] ledgerWriteLatencyBuckets = stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets(); metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_0_5", - ledgerWritelatencyBuckets[0]); + ledgerWriteLatencyBuckets[0]); metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_1", - ledgerWritelatencyBuckets[1]); + ledgerWriteLatencyBuckets[1]); metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_5", - ledgerWritelatencyBuckets[2]); + ledgerWriteLatencyBuckets[2]); metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_10", - ledgerWritelatencyBuckets[3]); + ledgerWriteLatencyBuckets[3]); metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_20", - ledgerWritelatencyBuckets[4]); + ledgerWriteLatencyBuckets[4]); metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_50", - ledgerWritelatencyBuckets[5]); + ledgerWriteLatencyBuckets[5]); metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_100", - ledgerWritelatencyBuckets[6]); + ledgerWriteLatencyBuckets[6]); metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_200", - ledgerWritelatencyBuckets[7]); + ledgerWriteLatencyBuckets[7]); metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_1000", - ledgerWritelatencyBuckets[8]); + ledgerWriteLatencyBuckets[8]); metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_overflow", - ledgerWritelatencyBuckets[9]); + ledgerWriteLatencyBuckets[9]); metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_count", stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount()); metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_sum", diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java index f0266dea4f1b1..b97f2238b5f65 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java @@ -250,7 +250,7 @@ public void testManagedLedgerMetrics() throws Exception{ metric = metrics.get("pulsar_storage_size"); assertEquals(metric.size(), 3); metric = metrics.get("pulsar_storage_logical_size"); - assertEquals(metric.size(), 2); + assertEquals(metric.size(), 3); metric = metrics.get("pulsar_storage_backlog_size"); assertEquals(metric.size(), 2); } diff --git a/site2/docs/reference-metrics.md b/site2/docs/reference-metrics.md index c89ca5c4270f3..aa69b79053716 100644 --- a/site2/docs/reference-metrics.md +++ b/site2/docs/reference-metrics.md @@ -144,7 +144,7 @@ All the namespace metrics are labelled with the following labels: | pulsar_throughput_in | Gauge | The total throughput of the namespace coming into this broker (bytes/second). | | pulsar_throughput_out | Gauge | The total throughput of the namespace going out from this broker (bytes/second). | | pulsar_storage_size | Gauge | The total storage size of the topics in this namespace owned by this broker (bytes). | -| pulsar_storage_logical_size | Gauge | The storage size (without accounting for replicas) of the topics in this namespace owned by this broker (bytes). | +| pulsar_storage_logical_size | Gauge | The storage size of topics in the namespace owned by the broker without replicas (in bytes). | | pulsar_storage_backlog_size | Gauge | The total backlog size of the topics of this namespace owned by this broker (messages). | | pulsar_storage_offloaded_size | Gauge | The total amount of the data in this namespace offloaded to the tiered storage (bytes). | | pulsar_storage_write_rate | Gauge | The total message batches (entries) written to the storage for this namespace (message batches / second). | @@ -191,6 +191,7 @@ All the topic metrics are labelled with the following labels: | pulsar_throughput_in | Gauge | The total throughput of the topic coming into this broker (bytes/second). | | pulsar_throughput_out | Gauge | The total throughput of the topic going out from this broker (bytes/second). | | pulsar_storage_size | Gauge | The total storage size of the topics in this topic owned by this broker (bytes). | +| pulsar_storage_logical_size | Gauge | The storage size of topics in the namespace owned by the broker without replicas (in bytes). | | pulsar_storage_backlog_size | Gauge | The total backlog size of the topics of this topic owned by this broker (messages). | | pulsar_storage_offloaded_size | Gauge | The total amount of the data in this topic offloaded to the tiered storage (bytes). | | pulsar_storage_backlog_quota_limit | Gauge | The total amount of the data in this topic that limit the backlog quota (bytes). | @@ -248,6 +249,7 @@ All the managedLedger metrics are labelled with the following labels: | Name | Type | Description | | --- | --- | --- | | pulsar_ml_AddEntryBytesRate | Gauge | The bytes/s rate of messages added | +| pulsar_ml_AddEntryWithReplicasBytesRate | Gauge | The bytes/s rate of messages added with replicas | | pulsar_ml_AddEntryErrors | Gauge | The number of addEntry requests that failed | | pulsar_ml_AddEntryLatencyBuckets | Histogram | The add entry latency of a ledger with a given quantile (threshold).
Available quantile:
  • quantile="0.0_0.5" is AddEntryLatency between (0.0ms, 0.5ms]
  • quantile="0.5_1.0" is AddEntryLatency between (0.5ms, 1.0ms]
  • quantile="1.0_5.0" is AddEntryLatency between (1ms, 5ms]
  • quantile="5.0_10.0" is AddEntryLatency between (5ms, 10ms]
  • quantile="10.0_20.0" is AddEntryLatency between (10ms, 20ms]
  • quantile="20.0_50.0" is AddEntryLatency between (20ms, 50ms]
  • quantile="50.0_100.0" is AddEntryLatency between (50ms, 100ms]
  • quantile="100.0_200.0" is AddEntryLatency between (100ms, 200ms]
  • quantile="200.0_1000.0" is AddEntryLatency between (200ms, 1s]
| | pulsar_ml_AddEntryLatencyBuckets_OVERFLOW | Gauge | The add entry latency > 1s |