Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose compaction metrics to Prometheus #11739

Merged
merged 10 commits into from Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -149,6 +149,7 @@
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.CompactedTopicContext;
import org.apache.pulsar.compaction.CompactedTopicImpl;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.CompactorMXBean;
Expand Down Expand Up @@ -1906,15 +1907,16 @@ public TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBa
stats.lastOffloadSuccessTimeStamp = ledger.getLastOffloadedSuccessTimestamp();
stats.lastOffloadFailureTimeStamp = ledger.getLastOffloadedFailureTimestamp();
Optional<CompactorMXBean> mxBean = getCompactorMXBean();
stats.compaction.lastCompactionRemovedEventCount = mxBean.map(stat ->
stat.getLastCompactionRemovedEventCount(topic)).orElse(0L);
stats.compaction.lastCompactionSucceedTimestamp = mxBean.map(stat ->
stat.getLastCompactionSucceedTimestamp(topic)).orElse(0L);
stats.compaction.lastCompactionFailedTimestamp = mxBean.map(stat ->
stat.getLastCompactionFailedTimestamp(topic)).orElse(0L);
stats.compaction.lastCompactionDurationTimeInMills = mxBean.map(stat ->
stat.getLastCompactionDurationTimeInMills(topic)).orElse(0L);

stats.compaction.reset();
mxBean.flatMap(bean -> bean.getCompactionRecordForTopic(topic)).map(compactionRecord -> {
stats.compaction.lastCompactionRemovedEventCount = compactionRecord.getLastCompactionRemovedEventCount();
stats.compaction.lastCompactionSucceedTimestamp = compactionRecord.getLastCompactionSucceedTimestamp();
stats.compaction.lastCompactionFailedTimestamp = compactionRecord.getLastCompactionFailedTimestamp();
stats.compaction.lastCompactionDurationTimeInMills =
compactionRecord.getLastCompactionDurationTimeInMills();
return compactionRecord;
});
return stats;
}

Expand Down Expand Up @@ -1991,19 +1993,14 @@ public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean
info.entries = -1;
info.size = -1;

try {
Optional<CompactedTopicImpl.CompactedTopicContext> compactedTopicContext =
((CompactedTopicImpl) compactedTopic)
.getCompactedTopicContext();
if (compactedTopicContext.isPresent()) {
CompactedTopicImpl.CompactedTopicContext ledgerContext = compactedTopicContext.get();
info.ledgerId = ledgerContext.getLedger().getId();
info.entries = ledgerContext.getLedger().getLastAddConfirmed() + 1;
info.size = ledgerContext.getLedger().getLength();
}
} catch (ExecutionException | InterruptedException e) {
log.warn("[{}]Fail to get ledger information for compacted topic.", topic);
Optional<CompactedTopicContext> compactedTopicContext = getCompactedTopicContext();
if (compactedTopicContext.isPresent()) {
CompactedTopicContext ledgerContext = compactedTopicContext.get();
info.ledgerId = ledgerContext.getLedger().getId();
info.entries = ledgerContext.getLedger().getLastAddConfirmed() + 1;
info.size = ledgerContext.getLedger().getLength();
}

stats.compactedLedger = info;

stats.cursors = Maps.newTreeMap();
Expand Down Expand Up @@ -2120,6 +2117,15 @@ public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean
return statFuture;
}

public Optional<CompactedTopicContext> getCompactedTopicContext() {
try {
return ((CompactedTopicImpl) compactedTopic).getCompactedTopicContext();
} catch (ExecutionException | InterruptedException e) {
log.warn("[{}]Fail to get ledger information for compacted topic.", topic);
}
return Optional.empty();
}

public long getBacklogSize() {
return ledger.getEstimatedBacklogSize();
}
Expand Down
Expand Up @@ -214,11 +214,7 @@ protected Metrics createMetricsByDimension(String namespace, String fromClusterN
}

