Skip to content

Commit

Permalink
Add compacted topic metrics for TopicStats in CLI (apache#11564)
Browse files Browse the repository at this point in the history
### Motivation
Add below metrics to help track potential flows or examine the overall condition of compacted topics .
- lastCompactionRemovedEventCount : the removed event count of last compaction
- lastCompactionSucceedTimestamp : the timestamp of last succeed compaction
- lastCompactionFailedTimestamp : the timestamp of last failed compaction
- lastCompactionDurationTimeInMills: the duration time of last compaction

These 4 metrics will be displayed in topic stats CLI :
```
./pulsar-admin topics stats persistent://tenant/ns/topic
```

### Documentation
This patch will add metrics in CLI , which would generate doc automatically.
  • Loading branch information
Technoboy- committed Aug 21, 2021
1 parent a8a95a4 commit c0ef593
Show file tree
Hide file tree
Showing 14 changed files with 380 additions and 25 deletions.
Expand Up @@ -1324,12 +1324,16 @@ protected synchronized ScheduledExecutorService getCompactorExecutor() {
// only public so mockito can mock it
public Compactor newCompactor() throws PulsarServerException {
return new TwoPhaseCompactor(this.getConfiguration(),
getClient(), getBookKeeperClient(),
getCompactorExecutor());
getClient(), getBookKeeperClient(),
getCompactorExecutor());
}

public synchronized Compactor getCompactor() throws PulsarServerException {
if (this.compactor == null) {
return getCompactor(true);
}

public synchronized Compactor getCompactor(boolean shouldInitialize) throws PulsarServerException {
if (this.compactor == null && shouldInitialize) {
this.compactor = newCompactor();
}
return this.compactor;
Expand All @@ -1338,8 +1342,8 @@ public synchronized Compactor getCompactor() throws PulsarServerException {
protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImpl offloadPolicies) {
if (this.offloaderScheduler == null) {
this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder()
.numThreads(offloadPolicies.getManagedLedgerOffloadMaxThreads())
.name("offloader").build();
.numThreads(offloadPolicies.getManagedLedgerOffloadMaxThreads())
.name("offloader").build();
}
return this.offloaderScheduler;
}
Expand Down
Expand Up @@ -160,6 +160,7 @@
import org.apache.pulsar.common.util.netty.ChannelFutures;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.common.util.netty.NettyFutureUtil;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
Expand Down Expand Up @@ -1732,8 +1733,15 @@ public void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle)
}
}
}

topics.remove(topic);

try {
Compactor compactor = pulsar.getCompactor(false);
if (compactor != null) {
compactor.getStats().removeTopic(topic);
}
} catch (PulsarServerException ignore) {
}
}

public int getNumberOfNamespaceBundles() {
Expand Down
Expand Up @@ -150,6 +150,8 @@
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.CompactedTopicImpl;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.CompactorMXBean;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.utils.StatsOutputStream;
Expand Down Expand Up @@ -1903,9 +1905,29 @@ public TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBa
stats.lastOffloadLedgerId = ledger.getLastOffloadedLedgerId();
stats.lastOffloadSuccessTimeStamp = ledger.getLastOffloadedSuccessTimestamp();
stats.lastOffloadFailureTimeStamp = ledger.getLastOffloadedFailureTimestamp();
Optional<CompactorMXBean> mxBean = getCompactorMXBean();
stats.compaction.lastCompactionRemovedEventCount = mxBean.map(stat ->
stat.getLastCompactionRemovedEventCount(topic)).orElse(0L);
stats.compaction.lastCompactionSucceedTimestamp = mxBean.map(stat ->
stat.getLastCompactionSucceedTimestamp(topic)).orElse(0L);
stats.compaction.lastCompactionFailedTimestamp = mxBean.map(stat ->
stat.getLastCompactionFailedTimestamp(topic)).orElse(0L);
stats.compaction.lastCompactionDurationTimeInMills = mxBean.map(stat ->
stat.getLastCompactionDurationTimeInMills(topic)).orElse(0L);

return stats;
}

