diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 6a10daf24bd03..70a0be855e284 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1324,12 +1324,16 @@ protected synchronized ScheduledExecutorService getCompactorExecutor() { // only public so mockito can mock it public Compactor newCompactor() throws PulsarServerException { return new TwoPhaseCompactor(this.getConfiguration(), - getClient(), getBookKeeperClient(), - getCompactorExecutor()); + getClient(), getBookKeeperClient(), + getCompactorExecutor()); } public synchronized Compactor getCompactor() throws PulsarServerException { - if (this.compactor == null) { + return getCompactor(true); + } + + public synchronized Compactor getCompactor(boolean shouldInitialize) throws PulsarServerException { + if (this.compactor == null && shouldInitialize) { this.compactor = newCompactor(); } return this.compactor; @@ -1338,8 +1342,8 @@ public synchronized Compactor getCompactor() throws PulsarServerException { protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImpl offloadPolicies) { if (this.offloaderScheduler == null) { this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder() - .numThreads(offloadPolicies.getManagedLedgerOffloadMaxThreads()) - .name("offloader").build(); + .numThreads(offloadPolicies.getManagedLedgerOffloadMaxThreads()) + .name("offloader").build(); } return this.offloaderScheduler; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index c0cd31101c915..93bffb4d1b821 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -160,6 +160,7 @@ import org.apache.pulsar.common.util.netty.ChannelFutures; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.apache.pulsar.common.util.netty.NettyFutureUtil; +import org.apache.pulsar.compaction.Compactor; import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.Notification; @@ -1732,8 +1733,15 @@ public void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle) } } } - topics.remove(topic); + + try { + Compactor compactor = pulsar.getCompactor(false); + if (compactor != null) { + compactor.getStats().removeTopic(topic); + } + } catch (PulsarServerException ignore) { + } } public int getNumberOfNamespaceBundles() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 7128f08e4f3a6..83d6f5ec00aa5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -150,6 +150,8 @@ import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.compaction.CompactedTopic; import org.apache.pulsar.compaction.CompactedTopicImpl; +import org.apache.pulsar.compaction.Compactor; +import org.apache.pulsar.compaction.CompactorMXBean; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.apache.pulsar.utils.StatsOutputStream; @@ -1903,9 +1905,29 @@ public TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBa stats.lastOffloadLedgerId = ledger.getLastOffloadedLedgerId(); stats.lastOffloadSuccessTimeStamp = ledger.getLastOffloadedSuccessTimestamp(); stats.lastOffloadFailureTimeStamp = ledger.getLastOffloadedFailureTimestamp(); + Optional mxBean = getCompactorMXBean(); + stats.compaction.lastCompactionRemovedEventCount = mxBean.map(stat -> + stat.getLastCompactionRemovedEventCount(topic)).orElse(0L); + stats.compaction.lastCompactionSucceedTimestamp = mxBean.map(stat -> + stat.getLastCompactionSucceedTimestamp(topic)).orElse(0L); + stats.compaction.lastCompactionFailedTimestamp = mxBean.map(stat -> + stat.getLastCompactionFailedTimestamp(topic)).orElse(0L); + stats.compaction.lastCompactionDurationTimeInMills = mxBean.map(stat -> + stat.getLastCompactionDurationTimeInMills(topic)).orElse(0L); + return stats; } + private Optional getCompactorMXBean() { + Compactor compactor = null; + try { + compactor = brokerService.pulsar().getCompactor(false); + } catch (PulsarServerException ex) { + log.warn("get compactor error", ex); + } + return Optional.ofNullable(compactor).map(c -> c.getStats()); + } + @Override public CompletableFuture getInternalStats(boolean includeLedgerMetadata) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java index 61032d6e72831..cb631e29f6343 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java @@ -42,6 +42,7 @@ public abstract class Compactor { protected final ScheduledExecutorService scheduler; private final PulsarClient pulsar; private final BookKeeper bk; + protected final CompactorMXBeanImpl mxBean; public Compactor(ServiceConfiguration conf, PulsarClient pulsar, @@ -51,6 +52,7 @@ public Compactor(ServiceConfiguration conf, this.scheduler = scheduler; this.pulsar = pulsar; this.bk = bk; + this.mxBean = new CompactorMXBeanImpl(); } public CompletableFuture compact(String topic) { @@ -60,23 +62,30 @@ public CompletableFuture compact(String topic) { private CompletableFuture compactAndCloseReader(RawReader reader) { CompletableFuture promise = new CompletableFuture<>(); + mxBean.addCompactionStartOp(reader.getTopic()); doCompaction(reader, bk).whenComplete( (ledgerId, exception) -> { reader.closeAsync().whenComplete((v, exception2) -> { - if (exception2 != null) { - log.warn("Error closing reader handle {}, ignoring", reader, exception2); - } - if (exception != null) { - // complete with original exception - promise.completeExceptionally(exception); - } else { - promise.complete(ledgerId); - } - }); + if (exception2 != null) { + log.warn("Error closing reader handle {}, ignoring", reader, exception2); + } + if (exception != null) { + // complete with original exception + mxBean.addCompactionEndOp(reader.getTopic(), false); + promise.completeExceptionally(exception); + } else { + mxBean.addCompactionEndOp(reader.getTopic(), true); + promise.complete(ledgerId); + } + }); }); return promise; } protected abstract CompletableFuture doCompaction(RawReader reader, BookKeeper bk); + + public CompactorMXBean getStats() { + return this.mxBean; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java new file mode 100644 index 0000000000000..54ca2e875f042 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import org.apache.bookkeeper.common.annotation.InterfaceAudience; +import org.apache.bookkeeper.common.annotation.InterfaceStability; + +/** + * JMX Bean interface for Compactor stats. + */ +@InterfaceAudience.LimitedPrivate +@InterfaceStability.Stable +public interface CompactorMXBean { + + /** + * @return the removed event count of last compaction + */ + long getLastCompactionRemovedEventCount(String topic); + + /** + * @return the timestamp of last succeed compaction + */ + long getLastCompactionSucceedTimestamp(String topic); + + /** + * @return the timestamp of last failed compaction + */ + long getLastCompactionFailedTimestamp(String topic); + + /** + * @return the duration time of last compaction + */ + long getLastCompactionDurationTimeInMills(String topic); + + /** + * Remove metrics about this topic. + * @param topic + */ + void removeTopic(String topic); + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java new file mode 100644 index 0000000000000..05db2aad4eae4 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.LongAdder; + +public class CompactorMXBeanImpl implements CompactorMXBean { + + private final ConcurrentHashMap compactRecordOps = new ConcurrentHashMap<>(); + + public void addCompactionRemovedEvent(String topic) { + compactRecordOps.computeIfAbsent(topic, k -> new CompactRecord()).addCompactionRemovedEvent(); + } + + public void addCompactionStartOp(String topic) { + compactRecordOps.computeIfAbsent(topic, k -> new CompactRecord()).reset(); + } + + public void addCompactionEndOp(String topic, boolean succeed) { + CompactRecord compactRecord = compactRecordOps.computeIfAbsent(topic, k -> new CompactRecord()); + compactRecord.lastCompactionDurationTimeInMills = System.currentTimeMillis() + - compactRecord.lastCompactionStartTimeOp; + compactRecord.lastCompactionRemovedEventCount = compactRecord.lastCompactionRemovedEventCountOp.longValue(); + if (succeed) { + compactRecord.lastCompactionSucceedTimestamp = System.currentTimeMillis(); + } else { + compactRecord.lastCompactionFailedTimestamp = System.currentTimeMillis(); + } + } + + @Override + public long getLastCompactionRemovedEventCount(String topic) { + return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionRemovedEventCount; + } + + @Override + public long getLastCompactionSucceedTimestamp(String topic) { + return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionSucceedTimestamp; + } + + @Override + public long getLastCompactionFailedTimestamp(String topic) { + return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionFailedTimestamp; + } + + @Override + public long getLastCompactionDurationTimeInMills(String topic) { + return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionDurationTimeInMills; + } + + @Override + public void removeTopic(String topic) { + compactRecordOps.remove(topic); + } + + static class CompactRecord { + + private long lastCompactionRemovedEventCount = 0L; + private long lastCompactionSucceedTimestamp = 0L; + private long lastCompactionFailedTimestamp = 0L; + private long lastCompactionDurationTimeInMills = 0L; + + private LongAdder lastCompactionRemovedEventCountOp = new LongAdder(); + private long lastCompactionStartTimeOp; + + public void addCompactionRemovedEvent() { + lastCompactionRemovedEventCountOp.increment(); + } + + public void reset() { + lastCompactionRemovedEventCountOp.reset(); + lastCompactionStartTimeOp = System.currentTimeMillis(); + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index 0f0f981136169..49d121f9cddc3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -124,18 +124,23 @@ private void phaseOneLoop(RawReader reader, try { MessageId id = m.getMessageId(); boolean deletedMessage = false; + boolean replaceMessage = false; if (RawBatchConverter.isReadableBatch(m)) { try { for (ImmutableTriple e : RawBatchConverter .extractIdsAndKeysAndSize(m)) { if (e != null) { if (e.getRight() > 0) { - latestForKey.put(e.getMiddle(), e.getLeft()); + MessageId old = latestForKey.put(e.getMiddle(), e.getLeft()); + replaceMessage = old != null; } else { deletedMessage = true; latestForKey.remove(e.getMiddle()); } } + if (replaceMessage || deletedMessage) { + mxBean.addCompactionRemovedEvent(reader.getTopic()); + } } } catch (IOException ioe) { log.info("Error decoding batch for message {}. Whole batch will be included in output", @@ -145,14 +150,17 @@ private void phaseOneLoop(RawReader reader, Pair keyAndSize = extractKeyAndSize(m); if (keyAndSize != null) { if (keyAndSize.getRight() > 0) { - latestForKey.put(keyAndSize.getLeft(), id); + MessageId old = latestForKey.put(keyAndSize.getLeft(), id); + replaceMessage = old != null; } else { deletedMessage = true; latestForKey.remove(keyAndSize.getLeft()); } } + if (replaceMessage || deletedMessage) { + mxBean.addCompactionRemovedEvent(reader.getTopic()); + } } - MessageId first = firstMessageId.orElse(deletedMessage ? null : id); MessageId to = deletedMessage ? toMessageId.orElse(null) : id; if (id.compareTo(lastMessageId) == 0) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorMXBeanImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorMXBeanImplTest.java new file mode 100644 index 0000000000000..b865396e87b2a --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorMXBeanImplTest.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import org.apache.pulsar.broker.service.BrokerService; +import org.mockito.Mockito; +import org.testng.annotations.Test; + +import java.util.Optional; +import java.util.concurrent.ScheduledExecutorService; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@Test(groups = "broker-compaction") +public class CompactorMXBeanImplTest { + + @Test + public void testSimple() throws Exception { + CompactorMXBeanImpl mxBean = new CompactorMXBeanImpl(); + String topic = "topic1"; + mxBean.addCompactionStartOp(topic); + assertEquals(mxBean.getLastCompactionRemovedEventCount(topic), 0, 0); + mxBean.addCompactionRemovedEvent(topic); + assertEquals(mxBean.getLastCompactionRemovedEventCount(topic), 0, 0); + mxBean.addCompactionEndOp(topic, true); + mxBean.addCompactionEndOp(topic, false); + assertEquals(mxBean.getLastCompactionRemovedEventCount(topic), 1, 0); + assertTrue(mxBean.getLastCompactionSucceedTimestamp(topic) > 0L); + assertTrue(mxBean.getLastCompactionFailedTimestamp(topic) > 0L); + assertTrue(mxBean.getLastCompactionDurationTimeInMills(topic) >= 0L); + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java index 0d1a95c2c0ca5..8197c4ab9314f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java @@ -84,7 +84,7 @@ public void cleanup() throws Exception { compactionScheduler.shutdownNow(); } - private List compactAndVerify(String topic, Map expected) throws Exception { + private List compactAndVerify(String topic, Map expected, boolean checkMetrics) throws Exception { BookKeeper bk = pulsar.getBookKeeperClientFactory().create( this.conf, null, null, Optional.empty(), null); Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); @@ -112,6 +112,16 @@ private List compactAndVerify(String topic, Map expected "Compacted version should match expected version"); m.close(); } + if (checkMetrics) { + long compactedTopicRemovedEventCount = compactor.getStats().getLastCompactionRemovedEventCount(topic); + long lastCompactSucceedTimestamp = compactor.getStats().getLastCompactionSucceedTimestamp(topic); + long lastCompactFailedTimestamp = compactor.getStats().getLastCompactionFailedTimestamp(topic); + long lastCompactDurationTimeInMills = compactor.getStats().getLastCompactionDurationTimeInMills(topic); + Assert.assertTrue(compactedTopicRemovedEventCount >= 1); + Assert.assertTrue(lastCompactSucceedTimestamp >= 1L); + Assert.assertTrue(lastCompactDurationTimeInMills >= 0L); + Assert.assertEquals(lastCompactFailedTimestamp, 0L); + } Assert.assertTrue(expected.isEmpty(), "All expected keys should have been found"); return keys; } @@ -140,7 +150,7 @@ public void testCompaction() throws Exception { .send(); expected.put(key, data); } - compactAndVerify(topic, expected); + compactAndVerify(topic, expected, true); } @Test @@ -169,7 +179,7 @@ public void testCompactAddCompact() throws Exception { expected.put("a", "A_2".getBytes()); expected.put("b", "B_1".getBytes()); - compactAndVerify(topic, new HashMap<>(expected)); + compactAndVerify(topic, new HashMap<>(expected), false); producer.newMessage() .key("b") @@ -177,7 +187,7 @@ public void testCompactAddCompact() throws Exception { .send(); expected.put("b", "B_2".getBytes()); - compactAndVerify(topic, expected); + compactAndVerify(topic, expected, false); } @Test @@ -206,7 +216,7 @@ public void testCompactedInOrder() throws Exception { expected.put("b", "B_1".getBytes()); expected.put("c", "C_1".getBytes()); - List keyOrder = compactAndVerify(topic, expected); + List keyOrder = compactAndVerify(topic, expected, false); Assert.assertEquals(keyOrder, Lists.newArrayList("c", "b", "a")); } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/CompactionStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/CompactionStats.java new file mode 100644 index 0000000000000..0500017064625 --- /dev/null +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/CompactionStats.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; + +/** + * Statistics about compaction. + */ +public interface CompactionStats { + + /** The removed event count of last compaction. */ + long getLastCompactionRemovedEventCount(); + + /** The timestamp of last succeed compaction. */ + long getLastCompactionSucceedTimestamp(); + + /** The timestamp of last failed compaction. */ + long getLastCompactionFailedTimestamp(); + + /** The duration time of last compaction. */ + long getLastCompactionDurationTimeInMills(); +} diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java index afc18108686dc..dc1964e515d75 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java @@ -85,4 +85,7 @@ public interface TopicStats { /** The serialized size of non-contiguous deleted messages ranges. */ int getNonContiguousDeletedMessagesRangesSerializedSize(); + + /** The compaction stats. */ + CompactionStats getCompaction(); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/CompactionStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/CompactionStatsImpl.java new file mode 100644 index 0000000000000..e187f6aaee909 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/CompactionStatsImpl.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data.stats; + +import lombok.Data; +import org.apache.pulsar.common.policies.data.CompactionStats; +/** + * Statistics about compaction. + */ +@Data +public class CompactionStatsImpl implements CompactionStats { + + /** The removed event count of last compaction. */ + public long lastCompactionRemovedEventCount; + + /** The timestamp of last succeed compaction. */ + public long lastCompactionSucceedTimestamp; + + /** The timestamp of last failed compaction. */ + public long lastCompactionFailedTimestamp; + + /** The duration time of last compaction. */ + public long lastCompactionDurationTimeInMills; + + public void reset() { + this.lastCompactionRemovedEventCount = 0; + this.lastCompactionSucceedTimestamp = 0; + this.lastCompactionFailedTimestamp = 0; + this.lastCompactionDurationTimeInMills = 0; + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java index 7ad8e83edf661..f3b3944b5fdc3 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java @@ -30,7 +30,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.TreeMap; /** @@ -114,6 +113,9 @@ public class TopicStatsImpl implements TopicStats { /** The serialized size of non-contiguous deleted messages ranges. */ public int nonContiguousDeletedMessagesRangesSerializedSize; + /** The compaction stats */ + public CompactionStatsImpl compaction; + public List getPublishers() { return publishers; } @@ -130,6 +132,7 @@ public TopicStatsImpl() { this.publishers = new ArrayList<>(); this.subscriptions = new HashMap<>(); this.replication = new TreeMap<>(); + this.compaction = new CompactionStatsImpl(); } public void reset() { @@ -157,6 +160,7 @@ public void reset() { this.lastOffloadLedgerId = 0; this.lastOffloadFailureTimeStamp = 0; this.lastOffloadSuccessTimeStamp = 0; + this.compaction.reset(); } // if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java index fa67fb080d3cf..9c4c3b24e1833 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java @@ -60,6 +60,10 @@ public void testPersistentTopicStats() { assertEquals(topicStats.publishers.size(), 1); assertEquals(topicStats.subscriptions.size(), 1); assertEquals(topicStats.replication.size(), 1); + assertEquals(topicStats.compaction.lastCompactionRemovedEventCount, 0); + assertEquals(topicStats.compaction.lastCompactionSucceedTimestamp, 0); + assertEquals(topicStats.compaction.lastCompactionFailedTimestamp, 0); + assertEquals(topicStats.compaction.lastCompactionDurationTimeInMills, 0); topicStats.reset(); assertEquals(topicStats.msgRateIn, 0.0); assertEquals(topicStats.msgThroughputIn, 0.0);