protected void populateAggregationMap(Map<String, List<Double>> map, String mkey, double value) {
if (!map.containsKey(mkey)) {
map.put(mkey, Lists.newArrayList(value));
} else {
map.get(mkey).add(value);
}
map.computeIfAbsent(mkey, __ -> Lists.newArrayList()).add(value);
}

protected void populateAggregationMapWithSum(Map<String, Double> map, String mkey, double value) {
Expand All @@ -242,24 +238,11 @@ protected void populateMaxMap(Map<String, Long> map, String mkey, long value) {
*/
protected void populateDimensionMap(Map<Metrics, List<ManagedLedgerImpl>> ledgersByDimensionMap, Metrics metrics,
ManagedLedgerImpl ledger) {
if (!ledgersByDimensionMap.containsKey(metrics)) {
// create new list
ledgersByDimensionMap.put(metrics, Lists.newArrayList(ledger));
} else {
// add to collection
ledgersByDimensionMap.get(metrics).add(ledger);
}
ledgersByDimensionMap.computeIfAbsent(metrics, __ -> Lists.newArrayList()).add(ledger);
}

protected void populateDimensionMap(Map<Metrics, List<TopicStats>> topicsStatsByDimensionMap,
Metrics metrics, TopicStats destStats) {
if (!topicsStatsByDimensionMap.containsKey(metrics)) {
// create new list
topicsStatsByDimensionMap.put(metrics, Lists.newArrayList(destStats));
} else {
// add to collection
topicsStatsByDimensionMap.get(metrics).add(destStats);
}

topicsStatsByDimensionMap.computeIfAbsent(metrics, __ -> Lists.newArrayList()).add(destStats);
}
}
Expand Up @@ -20,6 +20,8 @@

import java.util.HashMap;
import java.util.Map;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.pulsar.compaction.CompactionRecord;

public class AggregatedNamespaceStats {
public int topicsCount;
Expand Down Expand Up @@ -47,6 +49,16 @@ public class AggregatedNamespaceStats {

public Map<String, AggregatedSubscriptionStats> subscriptionStats = new HashMap<>();

long compactionRemovedEventCount;
long compactionSucceedCount;
long compactionFailedCount;
long compactionDurationTimeInMills;
double compactionReadThroughput;
double compactionWriteThroughput;
long compactionCompactedEntriesCount;
long compactionCompactedEntriesSize;
StatsBuckets compactionLatencyBuckets = new StatsBuckets(CompactionRecord.WRITE_LATENCY_BUCKETS_USEC);

void updateStats(TopicStats stats) {
topicsCount++;

Expand Down Expand Up @@ -112,6 +124,16 @@ void updateStats(TopicStats stats) {
consumerStats.unackedMessages += v.unackedMessages;
});
});

compactionRemovedEventCount += stats.compactionRemovedEventCount;
compactionSucceedCount += stats.compactionSucceedCount;
compactionFailedCount += stats.compactionFailedCount;
compactionDurationTimeInMills += stats.compactionDurationTimeInMills;
compactionReadThroughput += stats.compactionReadThroughput;
compactionWriteThroughput += stats.compactionWriteThroughput;
compactionCompactedEntriesCount += stats.compactionCompactedEntriesCount;
compactionCompactedEntriesSize += stats.compactionCompactedEntriesSize;
compactionLatencyBuckets.addAll(stats.compactionLatencyBuckets);
}