private Optional<CompactorMXBean> getCompactorMXBean() {
Compactor compactor = null;
try {
compactor = brokerService.pulsar().getCompactor(false);
} catch (PulsarServerException ex) {
log.warn("get compactor error", ex);
}
return Optional.ofNullable(compactor).map(c -> c.getStats());
}

@Override
public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean includeLedgerMetadata) {

Expand Down
Expand Up @@ -42,6 +42,7 @@ public abstract class Compactor {
protected final ScheduledExecutorService scheduler;
private final PulsarClient pulsar;
private final BookKeeper bk;
protected final CompactorMXBeanImpl mxBean;

public Compactor(ServiceConfiguration conf,
PulsarClient pulsar,
Expand All @@ -51,6 +52,7 @@ public Compactor(ServiceConfiguration conf,
this.scheduler = scheduler;
this.pulsar = pulsar;
this.bk = bk;
this.mxBean = new CompactorMXBeanImpl();
}

public CompletableFuture<Long> compact(String topic) {
Expand All @@ -60,23 +62,30 @@ public CompletableFuture<Long> compact(String topic) {

private CompletableFuture<Long> compactAndCloseReader(RawReader reader) {
CompletableFuture<Long> promise = new CompletableFuture<>();
mxBean.addCompactionStartOp(reader.getTopic());
doCompaction(reader, bk).whenComplete(
(ledgerId, exception) -> {
reader.closeAsync().whenComplete((v, exception2) -> {
if (exception2 != null) {
log.warn("Error closing reader handle {}, ignoring", reader, exception2);
}
if (exception != null) {
// complete with original exception
promise.completeExceptionally(exception);
} else {
promise.complete(ledgerId);
}
});
if (exception2 != null) {
log.warn("Error closing reader handle {}, ignoring", reader, exception2);
}
if (exception != null) {
// complete with original exception
mxBean.addCompactionEndOp(reader.getTopic(), false);
promise.completeExceptionally(exception);
} else {
mxBean.addCompactionEndOp(reader.getTopic(), true);
promise.complete(ledgerId);
}
});
});
return promise;
}

protected abstract CompletableFuture<Long> doCompaction(RawReader reader, BookKeeper bk);

public CompactorMXBean getStats() {
return this.mxBean;
}
}

@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.compaction;

import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;

/**
* JMX Bean interface for Compactor stats.
*/
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Stable
public interface CompactorMXBean {

/**
* @return the removed event count of last compaction
*/
long getLastCompactionRemovedEventCount(String topic);

/**
* @return the timestamp of last succeed compaction
*/
long getLastCompactionSucceedTimestamp(String topic);

/**
* @return the timestamp of last failed compaction
*/
long getLastCompactionFailedTimestamp(String topic);

/**
* @return the duration time of last compaction
*/
long getLastCompactionDurationTimeInMills(String topic);

/**
* Remove metrics about this topic.
* @param topic
*/
void removeTopic(String topic);

}
@@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.compaction;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;

public class CompactorMXBeanImpl implements CompactorMXBean {

private final ConcurrentHashMap<String, CompactRecord> compactRecordOps = new ConcurrentHashMap<>();

public void addCompactionRemovedEvent(String topic) {
compactRecordOps.computeIfAbsent(topic, k -> new CompactRecord()).addCompactionRemovedEvent();
}

public void addCompactionStartOp(String topic) {
compactRecordOps.computeIfAbsent(topic, k -> new CompactRecord()).reset();
}

public void addCompactionEndOp(String topic, boolean succeed) {
CompactRecord compactRecord = compactRecordOps.computeIfAbsent(topic, k -> new CompactRecord());
compactRecord.lastCompactionDurationTimeInMills = System.currentTimeMillis()
- compactRecord.lastCompactionStartTimeOp;
compactRecord.lastCompactionRemovedEventCount = compactRecord.lastCompactionRemovedEventCountOp.longValue();
if (succeed) {
compactRecord.lastCompactionSucceedTimestamp = System.currentTimeMillis();
} else {
compactRecord.lastCompactionFailedTimestamp = System.currentTimeMillis();
}
}

@Override
public long getLastCompactionRemovedEventCount(String topic) {
return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionRemovedEventCount;
}

@Override
public long getLastCompactionSucceedTimestamp(String topic) {
return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionSucceedTimestamp;
}

@Override
public long getLastCompactionFailedTimestamp(String topic) {
return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionFailedTimestamp;
}

@Override
public long getLastCompactionDurationTimeInMills(String topic) {
return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionDurationTimeInMills;
}

@Override
public void removeTopic(String topic) {
compactRecordOps.remove(topic);
}

static class CompactRecord {

private long lastCompactionRemovedEventCount = 0L;
private long lastCompactionSucceedTimestamp = 0L;
private long lastCompactionFailedTimestamp = 0L;
private long lastCompactionDurationTimeInMills = 0L;

private LongAdder lastCompactionRemovedEventCountOp = new LongAdder();
private long lastCompactionStartTimeOp;

public void addCompactionRemovedEvent() {
lastCompactionRemovedEventCountOp.increment();
}

public void reset() {
lastCompactionRemovedEventCountOp.reset();
lastCompactionStartTimeOp = System.currentTimeMillis();
}
}
}
Expand Up @@ -124,18 +124,23 @@ private void phaseOneLoop(RawReader reader,
try {
MessageId id = m.getMessageId();
boolean deletedMessage = false;
boolean replaceMessage = false;
if (RawBatchConverter.isReadableBatch(m)) {
try {
for (ImmutableTriple<MessageId, String, Integer> e : RawBatchConverter
.extractIdsAndKeysAndSize(m)) {
if (e != null) {
if (e.getRight() > 0) {
latestForKey.put(e.getMiddle(), e.getLeft());
MessageId old = latestForKey.put(e.getMiddle(), e.getLeft());
replaceMessage = old != null;
} else {
deletedMessage = true;
latestForKey.remove(e.getMiddle());
}
}
if (replaceMessage || deletedMessage) {
mxBean.addCompactionRemovedEvent(reader.getTopic());
}
}
} catch (IOException ioe) {
log.info("Error decoding batch for message {}. Whole batch will be included in output",
Expand All @@ -145,14 +150,17 @@ private void phaseOneLoop(RawReader reader,
Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
if (keyAndSize != null) {
if (keyAndSize.getRight() > 0) {
latestForKey.put(keyAndSize.getLeft(), id);
MessageId old = latestForKey.put(keyAndSize.getLeft(), id);
replaceMessage = old != null;
} else {
deletedMessage = true;
latestForKey.remove(keyAndSize.getLeft());
}
}
if (replaceMessage || deletedMessage) {
mxBean.addCompactionRemovedEvent(reader.getTopic());
}
}

MessageId first = firstMessageId.orElse(deletedMessage ? null : id);
MessageId to = deletedMessage ? toMessageId.orElse(null) : id;
if (id.compareTo(lastMessageId) == 0) {
Expand Down
@@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.compaction;

import org.apache.pulsar.broker.service.BrokerService;
import org.mockito.Mockito;
import org.testng.annotations.Test;

import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

@Test(groups = "broker-compaction")
public class CompactorMXBeanImplTest {

@Test
public void testSimple() throws Exception {
CompactorMXBeanImpl mxBean = new CompactorMXBeanImpl();
String topic = "topic1";
mxBean.addCompactionStartOp(topic);
assertEquals(mxBean.getLastCompactionRemovedEventCount(topic), 0, 0);
mxBean.addCompactionRemovedEvent(topic);
assertEquals(mxBean.getLastCompactionRemovedEventCount(topic), 0, 0);
mxBean.addCompactionEndOp(topic, true);
mxBean.addCompactionEndOp(topic, false);
assertEquals(mxBean.getLastCompactionRemovedEventCount(topic), 1, 0);
assertTrue(mxBean.getLastCompactionSucceedTimestamp(topic) > 0L);
assertTrue(mxBean.getLastCompactionFailedTimestamp(topic) > 0L);
assertTrue(mxBean.getLastCompactionDurationTimeInMills(topic) >= 0L);
}

}

0 comments on commit c0ef593

Please sign in to comment.