Skip to content

Commit

Permalink
Expose compaction metrics to Prometheus (#11739)
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- committed Aug 23, 2021
1 parent c0ef593 commit 7eab18b
Show file tree
Hide file tree
Showing 11 changed files with 510 additions and 39 deletions.
Expand Up @@ -1999,6 +1999,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "If true, export managed cursor metrics"
)
private boolean exposeManagedCursorMetricsInPrometheus = false;
@FieldContext(
category = CATEGORY_METRICS,
doc = "If true, export compaction metrics"
)
private boolean exposeCompactionMetricsInPrometheus = false;
@FieldContext(
category = CATEGORY_METRICS,
doc = "Classname of Pluggable JVM GC metrics logger that can log GC specific metrics")
Expand Down
Expand Up @@ -1594,6 +1594,8 @@ public void forEachTopic(Consumer<Topic> consumer) {
});
}



public BacklogQuotaManager getBacklogQuotaManager() {
return this.backlogQuotaManager;
}
Expand Down
Expand Up @@ -1991,19 +1991,14 @@ public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean
info.entries = -1;
info.size = -1;

try {
Optional<CompactedTopicImpl.CompactedTopicContext> compactedTopicContext =
((CompactedTopicImpl) compactedTopic)
.getCompactedTopicContext();
if (compactedTopicContext.isPresent()) {
CompactedTopicImpl.CompactedTopicContext ledgerContext = compactedTopicContext.get();
info.ledgerId = ledgerContext.getLedger().getId();
info.entries = ledgerContext.getLedger().getLastAddConfirmed() + 1;
info.size = ledgerContext.getLedger().getLength();
}
} catch (ExecutionException | InterruptedException e) {
log.warn("[{}]Fail to get ledger information for compacted topic.", topic);
Optional<CompactedTopicImpl.CompactedTopicContext> compactedTopicContext = getCompactedTopicContext();
if (compactedTopicContext.isPresent()) {
CompactedTopicImpl.CompactedTopicContext ledgerContext = compactedTopicContext.get();
info.ledgerId = ledgerContext.getLedger().getId();
info.entries = ledgerContext.getLedger().getLastAddConfirmed() + 1;
info.size = ledgerContext.getLedger().getLength();
}

stats.compactedLedger = info;

stats.cursors = Maps.newTreeMap();
Expand Down Expand Up @@ -2120,6 +2115,15 @@ public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean
return statFuture;
}

public Optional<CompactedTopicImpl.CompactedTopicContext> getCompactedTopicContext() {
try {
return ((CompactedTopicImpl) compactedTopic).getCompactedTopicContext();
} catch (ExecutionException | InterruptedException e) {
log.warn("[{}]Fail to get ledger information for compacted topic.", topic);
}
return Optional.empty();
}

public long getBacklogSize() {
return ledger.getEstimatedBacklogSize();
}
Expand Down
@@ -0,0 +1,168 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.stats.metrics;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.compaction.CompactedTopicImpl;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.CompactorMXBean;
import org.apache.pulsar.compaction.CompactorMXBeanImpl;

@Slf4j
public class CompactionMetrics extends AbstractMetrics {

private static final Buckets
BRK_COMPACTION_LATENCY_BUCKETS = new Buckets("brk_compaction_latencyBuckets",
ENTRY_LATENCY_BUCKETS_MS);

private List<Metrics> metricsCollection;
private Map<String, Double> tempAggregatedMetricsMap;
private Map<Metrics, List<String>> topicsByDimensionMap;
private Optional<CompactorMXBean> stats;
private int statsPeriodSeconds;

public CompactionMetrics(PulsarService pulsar) {
super(pulsar);
this.metricsCollection = Lists.newArrayList();
this.topicsByDimensionMap = Maps.newHashMap();
this.tempAggregatedMetricsMap = Maps.newHashMap();
this.stats = getCompactorMXBean();
this.statsPeriodSeconds = ((ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory())
.getConfig().getStatsPeriodSeconds();
}

@Override
public synchronized List<Metrics> generate() {
return aggregate(groupTopicsByDimension());
}


/**
* Aggregation by namespace.
*
* @return List<Metrics>
*/
private List<Metrics> aggregate(Map<Metrics, List<String>> topicsByDimension) {
metricsCollection.clear();
if (stats.isPresent()) {
CompactorMXBeanImpl compactorMXBean = (CompactorMXBeanImpl) stats.get();
for (Map.Entry<Metrics, List<String>> entry : topicsByDimension.entrySet()) {
Metrics metrics = entry.getKey();
List<String> topics = entry.getValue();
tempAggregatedMetricsMap.clear();
for (String topic : topics) {
populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_removedEventCount",
compactorMXBean.getCompactionRemovedEventCount(topic));

populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_succeedCount",
compactorMXBean.getCompactionSucceedCount(topic));

populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_failedCount",
compactorMXBean.getCompactionFailedCount(topic));

populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_durationTimeInMills",
compactorMXBean.getCompactionDurationTimeInMills(topic));

populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_readThroughput",
compactorMXBean.getCompactionReadThroughput(topic));

populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_writeThroughput",
compactorMXBean.getCompactionWriteThroughput(topic));

