diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index b017750bb972f..a4db0bc63feaf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -149,6 +149,7 @@ import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.compaction.CompactedTopic; +import org.apache.pulsar.compaction.CompactedTopicContext; import org.apache.pulsar.compaction.CompactedTopicImpl; import org.apache.pulsar.compaction.Compactor; import org.apache.pulsar.compaction.CompactorMXBean; @@ -1902,15 +1903,16 @@ public TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBa stats.lastOffloadSuccessTimeStamp = ledger.getLastOffloadedSuccessTimestamp(); stats.lastOffloadFailureTimeStamp = ledger.getLastOffloadedFailureTimestamp(); Optional mxBean = getCompactorMXBean(); - stats.compaction.lastCompactionRemovedEventCount = mxBean.map(stat -> - stat.getLastCompactionRemovedEventCount(topic)).orElse(0L); - stats.compaction.lastCompactionSucceedTimestamp = mxBean.map(stat -> - stat.getLastCompactionSucceedTimestamp(topic)).orElse(0L); - stats.compaction.lastCompactionFailedTimestamp = mxBean.map(stat -> - stat.getLastCompactionFailedTimestamp(topic)).orElse(0L); - stats.compaction.lastCompactionDurationTimeInMills = mxBean.map(stat -> - stat.getLastCompactionDurationTimeInMills(topic)).orElse(0L); + stats.compaction.reset(); + mxBean.flatMap(bean -> bean.getCompactionRecordForTopic(topic)).map(compactionRecord -> { + stats.compaction.lastCompactionRemovedEventCount = compactionRecord.getLastCompactionRemovedEventCount(); + stats.compaction.lastCompactionSucceedTimestamp = compactionRecord.getLastCompactionSucceedTimestamp(); + stats.compaction.lastCompactionFailedTimestamp = compactionRecord.getLastCompactionFailedTimestamp(); + stats.compaction.lastCompactionDurationTimeInMills = + compactionRecord.getLastCompactionDurationTimeInMills(); + return compactionRecord; + }); return stats; } @@ -1987,19 +1989,14 @@ public CompletableFuture getInternalStats(boolean info.entries = -1; info.size = -1; - try { - Optional compactedTopicContext = - ((CompactedTopicImpl) compactedTopic) - .getCompactedTopicContext(); - if (compactedTopicContext.isPresent()) { - CompactedTopicImpl.CompactedTopicContext ledgerContext = compactedTopicContext.get(); - info.ledgerId = ledgerContext.getLedger().getId(); - info.entries = ledgerContext.getLedger().getLastAddConfirmed() + 1; - info.size = ledgerContext.getLedger().getLength(); - } - } catch (ExecutionException | InterruptedException e) { - log.warn("[{}]Fail to get ledger information for compacted topic.", topic); + Optional compactedTopicContext = getCompactedTopicContext(); + if (compactedTopicContext.isPresent()) { + CompactedTopicContext ledgerContext = compactedTopicContext.get(); + info.ledgerId = ledgerContext.getLedger().getId(); + info.entries = ledgerContext.getLedger().getLastAddConfirmed() + 1; + info.size = ledgerContext.getLedger().getLength(); } + stats.compactedLedger = info; stats.cursors = Maps.newTreeMap(); @@ -2116,6 +2113,15 @@ public CompletableFuture getInternalStats(boolean return statFuture; } + public Optional getCompactedTopicContext() { + try { + return ((CompactedTopicImpl) compactedTopic).getCompactedTopicContext(); + } catch (ExecutionException | InterruptedException e) { + log.warn("[{}]Fail to get ledger information for compacted topic.", topic); + } + return Optional.empty(); + } + public long getBacklogSize() { return ledger.getEstimatedBacklogSize(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java index ed6e79fdbb7e0..610d22c54d8e9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java @@ -214,11 +214,7 @@ protected Metrics createMetricsByDimension(String namespace, String fromClusterN } protected void populateAggregationMap(Map> map, String mkey, double value) { - if (!map.containsKey(mkey)) { - map.put(mkey, Lists.newArrayList(value)); - } else { - map.get(mkey).add(value); - } + map.computeIfAbsent(mkey, __ -> Lists.newArrayList()).add(value); } protected void populateAggregationMapWithSum(Map map, String mkey, double value) { @@ -242,24 +238,11 @@ protected void populateMaxMap(Map map, String mkey, long value) { */ protected void populateDimensionMap(Map> ledgersByDimensionMap, Metrics metrics, ManagedLedgerImpl ledger) { - if (!ledgersByDimensionMap.containsKey(metrics)) { - // create new list - ledgersByDimensionMap.put(metrics, Lists.newArrayList(ledger)); - } else { - // add to collection - ledgersByDimensionMap.get(metrics).add(ledger); - } + ledgersByDimensionMap.computeIfAbsent(metrics, __ -> Lists.newArrayList()).add(ledger); } protected void populateDimensionMap(Map> topicsStatsByDimensionMap, Metrics metrics, TopicStats destStats) { - if (!topicsStatsByDimensionMap.containsKey(metrics)) { - // create new list - topicsStatsByDimensionMap.put(metrics, Lists.newArrayList(destStats)); - } else { - // add to collection - topicsStatsByDimensionMap.get(metrics).add(destStats); - } - + topicsStatsByDimensionMap.computeIfAbsent(metrics, __ -> Lists.newArrayList()).add(destStats); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java index c049366fe03b4..8bacd0f582de2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java @@ -20,6 +20,8 @@ import java.util.HashMap; import java.util.Map; +import org.apache.bookkeeper.mledger.util.StatsBuckets; +import org.apache.pulsar.compaction.CompactionRecord; public class AggregatedNamespaceStats { public int topicsCount; @@ -47,6 +49,16 @@ public class AggregatedNamespaceStats { public Map subscriptionStats = new HashMap<>(); + long compactionRemovedEventCount; + long compactionSucceedCount; + long compactionFailedCount; + long compactionDurationTimeInMills; + double compactionReadThroughput; + double compactionWriteThroughput; + long compactionCompactedEntriesCount; + long compactionCompactedEntriesSize; + StatsBuckets compactionLatencyBuckets = new StatsBuckets(CompactionRecord.WRITE_LATENCY_BUCKETS_USEC); + void updateStats(TopicStats stats) { topicsCount++; @@ -112,6 +124,16 @@ void updateStats(TopicStats stats) { consumerStats.unackedMessages += v.unackedMessages; }); }); + + compactionRemovedEventCount += stats.compactionRemovedEventCount; + compactionSucceedCount += stats.compactionSucceedCount; + compactionFailedCount += stats.compactionFailedCount; + compactionDurationTimeInMills += stats.compactionDurationTimeInMills; + compactionReadThroughput += stats.compactionReadThroughput; + compactionWriteThroughput += stats.compactionWriteThroughput; + compactionCompactedEntriesCount += stats.compactionCompactedEntriesCount; + compactionCompactedEntriesSize += stats.compactionCompactedEntriesSize; + compactionLatencyBuckets.addAll(stats.compactionLatencyBuckets); } public void reset() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index e041a011fc795..a2661ed246f93 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -19,9 +19,13 @@ package org.apache.pulsar.broker.stats.prometheus; import io.netty.util.concurrent.FastThreadLocal; +import java.util.Optional; import java.util.concurrent.atomic.LongAdder; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl; +import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -30,7 +34,11 @@ import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl; import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; import org.apache.pulsar.common.util.SimpleTextOutputStream; +import org.apache.pulsar.compaction.CompactedTopicContext; +import org.apache.pulsar.compaction.Compactor; +import org.apache.pulsar.compaction.CompactorMXBean; +@Slf4j public class NamespaceStatsAggregator { private static FastThreadLocal localNamespaceStats = @@ -57,6 +65,7 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b printDefaultBrokerStats(stream, cluster); + Optional compactorMXBean = getCompactorMXBean(pulsar); LongAdder topicsCount = new LongAdder(); pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> { namespaceStats.reset(); @@ -66,11 +75,13 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b topicsMap.forEach((name, topic) -> { getTopicStats(topic, topicStats, includeConsumerMetrics, includeProducerMetrics, pulsar.getConfiguration().isExposePreciseBacklogInPrometheus(), - pulsar.getConfiguration().isExposeSubscriptionBacklogSizeInPrometheus()); + pulsar.getConfiguration().isExposeSubscriptionBacklogSizeInPrometheus(), + compactorMXBean + ); if (includeTopicMetrics) { topicsCount.add(1); - TopicStats.printTopicStats(stream, cluster, namespace, name, topicStats); + TopicStats.printTopicStats(stream, cluster, namespace, name, topicStats, compactorMXBean); } else { namespaceStats.updateStats(topicStats); } @@ -87,8 +98,19 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b }); } + private static Optional getCompactorMXBean(PulsarService pulsar) { + Compactor compactor = null; + try { + compactor = pulsar.getCompactor(false); + } catch (PulsarServerException e) { + log.error("get compactor error", e); + } + return Optional.ofNullable(compactor).map(c -> c.getStats()); + } + private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics, - boolean includeProducerMetrics, boolean getPreciseBacklog, boolean subscriptionBacklogSize) { + boolean includeProducerMetrics, boolean getPreciseBacklog, boolean subscriptionBacklogSize, + Optional compactorMXBean) { stats.reset(); if (topic instanceof PersistentTopic) { @@ -220,6 +242,31 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include aggReplStats.connectedCount += replStats.connected ? 1 : 0; aggReplStats.replicationDelayInSeconds += replStats.replicationDelayInSeconds; }); + + compactorMXBean + .flatMap(mxBean -> mxBean.getCompactionRecordForTopic(topic.getName())) + .map(compactionRecord -> { + stats.compactionRemovedEventCount = compactionRecord.getCompactionRemovedEventCount(); + stats.compactionSucceedCount = compactionRecord.getCompactionSucceedCount(); + stats.compactionFailedCount = compactionRecord.getCompactionFailedCount(); + stats.compactionDurationTimeInMills = compactionRecord.getCompactionDurationTimeInMills(); + stats.compactionReadThroughput = compactionRecord.getCompactionReadThroughput(); + stats.compactionWriteThroughput = compactionRecord.getCompactionWriteThroughput(); + stats.compactionLatencyBuckets.addAll(compactionRecord.getCompactionLatencyStats()); + stats.compactionLatencyBuckets.refresh(); + PersistentTopic persistentTopic = (PersistentTopic) topic; + Optional compactedTopicContext = persistentTopic + .getCompactedTopicContext(); + if (compactedTopicContext.isPresent()) { + LedgerHandle ledger = compactedTopicContext.get().getLedger(); + long entries = ledger.getLastAddConfirmed() + 1; + long size = ledger.getLength(); + + stats.compactionCompactedEntriesCount = entries; + stats.compactionCompactedEntriesSize = size; + } + return compactionRecord; + }); } private static void printDefaultBrokerStats(SimpleTextOutputStream stream, String cluster) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java index 9474ecb1656da..61e0175282f86 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java @@ -20,7 +20,11 @@ import java.util.HashMap; import java.util.Map; +import java.util.Optional; +import org.apache.bookkeeper.mledger.util.StatsBuckets; import org.apache.pulsar.common.util.SimpleTextOutputStream; +import org.apache.pulsar.compaction.CompactionRecord; +import org.apache.pulsar.compaction.CompactorMXBean; class TopicStats { @@ -51,6 +55,16 @@ class TopicStats { // Used for tracking duplicate TYPE definitions static Map metricWithTypeDefinition = new HashMap<>(); + // For compaction + long compactionRemovedEventCount; + long compactionSucceedCount; + long compactionFailedCount; + long compactionDurationTimeInMills; + double compactionReadThroughput; + double compactionWriteThroughput; + long compactionCompactedEntriesCount; + long compactionCompactedEntriesSize; + StatsBuckets compactionLatencyBuckets = new StatsBuckets(CompactionRecord.WRITE_LATENCY_BUCKETS_USEC); public void reset() { subscriptionsCount = 0; @@ -73,6 +87,16 @@ public void reset() { replicationStats.clear(); subscriptionStats.clear(); producerStats.clear(); + + compactionRemovedEventCount = 0; + compactionSucceedCount = 0; + compactionFailedCount = 0; + compactionDurationTimeInMills = 0; + compactionReadThroughput = 0; + compactionWriteThroughput = 0; + compactionCompactedEntriesCount = 0; + compactionCompactedEntriesSize = 0; + compactionLatencyBuckets.reset(); } static void resetTypes() { @@ -80,7 +104,7 @@ static void resetTypes() { } static void printTopicStats(SimpleTextOutputStream stream, String cluster, String namespace, String topic, - TopicStats stats) { + TopicStats stats, Optional compactorMXBean) { metric(stream, cluster, namespace, topic, "pulsar_subscriptions_count", stats.subscriptionsCount); metric(stream, cluster, namespace, topic, "pulsar_producers_count", stats.producersCount); metric(stream, cluster, namespace, topic, "pulsar_consumers_count", stats.consumersCount); @@ -250,6 +274,53 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin metric(stream, cluster, namespace, topic, "pulsar_in_bytes_total", stats.bytesInCounter); metric(stream, cluster, namespace, topic, "pulsar_in_messages_total", stats.msgInCounter); + + // Compaction + boolean hasCompaction = compactorMXBean.flatMap(mxBean -> mxBean.getCompactionRecordForTopic(topic)) + .map(__ -> true).orElse(false); + if (hasCompaction) { + metric(stream, cluster, namespace, topic, "pulsar_compaction_removed_event_count", + stats.compactionRemovedEventCount); + metric(stream, cluster, namespace, topic, "pulsar_compaction_succeed_count", + stats.compactionSucceedCount); + metric(stream, cluster, namespace, topic, "pulsar_compaction_failed_count", + stats.compactionFailedCount); + metric(stream, cluster, namespace, topic, "pulsar_compaction_duration_time_in_mills", + stats.compactionDurationTimeInMills); + metric(stream, cluster, namespace, topic, "pulsar_compaction_read_throughput", + stats.compactionReadThroughput); + metric(stream, cluster, namespace, topic, "pulsar_compaction_write_throughput", + stats.compactionWriteThroughput); + metric(stream, cluster, namespace, topic, "pulsar_compaction_compacted_entries_count", + stats.compactionCompactedEntriesCount); + metric(stream, cluster, namespace, topic, "pulsar_compaction_compacted_entries_size", + stats.compactionCompactedEntriesSize); + long[] compactionLatencyBuckets = stats.compactionLatencyBuckets.getBuckets(); + metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_0_5", + compactionLatencyBuckets[0]); + metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_1", + compactionLatencyBuckets[1]); + metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_5", + compactionLatencyBuckets[2]); + metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_10", + compactionLatencyBuckets[3]); + metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_20", + compactionLatencyBuckets[4]); + metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_50", + compactionLatencyBuckets[5]); + metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_100", + compactionLatencyBuckets[6]); + metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_200", + compactionLatencyBuckets[7]); + metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_1000", + compactionLatencyBuckets[8]); + metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_overflow", + compactionLatencyBuckets[9]); + metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_sum", + stats.compactionLatencyBuckets.getSum()); + metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_count", + stats.compactionLatencyBuckets.getCount()); + } } static void metricType(SimpleTextOutputStream stream, String name) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicContext.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicContext.java new file mode 100644 index 0000000000000..76b0642dcb3dd --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicContext.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.compaction; + +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import lombok.Getter; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.pulsar.common.api.proto.MessageIdData; + +@Getter +public class CompactedTopicContext { + + final LedgerHandle ledger; + final AsyncLoadingCache cache; + + public CompactedTopicContext(LedgerHandle ledger, AsyncLoadingCache cache) { + this.ledger = ledger; + this.cache = cache; + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java index 21e9a1da563ce..b2f2d87680f4c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -30,7 +30,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import lombok.Getter; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerEntry; @@ -274,17 +273,6 @@ private static CompletableFuture> readEntries(LedgerHandle lh, long }); } - @Getter - public static class CompactedTopicContext { - final LedgerHandle ledger; - final AsyncLoadingCache cache; - - CompactedTopicContext(LedgerHandle ledger, AsyncLoadingCache cache) { - this.ledger = ledger; - this.cache = cache; - } - } - /** * Getter for CompactedTopicContext. * @return CompactedTopicContext diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionRecord.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionRecord.java new file mode 100644 index 0000000000000..4a8274389eb62 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionRecord.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; +import lombok.Getter; +import org.apache.bookkeeper.mledger.util.StatsBuckets; +import org.apache.pulsar.common.stats.Rate; + +public class CompactionRecord { + + public static final long[] WRITE_LATENCY_BUCKETS_USEC = { 500, 1_000, 5_000, 10_000, 20_000, 50_000, 100_000, + 200_000, 1000_000 }; + + @Getter + private long lastCompactionRemovedEventCount = 0L; + @Getter + private long lastCompactionSucceedTimestamp = 0L; + @Getter + private long lastCompactionFailedTimestamp = 0L; + @Getter + private long lastCompactionDurationTimeInMills = 0L; + + private LongAdder lastCompactionRemovedEventCountOp = new LongAdder(); + private long lastCompactionStartTimeOp; + + private final LongAdder compactionRemovedEventCount = new LongAdder(); + private final LongAdder compactionSucceedCount = new LongAdder(); + private final LongAdder compactionFailedCount = new LongAdder(); + private final LongAdder compactionDurationTimeInMills = new LongAdder(); + public final StatsBuckets writeLatencyStats = new StatsBuckets(WRITE_LATENCY_BUCKETS_USEC); + public final Rate writeRate = new Rate(); + public final Rate readRate = new Rate(); + + public void reset() { + compactionRemovedEventCount.reset(); + compactionSucceedCount.reset(); + compactionFailedCount.reset(); + compactionDurationTimeInMills.reset(); + writeLatencyStats.reset(); + } + + public void addCompactionRemovedEvent() { + lastCompactionRemovedEventCountOp.increment(); + compactionRemovedEventCount.increment(); + } + + public void addCompactionStartOp() { + lastCompactionRemovedEventCountOp.reset(); + lastCompactionStartTimeOp = System.currentTimeMillis(); + } + + public void addCompactionEndOp(boolean succeed) { + lastCompactionDurationTimeInMills = System.currentTimeMillis() + - lastCompactionStartTimeOp; + compactionDurationTimeInMills.add(lastCompactionDurationTimeInMills); + lastCompactionRemovedEventCount = lastCompactionRemovedEventCountOp.longValue(); + if (succeed) { + lastCompactionSucceedTimestamp = System.currentTimeMillis(); + compactionSucceedCount.increment(); + } else { + lastCompactionFailedTimestamp = System.currentTimeMillis(); + compactionFailedCount.increment(); + } + } + + public void addCompactionReadOp(long readableBytes) { + readRate.recordEvent(readableBytes); + } + + public void addCompactionWriteOp(long writeableBytes) { + writeRate.recordEvent(writeableBytes); + } + + public void addCompactionLatencyOp(long latency, TimeUnit unit) { + writeLatencyStats.addValue(unit.toMicros(latency)); + } + + public long getCompactionRemovedEventCount() { + return compactionRemovedEventCount.longValue(); + } + + public long getCompactionSucceedCount() { + return compactionSucceedCount.longValue(); + } + + public long getCompactionFailedCount() { + return compactionFailedCount.longValue(); + } + + public long getCompactionDurationTimeInMills() { + return compactionDurationTimeInMills.longValue(); + } + + public long[] getCompactionLatencyBuckets() { + writeLatencyStats.refresh(); + return writeLatencyStats.getBuckets(); + } + + public StatsBuckets getCompactionLatencyStats() { + return writeLatencyStats; + } + + public double getCompactionReadThroughput() { + readRate.calculateRate(); + return readRate.getValueRate(); + } + + public double getCompactionWriteThroughput() { + writeRate.calculateRate(); + return writeRate.getValueRate(); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java index 54ca2e875f042..7786e1939369d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.compaction; +import java.util.Optional; import org.apache.bookkeeper.common.annotation.InterfaceAudience; import org.apache.bookkeeper.common.annotation.InterfaceStability; @@ -28,30 +29,15 @@ @InterfaceStability.Stable public interface CompactorMXBean { - /** - * @return the removed event count of last compaction - */ - long getLastCompactionRemovedEventCount(String topic); - - /** - * @return the timestamp of last succeed compaction - */ - long getLastCompactionSucceedTimestamp(String topic); - - /** - * @return the timestamp of last failed compaction - */ - long getLastCompactionFailedTimestamp(String topic); - - /** - * @return the duration time of last compaction - */ - long getLastCompactionDurationTimeInMills(String topic); - /** * Remove metrics about this topic. * @param topic */ void removeTopic(String topic); + /** + * Get the compaction record of the topic. + * @param topic + */ + Optional getCompactionRecordForTopic(String topic); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java index 05db2aad4eae4..7b63127133e91 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java @@ -18,75 +18,54 @@ */ package org.apache.pulsar.compaction; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.LongAdder; +import java.util.concurrent.TimeUnit; public class CompactorMXBeanImpl implements CompactorMXBean { - private final ConcurrentHashMap compactRecordOps = new ConcurrentHashMap<>(); + private final ConcurrentHashMap compactionRecordOps = new ConcurrentHashMap<>(); public void addCompactionRemovedEvent(String topic) { - compactRecordOps.computeIfAbsent(topic, k -> new CompactRecord()).addCompactionRemovedEvent(); + compactionRecordOps.computeIfAbsent(topic, k -> new CompactionRecord()).addCompactionRemovedEvent(); } public void addCompactionStartOp(String topic) { - compactRecordOps.computeIfAbsent(topic, k -> new CompactRecord()).reset(); + compactionRecordOps.computeIfAbsent(topic, k -> new CompactionRecord()).addCompactionStartOp(); } public void addCompactionEndOp(String topic, boolean succeed) { - CompactRecord compactRecord = compactRecordOps.computeIfAbsent(topic, k -> new CompactRecord()); - compactRecord.lastCompactionDurationTimeInMills = System.currentTimeMillis() - - compactRecord.lastCompactionStartTimeOp; - compactRecord.lastCompactionRemovedEventCount = compactRecord.lastCompactionRemovedEventCountOp.longValue(); - if (succeed) { - compactRecord.lastCompactionSucceedTimestamp = System.currentTimeMillis(); - } else { - compactRecord.lastCompactionFailedTimestamp = System.currentTimeMillis(); - } + compactionRecordOps.computeIfAbsent(topic, k -> new CompactionRecord()).addCompactionEndOp(succeed); } @Override - public long getLastCompactionRemovedEventCount(String topic) { - return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionRemovedEventCount; + public void removeTopic(String topic) { + compactionRecordOps.remove(topic); } @Override - public long getLastCompactionSucceedTimestamp(String topic) { - return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionSucceedTimestamp; + public Optional getCompactionRecordForTopic(String topic) { + return Optional.ofNullable(compactionRecordOps.get(topic)); } - @Override - public long getLastCompactionFailedTimestamp(String topic) { - return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionFailedTimestamp; + public Set getTopics() { + return compactionRecordOps.keySet(); } - @Override - public long getLastCompactionDurationTimeInMills(String topic) { - return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionDurationTimeInMills; + public void reset() { + compactionRecordOps.values().forEach(CompactionRecord::reset); } - @Override - public void removeTopic(String topic) { - compactRecordOps.remove(topic); + public void addCompactionReadOp(String topic, long readableBytes) { + compactionRecordOps.computeIfAbsent(topic, k -> new CompactionRecord()).addCompactionReadOp(readableBytes); } - static class CompactRecord { - - private long lastCompactionRemovedEventCount = 0L; - private long lastCompactionSucceedTimestamp = 0L; - private long lastCompactionFailedTimestamp = 0L; - private long lastCompactionDurationTimeInMills = 0L; - - private LongAdder lastCompactionRemovedEventCountOp = new LongAdder(); - private long lastCompactionStartTimeOp; - - public void addCompactionRemovedEvent() { - lastCompactionRemovedEventCountOp.increment(); - } + public void addCompactionWriteOp(String topic, long writeableBytes) { + compactionRecordOps.computeIfAbsent(topic, k -> new CompactionRecord()).addCompactionWriteOp(writeableBytes); + } - public void reset() { - lastCompactionRemovedEventCountOp.reset(); - lastCompactionStartTimeOp = System.currentTimeMillis(); - } + public void addCompactionLatencyOp(String topic, long latency, TimeUnit unit) { + compactionRecordOps.computeIfAbsent(topic, k -> new CompactionRecord()).addCompactionLatencyOp(latency, unit); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index 49d121f9cddc3..ded798c49d5dd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerHandle; @@ -125,6 +126,7 @@ private void phaseOneLoop(RawReader reader, MessageId id = m.getMessageId(); boolean deletedMessage = false; boolean replaceMessage = false; + mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes()); if (RawBatchConverter.isReadableBatch(m)) { try { for (ImmutableTriple e : RawBatchConverter @@ -234,6 +236,7 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map try { MessageId id = m.getMessageId(); Optional messageToAdd = Optional.empty(); + mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes()); if (RawBatchConverter.isReadableBatch(m)) { try { messageToAdd = RawBatchConverter.rebatchMessage( @@ -262,7 +265,7 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map RawMessage message = messageToAdd.get(); try { outstanding.acquire(); - CompletableFuture addFuture = addToCompactedLedger(lh, message) + CompletableFuture addFuture = addToCompactedLedger(lh, message, reader.getTopic()) .whenComplete((res, exception2) -> { outstanding.release(); if (exception2 != null) { @@ -363,12 +366,15 @@ private CompletableFuture closeLedger(LedgerHandle lh) { return bkf; } - private CompletableFuture addToCompactedLedger(LedgerHandle lh, RawMessage m) { + private CompletableFuture addToCompactedLedger(LedgerHandle lh, RawMessage m, String topic) { CompletableFuture bkf = new CompletableFuture<>(); ByteBuf serialized = m.serialize(); try { + mxBean.addCompactionWriteOp(topic, m.getHeadersAndPayload().readableBytes()); + long start = System.nanoTime(); lh.asyncAddEntry(serialized, (rc, ledger, eid, ctx) -> { + mxBean.addCompactionLatencyOp(topic, System.nanoTime() - start, TimeUnit.NANOSECONDS); if (rc != BKException.Code.OK) { bkf.completeExceptionally(BKException.create(rc)); } else { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index 73879156fdaaa..3de3a0411884c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -27,6 +27,7 @@ import com.google.common.base.Splitter; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.jsonwebtoken.SignatureAlgorithm; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -43,14 +44,18 @@ import java.util.Objects; import java.util.Optional; import java.util.Properties; +import java.util.Random; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.crypto.SecretKey; import javax.naming.AuthenticationException; import lombok.Cleanup; +import org.apache.bookkeeper.client.BookKeeper; import org.apache.commons.io.IOUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; @@ -63,10 +68,13 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.compaction.Compactor; +import org.apache.pulsar.compaction.TwoPhaseCompactor; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -1136,6 +1144,91 @@ void testParseMetrics() throws IOException { parseMetrics(sampleMetrics); } + @Test + public void testCompaction() throws Exception { + final String topicName = "persistent://my-namespace/use/my-ns/my-compaction1"; + + Producer producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); + PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut); + String metricsStr = statsOut.toString(); + Multimap metrics = parseMetrics(metricsStr); + List cm = (List) metrics.get("pulsar_compaction_removed_event_count"); + assertEquals(cm.size(), 0); + cm = (List) metrics.get("pulsar_compaction_succeed_count"); + assertEquals(cm.size(), 0); + cm = (List) metrics.get("pulsar_compaction_failed_count"); + assertEquals(cm.size(), 0); + cm = (List) metrics.get("pulsar_compaction_duration_time_in_mills"); + assertEquals(cm.size(), 0); + cm = (List) metrics.get("pulsar_compaction_read_throughput"); + assertEquals(cm.size(), 0); + cm = (List) metrics.get("pulsar_compaction_write_throughput"); + assertEquals(cm.size(), 0); + cm = (List) metrics.get("pulsar_compaction_compacted_entries_count"); + assertEquals(cm.size(), 0); + cm = (List) metrics.get("pulsar_compaction_compacted_entries_size"); + assertEquals(cm.size(), 0); + // + final int numMessages = 1000; + final int maxKeys = 10; + Random r = new Random(0); + for (int j = 0; j < numMessages; j++) { + int keyIndex = r.nextInt(maxKeys); + String key = "key"+keyIndex; + byte[] data = ("my-message-" + key + "-" + j).getBytes(); + producer.newMessage() + .key(key) + .value(data) + .send(); + } + ScheduledExecutorService compactionScheduler = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build()); + Compactor compactor = pulsar.getCompactor(true); + compactor.compact(topicName).get(); + statsOut = new ByteArrayOutputStream(); + PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut); + metricsStr = statsOut.toString(); + metrics = parseMetrics(metricsStr); + cm = (List) metrics.get("pulsar_compaction_removed_event_count"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).value, 990); + cm = (List) metrics.get("pulsar_compaction_succeed_count"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).value, 1); + cm = (List) metrics.get("pulsar_compaction_failed_count"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).value, 0); + cm = (List) metrics.get("pulsar_compaction_duration_time_in_mills"); + assertEquals(cm.size(), 1); + assertTrue(cm.get(0).value > 0); + cm = (List) metrics.get("pulsar_compaction_read_throughput"); + assertEquals(cm.size(), 1); + assertTrue(cm.get(0).value > 0); + cm = (List) metrics.get("pulsar_compaction_write_throughput"); + assertEquals(cm.size(), 1); + assertTrue(cm.get(0).value > 0); + cm = (List) metrics.get("pulsar_compaction_compacted_entries_count"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).value, 10); + cm = (List) metrics.get("pulsar_compaction_compacted_entries_size"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).value, 870); + + pulsarClient.close(); + } + + private void compareCompactionStateCount(List cm, double count) { + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).tags.get("cluster"), "test"); + assertEquals(cm.get(0).tags.get("broker"), "localhost"); + assertEquals(cm.get(0).value, count); + } + /** * Hacky parsing of Prometheus text format. Should be good enough for unit tests */ diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java index 978492887b9ac..39ecc4235888a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java @@ -43,6 +43,14 @@ public void testSimpleAggregation() { topicStats1.msgBacklog = 30; topicStats1.managedLedgerStats.storageWriteRate = 12.0; topicStats1.managedLedgerStats.storageReadRate = 6.0; + topicStats1.compactionRemovedEventCount = 10; + topicStats1.compactionSucceedCount = 1; + topicStats1.compactionFailedCount = 2; + topicStats1.compactionDurationTimeInMills = 1000; + topicStats1.compactionReadThroughput = 15.0; + topicStats1.compactionWriteThroughput = 20.0; + topicStats1.compactionCompactedEntriesCount = 30; + topicStats1.compactionCompactedEntriesSize = 1000; AggregatedReplicationStats replStats1 = new AggregatedReplicationStats(); replStats1.msgRateIn = 1.0; @@ -75,6 +83,14 @@ public void testSimpleAggregation() { topicStats2.msgBacklog = 7; topicStats2.managedLedgerStats.storageWriteRate = 5.0; topicStats2.managedLedgerStats.storageReadRate = 2.5; + topicStats2.compactionRemovedEventCount = 10; + topicStats2.compactionSucceedCount = 1; + topicStats2.compactionFailedCount = 2; + topicStats2.compactionDurationTimeInMills = 1000; + topicStats2.compactionReadThroughput = 15.0; + topicStats2.compactionWriteThroughput = 20.0; + topicStats2.compactionCompactedEntriesCount = 30; + topicStats2.compactionCompactedEntriesSize = 1000; AggregatedReplicationStats replStats2 = new AggregatedReplicationStats(); replStats2.msgRateIn = 3.5; @@ -113,6 +129,15 @@ public void testSimpleAggregation() { assertEquals(nsStats.managedLedgerStats.storageSize, 6144); assertEquals(nsStats.managedLedgerStats.storageLogicalSize, 2560); + assertEquals(nsStats.compactionRemovedEventCount, 20); + assertEquals(nsStats.compactionSucceedCount, 2); + assertEquals(nsStats.compactionFailedCount, 4); + assertEquals(nsStats.compactionDurationTimeInMills, 2000); + assertEquals(nsStats.compactionReadThroughput, 30.0); + assertEquals(nsStats.compactionWriteThroughput, 40.0); + assertEquals(nsStats.compactionCompactedEntriesCount, 60); + assertEquals(nsStats.compactionCompactedEntriesSize, 2000); + AggregatedReplicationStats nsReplStats = nsStats.replicationStats.get(namespace); assertNotNull(nsReplStats); assertEquals(nsReplStats.msgRateIn, 4.5); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorMXBeanImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorMXBeanImplTest.java index b865396e87b2a..a3f2caedfcb28 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorMXBeanImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorMXBeanImplTest.java @@ -18,12 +18,9 @@ */ package org.apache.pulsar.compaction; -import org.apache.pulsar.broker.service.BrokerService; -import org.mockito.Mockito; import org.testng.annotations.Test; -import java.util.Optional; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -36,15 +33,38 @@ public void testSimple() throws Exception { CompactorMXBeanImpl mxBean = new CompactorMXBeanImpl(); String topic = "topic1"; mxBean.addCompactionStartOp(topic); - assertEquals(mxBean.getLastCompactionRemovedEventCount(topic), 0, 0); + CompactionRecord compaction = mxBean.getCompactionRecordForTopic(topic).get(); + assertEquals(compaction.getLastCompactionRemovedEventCount(), 0, 0); mxBean.addCompactionRemovedEvent(topic); - assertEquals(mxBean.getLastCompactionRemovedEventCount(topic), 0, 0); + assertEquals(compaction.getLastCompactionRemovedEventCount(), 0, 0); mxBean.addCompactionEndOp(topic, true); + assertEquals(compaction.getLastCompactionRemovedEventCount(), 1, 0); + assertTrue(compaction.getLastCompactionSucceedTimestamp() > 0L); + assertTrue(compaction.getLastCompactionDurationTimeInMills() >= 0L); + assertEquals(compaction.getLastCompactionFailedTimestamp(), 0, 0); + assertEquals(compaction.getCompactionRemovedEventCount(), 1, 0); + assertEquals(compaction.getCompactionSucceedCount(), 1, 0); + assertEquals(compaction.getCompactionFailedCount(), 0, 0); + assertTrue(compaction.getCompactionDurationTimeInMills() >= 0L); + mxBean.addCompactionStartOp(topic); + mxBean.addCompactionRemovedEvent(topic); mxBean.addCompactionEndOp(topic, false); - assertEquals(mxBean.getLastCompactionRemovedEventCount(topic), 1, 0); - assertTrue(mxBean.getLastCompactionSucceedTimestamp(topic) > 0L); - assertTrue(mxBean.getLastCompactionFailedTimestamp(topic) > 0L); - assertTrue(mxBean.getLastCompactionDurationTimeInMills(topic) >= 0L); + assertEquals(compaction.getCompactionRemovedEventCount(), 2, 0); + assertEquals(compaction.getCompactionFailedCount(), 1, 0); + assertEquals(compaction.getCompactionSucceedCount(), 1, 0); + assertTrue(compaction.getLastCompactionFailedTimestamp() > 0L); + assertTrue(compaction.getCompactionDurationTimeInMills() >= 0L); + mxBean.addCompactionReadOp(topic, 22); + assertTrue(compaction.getCompactionReadThroughput() > 0L); + mxBean.addCompactionWriteOp(topic, 33); + assertTrue(compaction.getCompactionWriteThroughput() > 0L); + mxBean.addCompactionLatencyOp(topic, 10, TimeUnit.NANOSECONDS); + assertTrue(compaction.getCompactionLatencyBuckets()[0] > 0l); + mxBean.reset(); + assertEquals(compaction.getCompactionRemovedEventCount(), 0, 0); + assertEquals(compaction.getCompactionSucceedCount(), 0, 0); + assertEquals(compaction.getCompactionFailedCount(), 0, 0); + assertEquals(compaction.getCompactionDurationTimeInMills(), 0, 0); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java index 8197c4ab9314f..3381aee677ac0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java @@ -113,10 +113,11 @@ private List compactAndVerify(String topic, Map expected m.close(); } if (checkMetrics) { - long compactedTopicRemovedEventCount = compactor.getStats().getLastCompactionRemovedEventCount(topic); - long lastCompactSucceedTimestamp = compactor.getStats().getLastCompactionSucceedTimestamp(topic); - long lastCompactFailedTimestamp = compactor.getStats().getLastCompactionFailedTimestamp(topic); - long lastCompactDurationTimeInMills = compactor.getStats().getLastCompactionDurationTimeInMills(topic); + CompactionRecord compactionRecord = compactor.getStats().getCompactionRecordForTopic(topic).get(); + long compactedTopicRemovedEventCount = compactionRecord.getLastCompactionRemovedEventCount(); + long lastCompactSucceedTimestamp = compactionRecord.getLastCompactionSucceedTimestamp(); + long lastCompactFailedTimestamp = compactionRecord.getLastCompactionFailedTimestamp(); + long lastCompactDurationTimeInMills = compactionRecord.getLastCompactionDurationTimeInMills(); Assert.assertTrue(compactedTopicRemovedEventCount >= 1); Assert.assertTrue(lastCompactSucceedTimestamp >= 1L); Assert.assertTrue(lastCompactDurationTimeInMills >= 0L); diff --git a/site2/docs/reference-metrics.md b/site2/docs/reference-metrics.md index 839029b91a842..3124e187f916c 100644 --- a/site2/docs/reference-metrics.md +++ b/site2/docs/reference-metrics.md @@ -204,6 +204,16 @@ All the topic metrics are labelled with the following labels: | pulsar_in_messages_total | Counter | The total number of messages received for this topic | | pulsar_out_bytes_total | Counter | The total number of bytes read from this topic | | pulsar_out_messages_total | Counter | The total number of messages read from this topic | +| pulsar_compaction_removed_event_count | Gauge | The removed event count of compaction | +| pulsar_compaction_succeed_count | Gauge | The succeed count of compaction | +| pulsar_compaction_failed_count | Gauge | The failed count of compaction | +| pulsar_compaction_duration_time_in_mills | Gauge | The duration time of compaction | +| pulsar_compaction_read_throughput | Gauge | The read throughput of compaction | +| pulsar_compaction_write_throughput | Gauge | The write throughput of compaction | +| pulsar_compaction_latency_le_* | Histogram | The compaction latency with given quantile.
Available thresholds:
  • pulsar_compaction_latency_le_0_5: <= 0.5ms
  • pulsar_compaction_latency_le_1: <= 1ms
  • pulsar_compaction_latency_le_5: <= 5ms
  • pulsar_compaction_latency_le_10: <= 10ms
  • pulsar_compaction_latency_le_20: <= 20ms
  • pulsar_compaction_latency_le_50: <= 50ms
  • pulsar_compaction_latency_le_100: <= 100ms
  • pulsar_compaction_latency_le_200: <= 200ms
  • pulsar_compaction_latency_le_1000: <= 1s
  • pulsar_compaction_latency_le_overflow: > 1s
| +| pulsar_compaction_compacted_entries_count | Gauge | The compacted entries count | +| pulsar_compaction_compacted_entries_size |Gauge | The compacted entries size | + #### Replication metrics