Skip to content

Commit

Permalink
Found a different, less flaky way to test if cache was used.
Browse files Browse the repository at this point in the history
  • Loading branch information
asafm committed Jan 9, 2024
1 parent 958c3df commit 8953ffa
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 37 deletions.
Expand Up @@ -90,6 +90,11 @@ public interface ManagedLedgerMXBean {
*/
long getAddEntryErrors();

/**
* @return the number of entries read from the managed ledger (from cache or BK)
*/
long getEntriesReadTotalCount();

/**
* @return the number of readEntries requests that succeeded
*/
Expand Down
Expand Up @@ -323,7 +323,7 @@ public enum PositionBound {

/**
* This variable is used for testing the tests.
* {@link ManagedLedgerTest#testManagedLedgerWithPlacementPolicyInCustomMetadata()}
* ManagedLedgerTest#testManagedLedgerWithPlacementPolicyInCustomMetadata()
*/
@VisibleForTesting
Map<String, byte[]> createdLedgerCustomMetadata;
Expand Down Expand Up @@ -2129,6 +2129,7 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry)
}

protected void asyncReadEntry(ReadHandle ledger, PositionImpl position, ReadEntryCallback callback, Object ctx) {
mbean.addEntriesRead(1);
if (config.getReadEntryTimeoutSeconds() > 0) {
// set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled
long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this);
Expand Down
Expand Up @@ -41,6 +41,7 @@ public class ManagedLedgerMBeanImpl implements ManagedLedgerMXBean {
private final Rate readEntriesOpsFailed = new Rate();
private final Rate readEntriesOpsCacheMisses = new Rate();
private final Rate markDeleteOps = new Rate();
private final Rate entriesRead = new Rate();

private final LongAdder dataLedgerOpenOp = new LongAdder();
private final LongAdder dataLedgerCloseOp = new LongAdder();
Expand Down Expand Up @@ -80,6 +81,7 @@ public void refreshStats(long period, TimeUnit unit) {
ledgerAddEntryLatencyStatsUsec.refresh();
ledgerSwitchLatencyStatsUsec.refresh();
entryStats.refresh();
entriesRead.calculateRate(seconds);
}

public void addAddEntrySample(long size) {
Expand Down Expand Up @@ -120,6 +122,10 @@ public void addReadEntriesSample(int count, long totalSize) {
readEntriesOps.recordMultipleEvents(count, totalSize);
}

public void addEntriesRead(int count) {
entriesRead.recordEvent(count);
}

public void startDataLedgerOpenOp() {
dataLedgerOpenOp.increment();
}
Expand Down Expand Up @@ -189,6 +195,11 @@ public String getName() {
return managedLedger.getName();
}

@Override
public long getEntriesReadTotalCount() {
return entriesRead.getTotalCount();
}

@Override
public double getAddEntryMessagesRate() {
return addEntryOps.getRate();
Expand Down Expand Up @@ -333,5 +344,4 @@ public PendingBookieOpsStats getPendingBookieOpsStats() {
result.cursorLedgerDeleteOp = cursorLedgerDeleteOp.longValue();
return result;
}

}
Expand Up @@ -3255,6 +3255,9 @@ private void updateResultIfNewer(TimeBasedBacklogQuotaCheckResult updatedResult)
public CompletableFuture<Boolean> checkTimeBacklogExceeded() {
TopicName topicName = TopicName.get(getName());
int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime();
if (log.isDebugEnabled()) {
log.debug("[{}] Time backlog quota = [{}]. Checking if exceeded.", topicName, backlogQuotaLimitInSecond);
}

// If backlog quota by time is not set
if (backlogQuotaLimitInSecond <= 0) {
Expand All @@ -3267,6 +3270,10 @@ public CompletableFuture<Boolean> checkTimeBacklogExceeded() {
// If we have no durable cursor since `ledger.getCursors()` only managed durable cursors
if (oldestMarkDeleteCursorInfo == null
|| oldestMarkDeleteCursorInfo.getPosition() == null) {
if (log.isDebugEnabled()) {
log.debug("[{}] No durable cursor found. Skipping time based backlog quota check."
+ " Oldest mark-delete cursor info: {}", topicName, oldestMarkDeleteCursorInfo);
}
return CompletableFuture.completedFuture(false);
}

Expand All @@ -3285,15 +3292,23 @@ public CompletableFuture<Boolean> checkTimeBacklogExceeded() {
oldestMarkDeleteCursorInfo.getVersion());

updateResultIfNewer(updatedResult);
if (log.isDebugEnabled()) {
log.debug("[{}] Time-based backlog quota check. Updating cached result for position {}, "
+ "since cursor causing it has changed from {} to {}",
topicName,
oldestMarkDeletePosition,
lastCheckResult.getCursorName(),
oldestMarkDeleteCursorInfo.getCursor().getName());
}
}

long entryTimestamp = lastCheckResult.getPositionPublishTimestampInMillis();
boolean expired = MessageImpl.isEntryExpired(backlogQuotaLimitInSecond, entryTimestamp);
if (expired && log.isDebugEnabled()) {
log.debug("(Using cache) Time based backlog quota exceeded, oldest entry in cursor {}'s backlog"
+ " exceeded quota {}", lastCheckResult.getCursorName(), backlogQuotaLimitInSecond);
if (log.isDebugEnabled()) {
log.debug("[{}] Time based backlog quota check. Using cache result for position {}. "
+ "Entry timestamp: {}, expired: {}",
topicName, oldestMarkDeletePosition, entryTimestamp, expired);
}
persistentTopicMetrics.getBacklogQuotaMetrics().recordTimeBasedBacklogQuotaCheckReadFromCache();
return CompletableFuture.completedFuture(expired);
}

Expand All @@ -3317,10 +3332,20 @@ public void readEntryComplete(Entry entry, Object ctx) {
oldestMarkDeleteCursorInfo.getVersion()));

boolean expired = MessageImpl.isEntryExpired(backlogQuotaLimitInSecond, entryTimestamp);
if (expired && log.isDebugEnabled()) {
log.debug("Time based backlog quota exceeded, oldest entry in cursor {}'s backlog"
+ " exceeded quota {}", ledger.getSlowestConsumer().getName(),
backlogQuotaLimitInSecond);
if (log.isDebugEnabled()) {
log.debug("[{}] Time based backlog quota check. Oldest unacked entry read from BK. "
+ "Oldest entry in cursor {}'s backlog: {}. "
+ "Oldest mark-delete position: {}. "
+ "Quota {}. Last check result position [{}]. "
+ "Expired: {}, entryTimestamp: {}",
topicName,
oldestMarkDeleteCursorInfo.getCursor().getName(),
position,
oldestMarkDeletePosition,
backlogQuotaLimitInSecond,
lastCheckResult.getOldestCursorMarkDeletePosition(),
expired,
entryTimestamp);
}
future.complete(expired);
} catch (Exception e) {
Expand All @@ -3341,7 +3366,8 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
return future;
} else {
try {
EstimateTimeBasedBacklogQuotaCheckResult checkResult = estimatedTimeBasedBacklogQuotaCheck(oldestMarkDeletePosition);
EstimateTimeBasedBacklogQuotaCheckResult checkResult =
estimatedTimeBasedBacklogQuotaCheck(oldestMarkDeletePosition);
if (checkResult.getEstimatedOldestUnacknowledgedMessageTimestamp() != null) {
updateResultIfNewer(
new TimeBasedBacklogQuotaCheckResult(
Expand All @@ -3359,7 +3385,8 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
}
}

private EstimateTimeBasedBacklogQuotaCheckResult estimatedTimeBasedBacklogQuotaCheck(PositionImpl markDeletePosition)
private EstimateTimeBasedBacklogQuotaCheckResult estimatedTimeBasedBacklogQuotaCheck(
PositionImpl markDeletePosition)
throws ExecutionException, InterruptedException {
int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime();
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) ledger;
Expand Down Expand Up @@ -3394,7 +3421,9 @@ private EstimateTimeBasedBacklogQuotaCheckResult estimatedTimeBasedBacklogQuotaC
estimateMsgAgeMs);
}

return new EstimateTimeBasedBacklogQuotaCheckResult(shouldTruncateBacklog, positionToCheckLedgerInfo.getTimestamp());
return new EstimateTimeBasedBacklogQuotaCheckResult(
shouldTruncateBacklog,
positionToCheckLedgerInfo.getTimestamp());
} else {
return new EstimateTimeBasedBacklogQuotaCheckResult(false, null);
}
Expand Down
Expand Up @@ -30,7 +30,6 @@ public class PersistentTopicMetrics {
public static class BacklogQuotaMetrics {
private final LongAdder timeBasedBacklogQuotaExceededEvictionCount = new LongAdder();
private final LongAdder sizeBasedBacklogQuotaExceededEvictionCount = new LongAdder();
private final LongAdder timeBasedBacklogQuotaCheckReadFromCache = new LongAdder();

public void recordTimeBasedBacklogEviction() {
timeBasedBacklogQuotaExceededEvictionCount.increment();
Expand All @@ -40,20 +39,12 @@ public void recordSizeBasedBacklogEviction() {
sizeBasedBacklogQuotaExceededEvictionCount.increment();
}

public void recordTimeBasedBacklogQuotaCheckReadFromCache() {
timeBasedBacklogQuotaCheckReadFromCache.increment();
}

public long getSizeBasedBacklogQuotaExceededEvictionCount() {
return sizeBasedBacklogQuotaExceededEvictionCount.longValue();
}

public long getTimeBasedBacklogQuotaExceededEvictionCount() {
return timeBasedBacklogQuotaExceededEvictionCount.longValue();
}

public long getTimeBasedBacklogQuotaCheckReadFromCache() {
return timeBasedBacklogQuotaCheckReadFromCache.longValue();
}
}
}
Expand Up @@ -388,7 +388,7 @@ public void backlogsStatsPrecise() throws PulsarAdminException, PulsarClientExce

final String subName1 = "c1";
final String subName2 = "c2";
final int numMsgs = 5;
final int numMsgs = 4;

Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1)
.acknowledgmentGroupTime(0, SECONDS)
Expand All @@ -400,7 +400,7 @@ public void backlogsStatsPrecise() throws PulsarAdminException, PulsarClientExce

byte[] content = new byte[1024];
for (int i = 0; i < numMsgs; i++) {
Thread.sleep(1000);
Thread.sleep(3000); // Guarantees if we use wrong message in age, to show up in failed test
producer.send(content);
}

Expand Down Expand Up @@ -436,13 +436,15 @@ public void backlogsStatsPrecise() throws PulsarAdminException, PulsarClientExce
entry("cluster", CLUSTER_NAME),
entry("namespace", namespace),
entry("topic", topic1));
assertThat((long) backlogAgeMetric.value).isCloseTo(expectedMessageAgeSeconds, within(1L));
assertThat((long) backlogAgeMetric.value).isCloseTo(expectedMessageAgeSeconds, within(2L));

// Move subscription 2 away from being the oldest mark delete
// S2/S1
// 0 1
Message<byte[]> firstOldestMessage = consumer2.receive();
consumer2.acknowledge(firstOldestMessage);
// We only read and not ack, since we just need its publish-timestamp for later assert
Message<byte[]> secondOldestMessage = consumer2.receive();
consumer2.acknowledge(secondOldestMessage);

// Switch subscription 1 to be where subscription 2 was in terms of oldest mark delete
// S1 S2
Expand All @@ -460,12 +462,12 @@ public void backlogsStatsPrecise() throws PulsarAdminException, PulsarClientExce
.get(0).value;

expectedMessageAgeSeconds = MILLISECONDS.toSeconds(System.currentTimeMillis() - oldestMessage.getPublishTime());
assertThat(actualAge).isCloseTo(expectedMessageAgeSeconds, within(1L));
assertThat(actualAge).isCloseTo(expectedMessageAgeSeconds, within(2L));

topicStats = getTopicStats(topic1);
assertThat(topicStats.getOldestBacklogMessageSubscriptionName()).isEqualTo(subName1);

long cacheUsedCounterBefore = getCacheUsedCounter(topic1);
long entriesReadBefore = getReadEntries(topic1);

// Move subscription 1 passed subscription 2
for (int i = 0; i < 3; i++) {
Expand All @@ -479,25 +481,24 @@ public void backlogsStatsPrecise() throws PulsarAdminException, PulsarClientExce
waitForQuotaCheckToRunTwice();

// Cache shouldn't be used, since position has changed
assertThat(getCacheUsedCounter(topic1)).isEqualTo(cacheUsedCounterBefore);
long readEntries = getReadEntries(topic1);
assertThat(readEntries).isGreaterThan(entriesReadBefore);

topicStats = getTopicStats(topic1);
expectedMessageAgeSeconds = MILLISECONDS.toSeconds(System.currentTimeMillis() - secondOldestMessage.getPublishTime());
assertThat(topicStats.getOldestBacklogMessageAgeSeconds()).isCloseTo(expectedMessageAgeSeconds, within(1L));
assertThat(topicStats.getOldestBacklogMessageAgeSeconds()).isCloseTo(expectedMessageAgeSeconds, within(2L));
assertThat(topicStats.getOldestBacklogMessageSubscriptionName()).isEqualTo(subName2);

cacheUsedCounterBefore = getCacheUsedCounter(topic1);

waitForQuotaCheckToRunTwice();

// Cache should be used, since position hasn't changed
assertThat(getCacheUsedCounter(topic1)).isGreaterThan(cacheUsedCounterBefore);
assertThat(getReadEntries(topic1)).isEqualTo(readEntries);
}
}

private long getCacheUsedCounter(String topic1) {
private long getReadEntries(String topic1) {
return ((PersistentTopic) pulsar.getBrokerService().getTopicReference(topic1).get())
.getPersistentTopicMetrics().getBacklogQuotaMetrics().getTimeBasedBacklogQuotaCheckReadFromCache();
.getManagedLedger().getStats().getEntriesReadTotalCount();
}

@Test
Expand Down
4 changes: 2 additions & 2 deletions pulsar-broker/src/test/resources/log4j2.xml
Expand Up @@ -28,8 +28,8 @@
</Console>
</Appenders>
<Loggers>
<!-- <Logger name="org.apache.bookkeeper.mledger.impl.ManagedCursorImpl" level="DEBUG">-->
<!-- <AppenderRef ref="CONSOLE"/>-->
<!-- <Logger name="org.apache.pulsar.broker.service.persistent.PersistentTopic" level="DEBUG" additivity="false">-->
<!-- <AppenderRef ref="CONSOLE" />-->
<!-- </Logger>-->

<Root level="INFO">
Expand Down

0 comments on commit 8953ffa

Please sign in to comment.