Skip to content

Commit

Permalink
Expose compaction metrics to Prometheus (#11739)
Browse files Browse the repository at this point in the history
### Motivation
As #11564 has involved compaction metrics in CLI, it's extremely useful to expose relative metrics to Prometheus.

- pulsar_compaction_removed_event_count :  the removed event count of compaction .
- pulsar_compaction_succeed_count : the succeed count of compaction.
- pulsar_compaction_failed_count : the failed count of compaction.
- pulsar_compaction_duration_time_in_millis : the duration time of compaction.
- pulsar_compaction_read_throughput : the read throughput of compaction.
- pulsar_compaction_write_throughput : the write throughput of compaction.
- pulsar_compaction_compacted_entries_count: the compacted entries count.
- pulsar_compaction_compacted_entries_size: the compacted entries size;

if users enable the topic level metrics and the topic has been compacted or doing the compaction, we should expose the compaction metrics for the topic level. If users disable the topic level metrics, we should expose the aggregated compaction metrics for the namespace level.


(cherry picked from commit 656635e)
  • Loading branch information
Technoboy- authored and codelipenghui committed Sep 2, 2021
1 parent 71cc13b commit a80f1e8
Show file tree
Hide file tree
Showing 16 changed files with 539 additions and 135 deletions.
Expand Up @@ -149,6 +149,7 @@
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.CompactedTopicContext;
import org.apache.pulsar.compaction.CompactedTopicImpl;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.CompactorMXBean;
Expand Down Expand Up @@ -1902,15 +1903,16 @@ public TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBa
stats.lastOffloadSuccessTimeStamp = ledger.getLastOffloadedSuccessTimestamp();
stats.lastOffloadFailureTimeStamp = ledger.getLastOffloadedFailureTimestamp();
Optional<CompactorMXBean> mxBean = getCompactorMXBean();
stats.compaction.lastCompactionRemovedEventCount = mxBean.map(stat ->
stat.getLastCompactionRemovedEventCount(topic)).orElse(0L);
stats.compaction.lastCompactionSucceedTimestamp = mxBean.map(stat ->
stat.getLastCompactionSucceedTimestamp(topic)).orElse(0L);
stats.compaction.lastCompactionFailedTimestamp = mxBean.map(stat ->
stat.getLastCompactionFailedTimestamp(topic)).orElse(0L);
stats.compaction.lastCompactionDurationTimeInMills = mxBean.map(stat ->
stat.getLastCompactionDurationTimeInMills(topic)).orElse(0L);

stats.compaction.reset();
mxBean.flatMap(bean -> bean.getCompactionRecordForTopic(topic)).map(compactionRecord -> {
stats.compaction.lastCompactionRemovedEventCount = compactionRecord.getLastCompactionRemovedEventCount();
stats.compaction.lastCompactionSucceedTimestamp = compactionRecord.getLastCompactionSucceedTimestamp();
stats.compaction.lastCompactionFailedTimestamp = compactionRecord.getLastCompactionFailedTimestamp();
stats.compaction.lastCompactionDurationTimeInMills =
compactionRecord.getLastCompactionDurationTimeInMills();
return compactionRecord;
});
return stats;
}

Expand Down Expand Up @@ -1987,19 +1989,14 @@ public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean
info.entries = -1;
info.size = -1;

try {
Optional<CompactedTopicImpl.CompactedTopicContext> compactedTopicContext =
((CompactedTopicImpl) compactedTopic)
.getCompactedTopicContext();
if (compactedTopicContext.isPresent()) {
CompactedTopicImpl.CompactedTopicContext ledgerContext = compactedTopicContext.get();
info.ledgerId = ledgerContext.getLedger().getId();
info.entries = ledgerContext.getLedger().getLastAddConfirmed() + 1;
info.size = ledgerContext.getLedger().getLength();
}
} catch (ExecutionException | InterruptedException e) {
log.warn("[{}]Fail to get ledger information for compacted topic.", topic);
Optional<CompactedTopicContext> compactedTopicContext = getCompactedTopicContext();
if (compactedTopicContext.isPresent()) {
CompactedTopicContext ledgerContext = compactedTopicContext.get();
info.ledgerId = ledgerContext.getLedger().getId();
info.entries = ledgerContext.getLedger().getLastAddConfirmed() + 1;
info.size = ledgerContext.getLedger().getLength();
}

stats.compactedLedger = info;

stats.cursors = Maps.newTreeMap();
Expand Down Expand Up @@ -2116,6 +2113,15 @@ public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean
return statFuture;
}