public void reset() {
Expand Down
Expand Up @@ -19,9 +19,13 @@
package org.apache.pulsar.broker.stats.prometheus;

import io.netty.util.concurrent.FastThreadLocal;
import java.util.Optional;
import java.util.concurrent.atomic.LongAdder;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
Expand All @@ -30,7 +34,11 @@
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.apache.pulsar.compaction.CompactedTopicContext;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.CompactorMXBean;

@Slf4j
public class NamespaceStatsAggregator {

private static FastThreadLocal<AggregatedNamespaceStats> localNamespaceStats =
Expand All @@ -57,6 +65,7 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b

printDefaultBrokerStats(stream, cluster);

Optional<CompactorMXBean> compactorMXBean = getCompactorMXBean(pulsar);
LongAdder topicsCount = new LongAdder();
pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> {
namespaceStats.reset();
Expand All @@ -66,11 +75,13 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b
topicsMap.forEach((name, topic) -> {
getTopicStats(topic, topicStats, includeConsumerMetrics, includeProducerMetrics,
pulsar.getConfiguration().isExposePreciseBacklogInPrometheus(),
pulsar.getConfiguration().isExposeSubscriptionBacklogSizeInPrometheus());
pulsar.getConfiguration().isExposeSubscriptionBacklogSizeInPrometheus(),
compactorMXBean
);

if (includeTopicMetrics) {
topicsCount.add(1);
TopicStats.printTopicStats(stream, cluster, namespace, name, topicStats);
TopicStats.printTopicStats(stream, cluster, namespace, name, topicStats, compactorMXBean);
} else {
namespaceStats.updateStats(topicStats);
}
Expand All @@ -87,8 +98,19 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b
});
}

private static Optional<CompactorMXBean> getCompactorMXBean(PulsarService pulsar) {
Compactor compactor = null;
try {
compactor = pulsar.getCompactor(false);
} catch (PulsarServerException e) {
log.error("get compactor error", e);
}
return Optional.ofNullable(compactor).map(c -> c.getStats());
}

private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics,
boolean includeProducerMetrics, boolean getPreciseBacklog, boolean subscriptionBacklogSize) {
boolean includeProducerMetrics, boolean getPreciseBacklog, boolean subscriptionBacklogSize,
Optional<CompactorMXBean> compactorMXBean) {
stats.reset();

if (topic instanceof PersistentTopic) {
Expand Down Expand Up @@ -220,6 +242,31 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
aggReplStats.connectedCount += replStats.connected ? 1 : 0;
aggReplStats.replicationDelayInSeconds += replStats.replicationDelayInSeconds;
});

compactorMXBean
.flatMap(mxBean -> mxBean.getCompactionRecordForTopic(topic.getName()))
.map(compactionRecord -> {
stats.compactionRemovedEventCount = compactionRecord.getCompactionRemovedEventCount();
stats.compactionSucceedCount = compactionRecord.getCompactionSucceedCount();
stats.compactionFailedCount = compactionRecord.getCompactionFailedCount();
stats.compactionDurationTimeInMills = compactionRecord.getCompactionDurationTimeInMills();
stats.compactionReadThroughput = compactionRecord.getCompactionReadThroughput();
stats.compactionWriteThroughput = compactionRecord.getCompactionWriteThroughput();
stats.compactionLatencyBuckets.addAll(compactionRecord.getCompactionLatencyStats());
stats.compactionLatencyBuckets.refresh();
PersistentTopic persistentTopic = (PersistentTopic) topic;
Optional<CompactedTopicContext> compactedTopicContext = persistentTopic
.getCompactedTopicContext();
if (compactedTopicContext.isPresent()) {
LedgerHandle ledger = compactedTopicContext.get().getLedger();
long entries = ledger.getLastAddConfirmed() + 1;
long size = ledger.getLength();

stats.compactionCompactedEntriesCount = entries;
stats.compactionCompactedEntriesSize = size;
}
return compactionRecord;
});
}

