Skip to content

Commit

Permalink
PR Fixes 2
Browse files Browse the repository at this point in the history
  • Loading branch information
asafm committed Jan 4, 2024
1 parent f4a8f18 commit 958c3df
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.locks.StampedLock;
import lombok.Value;
import lombok.experimental.UtilityClass;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -48,7 +49,7 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
/**
* This field is incremented everytime the cursor information is updated.
*/
private volatile long version;
private long version;

@Value
public static class CursorInfo {
Expand Down Expand Up @@ -79,14 +80,14 @@ private static class Item {
/**
* Utility class to manage a data version, which rolls over to 0 when reaching Long.MAX_VALUE.
*/
public static final class DataVersion {
private DataVersion() {}
@UtilityClass
public class DataVersion {

/**
* Compares two data versions, which either rolls overs to 0 when reaching Long.MAX_VALUE.
* <p>
* Use {@link DataVersion#incrementVersion(long)} to increment the versions. The assumptions
* is that metric versios are compared with close time proximity one to another, hence,
* Use {@link DataVersion#getNextVersion(long)} to increment the versions. The assumptions
* are that metric versions are compared with close time proximity one to another, hence,
* they are expected not close to each other in terms of distance, hence we don't
* expect the distance ever to exceed Long.MAX_VALUE / 2, otherwise we wouldn't be able
* to know which one is a later version in case the furthest rolls over to beyond 0. We
Expand Down Expand Up @@ -125,7 +126,7 @@ public static int compareVersions(long v1, long v2) {
}
}

public static long incrementVersion(long existingVersion) {
public static long getNextVersion(long existingVersion) {
if (existingVersion == Long.MAX_VALUE) {
return 0;
} else {
Expand Down Expand Up @@ -156,7 +157,6 @@ public ManagedCursorContainer() {}
* @param position position of the cursor to use for ordering, pass null if the cursor's position shouldn't be
* tracked for the slowest reader.
*/
@SuppressWarnings("NonAtomicOperationOnVolatileField") // We have rw lock for that
public void add(ManagedCursor cursor, Position position) {
long stamp = rwLock.writeLock();
try {
Expand All @@ -171,7 +171,7 @@ public void add(ManagedCursor cursor, Position position) {
if (cursor.isDurable()) {
durableCursorCount++;
}
version = DataVersion.incrementVersion(version);
version = DataVersion.getNextVersion(version);
} finally {
rwLock.unlockWrite(stamp);
}
Expand All @@ -187,7 +187,6 @@ public ManagedCursor get(String name) {
}
}

@SuppressWarnings("NonAtomicOperationOnVolatileField") // we have rw lock for that
public boolean removeCursor(String name) {
long stamp = rwLock.writeLock();
try {
Expand All @@ -208,7 +207,7 @@ public boolean removeCursor(String name) {
if (item.cursor.isDurable()) {
durableCursorCount--;
}
version = DataVersion.incrementVersion(version);
version = DataVersion.getNextVersion(version);
return true;
} else {
return false;
Expand All @@ -230,7 +229,6 @@ public boolean removeCursor(String name) {
* @return a pair of positions, representing the previous slowest reader and the new slowest reader (after the
* update).
*/
@SuppressWarnings("NonAtomicOperationOnVolatileField") // we have rw lock for that
public Pair<PositionImpl, PositionImpl> cursorUpdated(ManagedCursor cursor, Position newPosition) {
requireNonNull(cursor);

Expand All @@ -243,7 +241,7 @@ public Pair<PositionImpl, PositionImpl> cursorUpdated(ManagedCursor cursor, Posi

PositionImpl previousSlowestConsumer = heap.get(0).position;
item.position = (PositionImpl) newPosition;
version = DataVersion.incrementVersion(version);
version = DataVersion.getNextVersion(version);

if (heap.size() == 1) {
return Pair.of(previousSlowestConsumer, item.position);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -752,20 +752,20 @@ public void testDataVersion() {
assertThat(ManagedCursorContainer.DataVersion.compareVersions(3L, 3L)).isZero();

long v1 = Long.MAX_VALUE - 1;
long v2 = ManagedCursorContainer.DataVersion.incrementVersion(v1);
long v2 = ManagedCursorContainer.DataVersion.getNextVersion(v1);

assertThat(ManagedCursorContainer.DataVersion.compareVersions(v1, v2)).isNegative();

v2 = ManagedCursorContainer.DataVersion.incrementVersion(v2);
v2 = ManagedCursorContainer.DataVersion.getNextVersion(v2);
assertThat(ManagedCursorContainer.DataVersion.compareVersions(v1, v2)).isNegative();

v1 = ManagedCursorContainer.DataVersion.incrementVersion(v1);
v1 = ManagedCursorContainer.DataVersion.getNextVersion(v1);
assertThat(ManagedCursorContainer.DataVersion.compareVersions(v1, v2)).isNegative();

v1 = ManagedCursorContainer.DataVersion.incrementVersion(v1);
v1 = ManagedCursorContainer.DataVersion.getNextVersion(v1);
assertThat(ManagedCursorContainer.DataVersion.compareVersions(v1, v2)).isZero();

v1 = ManagedCursorContainer.DataVersion.incrementVersion(v1);
v1 = ManagedCursorContainer.DataVersion.getNextVersion(v1);
assertThat(ManagedCursorContainer.DataVersion.compareVersions(v1, v2)).isPositive();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.bookkeeper.mledger.ManagedLedgerConfig.PROPERTY_SOURCE_TOPIC_KEY;
import static org.apache.commons.collections4.CollectionUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
Expand Down Expand Up @@ -845,7 +846,7 @@ CompletableFuture<Void> shutdownEventLoopGracefully(EventLoopGroup eventLoopGrou
long timeout = (long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT * brokerShutdownTimeoutMs);
return NettyFutureUtil.toCompletableFutureVoid(
eventLoopGroup.shutdownGracefully(quietPeriod,
timeout, TimeUnit.MILLISECONDS));
timeout, MILLISECONDS));
}

private CompletableFuture<Void> closeChannel(Channel channel) {
Expand Down Expand Up @@ -899,8 +900,8 @@ public void unloadNamespaceBundlesGracefully(int maxConcurrentUnload, boolean cl
rateLimiter.acquire(1);
}
long timeout = pulsar.getConfiguration().getNamespaceBundleUnloadingTimeoutMs();
pulsar.getNamespaceService().unloadNamespaceBundle(su, timeout, TimeUnit.MILLISECONDS,
closeWithoutWaitingClientDisconnect).get(timeout, TimeUnit.MILLISECONDS);
pulsar.getNamespaceService().unloadNamespaceBundle(su, timeout, MILLISECONDS,
closeWithoutWaitingClientDisconnect).get(timeout, MILLISECONDS);
} catch (Exception e) {
log.warn("Failed to unload namespace bundle {}", su, e);
}
Expand Down Expand Up @@ -2073,29 +2074,31 @@ public BacklogQuotaManager getBacklogQuotaManager() {
}

public void monitorBacklogQuota() {
backlogQuotaCheckDuration.time(() -> {
forEachPersistentTopic(topic -> {
if (topic.isSizeBacklogExceeded()) {
getBacklogQuotaManager().handleExceededBacklogQuota(topic,
BacklogQuota.BacklogQuotaType.destination_storage, false);
} else {
topic.checkTimeBacklogExceeded().thenAccept(isExceeded -> {
if (isExceeded) {
getBacklogQuotaManager().handleExceededBacklogQuota(topic,
BacklogQuota.BacklogQuotaType.message_age,
pulsar.getConfiguration().isPreciseTimeBasedBacklogQuotaCheck());
} else {
if (log.isDebugEnabled()) {
log.debug("quota not exceeded for [{}]", topic.getName());
}
long startTimeMillis = System.currentTimeMillis();
forEachPersistentTopic(topic -> {
if (topic.isSizeBacklogExceeded()) {
getBacklogQuotaManager().handleExceededBacklogQuota(topic,
BacklogQuota.BacklogQuotaType.destination_storage, false);
} else {
topic.checkTimeBacklogExceeded().thenAccept(isExceeded -> {
if (isExceeded) {
getBacklogQuotaManager().handleExceededBacklogQuota(topic,
BacklogQuota.BacklogQuotaType.message_age,
pulsar.getConfiguration().isPreciseTimeBasedBacklogQuotaCheck());
} else {
if (log.isDebugEnabled()) {
log.debug("quota not exceeded for [{}]", topic.getName());
}
}).exceptionally(throwable -> {
log.error("Error when checkTimeBacklogExceeded({}) in monitorBacklogQuota",
topic.getName(), throwable);
return null;
});
}
});
}
}).exceptionally(throwable -> {
log.error("Error when checkTimeBacklogExceeded({}) in monitorBacklogQuota",
topic.getName(), throwable);
return null;
}).whenComplete((unused, throwable) -> {
backlogQuotaCheckDuration.observe(
MILLISECONDS.toSeconds(System.currentTimeMillis() - startTimeMillis));
});
}
});
}

Expand Down Expand Up @@ -2589,7 +2592,7 @@ private void updateConfigurationAndRegisterListeners() {
// add listener to notify broker managedLedgerCacheEvictionTimeThresholdMillis dynamic config
registerConfigurationListener(
"managedLedgerCacheEvictionTimeThresholdMillis", (cacheEvictionTimeThresholdMills) -> {
managedLedgerFactory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS
managedLedgerFactory.updateCacheEvictionTimeThreshold(MILLISECONDS
.toNanos((long) cacheEvictionTimeThresholdMills));
});

Expand Down Expand Up @@ -3008,7 +3011,7 @@ private void createPendingLoadTopic() {
pendingTopic.getTopicFuture()
.completeExceptionally((e instanceof RuntimeException && e.getCause() != null) ? e.getCause() : e);
// schedule to process next pending topic
inactivityMonitor.schedule(this::createPendingLoadTopic, 100, TimeUnit.MILLISECONDS);
inactivityMonitor.schedule(this::createPendingLoadTopic, 100, MILLISECONDS);
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPolicyListener;
import org.apache.pulsar.broker.service.TransportCnx;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.client.api.MessageId;
Expand Down Expand Up @@ -1265,6 +1264,6 @@ public boolean isPersistent() {

@Override
public long getBestEffortOldestUnacknowledgedMessageAgeSeconds() {
return PersistentTopic.NOT_AVAILABLE_YET;
return -1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,6 @@

public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCallback {

public static final long NOT_AVAILABLE_YET = -1;

// Managed ledger associated with the topic
protected final ManagedLedger ledger;

Expand Down Expand Up @@ -293,7 +291,7 @@ private static class TimeBasedBacklogQuotaCheckResult {
}

@Value
private static class CheckResult {
private static class EstimateTimeBasedBacklogQuotaCheckResult {
boolean truncateBacklogToMatchQuota;
Long estimatedOldestUnacknowledgedMessageTimestamp;
}
Expand Down Expand Up @@ -2451,8 +2449,8 @@ public CompletableFuture<? extends TopicStatsImpl> asyncGetStats(GetStatsOptions

TimeBasedBacklogQuotaCheckResult backlogQuotaCheckResult = timeBasedBacklogQuotaCheckResult;
stats.oldestBacklogMessageAgeSeconds = (backlogQuotaCheckResult == null)
? NOT_AVAILABLE_YET
: TimeUnit.MILLISECONDS.toSeconds(
? (long) -1
: TimeUnit.MILLISECONDS.toSeconds(
Clock.systemUTC().millis() - backlogQuotaCheckResult.getPositionPublishTimestampInMillis());

stats.oldestBacklogMessageSubscriptionName = (backlogQuotaCheckResult == null)
Expand Down Expand Up @@ -3230,7 +3228,7 @@ public boolean isSizeBacklogExceeded() {
public long getBestEffortOldestUnacknowledgedMessageAgeSeconds() {
TimeBasedBacklogQuotaCheckResult result = timeBasedBacklogQuotaCheckResult;
if (result == null) {
return NOT_AVAILABLE_YET;
return -1;
} else {
return TimeUnit.MILLISECONDS.toSeconds(
Clock.systemUTC().millis() - result.getPositionPublishTimestampInMillis());
Expand Down Expand Up @@ -3295,7 +3293,7 @@ public CompletableFuture<Boolean> checkTimeBacklogExceeded() {
log.debug("(Using cache) Time based backlog quota exceeded, oldest entry in cursor {}'s backlog"
+ " exceeded quota {}", lastCheckResult.getCursorName(), backlogQuotaLimitInSecond);
}

persistentTopicMetrics.getBacklogQuotaMetrics().recordTimeBasedBacklogQuotaCheckReadFromCache();
return CompletableFuture.completedFuture(expired);
}

Expand Down Expand Up @@ -3343,7 +3341,7 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
return future;
} else {
try {
CheckResult checkResult = estimatedTimeBasedBacklogQuotaCheck(oldestMarkDeletePosition);
EstimateTimeBasedBacklogQuotaCheckResult checkResult = estimatedTimeBasedBacklogQuotaCheck(oldestMarkDeletePosition);
if (checkResult.getEstimatedOldestUnacknowledgedMessageTimestamp() != null) {
updateResultIfNewer(
new TimeBasedBacklogQuotaCheckResult(
Expand All @@ -3361,15 +3359,15 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
}
}

private CheckResult estimatedTimeBasedBacklogQuotaCheck(PositionImpl markDeletePosition)
private EstimateTimeBasedBacklogQuotaCheckResult estimatedTimeBasedBacklogQuotaCheck(PositionImpl markDeletePosition)
throws ExecutionException, InterruptedException {
int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime();
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) ledger;

// The ledger timestamp is only known when ledger is closed, hence when the mark-delete
// is at active ledger (open) we can't estimate it.
if (managedLedger.getLedgersInfo().lastKey().equals(markDeletePosition.getLedgerId())) {
return new CheckResult(false, null);
return new EstimateTimeBasedBacklogQuotaCheckResult(false, null);
}

org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo
Expand All @@ -3396,9 +3394,9 @@ private CheckResult estimatedTimeBasedBacklogQuotaCheck(PositionImpl markDeleteP
estimateMsgAgeMs);
}

return new CheckResult(shouldTruncateBacklog, positionToCheckLedgerInfo.getTimestamp());
return new EstimateTimeBasedBacklogQuotaCheckResult(shouldTruncateBacklog, positionToCheckLedgerInfo.getTimestamp());
} else {
return new CheckResult(false, null);
return new EstimateTimeBasedBacklogQuotaCheckResult(false, null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class PersistentTopicMetrics {
public static class BacklogQuotaMetrics {
private final LongAdder timeBasedBacklogQuotaExceededEvictionCount = new LongAdder();
private final LongAdder sizeBasedBacklogQuotaExceededEvictionCount = new LongAdder();
private final LongAdder timeBasedBacklogQuotaCheckReadFromCache = new LongAdder();

public void recordTimeBasedBacklogEviction() {
timeBasedBacklogQuotaExceededEvictionCount.increment();
Expand All @@ -39,12 +40,20 @@ public void recordSizeBasedBacklogEviction() {
sizeBasedBacklogQuotaExceededEvictionCount.increment();
}

public void recordTimeBasedBacklogQuotaCheckReadFromCache() {
timeBasedBacklogQuotaCheckReadFromCache.increment();
}

public long getSizeBasedBacklogQuotaExceededEvictionCount() {
return sizeBasedBacklogQuotaExceededEvictionCount.longValue();
}

public long getTimeBasedBacklogQuotaExceededEvictionCount() {
return timeBasedBacklogQuotaExceededEvictionCount.longValue();
}

public long getTimeBasedBacklogQuotaCheckReadFromCache() {
return timeBasedBacklogQuotaCheckReadFromCache.longValue();
}
}
}

0 comments on commit 958c3df

Please sign in to comment.