public Optional<CompactedTopicContext> getCompactedTopicContext() {
try {
return ((CompactedTopicImpl) compactedTopic).getCompactedTopicContext();
} catch (ExecutionException | InterruptedException e) {
log.warn("[{}]Fail to get ledger information for compacted topic.", topic);
}
return Optional.empty();
}

public long getBacklogSize() {
return ledger.getEstimatedBacklogSize();
}
Expand Down
Expand Up @@ -214,11 +214,7 @@ protected Metrics createMetricsByDimension(String namespace, String fromClusterN
}

protected void populateAggregationMap(Map<String, List<Double>> map, String mkey, double value) {
if (!map.containsKey(mkey)) {
map.put(mkey, Lists.newArrayList(value));
} else {
map.get(mkey).add(value);
}
map.computeIfAbsent(mkey, __ -> Lists.newArrayList()).add(value);
}

protected void populateAggregationMapWithSum(Map<String, Double> map, String mkey, double value) {
Expand All @@ -242,24 +238,11 @@ protected void populateMaxMap(Map<String, Long> map, String mkey, long value) {
*/
protected void populateDimensionMap(Map<Metrics, List<ManagedLedgerImpl>> ledgersByDimensionMap, Metrics metrics,
ManagedLedgerImpl ledger) {
if (!ledgersByDimensionMap.containsKey(metrics)) {
// create new list
ledgersByDimensionMap.put(metrics, Lists.newArrayList(ledger));
} else {
// add to collection
ledgersByDimensionMap.get(metrics).add(ledger);
}
ledgersByDimensionMap.computeIfAbsent(metrics, __ -> Lists.newArrayList()).add(ledger);
}

protected void populateDimensionMap(Map<Metrics, List<TopicStats>> topicsStatsByDimensionMap,
Metrics metrics, TopicStats destStats) {
if (!topicsStatsByDimensionMap.containsKey(metrics)) {
// create new list
topicsStatsByDimensionMap.put(metrics, Lists.newArrayList(destStats));
} else {
// add to collection
topicsStatsByDimensionMap.get(metrics).add(destStats);
}

topicsStatsByDimensionMap.computeIfAbsent(metrics, __ -> Lists.newArrayList()).add(destStats);
}
}
Expand Up @@ -20,6 +20,8 @@

import java.util.HashMap;
import java.util.Map;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.pulsar.compaction.CompactionRecord;

public class AggregatedNamespaceStats {
public int topicsCount;
Expand Down Expand Up @@ -47,6 +49,16 @@ public class AggregatedNamespaceStats {

public Map<String, AggregatedSubscriptionStats> subscriptionStats = new HashMap<>();

long compactionRemovedEventCount;
long compactionSucceedCount;
long compactionFailedCount;
long compactionDurationTimeInMills;
double compactionReadThroughput;
double compactionWriteThroughput;
long compactionCompactedEntriesCount;
long compactionCompactedEntriesSize;
StatsBuckets compactionLatencyBuckets = new StatsBuckets(CompactionRecord.WRITE_LATENCY_BUCKETS_USEC);

void updateStats(TopicStats stats) {
topicsCount++;

Expand Down Expand Up @@ -112,6 +124,16 @@ void updateStats(TopicStats stats) {
consumerStats.unackedMessages += v.unackedMessages;
});
});

compactionRemovedEventCount += stats.compactionRemovedEventCount;
compactionSucceedCount += stats.compactionSucceedCount;
compactionFailedCount += stats.compactionFailedCount;
compactionDurationTimeInMills += stats.compactionDurationTimeInMills;
compactionReadThroughput += stats.compactionReadThroughput;
compactionWriteThroughput += stats.compactionWriteThroughput;
compactionCompactedEntriesCount += stats.compactionCompactedEntriesCount;
compactionCompactedEntriesSize += stats.compactionCompactedEntriesSize;
compactionLatencyBuckets.addAll(stats.compactionLatencyBuckets);
}

