Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Branch-2.7] Fix ledger roll over scheduler task #11226

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -180,6 +180,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
final EntryCache entryCache;

private ScheduledFuture<?> timeoutTask;
private ScheduledFuture<?> checkLedgerRollTask;

/**
* This lock is held while the ledgers list or propertiesMap is updated asynchronously on the metadata store. Since we use the store
Expand All @@ -199,7 +200,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private long lastLedgerCreationInitiationTimestamp = 0;

private static final Random random = new Random(System.currentTimeMillis());
private final long maximumRolloverTimeMs;
protected final Supplier<Boolean> mlOwnershipChecker;

volatile PositionImpl lastConfirmedEntry;
Expand Down Expand Up @@ -293,8 +293,6 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper
this.uninitializedCursors = Maps.newHashMap();
this.clock = config.getClock();

// Get the next rollover time. Add a random value upto 5% to avoid rollover multiple ledgers at the same time
this.maximumRolloverTimeMs = (long) (config.getMaximumRolloverTimeMs() * (1 + random.nextDouble() * 5 / 100.0));
this.mlOwnershipChecker = mlOwnershipChecker;
this.propertiesMap = Maps.newHashMap();
}
Expand Down Expand Up @@ -437,7 +435,7 @@ public void operationFailed(MetaStoreException e) {

log.info("[{}] Created ledger {}", name, lh.getId());
STATE_UPDATER.set(this, State.LedgerOpened);
lastLedgerCreatedTimestamp = clock.millis();
updateLastLedgerCreatedTimeAndScheduleRolloverTask();
currentLedger = lh;

lastConfirmedEntry = new PositionImpl(lh.getId(), -1);
Expand Down Expand Up @@ -1215,6 +1213,10 @@ public synchronized void asyncClose(final CloseCallback callback, final Object c
this.timeoutTask.cancel(false);
}

if (this.checkLedgerRollTask != null) {
this.checkLedgerRollTask.cancel(false);
}

LedgerHandle lh = currentLedger;

if (lh == null) {
Expand Down Expand Up @@ -1382,7 +1384,7 @@ private void updateLedgersListAfterRollover(MetaStoreCallback<Void> callback) {

public synchronized void updateLedgersIdsComplete(Stat stat) {
STATE_UPDATER.set(this, State.LedgerOpened);
lastLedgerCreatedTimestamp = clock.millis();
updateLastLedgerCreatedTimeAndScheduleRolloverTask();

if (log.isDebugEnabled()) {
log.debug("[{}] Resending {} pending messages", name, pendingAddEntries.size());
Expand Down Expand Up @@ -3122,7 +3124,7 @@ private boolean currentLedgerIsFull() {
|| currentLedgerSize >= (config.getMaxSizePerLedgerMb() * MegaByte));

long timeSinceLedgerCreationMs = clock.millis() - lastLedgerCreatedTimestamp;
boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= maximumRolloverTimeMs;
boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= config.getMaximumRolloverTimeMs();

if (spaceQuotaReached || maxLedgerTimeReached) {
if (config.getMinimumRolloverTimeMs() > 0) {
Expand Down Expand Up @@ -3573,4 +3575,21 @@ public CompletableFuture<Set<BookieId>> getEnsemblesAsync(long ledgerId) {
return CompletableFuture.completedFuture(ensembles);
});
}

private void updateLastLedgerCreatedTimeAndScheduleRolloverTask() {
this.lastLedgerCreatedTimestamp = clock.millis();
if (config.getMaximumRolloverTimeMs() > 0) {
if (checkLedgerRollTask != null && !checkLedgerRollTask.isDone()) {
// new ledger has been created successfully
// and the previous checkLedgerRollTask is not done, we could cancel it
checkLedgerRollTask.cancel(true);
}
// Add a random value upto 5% to avoid rollover multiple ledgers at the same time
long schedulerMaxRolloverTime =
(long) (config.getMaximumRolloverTimeMs() * (1 + random.nextDouble() * 5 / 100.0));
this.checkLedgerRollTask = this.scheduledExecutor.schedule(
safeRun(this::rollCurrentLedgerIfFull), schedulerMaxRolloverTime, TimeUnit.MILLISECONDS);
}
}

}
Expand Up @@ -108,6 +108,7 @@
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand Down Expand Up @@ -2871,4 +2872,22 @@ public void testOpEntryAdd_toString_doesNotThrowNPE(){
", dataLength=" + dataLength +
'}';
}

@Test
public void testLedgerReachMaximumRolloverTime() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
config.setMaximumRolloverTime(10, TimeUnit.SECONDS);

ManagedLedger ml = factory.open("ledger-reach-maximum-rollover-time", config);
long firstLedgerId = ((PositionImpl) ml.addEntry("test".getBytes())).getLedgerId();

// the ledger rollover scheduled time is between 1000 * 10 and 1000 * 10 + 500 ms,
// wait 1000 * 12 ms, the ledger should be rolled over.
Awaitility.await()
.atMost(12, TimeUnit.SECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.until(() -> firstLedgerId != ((PositionImpl) ml.addEntry("test".getBytes())).getLedgerId());
}

}
Expand Up @@ -212,7 +212,6 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private final ScheduledExecutorService compactionMonitor;
private final ScheduledExecutorService messagePublishBufferMonitor;
private final ScheduledExecutorService consumedLedgersMonitor;
private final ScheduledExecutorService ledgerFullMonitor;
private ScheduledExecutorService topicPublishRateLimiterMonitor;
private ScheduledExecutorService brokerPublishRateLimiterMonitor;
private ScheduledExecutorService deduplicationSnapshotMonitor;
Expand Down Expand Up @@ -297,8 +296,6 @@ public BrokerService(PulsarService pulsar) throws Exception {
Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-publish-buffer-monitor"));
this.consumedLedgersMonitor = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("consumed-Ledgers-monitor"));
this.ledgerFullMonitor =
Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("ledger-full-monitor"));

this.backlogQuotaManager = new BacklogQuotaManager(pulsar);
this.backlogQuotaChecker = Executors
Expand Down Expand Up @@ -440,7 +437,6 @@ public void start() throws Exception {
this.startCompactionMonitor();
this.startMessagePublishBufferMonitor();
this.startConsumedLedgersMonitor();
this.startLedgerFullMonitor();
this.startBacklogQuotaChecker();
this.updateBrokerPublisherThrottlingMaxRate();
this.startCheckReplicationPolicies();
Expand Down Expand Up @@ -528,12 +524,6 @@ protected void startConsumedLedgersMonitor() {
}
}

protected void startLedgerFullMonitor() {
int interval = pulsar().getConfiguration().getManagedLedgerMaxLedgerRolloverTimeMinutes();
ledgerFullMonitor.scheduleAtFixedRate(safeRun(this::checkLedgerFull),
interval, interval, TimeUnit.MINUTES);
}

protected void startBacklogQuotaChecker() {
if (pulsar().getConfiguration().isBacklogQuotaCheckEnabled()) {
final int interval = pulsar().getConfiguration().getBacklogQuotaCheckIntervalInSeconds();
Expand Down Expand Up @@ -672,7 +662,6 @@ public void close() throws IOException {
inactivityMonitor.shutdown();
messageExpiryMonitor.shutdown();
compactionMonitor.shutdown();
ledgerFullMonitor.shutdown();
messagePublishBufferMonitor.shutdown();
consumedLedgersMonitor.shutdown();
backlogQuotaChecker.shutdown();
Expand Down Expand Up @@ -1435,18 +1424,6 @@ private void checkConsumedLedgers() {
});
}

private void checkLedgerFull() {
forEachTopic((t) -> {
if (t instanceof PersistentTopic) {
Optional.ofNullable(((PersistentTopic) t).getManagedLedger()).ifPresent(
managedLedger -> {
managedLedger.rollCurrentLedgerIfFull();
}
);
}
});
}

public void checkMessageDeduplicationInfo() {
forEachTopic(Topic::checkMessageDeduplicationInfo);
}
Expand Down
Expand Up @@ -65,7 +65,7 @@ public void testCurrentLedgerRolloverIfFull() throws Exception {
managedLedgerConfig.setRetentionTime(1, TimeUnit.SECONDS);
managedLedgerConfig.setMaxEntriesPerLedger(2);
managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
managedLedgerConfig.setMaximumRolloverTime(5, TimeUnit.MILLISECONDS);
managedLedgerConfig.setMaximumRolloverTime(1, TimeUnit.SECONDS);

int msgNum = 10;
for (int i = 0; i < msgNum; i++) {
Expand Down