From 4cda98534d673f7f6ea0b64d6d69a1d0e50a843b Mon Sep 17 00:00:00 2001 From: GuoJiwei Date: Tue, 31 Aug 2021 22:58:44 +0800 Subject: [PATCH] Expose compaction metrics to Prometheus (#11739) ### Motivation As #11564 has involved compaction metrics in CLI, it's extremely useful to expose relative metrics to Prometheus. - pulsar_compaction_removed_event_count : the removed event count of compaction . - pulsar_compaction_succeed_count : the succeed count of compaction. - pulsar_compaction_failed_count : the failed count of compaction. - pulsar_compaction_duration_time_in_millis : the duration time of compaction. - pulsar_compaction_read_throughput : the read throughput of compaction. - pulsar_compaction_write_throughput : the write throughput of compaction. - pulsar_compaction_compacted_entries_count: the compacted entries count. - pulsar_compaction_compacted_entries_size: the compacted entries size; if users enable the topic level metrics and the topic has been compacted or doing the compaction, we should expose the compaction metrics for the topic level. If users disable the topic level metrics, we should expose the aggregated compaction metrics for the namespace level. (cherry picked from commit 656635ef7638c9df6be9513bd38d34f6872b2b29) (cherry picked from commit a80f1e8774d1c8f6505f19ed8ec1b97d75fa47d9) --- .../service/persistent/PersistentTopic.java | 46 ++++--- .../broker/stats/metrics/AbstractMetrics.java | 23 +--- .../prometheus/AggregatedNamespaceStats.java | 22 +++ .../prometheus/NamespaceStatsAggregator.java | 53 ++++++- .../broker/stats/prometheus/TopicStats.java | 73 +++++++++- .../compaction/CompactedTopicContext.java | 37 +++++ .../pulsar/compaction/CompactedTopicImpl.java | 12 -- .../pulsar/compaction/CompactionRecord.java | 130 ++++++++++++++++++ .../pulsar/compaction/CompactorMXBean.java | 26 +--- .../compaction/CompactorMXBeanImpl.java | 65 +++------ .../pulsar/compaction/TwoPhaseCompactor.java | 10 +- .../broker/stats/PrometheusMetricsTest.java | 93 +++++++++++++ .../AggregatedNamespaceStatsTest.java | 25 ++++ .../compaction/CompactorMXBeanImplTest.java | 40 ++++-- .../pulsar/compaction/CompactorTest.java | 9 +- site2/docs/reference-metrics.md | 10 ++ 16 files changed, 539 insertions(+), 135 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicContext.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionRecord.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index e70416faf7c96..6f35073b757de 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -145,6 +145,7 @@ import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.compaction.CompactedTopic; +import org.apache.pulsar.compaction.CompactedTopicContext; import org.apache.pulsar.compaction.CompactedTopicImpl; import org.apache.pulsar.compaction.Compactor; import org.apache.pulsar.compaction.CompactorMXBean; @@ -1817,15 +1818,16 @@ public TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBa stats.lastOffloadSuccessTimeStamp = ledger.getLastOffloadedSuccessTimestamp(); stats.lastOffloadFailureTimeStamp = ledger.getLastOffloadedFailureTimestamp(); Optional mxBean = getCompactorMXBean(); - stats.compaction.lastCompactionRemovedEventCount = mxBean.map(stat -> - stat.getLastCompactionRemovedEventCount(topic)).orElse(0L); - stats.compaction.lastCompactionSucceedTimestamp = mxBean.map(stat -> - stat.getLastCompactionSucceedTimestamp(topic)).orElse(0L); - stats.compaction.lastCompactionFailedTimestamp = mxBean.map(stat -> - stat.getLastCompactionFailedTimestamp(topic)).orElse(0L); - stats.compaction.lastCompactionDurationTimeInMills = mxBean.map(stat -> - stat.getLastCompactionDurationTimeInMills(topic)).orElse(0L); + stats.compaction.reset(); + mxBean.flatMap(bean -> bean.getCompactionRecordForTopic(topic)).map(compactionRecord -> { + stats.compaction.lastCompactionRemovedEventCount = compactionRecord.getLastCompactionRemovedEventCount(); + stats.compaction.lastCompactionSucceedTimestamp = compactionRecord.getLastCompactionSucceedTimestamp(); + stats.compaction.lastCompactionFailedTimestamp = compactionRecord.getLastCompactionFailedTimestamp(); + stats.compaction.lastCompactionDurationTimeInMills = + compactionRecord.getLastCompactionDurationTimeInMills(); + return compactionRecord; + }); return stats; } @@ -1902,19 +1904,14 @@ public CompletableFuture getInternalStats(boolean info.entries = -1; info.size = -1; - try { - Optional compactedTopicContext = - ((CompactedTopicImpl) compactedTopic) - .getCompactedTopicContext(); - if (compactedTopicContext.isPresent()) { - CompactedTopicImpl.CompactedTopicContext ledgerContext = compactedTopicContext.get(); - info.ledgerId = ledgerContext.getLedger().getId(); - info.entries = ledgerContext.getLedger().getLastAddConfirmed() + 1; - info.size = ledgerContext.getLedger().getLength(); - } - } catch (ExecutionException | InterruptedException e) { - log.warn("[{}]Fail to get ledger information for compacted topic.", topic); + Optional compactedTopicContext = getCompactedTopicContext(); + if (compactedTopicContext.isPresent()) { + CompactedTopicContext ledgerContext = compactedTopicContext.get(); + info.ledgerId = ledgerContext.getLedger().getId(); + info.entries = ledgerContext.getLedger().getLastAddConfirmed() + 1; + info.size = ledgerContext.getLedger().getLength(); } + stats.compactedLedger = info; stats.cursors = Maps.newTreeMap(); @@ -2018,6 +2015,15 @@ public CompletableFuture getInternalStats(boolean return statFuture; } + public Optional getCompactedTopicContext() { + try { + return ((CompactedTopicImpl) compactedTopic).getCompactedTopicContext(); + } catch (ExecutionException | InterruptedException e) { + log.warn("[{}]Fail to get ledger information for compacted topic.", topic); + } + return Optional.empty(); + } + public long getBacklogSize() { return ledger.getEstimatedBacklogSize(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java index ed6e79fdbb7e0..610d22c54d8e9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java @@ -214,11 +214,7 @@ protected Metrics createMetricsByDimension(String namespace, String fromClusterN } protected void populateAggregationMap(Map> map, String mkey, double value) { - if (!map.containsKey(mkey)) { - map.put(mkey, Lists.newArrayList(value)); - } else { - map.get(mkey).add(value); - } + map.computeIfAbsent(mkey, __ -> Lists.newArrayList()).add(value); } protected void populateAggregationMapWithSum(Map map, String mkey, double value) { @@ -242,24 +238,11 @@ protected void populateMaxMap(Map map, String mkey, long value) { */ protected void populateDimensionMap(Map> ledgersByDimensionMap, Metrics metrics, ManagedLedgerImpl ledger) { - if (!ledgersByDimensionMap.containsKey(metrics)) { - // create new list - ledgersByDimensionMap.put(metrics, Lists.newArrayList(ledger)); - } else { - // add to collection - ledgersByDimensionMap.get(metrics).add(ledger); - } + ledgersByDimensionMap.computeIfAbsent(metrics, __ -> Lists.newArrayList()).add(ledger); } protected void populateDimensionMap(Map> topicsStatsByDimensionMap, Metrics metrics, TopicStats destStats) { - if (!topicsStatsByDimensionMap.containsKey(metrics)) { - // create new list - topicsStatsByDimensionMap.put(metrics, Lists.newArrayList(destStats)); - } else { - // add to collection - topicsStatsByDimensionMap.get(metrics).add(destStats); - } - + topicsStatsByDimensionMap.computeIfAbsent(metrics, __ -> Lists.newArrayList()).add(destStats); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java index c049366fe03b4..8bacd0f582de2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java @@ -20,6 +20,8 @@ import java.util.HashMap; import java.util.Map; +import org.apache.bookkeeper.mledger.util.StatsBuckets; +import org.apache.pulsar.compaction.CompactionRecord; public class AggregatedNamespaceStats { public int topicsCount; @@ -47,6 +49,16 @@ public class AggregatedNamespaceStats { public Map subscriptionStats = new HashMap<>(); + long compactionRemovedEventCount; + long compactionSucceedCount; + long compactionFailedCount; + long compactionDurationTimeInMills; + double compactionReadThroughput; + double compactionWriteThroughput; + long compactionCompactedEntriesCount; + long compactionCompactedEntriesSize; + StatsBuckets compactionLatencyBuckets = new StatsBuckets(CompactionRecord.WRITE_LATENCY_BUCKETS_USEC); + void updateStats(TopicStats stats) { topicsCount++; @@ -112,6 +124,16 @@ void updateStats(TopicStats stats) { consumerStats.unackedMessages += v.unackedMessages; }); }); + + compactionRemovedEventCount += stats.compactionRemovedEventCount; + compactionSucceedCount += stats.compactionSucceedCount; + compactionFailedCount += stats.compactionFailedCount; + compactionDurationTimeInMills += stats.compactionDurationTimeInMills; + compactionReadThroughput += stats.compactionReadThroughput; + compactionWriteThroughput += stats.compactionWriteThroughput; + compactionCompactedEntriesCount += stats.compactionCompactedEntriesCount; + compactionCompactedEntriesSize += stats.compactionCompactedEntriesSize; + compactionLatencyBuckets.addAll(stats.compactionLatencyBuckets); } public void reset() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index c08641d46a936..cbc966e3f9800 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -19,9 +19,13 @@ package org.apache.pulsar.broker.stats.prometheus; import io.netty.util.concurrent.FastThreadLocal; +import java.util.Optional; import java.util.concurrent.atomic.LongAdder; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl; +import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -29,7 +33,11 @@ import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl; import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; import org.apache.pulsar.common.util.SimpleTextOutputStream; +import org.apache.pulsar.compaction.CompactedTopicContext; +import org.apache.pulsar.compaction.Compactor; +import org.apache.pulsar.compaction.CompactorMXBean; +@Slf4j public class NamespaceStatsAggregator { private static FastThreadLocal localNamespaceStats = @@ -56,6 +64,7 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b printDefaultBrokerStats(stream, cluster); + Optional compactorMXBean = getCompactorMXBean(pulsar); LongAdder topicsCount = new LongAdder(); pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> { namespaceStats.reset(); @@ -65,11 +74,13 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b topicsMap.forEach((name, topic) -> { getTopicStats(topic, topicStats, includeConsumerMetrics, includeProducerMetrics, pulsar.getConfiguration().isExposePreciseBacklogInPrometheus(), - pulsar.getConfiguration().isExposeSubscriptionBacklogSizeInPrometheus()); + pulsar.getConfiguration().isExposeSubscriptionBacklogSizeInPrometheus(), + compactorMXBean + ); if (includeTopicMetrics) { topicsCount.add(1); - TopicStats.printTopicStats(stream, cluster, namespace, name, topicStats); + TopicStats.printTopicStats(stream, cluster, namespace, name, topicStats, compactorMXBean); } else { namespaceStats.updateStats(topicStats); } @@ -86,8 +97,19 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b }); } + private static Optional getCompactorMXBean(PulsarService pulsar) { + Compactor compactor = null; + try { + compactor = pulsar.getCompactor(false); + } catch (PulsarServerException e) { + log.error("get compactor error", e); + } + return Optional.ofNullable(compactor).map(c -> c.getStats()); + } + private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics, - boolean includeProducerMetrics, boolean getPreciseBacklog, boolean subscriptionBacklogSize) { + boolean includeProducerMetrics, boolean getPreciseBacklog, boolean subscriptionBacklogSize, + Optional compactorMXBean) { stats.reset(); if (topic instanceof PersistentTopic) { @@ -217,6 +239,31 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include aggReplStats.connectedCount += replStats.connected ? 1 : 0; aggReplStats.replicationDelayInSeconds += replStats.replicationDelayInSeconds; }); + + compactorMXBean + .flatMap(mxBean -> mxBean.getCompactionRecordForTopic(topic.getName())) + .map(compactionRecord -> { + stats.compactionRemovedEventCount = compactionRecord.getCompactionRemovedEventCount(); + stats.compactionSucceedCount = compactionRecord.getCompactionSucceedCount(); + stats.compactionFailedCount = compactionRecord.getCompactionFailedCount(); + stats.compactionDurationTimeInMills = compactionRecord.getCompactionDurationTimeInMills(); + stats.compactionReadThroughput = compactionRecord.getCompactionReadThroughput(); + stats.compactionWriteThroughput = compactionRecord.getCompactionWriteThroughput(); + stats.compactionLatencyBuckets.addAll(compactionRecord.getCompactionLatencyStats()); + stats.compactionLatencyBuckets.refresh(); + PersistentTopic persistentTopic = (PersistentTopic) topic; + Optional compactedTopicContext = persistentTopic + .getCompactedTopicContext(); + if (compactedTopicContext.isPresent()) { + LedgerHandle ledger = compactedTopicContext.get().getLedger(); + long entries = ledger.getLastAddConfirmed() + 1; + long size = ledger.getLength(); + + stats.compactionCompactedEntriesCount = entries; + stats.compactionCompactedEntriesSize = size; + } + return compactionRecord; + }); } private static void printDefaultBrokerStats(SimpleTextOutputStream stream, String cluster) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java index 9474ecb1656da..61e0175282f86 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java @@ -20,7 +20,11 @@ import java.util.HashMap; import java.util.Map; +import java.util.Optional; +import org.apache.bookkeeper.mledger.util.StatsBuckets; import org.apache.pulsar.common.util.SimpleTextOutputStream; +import org.apache.pulsar.compaction.CompactionRecord; +import org.apache.pulsar.compaction.CompactorMXBean; class TopicStats { @@ -51,6 +55,16 @@ class TopicStats { // Used for tracking duplicate TYPE definitions static Map metricWithTypeDefinition = new HashMap<>(); + // For compaction + long compactionRemovedEventCount; + long compactionSucceedCount; + long compactionFailedCount; + long compactionDurationTimeInMills; + double compactionReadThroughput; + double compactionWriteThroughput; + long compactionCompactedEntriesCount; + long compactionCompactedEntriesSize; + StatsBuckets compactionLatencyBuckets = new StatsBuckets(CompactionRecord.WRITE_LATENCY_BUCKETS_USEC); public void reset() { subscriptionsCount = 0; @@ -73,6 +87,16 @@ public void reset() { replicationStats.clear(); subscriptionStats.clear(); producerStats.clear(); + + compactionRemovedEventCount = 0; + compactionSucceedCount = 0; + compactionFailedCount = 0; + compactionDurationTimeInMills = 0; + compactionReadThroughput = 0; + compactionWriteThroughput = 0; + compactionCompactedEntriesCount = 0; + compactionCompactedEntriesSize = 0; + compactionLatencyBuckets.reset(); } static void resetTypes() { @@ -80,7 +104,7 @@ static void resetTypes() { } static void printTopicStats(SimpleTextOutputStream stream, String cluster, String namespace, String topic, - TopicStats stats) { + TopicStats stats, Optional compactorMXBean) { metric(stream, cluster, namespace, topic, "pulsar_subscriptions_count", stats.subscriptionsCount); metric(stream, cluster, namespace, topic, "pulsar_producers_count", stats.producersCount); metric(stream, cluster, namespace, topic, "pulsar_consumers_count", stats.consumersCount); @@ -250,6 +274,53 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin metric(stream, cluster, namespace, topic, "pulsar_in_bytes_total", stats.bytesInCounter); metric(stream, cluster, namespace, topic, "pulsar_in_messages_total", stats.msgInCounter); + + // Compaction + boolean hasCompaction = compactorMXBean.flatMap(mxBean -> mxBean.getCompactionRecordForTopic(topic)) + .map(__ -> true).orElse(false); + if (hasCompaction) { + metric(stream, cluster, namespace, topic, "pulsar_compaction_removed_event_count", + stats.compactionRemovedEventCount); + metric(stream, cluster, namespace, topic, "pulsar_compaction_succeed_count", + stats.compactionSucceedCount); + metric(stream, cluster, namespace, topic, "pulsar_compaction_failed_count", + stats.compactionFailedCount); + metric(stream, cluster, namespace, topic, "pulsar_compaction_duration_time_in_mills", + stats.compactionDurationTimeInMills); + metric(stream, cluster, namespace, topic, "pulsar_compaction_read_throughput", + stats.compactionReadThroughput); + metric(stream, cluster, namespace, topic, "pulsar_compaction_write_throughput", + stats.compactionWriteThroughput); + metric(stream, cluster, namespace, topic, "pulsar_compaction_compacted_entries_count", + stats.compactionCompactedEntriesCount); + metric(stream, cluster, namespace, topic, "pulsar_compaction_compacted_entries_size", + stats.compactionCompactedEntriesSize); + long[] compactionLatencyBuckets = stats.compactionLatencyBuckets.getBuckets(); + metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_0_5", + compactionLatencyBuckets[0]); + metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_1", + compactionLatencyBuckets[1]); + metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_5", + compactionLatencyBuckets[2]); + metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_10", + compactionLatencyBuckets[3]); + metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_20", + compactionLatencyBuckets[4]); + metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_50", + compactionLatencyBuckets[5]); + metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_100", + compactionLatencyBuckets[6]); + metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_200", + compactionLatencyBuckets[7]); + metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_1000", + compactionLatencyBuckets[8]); + metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_overflow", + compactionLatencyBuckets[9]); + metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_sum", + stats.compactionLatencyBuckets.getSum()); + metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_count", + stats.compactionLatencyBuckets.getCount()); + } } static void metricType(SimpleTextOutputStream stream, String name) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicContext.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicContext.java new file mode 100644 index 0000000000000..76b0642dcb3dd --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicContext.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.compaction; + +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import lombok.Getter; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.pulsar.common.api.proto.MessageIdData; + +@Getter +public class CompactedTopicContext { + + final LedgerHandle ledger; + final AsyncLoadingCache cache; + + public CompactedTopicContext(LedgerHandle ledger, AsyncLoadingCache cache) { + this.ledger = ledger; + this.cache = cache; + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java index e0755a71d28b6..502c3040b61c4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -30,7 +30,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import lombok.Getter; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerEntry; @@ -264,17 +263,6 @@ private static CompletableFuture> readEntries(LedgerHandle lh, long }); } - @Getter - public static class CompactedTopicContext { - final LedgerHandle ledger; - final AsyncLoadingCache cache; - - CompactedTopicContext(LedgerHandle ledger, AsyncLoadingCache cache) { - this.ledger = ledger; - this.cache = cache; - } - } - /** * Getter for CompactedTopicContext. * @return CompactedTopicContext diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionRecord.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionRecord.java new file mode 100644 index 0000000000000..4a8274389eb62 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionRecord.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; +import lombok.Getter; +import org.apache.bookkeeper.mledger.util.StatsBuckets; +import org.apache.pulsar.common.stats.Rate; + +public class CompactionRecord { + + public static final long[] WRITE_LATENCY_BUCKETS_USEC = { 500, 1_000, 5_000, 10_000, 20_000, 50_000, 100_000, + 200_000, 1000_000 }; + + @Getter + private long lastCompactionRemovedEventCount = 0L; + @Getter + private long lastCompactionSucceedTimestamp = 0L; + @Getter + private long lastCompactionFailedTimestamp = 0L; + @Getter + private long lastCompactionDurationTimeInMills = 0L; + + private LongAdder lastCompactionRemovedEventCountOp = new LongAdder(); + private long lastCompactionStartTimeOp; + + private final LongAdder compactionRemovedEventCount = new LongAdder(); + private final LongAdder compactionSucceedCount = new LongAdder(); + private final LongAdder compactionFailedCount = new LongAdder(); + private final LongAdder compactionDurationTimeInMills = new LongAdder(); + public final StatsBuckets writeLatencyStats = new StatsBuckets(WRITE_LATENCY_BUCKETS_USEC); + public final Rate writeRate = new Rate(); + public final Rate readRate = new Rate(); + + public void reset() { + compactionRemovedEventCount.reset(); + compactionSucceedCount.reset(); + compactionFailedCount.reset(); + compactionDurationTimeInMills.reset(); + writeLatencyStats.reset(); + } + + public void addCompactionRemovedEvent() { + lastCompactionRemovedEventCountOp.increment(); + compactionRemovedEventCount.increment(); + } + + public void addCompactionStartOp() { + lastCompactionRemovedEventCountOp.reset(); + lastCompactionStartTimeOp = System.currentTimeMillis(); + } + + public void addCompactionEndOp(boolean succeed) { + lastCompactionDurationTimeInMills = System.currentTimeMillis() + - lastCompactionStartTimeOp; + compactionDurationTimeInMills.add(lastCompactionDurationTimeInMills); + lastCompactionRemovedEventCount = lastCompactionRemovedEventCountOp.longValue(); + if (succeed) { + lastCompactionSucceedTimestamp = System.currentTimeMillis(); + compactionSucceedCount.increment(); + } else { + lastCompactionFailedTimestamp = System.currentTimeMillis(); + compactionFailedCount.increment(); + } + } + + public void addCompactionReadOp(long readableBytes) { + readRate.recordEvent(readableBytes); + } + + public void addCompactionWriteOp(long writeableBytes) { + writeRate.recordEvent(writeableBytes); + } + + public void addCompactionLatencyOp(long latency, TimeUnit unit) { + writeLatencyStats.addValue(unit.toMicros(latency)); + } + + public long getCompactionRemovedEventCount() { + return compactionRemovedEventCount.longValue(); + } + + public long getCompactionSucceedCount() { + return compactionSucceedCount.longValue(); + } + + public long getCompactionFailedCount() { + return compactionFailedCount.longValue(); + } + + public long getCompactionDurationTimeInMills() { + return compactionDurationTimeInMills.longValue(); + } + + public long[] getCompactionLatencyBuckets() { + writeLatencyStats.refresh(); + return writeLatencyStats.getBuckets(); + } + + public StatsBuckets getCompactionLatencyStats() { + return writeLatencyStats; + } + + public double getCompactionReadThroughput() { + readRate.calculateRate(); + return readRate.getValueRate(); + } + + public double getCompactionWriteThroughput() { + writeRate.calculateRate(); + return writeRate.getValueRate(); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java index 54ca2e875f042..7786e1939369d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.compaction; +import java.util.Optional; import org.apache.bookkeeper.common.annotation.InterfaceAudience; import org.apache.bookkeeper.common.annotation.InterfaceStability; @@ -28,30 +29,15 @@ @InterfaceStability.Stable public interface CompactorMXBean { - /** - * @return the removed event count of last compaction - */ - long getLastCompactionRemovedEventCount(String topic); - - /** - * @return the timestamp of last succeed compaction - */ - long getLastCompactionSucceedTimestamp(String topic); - - /** - * @return the timestamp of last failed compaction - */ - long getLastCompactionFailedTimestamp(String topic); - - /** - * @return the duration time of last compaction - */ - long getLastCompactionDurationTimeInMills(String topic); - /** * Remove metrics about this topic. * @param topic */ void removeTopic(String topic); + /** + * Get the compaction record of the topic. + * @param topic + */ + Optional getCompactionRecordForTopic(String topic); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java index 05db2aad4eae4..7b63127133e91 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java @@ -18,75 +18,54 @@ */ package org.apache.pulsar.compaction; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.LongAdder; +import java.util.concurrent.TimeUnit; public class CompactorMXBeanImpl implements CompactorMXBean { - private final ConcurrentHashMap compactRecordOps = new ConcurrentHashMap<>(); + private final ConcurrentHashMap compactionRecordOps = new ConcurrentHashMap<>(); public void addCompactionRemovedEvent(String topic) { - compactRecordOps.computeIfAbsent(topic, k -> new CompactRecord()).addCompactionRemovedEvent(); + compactionRecordOps.computeIfAbsent(topic, k -> new CompactionRecord()).addCompactionRemovedEvent(); } public void addCompactionStartOp(String topic) { - compactRecordOps.computeIfAbsent(topic, k -> new CompactRecord()).reset(); + compactionRecordOps.computeIfAbsent(topic, k -> new CompactionRecord()).addCompactionStartOp(); } public void addCompactionEndOp(String topic, boolean succeed) { - CompactRecord compactRecord = compactRecordOps.computeIfAbsent(topic, k -> new CompactRecord()); - compactRecord.lastCompactionDurationTimeInMills = System.currentTimeMillis() - - compactRecord.lastCompactionStartTimeOp; - compactRecord.lastCompactionRemovedEventCount = compactRecord.lastCompactionRemovedEventCountOp.longValue(); - if (succeed) { - compactRecord.lastCompactionSucceedTimestamp = System.currentTimeMillis(); - } else { - compactRecord.lastCompactionFailedTimestamp = System.currentTimeMillis(); - } + compactionRecordOps.computeIfAbsent(topic, k -> new CompactionRecord()).addCompactionEndOp(succeed); } @Override - public long getLastCompactionRemovedEventCount(String topic) { - return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionRemovedEventCount; + public void removeTopic(String topic) { + compactionRecordOps.remove(topic); } @Override - public long getLastCompactionSucceedTimestamp(String topic) { - return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionSucceedTimestamp; + public Optional getCompactionRecordForTopic(String topic) { + return Optional.ofNullable(compactionRecordOps.get(topic)); } - @Override - public long getLastCompactionFailedTimestamp(String topic) { - return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionFailedTimestamp; + public Set getTopics() { + return compactionRecordOps.keySet(); } - @Override - public long getLastCompactionDurationTimeInMills(String topic) { - return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionDurationTimeInMills; + public void reset() { + compactionRecordOps.values().forEach(CompactionRecord::reset); } - @Override - public void removeTopic(String topic) { - compactRecordOps.remove(topic); + public void addCompactionReadOp(String topic, long readableBytes) { + compactionRecordOps.computeIfAbsent(topic, k -> new CompactionRecord()).addCompactionReadOp(readableBytes); } - static class CompactRecord { - - private long lastCompactionRemovedEventCount = 0L; - private long lastCompactionSucceedTimestamp = 0L; - private long lastCompactionFailedTimestamp = 0L; - private long lastCompactionDurationTimeInMills = 0L; - - private LongAdder lastCompactionRemovedEventCountOp = new LongAdder(); - private long lastCompactionStartTimeOp; - - public void addCompactionRemovedEvent() { - lastCompactionRemovedEventCountOp.increment(); - } + public void addCompactionWriteOp(String topic, long writeableBytes) { + compactionRecordOps.computeIfAbsent(topic, k -> new CompactionRecord()).addCompactionWriteOp(writeableBytes); + } - public void reset() { - lastCompactionRemovedEventCountOp.reset(); - lastCompactionStartTimeOp = System.currentTimeMillis(); - } + public void addCompactionLatencyOp(String topic, long latency, TimeUnit unit) { + compactionRecordOps.computeIfAbsent(topic, k -> new CompactionRecord()).addCompactionLatencyOp(latency, unit); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index 8e617621f7cbb..af430ca681a45 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerHandle; @@ -124,6 +125,7 @@ private void phaseOneLoop(RawReader reader, MessageId id = m.getMessageId(); boolean deletedMessage = false; boolean replaceMessage = false; + mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes()); if (RawBatchConverter.isReadableBatch(m)) { try { for (ImmutableTriple e : RawBatchConverter @@ -233,6 +235,7 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map try { MessageId id = m.getMessageId(); Optional messageToAdd = Optional.empty(); + mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes()); if (RawBatchConverter.isReadableBatch(m)) { try { messageToAdd = RawBatchConverter.rebatchMessage( @@ -261,7 +264,7 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map RawMessage message = messageToAdd.get(); try { outstanding.acquire(); - CompletableFuture addFuture = addToCompactedLedger(lh, message) + CompletableFuture addFuture = addToCompactedLedger(lh, message, reader.getTopic()) .whenComplete((res, exception2) -> { outstanding.release(); if (exception2 != null) { @@ -362,12 +365,15 @@ private CompletableFuture closeLedger(LedgerHandle lh) { return bkf; } - private CompletableFuture addToCompactedLedger(LedgerHandle lh, RawMessage m) { + private CompletableFuture addToCompactedLedger(LedgerHandle lh, RawMessage m, String topic) { CompletableFuture bkf = new CompletableFuture<>(); ByteBuf serialized = m.serialize(); try { + mxBean.addCompactionWriteOp(topic, m.getHeadersAndPayload().readableBytes()); + long start = System.nanoTime(); lh.asyncAddEntry(serialized, (rc, ledger, eid, ctx) -> { + mxBean.addCompactionLatencyOp(topic, System.nanoTime() - start, TimeUnit.NANOSECONDS); if (rc != BKException.Code.OK) { bkf.completeExceptionally(BKException.create(rc)); } else { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index 73879156fdaaa..3de3a0411884c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -27,6 +27,7 @@ import com.google.common.base.Splitter; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.jsonwebtoken.SignatureAlgorithm; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -43,14 +44,18 @@ import java.util.Objects; import java.util.Optional; import java.util.Properties; +import java.util.Random; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.crypto.SecretKey; import javax.naming.AuthenticationException; import lombok.Cleanup; +import org.apache.bookkeeper.client.BookKeeper; import org.apache.commons.io.IOUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; @@ -63,10 +68,13 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.compaction.Compactor; +import org.apache.pulsar.compaction.TwoPhaseCompactor; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -1136,6 +1144,91 @@ void testParseMetrics() throws IOException { parseMetrics(sampleMetrics); } + @Test + public void testCompaction() throws Exception { + final String topicName = "persistent://my-namespace/use/my-ns/my-compaction1"; + + Producer producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); + PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut); + String metricsStr = statsOut.toString(); + Multimap metrics = parseMetrics(metricsStr); + List cm = (List) metrics.get("pulsar_compaction_removed_event_count"); + assertEquals(cm.size(), 0); + cm = (List) metrics.get("pulsar_compaction_succeed_count"); + assertEquals(cm.size(), 0); + cm = (List) metrics.get("pulsar_compaction_failed_count"); + assertEquals(cm.size(), 0); + cm = (List) metrics.get("pulsar_compaction_duration_time_in_mills"); + assertEquals(cm.size(), 0); + cm = (List) metrics.get("pulsar_compaction_read_throughput"); + assertEquals(cm.size(), 0); + cm = (List) metrics.get("pulsar_compaction_write_throughput"); + assertEquals(cm.size(), 0); + cm = (List) metrics.get("pulsar_compaction_compacted_entries_count"); + assertEquals(cm.size(), 0); + cm = (List) metrics.get("pulsar_compaction_compacted_entries_size"); + assertEquals(cm.size(), 0); + // + final int numMessages = 1000; + final int maxKeys = 10; + Random r = new Random(0); + for (int j = 0; j < numMessages; j++) { + int keyIndex = r.nextInt(maxKeys); + String key = "key"+keyIndex; + byte[] data = ("my-message-" + key + "-" + j).getBytes(); + producer.newMessage() + .key(key) + .value(data) + .send(); + } + ScheduledExecutorService compactionScheduler = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build()); + Compactor compactor = pulsar.getCompactor(true); + compactor.compact(topicName).get(); + statsOut = new ByteArrayOutputStream(); + PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut); + metricsStr = statsOut.toString(); + metrics = parseMetrics(metricsStr); + cm = (List) metrics.get("pulsar_compaction_removed_event_count"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).value, 990); + cm = (List) metrics.get("pulsar_compaction_succeed_count"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).value, 1); + cm = (List) metrics.get("pulsar_compaction_failed_count"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).value, 0); + cm = (List) metrics.get("pulsar_compaction_duration_time_in_mills"); + assertEquals(cm.size(), 1); + assertTrue(cm.get(0).value > 0); + cm = (List) metrics.get("pulsar_compaction_read_throughput"); + assertEquals(cm.size(), 1); + assertTrue(cm.get(0).value > 0); + cm = (List) metrics.get("pulsar_compaction_write_throughput"); + assertEquals(cm.size(), 1); + assertTrue(cm.get(0).value > 0); + cm = (List) metrics.get("pulsar_compaction_compacted_entries_count"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).value, 10); + cm = (List) metrics.get("pulsar_compaction_compacted_entries_size"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).value, 870); + + pulsarClient.close(); + } + + private void compareCompactionStateCount(List cm, double count) { + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).tags.get("cluster"), "test"); + assertEquals(cm.get(0).tags.get("broker"), "localhost"); + assertEquals(cm.get(0).value, count); + } + /** * Hacky parsing of Prometheus text format. Should be good enough for unit tests */ diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java index 978492887b9ac..39ecc4235888a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java @@ -43,6 +43,14 @@ public void testSimpleAggregation() { topicStats1.msgBacklog = 30; topicStats1.managedLedgerStats.storageWriteRate = 12.0; topicStats1.managedLedgerStats.storageReadRate = 6.0; + topicStats1.compactionRemovedEventCount = 10; + topicStats1.compactionSucceedCount = 1; + topicStats1.compactionFailedCount = 2; + topicStats1.compactionDurationTimeInMills = 1000; + topicStats1.compactionReadThroughput = 15.0; + topicStats1.compactionWriteThroughput = 20.0; + topicStats1.compactionCompactedEntriesCount = 30; + topicStats1.compactionCompactedEntriesSize = 1000; AggregatedReplicationStats replStats1 = new AggregatedReplicationStats(); replStats1.msgRateIn = 1.0; @@ -75,6 +83,14 @@ public void testSimpleAggregation() { topicStats2.msgBacklog = 7; topicStats2.managedLedgerStats.storageWriteRate = 5.0; topicStats2.managedLedgerStats.storageReadRate = 2.5; + topicStats2.compactionRemovedEventCount = 10; + topicStats2.compactionSucceedCount = 1; + topicStats2.compactionFailedCount = 2; + topicStats2.compactionDurationTimeInMills = 1000; + topicStats2.compactionReadThroughput = 15.0; + topicStats2.compactionWriteThroughput = 20.0; + topicStats2.compactionCompactedEntriesCount = 30; + topicStats2.compactionCompactedEntriesSize = 1000; AggregatedReplicationStats replStats2 = new AggregatedReplicationStats(); replStats2.msgRateIn = 3.5; @@ -113,6 +129,15 @@ public void testSimpleAggregation() { assertEquals(nsStats.managedLedgerStats.storageSize, 6144); assertEquals(nsStats.managedLedgerStats.storageLogicalSize, 2560); + assertEquals(nsStats.compactionRemovedEventCount, 20); + assertEquals(nsStats.compactionSucceedCount, 2); + assertEquals(nsStats.compactionFailedCount, 4); + assertEquals(nsStats.compactionDurationTimeInMills, 2000); + assertEquals(nsStats.compactionReadThroughput, 30.0); + assertEquals(nsStats.compactionWriteThroughput, 40.0); + assertEquals(nsStats.compactionCompactedEntriesCount, 60); + assertEquals(nsStats.compactionCompactedEntriesSize, 2000); + AggregatedReplicationStats nsReplStats = nsStats.replicationStats.get(namespace); assertNotNull(nsReplStats); assertEquals(nsReplStats.msgRateIn, 4.5); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorMXBeanImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorMXBeanImplTest.java index b865396e87b2a..a3f2caedfcb28 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorMXBeanImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorMXBeanImplTest.java @@ -18,12 +18,9 @@ */ package org.apache.pulsar.compaction; -import org.apache.pulsar.broker.service.BrokerService; -import org.mockito.Mockito; import org.testng.annotations.Test; -import java.util.Optional; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -36,15 +33,38 @@ public void testSimple() throws Exception { CompactorMXBeanImpl mxBean = new CompactorMXBeanImpl(); String topic = "topic1"; mxBean.addCompactionStartOp(topic); - assertEquals(mxBean.getLastCompactionRemovedEventCount(topic), 0, 0); + CompactionRecord compaction = mxBean.getCompactionRecordForTopic(topic).get(); + assertEquals(compaction.getLastCompactionRemovedEventCount(), 0, 0); mxBean.addCompactionRemovedEvent(topic); - assertEquals(mxBean.getLastCompactionRemovedEventCount(topic), 0, 0); + assertEquals(compaction.getLastCompactionRemovedEventCount(), 0, 0); mxBean.addCompactionEndOp(topic, true); + assertEquals(compaction.getLastCompactionRemovedEventCount(), 1, 0); + assertTrue(compaction.getLastCompactionSucceedTimestamp() > 0L); + assertTrue(compaction.getLastCompactionDurationTimeInMills() >= 0L); + assertEquals(compaction.getLastCompactionFailedTimestamp(), 0, 0); + assertEquals(compaction.getCompactionRemovedEventCount(), 1, 0); + assertEquals(compaction.getCompactionSucceedCount(), 1, 0); + assertEquals(compaction.getCompactionFailedCount(), 0, 0); + assertTrue(compaction.getCompactionDurationTimeInMills() >= 0L); + mxBean.addCompactionStartOp(topic); + mxBean.addCompactionRemovedEvent(topic); mxBean.addCompactionEndOp(topic, false); - assertEquals(mxBean.getLastCompactionRemovedEventCount(topic), 1, 0); - assertTrue(mxBean.getLastCompactionSucceedTimestamp(topic) > 0L); - assertTrue(mxBean.getLastCompactionFailedTimestamp(topic) > 0L); - assertTrue(mxBean.getLastCompactionDurationTimeInMills(topic) >= 0L); + assertEquals(compaction.getCompactionRemovedEventCount(), 2, 0); + assertEquals(compaction.getCompactionFailedCount(), 1, 0); + assertEquals(compaction.getCompactionSucceedCount(), 1, 0); + assertTrue(compaction.getLastCompactionFailedTimestamp() > 0L); + assertTrue(compaction.getCompactionDurationTimeInMills() >= 0L); + mxBean.addCompactionReadOp(topic, 22); + assertTrue(compaction.getCompactionReadThroughput() > 0L); + mxBean.addCompactionWriteOp(topic, 33); + assertTrue(compaction.getCompactionWriteThroughput() > 0L); + mxBean.addCompactionLatencyOp(topic, 10, TimeUnit.NANOSECONDS); + assertTrue(compaction.getCompactionLatencyBuckets()[0] > 0l); + mxBean.reset(); + assertEquals(compaction.getCompactionRemovedEventCount(), 0, 0); + assertEquals(compaction.getCompactionSucceedCount(), 0, 0); + assertEquals(compaction.getCompactionFailedCount(), 0, 0); + assertEquals(compaction.getCompactionDurationTimeInMills(), 0, 0); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java index 14997891b2b6a..cb562fe9ab789 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java @@ -109,10 +109,11 @@ private List compactAndVerify(String topic, Map expected m.close(); } if (checkMetrics) { - long compactedTopicRemovedEventCount = compactor.getStats().getLastCompactionRemovedEventCount(topic); - long lastCompactSucceedTimestamp = compactor.getStats().getLastCompactionSucceedTimestamp(topic); - long lastCompactFailedTimestamp = compactor.getStats().getLastCompactionFailedTimestamp(topic); - long lastCompactDurationTimeInMills = compactor.getStats().getLastCompactionDurationTimeInMills(topic); + CompactionRecord compactionRecord = compactor.getStats().getCompactionRecordForTopic(topic).get(); + long compactedTopicRemovedEventCount = compactionRecord.getLastCompactionRemovedEventCount(); + long lastCompactSucceedTimestamp = compactionRecord.getLastCompactionSucceedTimestamp(); + long lastCompactFailedTimestamp = compactionRecord.getLastCompactionFailedTimestamp(); + long lastCompactDurationTimeInMills = compactionRecord.getLastCompactionDurationTimeInMills(); Assert.assertTrue(compactedTopicRemovedEventCount >= 1); Assert.assertTrue(lastCompactSucceedTimestamp >= 1L); Assert.assertTrue(lastCompactDurationTimeInMills >= 0L); diff --git a/site2/docs/reference-metrics.md b/site2/docs/reference-metrics.md index aa69b79053716..6bbedd31fbe8a 100644 --- a/site2/docs/reference-metrics.md +++ b/site2/docs/reference-metrics.md @@ -204,6 +204,16 @@ All the topic metrics are labelled with the following labels: | pulsar_in_messages_total | Counter | The total number of messages received for this topic | | pulsar_out_bytes_total | Counter | The total number of bytes read from this topic | | pulsar_out_messages_total | Counter | The total number of messages read from this topic | +| pulsar_compaction_removed_event_count | Gauge | The removed event count of compaction | +| pulsar_compaction_succeed_count | Gauge | The succeed count of compaction | +| pulsar_compaction_failed_count | Gauge | The failed count of compaction | +| pulsar_compaction_duration_time_in_mills | Gauge | The duration time of compaction | +| pulsar_compaction_read_throughput | Gauge | The read throughput of compaction | +| pulsar_compaction_write_throughput | Gauge | The write throughput of compaction | +| pulsar_compaction_latency_le_* | Histogram | The compaction latency with given quantile.
Available thresholds:
  • pulsar_compaction_latency_le_0_5: <= 0.5ms
  • pulsar_compaction_latency_le_1: <= 1ms
  • pulsar_compaction_latency_le_5: <= 5ms
  • pulsar_compaction_latency_le_10: <= 10ms
  • pulsar_compaction_latency_le_20: <= 20ms
  • pulsar_compaction_latency_le_50: <= 50ms
  • pulsar_compaction_latency_le_100: <= 100ms
  • pulsar_compaction_latency_le_200: <= 200ms
  • pulsar_compaction_latency_le_1000: <= 1s
  • pulsar_compaction_latency_le_overflow: > 1s
| +| pulsar_compaction_compacted_entries_count | Gauge | The compacted entries count | +| pulsar_compaction_compacted_entries_size |Gauge | The compacted entries size | + #### Replication metrics