public void reset() {
Expand Down
Expand Up @@ -19,9 +19,13 @@
package org.apache.pulsar.broker.stats.prometheus;

import io.netty.util.concurrent.FastThreadLocal;
import java.util.Optional;
import java.util.concurrent.atomic.LongAdder;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
Expand All @@ -30,7 +34,11 @@
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.apache.pulsar.compaction.CompactedTopicContext;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.CompactorMXBean;

@Slf4j
public class NamespaceStatsAggregator {

private static FastThreadLocal<AggregatedNamespaceStats> localNamespaceStats =
Expand All @@ -57,6 +65,7 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b

printDefaultBrokerStats(stream, cluster);

Optional<CompactorMXBean> compactorMXBean = getCompactorMXBean(pulsar);
LongAdder topicsCount = new LongAdder();
pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> {
namespaceStats.reset();
Expand All @@ -66,11 +75,13 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b
topicsMap.forEach((name, topic) -> {
getTopicStats(topic, topicStats, includeConsumerMetrics, includeProducerMetrics,
pulsar.getConfiguration().isExposePreciseBacklogInPrometheus(),
pulsar.getConfiguration().isExposeSubscriptionBacklogSizeInPrometheus());
pulsar.getConfiguration().isExposeSubscriptionBacklogSizeInPrometheus(),
compactorMXBean
);

if (includeTopicMetrics) {
topicsCount.add(1);
TopicStats.printTopicStats(stream, cluster, namespace, name, topicStats);
TopicStats.printTopicStats(stream, cluster, namespace, name, topicStats, compactorMXBean);
} else {
namespaceStats.updateStats(topicStats);
}
Expand All @@ -87,8 +98,19 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b
});
}

private static Optional<CompactorMXBean> getCompactorMXBean(PulsarService pulsar) {
Compactor compactor = null;
try {
compactor = pulsar.getCompactor(false);
} catch (PulsarServerException e) {
log.error("get compactor error", e);
}
return Optional.ofNullable(compactor).map(c -> c.getStats());
}

private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics,
boolean includeProducerMetrics, boolean getPreciseBacklog, boolean subscriptionBacklogSize) {
boolean includeProducerMetrics, boolean getPreciseBacklog, boolean subscriptionBacklogSize,
Optional<CompactorMXBean> compactorMXBean) {
stats.reset();

if (topic instanceof PersistentTopic) {
Expand Down Expand Up @@ -220,6 +242,31 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
aggReplStats.connectedCount += replStats.connected ? 1 : 0;
aggReplStats.replicationDelayInSeconds += replStats.replicationDelayInSeconds;
});

compactorMXBean
.flatMap(mxBean -> mxBean.getCompactionRecordForTopic(topic.getName()))
.map(compactionRecord -> {
stats.compactionRemovedEventCount = compactionRecord.getCompactionRemovedEventCount();
stats.compactionSucceedCount = compactionRecord.getCompactionSucceedCount();
stats.compactionFailedCount = compactionRecord.getCompactionFailedCount();
stats.compactionDurationTimeInMills = compactionRecord.getCompactionDurationTimeInMills();
stats.compactionReadThroughput = compactionRecord.getCompactionReadThroughput();
stats.compactionWriteThroughput = compactionRecord.getCompactionWriteThroughput();
stats.compactionLatencyBuckets.addAll(compactionRecord.getCompactionLatencyStats());
stats.compactionLatencyBuckets.refresh();
PersistentTopic persistentTopic = (PersistentTopic) topic;
Optional<CompactedTopicContext> compactedTopicContext = persistentTopic
.getCompactedTopicContext();
if (compactedTopicContext.isPresent()) {
LedgerHandle ledger = compactedTopicContext.get().getLedger();
long entries = ledger.getLastAddConfirmed() + 1;
long size = ledger.getLength();

stats.compactionCompactedEntriesCount = entries;
stats.compactionCompactedEntriesSize = size;
}
return compactionRecord;
});
}

