diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index e98a5ce2ebdc1c..8c1a48fcfdaf30 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1999,6 +1999,11 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "If true, export managed cursor metrics" ) private boolean exposeManagedCursorMetricsInPrometheus = false; + @FieldContext( + category = CATEGORY_METRICS, + doc = "If true, export compaction metrics" + ) + private boolean exposeCompactionMetricsInPrometheus = false; @FieldContext( category = CATEGORY_METRICS, doc = "Classname of Pluggable JVM GC metrics logger that can log GC specific metrics") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/CompactionMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/CompactionMetrics.java new file mode 100644 index 00000000000000..b8714f93d270cc --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/CompactionMetrics.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats.metrics; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.stats.Metrics; +import org.apache.pulsar.compaction.Compactor; +import org.apache.pulsar.compaction.CompactorMXBean; + +@Slf4j +public class CompactionMetrics extends AbstractMetrics { + + private List metricsCollection; + private Map tempAggregatedMetricsMap; + private Map> topicsByDimensionMap; + private Optional stats; + + public CompactionMetrics(PulsarService pulsar) { + super(pulsar); + this.metricsCollection = Lists.newArrayList(); + this.topicsByDimensionMap = Maps.newHashMap(); + this.tempAggregatedMetricsMap = Maps.newHashMap(); + this.stats = getCompactorMXBean(); + } + + @Override + public synchronized List generate() { + return aggregate(groupTopicsByDimension()); + } + + + /** + * Aggregation by namespace. + * + * @return List + */ + private List aggregate(Map> topicsByDimension) { + metricsCollection.clear(); + if (stats.isPresent()) { + CompactorMXBean compactorMXBean = stats.get(); + for (Map.Entry> entry : topicsByDimension.entrySet()) { + Metrics metrics = entry.getKey(); + List topics = entry.getValue(); + tempAggregatedMetricsMap.clear(); + for (String topic : topics) { + populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_cp_totalCompactionCount", + compactorMXBean.getTotalCompactionCount(topic)); + + populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_cp_totalCompactionRemovedEventCount", + compactorMXBean.getTotalCompactionRemovedEventCount(topic)); + + populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_cp_totalCompactionSucceedCount", + compactorMXBean.getTotalCompactionSucceedCount(topic)); + + populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_cp_totalCompactionFailedCount", + compactorMXBean.getTotalCompactionFailedCount(topic)); + } + for (Map.Entry ma : tempAggregatedMetricsMap.entrySet()) { + metrics.put(ma.getKey(), ma.getValue()); + } + metricsCollection.add(metrics); + } + } + + return metricsCollection; + } + + private Map> groupTopicsByDimension() { + topicsByDimensionMap.clear(); + if (stats.isPresent()) { + tempAggregatedMetricsMap.clear(); + for (String topic : stats.get().getTopics()) { + String namespace = TopicName.get(topic).getNamespace(); + Metrics metrics = super.createMetricsByDimension(namespace); + populateDimensionMap(topicsByDimensionMap, metrics, topic); + } + } + return topicsByDimensionMap; + } + + private Optional getCompactorMXBean() { + Compactor compactor = null; + try { + compactor = pulsar.getCompactor(false); + } catch (PulsarServerException e) { + log.error("get compactor error", e); + } + return Optional.ofNullable(compactor).map(c -> c.getStats()); + } + + private void populateDimensionMap(Map> topicsByDimensionMap, Metrics metrics, String topic) { + if (!topicsByDimensionMap.containsKey(metrics)) { + topicsByDimensionMap.put(metrics, Lists.newArrayList(topic)); + } else { + topicsByDimensionMap.get(metrics).add(topic); + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index 8f303a34fb05e6..38d2ca9c718c16 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -43,6 +43,7 @@ import org.apache.bookkeeper.stats.StatsProvider; import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.stats.metrics.CompactionMetrics; import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics; import org.apache.pulsar.broker.stats.metrics.ManagedLedgerCacheMetrics; import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics; @@ -143,6 +144,12 @@ private static void generateBrokerBasicMetrics(PulsarService pulsar, SimpleTextO clusterName, Collector.Type.GAUGE, stream); } + if (pulsar.getConfiguration().isExposeCompactionMetricsInPrometheus()) { + // generate compaction metrics + parseMetricsToPrometheusMetrics(new CompactionMetrics(pulsar).generate(), + clusterName, Collector.Type.GAUGE, stream); + } + parseMetricsToPrometheusMetrics(Collections.singletonList(pulsar.getBrokerService() .getPulsarStats().getBrokerOperabilityMetrics().generateConnectionMetrics()), clusterName, Collector.Type.GAUGE, stream); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java index 54ca2e875f0428..400ceaa9be749b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.compaction; +import java.util.Set; import org.apache.bookkeeper.common.annotation.InterfaceAudience; import org.apache.bookkeeper.common.annotation.InterfaceStability; @@ -54,4 +55,29 @@ public interface CompactorMXBean { */ void removeTopic(String topic); + /** + * @return the count of total compaction + */ + long getTotalCompactionCount(String topic); + + /** + * @return the removed event count of total compaction + */ + long getTotalCompactionRemovedEventCount(String topic); + + /** + * @return the succeed count of total compaction + */ + long getTotalCompactionSucceedCount(String topic); + + /** + * @return the failed count of total compaction + */ + long getTotalCompactionFailedCount(String topic); + + /** + * @return the topic set + */ + Set getTopics(); + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java index 05db2aad4eae4b..1b27d16a88c53c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java @@ -18,59 +18,81 @@ */ package org.apache.pulsar.compaction; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.LongAdder; public class CompactorMXBeanImpl implements CompactorMXBean { - private final ConcurrentHashMap compactRecordOps = new ConcurrentHashMap<>(); + private final ConcurrentHashMap compactionRecordOps = new ConcurrentHashMap<>(); public void addCompactionRemovedEvent(String topic) { - compactRecordOps.computeIfAbsent(topic, k -> new CompactRecord()).addCompactionRemovedEvent(); + compactionRecordOps.computeIfAbsent(topic, k -> new CompactionRecord()).addCompactionRemovedEvent(); } public void addCompactionStartOp(String topic) { - compactRecordOps.computeIfAbsent(topic, k -> new CompactRecord()).reset(); + compactionRecordOps.computeIfAbsent(topic, k -> new CompactionRecord()).addCompactionStartOp(); } public void addCompactionEndOp(String topic, boolean succeed) { - CompactRecord compactRecord = compactRecordOps.computeIfAbsent(topic, k -> new CompactRecord()); - compactRecord.lastCompactionDurationTimeInMills = System.currentTimeMillis() - - compactRecord.lastCompactionStartTimeOp; - compactRecord.lastCompactionRemovedEventCount = compactRecord.lastCompactionRemovedEventCountOp.longValue(); - if (succeed) { - compactRecord.lastCompactionSucceedTimestamp = System.currentTimeMillis(); - } else { - compactRecord.lastCompactionFailedTimestamp = System.currentTimeMillis(); - } + compactionRecordOps.computeIfAbsent(topic, k -> new CompactionRecord()).addCompactionEndOp(succeed); } @Override public long getLastCompactionRemovedEventCount(String topic) { - return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionRemovedEventCount; + return compactionRecordOps.getOrDefault(topic, new CompactionRecord()).lastCompactionRemovedEventCount; } @Override public long getLastCompactionSucceedTimestamp(String topic) { - return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionSucceedTimestamp; + return compactionRecordOps.getOrDefault(topic, new CompactionRecord()).lastCompactionSucceedTimestamp; } @Override public long getLastCompactionFailedTimestamp(String topic) { - return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionFailedTimestamp; + return compactionRecordOps.getOrDefault(topic, new CompactionRecord()).lastCompactionFailedTimestamp; } @Override public long getLastCompactionDurationTimeInMills(String topic) { - return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionDurationTimeInMills; + return compactionRecordOps.getOrDefault(topic, new CompactionRecord()).lastCompactionDurationTimeInMills; } @Override public void removeTopic(String topic) { - compactRecordOps.remove(topic); + compactionRecordOps.remove(topic); + } + + @Override + public long getTotalCompactionCount(String topic) { + return compactionRecordOps.getOrDefault(topic, new CompactionRecord()) + .totalCompactionCount.longValue(); + } + + @Override + public long getTotalCompactionRemovedEventCount(String topic) { + return compactionRecordOps.getOrDefault(topic, new CompactionRecord()) + .totalCompactionRemovedEventCount.longValue(); } - static class CompactRecord { + @Override + public long getTotalCompactionSucceedCount(String topic) { + return compactionRecordOps.getOrDefault(topic, new CompactionRecord()) + .totalCompactionSucceedCount.longValue(); + } + + @Override + public long getTotalCompactionFailedCount(String topic) { + return compactionRecordOps.getOrDefault(topic, new CompactionRecord()) + .totalCompactionFailedCount.longValue(); + } + + @Override + public Set getTopics() { + return compactionRecordOps.keySet(); + } + + static class CompactionRecord { private long lastCompactionRemovedEventCount = 0L; private long lastCompactionSucceedTimestamp = 0L; @@ -80,13 +102,33 @@ static class CompactRecord { private LongAdder lastCompactionRemovedEventCountOp = new LongAdder(); private long lastCompactionStartTimeOp; + private final LongAdder totalCompactionCount = new LongAdder(); + private final LongAdder totalCompactionRemovedEventCount = new LongAdder(); + private final LongAdder totalCompactionSucceedCount = new LongAdder(); + private final LongAdder totalCompactionFailedCount = new LongAdder(); + public void addCompactionRemovedEvent() { lastCompactionRemovedEventCountOp.increment(); + totalCompactionRemovedEventCount.increment(); } - public void reset() { + public void addCompactionStartOp() { lastCompactionRemovedEventCountOp.reset(); lastCompactionStartTimeOp = System.currentTimeMillis(); } + + public void addCompactionEndOp(boolean succeed) { + lastCompactionDurationTimeInMills = System.currentTimeMillis() + - lastCompactionStartTimeOp; + lastCompactionRemovedEventCount = lastCompactionRemovedEventCountOp.longValue(); + if (succeed) { + lastCompactionSucceedTimestamp = System.currentTimeMillis(); + totalCompactionSucceedCount.increment(); + } else { + lastCompactionFailedTimestamp = System.currentTimeMillis(); + totalCompactionFailedCount.increment(); + } + totalCompactionCount.increment(); + } } }