BRK_COMPACTION_LATENCY_BUCKETS.populateBucketEntries(tempAggregatedMetricsMap,
compactorMXBean.getCompactionLatencyBuckets(topic),
statsPeriodSeconds);

CompletableFuture<Optional<Topic>> topicHandle = pulsar.getBrokerService().getTopicIfExists(topic);
Optional<Topic> topicOp = BrokerService.extractTopic(topicHandle);
if (topicOp.isPresent()) {
PersistentTopic persistentTopic = (PersistentTopic) topicOp.get();
Optional<CompactedTopicImpl.CompactedTopicContext> compactedTopicContext = persistentTopic
.getCompactedTopicContext();
if (compactedTopicContext.isPresent()) {
LedgerHandle ledger = compactedTopicContext.get().getLedger();
long entries = ledger.getLastAddConfirmed() + 1;
long size = ledger.getLength();

populateAggregationMapWithSum(tempAggregatedMetricsMap,
"brk_compaction_compactedEntriesCount", entries);

populateAggregationMapWithSum(tempAggregatedMetricsMap,
"brk_compaction_compactedEntriesSize", size);
}
}
}
for (Map.Entry<String, Double> ma : tempAggregatedMetricsMap.entrySet()) {
metrics.put(ma.getKey(), ma.getValue());
}
metricsCollection.add(metrics);
}
compactorMXBean.reset();
}

return metricsCollection;
}

private Map<Metrics, List<String>> groupTopicsByDimension() {
topicsByDimensionMap.clear();
if (stats.isPresent()) {
CompactorMXBeanImpl compactorMXBean = (CompactorMXBeanImpl) stats.get();
tempAggregatedMetricsMap.clear();
for (String topic : compactorMXBean.getTopics()) {
String namespace = TopicName.get(topic).getNamespace();
Metrics metrics = super.createMetricsByDimension(namespace);
populateDimensionMap(topicsByDimensionMap, metrics, topic);
}
}
return topicsByDimensionMap;
}

private Optional<CompactorMXBean> getCompactorMXBean() {
Compactor compactor = null;
try {
compactor = pulsar.getCompactor(false);
} catch (PulsarServerException e) {
log.error("get compactor error", e);
}
return Optional.ofNullable(compactor).map(c -> c.getStats());
}

private void populateDimensionMap(Map<Metrics, List<String>> topicsByDimensionMap, Metrics metrics, String topic) {
if (!topicsByDimensionMap.containsKey(metrics)) {
topicsByDimensionMap.put(metrics, Lists.newArrayList(topic));
} else {
topicsByDimensionMap.get(metrics).add(topic);
}
}
}
Expand Up @@ -43,6 +43,7 @@
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.stats.metrics.CompactionMetrics;
import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
import org.apache.pulsar.broker.stats.metrics.ManagedLedgerCacheMetrics;
import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics;
Expand Down Expand Up @@ -143,6 +144,12 @@ private static void generateBrokerBasicMetrics(PulsarService pulsar, SimpleTextO
clusterName, Collector.Type.GAUGE, stream);
}

if (pulsar.getConfiguration().isExposeCompactionMetricsInPrometheus()) {
// generate compaction metrics
parseMetricsToPrometheusMetrics(new CompactionMetrics(pulsar).generate(),
clusterName, Collector.Type.GAUGE, stream);
}

parseMetricsToPrometheusMetrics(Collections.singletonList(pulsar.getBrokerService()
.getPulsarStats().getBrokerOperabilityMetrics().generateConnectionMetrics()),
clusterName, Collector.Type.GAUGE, stream);
Expand Down
Expand Up @@ -54,4 +54,38 @@ public interface CompactorMXBean {
*/
void removeTopic(String topic);

/**
* @return the removed event count of compaction
*/
long getCompactionRemovedEventCount(String topic);

/**
* @return the succeed count of compaction
*/
long getCompactionSucceedCount(String topic);

/**
* @return the failed count of compaction
*/
long getCompactionFailedCount(String topic);

/**
* @return the duration time of compaction
*/
long getCompactionDurationTimeInMills(String topic);

/**
* @return the latency buckets
*/
long[] getCompactionLatencyBuckets(String topic);

/**
* @return the read throughput of compaction
*/
double getCompactionReadThroughput(String topic);

/**
* @return the write throughput of compaction
*/
double getCompactionWriteThroughput(String topic);
}

0 comments on commit 7eab18b

Please sign in to comment.