private static void printDefaultBrokerStats(SimpleTextOutputStream stream, String cluster) {
Expand Down
Expand Up @@ -20,7 +20,11 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.apache.pulsar.compaction.CompactionRecord;
import org.apache.pulsar.compaction.CompactorMXBean;

class TopicStats {

Expand Down Expand Up @@ -51,6 +55,16 @@ class TopicStats {
// Used for tracking duplicate TYPE definitions
static Map<String, String> metricWithTypeDefinition = new HashMap<>();

// For compaction
long compactionRemovedEventCount;
long compactionSucceedCount;
long compactionFailedCount;
long compactionDurationTimeInMills;
double compactionReadThroughput;
double compactionWriteThroughput;
long compactionCompactedEntriesCount;
long compactionCompactedEntriesSize;
StatsBuckets compactionLatencyBuckets = new StatsBuckets(CompactionRecord.WRITE_LATENCY_BUCKETS_USEC);

public void reset() {
subscriptionsCount = 0;
Expand All @@ -73,14 +87,24 @@ public void reset() {
replicationStats.clear();
subscriptionStats.clear();
producerStats.clear();

compactionRemovedEventCount = 0;
compactionSucceedCount = 0;
compactionFailedCount = 0;
compactionDurationTimeInMills = 0;
compactionReadThroughput = 0;
compactionWriteThroughput = 0;
compactionCompactedEntriesCount = 0;
compactionCompactedEntriesSize = 0;
compactionLatencyBuckets.reset();
}

static void resetTypes() {
metricWithTypeDefinition.clear();
}

static void printTopicStats(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
TopicStats stats) {
TopicStats stats, Optional<CompactorMXBean> compactorMXBean) {
metric(stream, cluster, namespace, topic, "pulsar_subscriptions_count", stats.subscriptionsCount);
metric(stream, cluster, namespace, topic, "pulsar_producers_count", stats.producersCount);
metric(stream, cluster, namespace, topic, "pulsar_consumers_count", stats.consumersCount);
Expand Down Expand Up @@ -250,6 +274,53 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin

metric(stream, cluster, namespace, topic, "pulsar_in_bytes_total", stats.bytesInCounter);
metric(stream, cluster, namespace, topic, "pulsar_in_messages_total", stats.msgInCounter);

// Compaction
boolean hasCompaction = compactorMXBean.flatMap(mxBean -> mxBean.getCompactionRecordForTopic(topic))
.map(__ -> true).orElse(false);
if (hasCompaction) {
metric(stream, cluster, namespace, topic, "pulsar_compaction_removed_event_count",
stats.compactionRemovedEventCount);
metric(stream, cluster, namespace, topic, "pulsar_compaction_succeed_count",
stats.compactionSucceedCount);
metric(stream, cluster, namespace, topic, "pulsar_compaction_failed_count",
stats.compactionFailedCount);
metric(stream, cluster, namespace, topic, "pulsar_compaction_duration_time_in_mills",
stats.compactionDurationTimeInMills);
metric(stream, cluster, namespace, topic, "pulsar_compaction_read_throughput",
stats.compactionReadThroughput);
metric(stream, cluster, namespace, topic, "pulsar_compaction_write_throughput",
stats.compactionWriteThroughput);
metric(stream, cluster, namespace, topic, "pulsar_compaction_compacted_entries_count",
stats.compactionCompactedEntriesCount);
metric(stream, cluster, namespace, topic, "pulsar_compaction_compacted_entries_size",
stats.compactionCompactedEntriesSize);
long[] compactionLatencyBuckets = stats.compactionLatencyBuckets.getBuckets();
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_0_5",
compactionLatencyBuckets[0]);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_1",
compactionLatencyBuckets[1]);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_5",
compactionLatencyBuckets[2]);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_10",
compactionLatencyBuckets[3]);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_20",
compactionLatencyBuckets[4]);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_50",
compactionLatencyBuckets[5]);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_100",
compactionLatencyBuckets[6]);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_200",
compactionLatencyBuckets[7]);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_1000",
compactionLatencyBuckets[8]);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_overflow",
compactionLatencyBuckets[9]);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_sum",
stats.compactionLatencyBuckets.getSum());
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_count",
stats.compactionLatencyBuckets.getCount());
}
}

static void metricType(SimpleTextOutputStream stream, String name) {
Expand Down

0 comments on commit a80f1e8

Please sign in to comment.