private static void printDefaultBrokerStats(SimpleTextOutputStream stream, String cluster) {
Expand Down
Expand Up @@ -20,7 +20,11 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.apache.pulsar.compaction.CompactionRecord;
import org.apache.pulsar.compaction.CompactorMXBean;

class TopicStats {

Expand Down Expand Up @@ -51,6 +55,16 @@ class TopicStats {
// Used for tracking duplicate TYPE definitions
static Map<String, String> metricWithTypeDefinition = new HashMap<>();

// For compaction
long compactionRemovedEventCount;
long compactionSucceedCount;
long compactionFailedCount;
long compactionDurationTimeInMills;
double compactionReadThroughput;
double compactionWriteThroughput;
long compactionCompactedEntriesCount;
long compactionCompactedEntriesSize;
StatsBuckets compactionLatencyBuckets = new StatsBuckets(CompactionRecord.WRITE_LATENCY_BUCKETS_USEC);

public void reset() {
subscriptionsCount = 0;
Expand All @@ -73,14 +87,24 @@ public void reset() {
replicationStats.clear();
subscriptionStats.clear();
producerStats.clear();

compactionRemovedEventCount = 0;
compactionSucceedCount = 0;
compactionFailedCount = 0;
compactionDurationTimeInMills = 0;
compactionReadThroughput = 0;
compactionWriteThroughput = 0;
compactionCompactedEntriesCount = 0;
compactionCompactedEntriesSize = 0;
compactionLatencyBuckets.reset();
}

static void resetTypes() {
metricWithTypeDefinition.clear();
}

static void printTopicStats(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
TopicStats stats) {
TopicStats stats, Optional<CompactorMXBean> compactorMXBean) {
metric(stream, cluster, namespace, topic, "pulsar_subscriptions_count", stats.subscriptionsCount);
metric(stream, cluster, namespace, topic, "pulsar_producers_count", stats.producersCount);
metric(stream, cluster, namespace, topic, "pulsar_consumers_count", stats.consumersCount);
Expand Down Expand Up @@ -250,6 +274,53 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin

metric(stream, cluster, namespace, topic, "pulsar_in_bytes_total", stats.bytesInCounter);
metric(stream, cluster, namespace, topic, "pulsar_in_messages_total", stats.msgInCounter);

// Compaction
boolean hasCompaction = compactorMXBean.flatMap(mxBean -> mxBean.getCompactionRecordForTopic(topic))
.map(__ -> true).orElse(false);
if (hasCompaction) {
metric(stream, cluster, namespace, topic, "pulsar_compaction_removed_event_count",
stats.compactionRemovedEventCount);
metric(stream, cluster, namespace, topic, "pulsar_compaction_succeed_count",
stats.compactionSucceedCount);
metric(stream, cluster, namespace, topic, "pulsar_compaction_failed_count",
stats.compactionFailedCount);
metric(stream, cluster, namespace, topic, "pulsar_compaction_duration_time_in_mills",
stats.compactionDurationTimeInMills);
metric(stream, cluster, namespace, topic, "pulsar_compaction_read_throughput",
stats.compactionReadThroughput);
metric(stream, cluster, namespace, topic, "pulsar_compaction_write_throughput",
stats.compactionWriteThroughput);
metric(stream, cluster, namespace, topic, "pulsar_compaction_compacted_entries_count",
stats.compactionCompactedEntriesCount);
metric(stream, cluster, namespace, topic, "pulsar_compaction_compacted_entries_size",
stats.compactionCompactedEntriesSize);
long[] compactionLatencyBuckets = stats.compactionLatencyBuckets.getBuckets();
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_0_5",
compactionLatencyBuckets[0]);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_1",
compactionLatencyBuckets[1]);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_5",
compactionLatencyBuckets[2]);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_10",
compactionLatencyBuckets[3]);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_20",
compactionLatencyBuckets[4]);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_50",
compactionLatencyBuckets[5]);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_100",
compactionLatencyBuckets[6]);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_200",
compactionLatencyBuckets[7]);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_1000",
compactionLatencyBuckets[8]);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_overflow",
compactionLatencyBuckets[9]);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_sum",
stats.compactionLatencyBuckets.getSum());
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_count",
stats.compactionLatencyBuckets.getCount());
}
}

static void metricType(SimpleTextOutputStream stream, String name) {
Expand Down