From 8953ffa53468fd71c4832d386261dcbe40b6c269 Mon Sep 17 00:00:00 2001 From: Asaf Mesika Date: Tue, 9 Jan 2024 11:34:52 +0200 Subject: [PATCH] Found a different, less flaky way to test if cache was used. --- .../mledger/ManagedLedgerMXBean.java | 5 ++ .../mledger/impl/ManagedLedgerImpl.java | 3 +- .../mledger/impl/ManagedLedgerMBeanImpl.java | 12 ++++- .../service/persistent/PersistentTopic.java | 51 +++++++++++++++---- .../persistent/PersistentTopicMetrics.java | 9 ---- .../service/BacklogQuotaManagerTest.java | 27 +++++----- pulsar-broker/src/test/resources/log4j2.xml | 4 +- 7 files changed, 74 insertions(+), 37 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java index 50a3ffb157961..cb6d3700afe3a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java @@ -90,6 +90,11 @@ public interface ManagedLedgerMXBean { */ long getAddEntryErrors(); + /** + * @return the number of entries read from the managed ledger (from cache or BK) + */ + long getEntriesReadTotalCount(); + /** * @return the number of readEntries requests that succeeded */ diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 4084d7004a80d..12a191dda86da 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -323,7 +323,7 @@ public enum PositionBound { /** * This variable is used for testing the tests. - * {@link ManagedLedgerTest#testManagedLedgerWithPlacementPolicyInCustomMetadata()} + * ManagedLedgerTest#testManagedLedgerWithPlacementPolicyInCustomMetadata() */ @VisibleForTesting Map createdLedgerCustomMetadata; @@ -2129,6 +2129,7 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) } protected void asyncReadEntry(ReadHandle ledger, PositionImpl position, ReadEntryCallback callback, Object ctx) { + mbean.addEntriesRead(1); if (config.getReadEntryTimeoutSeconds() > 0) { // set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java index e057dee99538e..3935828ff3d80 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java @@ -41,6 +41,7 @@ public class ManagedLedgerMBeanImpl implements ManagedLedgerMXBean { private final Rate readEntriesOpsFailed = new Rate(); private final Rate readEntriesOpsCacheMisses = new Rate(); private final Rate markDeleteOps = new Rate(); + private final Rate entriesRead = new Rate(); private final LongAdder dataLedgerOpenOp = new LongAdder(); private final LongAdder dataLedgerCloseOp = new LongAdder(); @@ -80,6 +81,7 @@ public void refreshStats(long period, TimeUnit unit) { ledgerAddEntryLatencyStatsUsec.refresh(); ledgerSwitchLatencyStatsUsec.refresh(); entryStats.refresh(); + entriesRead.calculateRate(seconds); } public void addAddEntrySample(long size) { @@ -120,6 +122,10 @@ public void addReadEntriesSample(int count, long totalSize) { readEntriesOps.recordMultipleEvents(count, totalSize); } + public void addEntriesRead(int count) { + entriesRead.recordEvent(count); + } + public void startDataLedgerOpenOp() { dataLedgerOpenOp.increment(); } @@ -189,6 +195,11 @@ public String getName() { return managedLedger.getName(); } + @Override + public long getEntriesReadTotalCount() { + return entriesRead.getTotalCount(); + } + @Override public double getAddEntryMessagesRate() { return addEntryOps.getRate(); @@ -333,5 +344,4 @@ public PendingBookieOpsStats getPendingBookieOpsStats() { result.cursorLedgerDeleteOp = cursorLedgerDeleteOp.longValue(); return result; } - } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index ec86624869f2f..4ac55426e5d9c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -3255,6 +3255,9 @@ private void updateResultIfNewer(TimeBasedBacklogQuotaCheckResult updatedResult) public CompletableFuture checkTimeBacklogExceeded() { TopicName topicName = TopicName.get(getName()); int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime(); + if (log.isDebugEnabled()) { + log.debug("[{}] Time backlog quota = [{}]. Checking if exceeded.", topicName, backlogQuotaLimitInSecond); + } // If backlog quota by time is not set if (backlogQuotaLimitInSecond <= 0) { @@ -3267,6 +3270,10 @@ public CompletableFuture checkTimeBacklogExceeded() { // If we have no durable cursor since `ledger.getCursors()` only managed durable cursors if (oldestMarkDeleteCursorInfo == null || oldestMarkDeleteCursorInfo.getPosition() == null) { + if (log.isDebugEnabled()) { + log.debug("[{}] No durable cursor found. Skipping time based backlog quota check." + + " Oldest mark-delete cursor info: {}", topicName, oldestMarkDeleteCursorInfo); + } return CompletableFuture.completedFuture(false); } @@ -3285,15 +3292,23 @@ public CompletableFuture checkTimeBacklogExceeded() { oldestMarkDeleteCursorInfo.getVersion()); updateResultIfNewer(updatedResult); + if (log.isDebugEnabled()) { + log.debug("[{}] Time-based backlog quota check. Updating cached result for position {}, " + + "since cursor causing it has changed from {} to {}", + topicName, + oldestMarkDeletePosition, + lastCheckResult.getCursorName(), + oldestMarkDeleteCursorInfo.getCursor().getName()); + } } long entryTimestamp = lastCheckResult.getPositionPublishTimestampInMillis(); boolean expired = MessageImpl.isEntryExpired(backlogQuotaLimitInSecond, entryTimestamp); - if (expired && log.isDebugEnabled()) { - log.debug("(Using cache) Time based backlog quota exceeded, oldest entry in cursor {}'s backlog" - + " exceeded quota {}", lastCheckResult.getCursorName(), backlogQuotaLimitInSecond); + if (log.isDebugEnabled()) { + log.debug("[{}] Time based backlog quota check. Using cache result for position {}. " + + "Entry timestamp: {}, expired: {}", + topicName, oldestMarkDeletePosition, entryTimestamp, expired); } - persistentTopicMetrics.getBacklogQuotaMetrics().recordTimeBasedBacklogQuotaCheckReadFromCache(); return CompletableFuture.completedFuture(expired); } @@ -3317,10 +3332,20 @@ public void readEntryComplete(Entry entry, Object ctx) { oldestMarkDeleteCursorInfo.getVersion())); boolean expired = MessageImpl.isEntryExpired(backlogQuotaLimitInSecond, entryTimestamp); - if (expired && log.isDebugEnabled()) { - log.debug("Time based backlog quota exceeded, oldest entry in cursor {}'s backlog" - + " exceeded quota {}", ledger.getSlowestConsumer().getName(), - backlogQuotaLimitInSecond); + if (log.isDebugEnabled()) { + log.debug("[{}] Time based backlog quota check. Oldest unacked entry read from BK. " + + "Oldest entry in cursor {}'s backlog: {}. " + + "Oldest mark-delete position: {}. " + + "Quota {}. Last check result position [{}]. " + + "Expired: {}, entryTimestamp: {}", + topicName, + oldestMarkDeleteCursorInfo.getCursor().getName(), + position, + oldestMarkDeletePosition, + backlogQuotaLimitInSecond, + lastCheckResult.getOldestCursorMarkDeletePosition(), + expired, + entryTimestamp); } future.complete(expired); } catch (Exception e) { @@ -3341,7 +3366,8 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { return future; } else { try { - EstimateTimeBasedBacklogQuotaCheckResult checkResult = estimatedTimeBasedBacklogQuotaCheck(oldestMarkDeletePosition); + EstimateTimeBasedBacklogQuotaCheckResult checkResult = + estimatedTimeBasedBacklogQuotaCheck(oldestMarkDeletePosition); if (checkResult.getEstimatedOldestUnacknowledgedMessageTimestamp() != null) { updateResultIfNewer( new TimeBasedBacklogQuotaCheckResult( @@ -3359,7 +3385,8 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { } } - private EstimateTimeBasedBacklogQuotaCheckResult estimatedTimeBasedBacklogQuotaCheck(PositionImpl markDeletePosition) + private EstimateTimeBasedBacklogQuotaCheckResult estimatedTimeBasedBacklogQuotaCheck( + PositionImpl markDeletePosition) throws ExecutionException, InterruptedException { int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime(); ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) ledger; @@ -3394,7 +3421,9 @@ private EstimateTimeBasedBacklogQuotaCheckResult estimatedTimeBasedBacklogQuotaC estimateMsgAgeMs); } - return new EstimateTimeBasedBacklogQuotaCheckResult(shouldTruncateBacklog, positionToCheckLedgerInfo.getTimestamp()); + return new EstimateTimeBasedBacklogQuotaCheckResult( + shouldTruncateBacklog, + positionToCheckLedgerInfo.getTimestamp()); } else { return new EstimateTimeBasedBacklogQuotaCheckResult(false, null); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopicMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopicMetrics.java index afaafe748cabb..f79d053a9790d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopicMetrics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopicMetrics.java @@ -30,7 +30,6 @@ public class PersistentTopicMetrics { public static class BacklogQuotaMetrics { private final LongAdder timeBasedBacklogQuotaExceededEvictionCount = new LongAdder(); private final LongAdder sizeBasedBacklogQuotaExceededEvictionCount = new LongAdder(); - private final LongAdder timeBasedBacklogQuotaCheckReadFromCache = new LongAdder(); public void recordTimeBasedBacklogEviction() { timeBasedBacklogQuotaExceededEvictionCount.increment(); @@ -40,10 +39,6 @@ public void recordSizeBasedBacklogEviction() { sizeBasedBacklogQuotaExceededEvictionCount.increment(); } - public void recordTimeBasedBacklogQuotaCheckReadFromCache() { - timeBasedBacklogQuotaCheckReadFromCache.increment(); - } - public long getSizeBasedBacklogQuotaExceededEvictionCount() { return sizeBasedBacklogQuotaExceededEvictionCount.longValue(); } @@ -51,9 +46,5 @@ public long getSizeBasedBacklogQuotaExceededEvictionCount() { public long getTimeBasedBacklogQuotaExceededEvictionCount() { return timeBasedBacklogQuotaExceededEvictionCount.longValue(); } - - public long getTimeBasedBacklogQuotaCheckReadFromCache() { - return timeBasedBacklogQuotaCheckReadFromCache.longValue(); - } } } \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index 9d243aae7d41c..e24fb493b954a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -388,7 +388,7 @@ public void backlogsStatsPrecise() throws PulsarAdminException, PulsarClientExce final String subName1 = "c1"; final String subName2 = "c2"; - final int numMsgs = 5; + final int numMsgs = 4; Consumer consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1) .acknowledgmentGroupTime(0, SECONDS) @@ -400,7 +400,7 @@ public void backlogsStatsPrecise() throws PulsarAdminException, PulsarClientExce byte[] content = new byte[1024]; for (int i = 0; i < numMsgs; i++) { - Thread.sleep(1000); + Thread.sleep(3000); // Guarantees if we use wrong message in age, to show up in failed test producer.send(content); } @@ -436,13 +436,15 @@ public void backlogsStatsPrecise() throws PulsarAdminException, PulsarClientExce entry("cluster", CLUSTER_NAME), entry("namespace", namespace), entry("topic", topic1)); - assertThat((long) backlogAgeMetric.value).isCloseTo(expectedMessageAgeSeconds, within(1L)); + assertThat((long) backlogAgeMetric.value).isCloseTo(expectedMessageAgeSeconds, within(2L)); // Move subscription 2 away from being the oldest mark delete // S2/S1 // 0 1 + Message firstOldestMessage = consumer2.receive(); + consumer2.acknowledge(firstOldestMessage); + // We only read and not ack, since we just need its publish-timestamp for later assert Message secondOldestMessage = consumer2.receive(); - consumer2.acknowledge(secondOldestMessage); // Switch subscription 1 to be where subscription 2 was in terms of oldest mark delete // S1 S2 @@ -460,12 +462,12 @@ public void backlogsStatsPrecise() throws PulsarAdminException, PulsarClientExce .get(0).value; expectedMessageAgeSeconds = MILLISECONDS.toSeconds(System.currentTimeMillis() - oldestMessage.getPublishTime()); - assertThat(actualAge).isCloseTo(expectedMessageAgeSeconds, within(1L)); + assertThat(actualAge).isCloseTo(expectedMessageAgeSeconds, within(2L)); topicStats = getTopicStats(topic1); assertThat(topicStats.getOldestBacklogMessageSubscriptionName()).isEqualTo(subName1); - long cacheUsedCounterBefore = getCacheUsedCounter(topic1); + long entriesReadBefore = getReadEntries(topic1); // Move subscription 1 passed subscription 2 for (int i = 0; i < 3; i++) { @@ -479,25 +481,24 @@ public void backlogsStatsPrecise() throws PulsarAdminException, PulsarClientExce waitForQuotaCheckToRunTwice(); // Cache shouldn't be used, since position has changed - assertThat(getCacheUsedCounter(topic1)).isEqualTo(cacheUsedCounterBefore); + long readEntries = getReadEntries(topic1); + assertThat(readEntries).isGreaterThan(entriesReadBefore); topicStats = getTopicStats(topic1); expectedMessageAgeSeconds = MILLISECONDS.toSeconds(System.currentTimeMillis() - secondOldestMessage.getPublishTime()); - assertThat(topicStats.getOldestBacklogMessageAgeSeconds()).isCloseTo(expectedMessageAgeSeconds, within(1L)); + assertThat(topicStats.getOldestBacklogMessageAgeSeconds()).isCloseTo(expectedMessageAgeSeconds, within(2L)); assertThat(topicStats.getOldestBacklogMessageSubscriptionName()).isEqualTo(subName2); - cacheUsedCounterBefore = getCacheUsedCounter(topic1); - waitForQuotaCheckToRunTwice(); // Cache should be used, since position hasn't changed - assertThat(getCacheUsedCounter(topic1)).isGreaterThan(cacheUsedCounterBefore); + assertThat(getReadEntries(topic1)).isEqualTo(readEntries); } } - private long getCacheUsedCounter(String topic1) { + private long getReadEntries(String topic1) { return ((PersistentTopic) pulsar.getBrokerService().getTopicReference(topic1).get()) - .getPersistentTopicMetrics().getBacklogQuotaMetrics().getTimeBasedBacklogQuotaCheckReadFromCache(); + .getManagedLedger().getStats().getEntriesReadTotalCount(); } @Test diff --git a/pulsar-broker/src/test/resources/log4j2.xml b/pulsar-broker/src/test/resources/log4j2.xml index 0d9244e932857..4038dd59b1d79 100644 --- a/pulsar-broker/src/test/resources/log4j2.xml +++ b/pulsar-broker/src/test/resources/log4j2.xml @@ -28,8 +28,8 @@ - - + +