Skip to content

Commit

Permalink
[Ledger] Fix ledger rollover scheduled task (apache#11116)
Browse files Browse the repository at this point in the history
### Motivation

Currently, the ledger rollover scheduled task will execute before reach the ledger maximum rollover time, this will cause the ledger doesn't roll over in time.

### Modifications

Only make the ledger rollover scheduled task after the ledger created successfully.
If the scheduled task was executed when there is no entry in the current ledger, the scheduled task will not be re-executed, and if there is new entry is added the ledger will rollover.

### Verifying this change

Add a unit test to verify the ledger could be rolled over in time.
  • Loading branch information
gaoran10 authored and ciaocloud committed Oct 16, 2021
1 parent 33162f1 commit cb66036
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 15 deletions.
Expand Up @@ -401,8 +401,6 @@ public void operationFailed(MetaStoreException e) {
});

scheduleTimeoutTask();

scheduleRollOverLedgerTask();
}

private synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedgerCallback callback) {
Expand Down Expand Up @@ -467,7 +465,7 @@ public void operationFailed(MetaStoreException e) {

log.info("[{}] Created ledger {}", name, lh.getId());
STATE_UPDATER.set(this, State.LedgerOpened);
lastLedgerCreatedTimestamp = clock.millis();
updateLastLedgerCreatedTimeAndScheduleRolloverTask();
currentLedger = lh;

lastConfirmedEntry = new PositionImpl(lh.getId(), -1);
Expand Down Expand Up @@ -1488,7 +1486,7 @@ private void updateLedgersListAfterRollover(MetaStoreCallback<Void> callback) {

public synchronized void updateLedgersIdsComplete(Stat stat) {
STATE_UPDATER.set(this, State.LedgerOpened);
lastLedgerCreatedTimestamp = clock.millis();
updateLastLedgerCreatedTimeAndScheduleRolloverTask();

if (log.isDebugEnabled()) {
log.debug("[{}] Resending {} pending messages", name, pendingAddEntries.size());
Expand Down Expand Up @@ -3361,7 +3359,7 @@ private boolean currentLedgerIsFull() {
|| currentLedgerSize >= (config.getMaxSizePerLedgerMb() * MegaByte));

long timeSinceLedgerCreationMs = clock.millis() - lastLedgerCreatedTimestamp;
boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= maximumRolloverTimeMs;
boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= config.getMaximumRolloverTimeMs();

if (spaceQuotaReached || maxLedgerTimeReached) {
if (config.getMinimumRolloverTimeMs() > 0) {
Expand Down Expand Up @@ -3663,15 +3661,6 @@ private void scheduleTimeoutTask() {
}
}

private void scheduleRollOverLedgerTask() {
if (config.getMaximumRolloverTimeMs() > 0) {
long interval = config.getMaximumRolloverTimeMs();
this.checkLedgerRollTask = this.scheduledExecutor.scheduleAtFixedRate(safeRun(() -> {
rollCurrentLedgerIfFull();
}), interval, interval, TimeUnit.MILLISECONDS);
}
}

private void checkAddTimeout() {
long timeoutSec = config.getAddEntryTimeoutSeconds();
if (timeoutSec < 1) {
Expand Down Expand Up @@ -3929,4 +3918,18 @@ public CompletableFuture<Set<BookieId>> getEnsemblesAsync(long ledgerId) {
return CompletableFuture.completedFuture(ensembles);
});
}

private void updateLastLedgerCreatedTimeAndScheduleRolloverTask() {
this.lastLedgerCreatedTimestamp = clock.millis();
if (config.getMaximumRolloverTimeMs() > 0) {
if (checkLedgerRollTask != null && !checkLedgerRollTask.isDone()) {
// new ledger has been created successfully
// and the previous checkLedgerRollTask is not done, we could cancel it
checkLedgerRollTask.cancel(true);
}
this.checkLedgerRollTask = this.scheduledExecutor.schedule(
safeRun(this::rollCurrentLedgerIfFull), getMaximumRolloverTimeMs(config), TimeUnit.MILLISECONDS);
}
}

}
Expand Up @@ -2926,6 +2926,23 @@ public void testManagedLedgerRollOverIfFull() throws Exception {
Assert.assertEquals(ledger.getTotalSize(), 0);
}

@Test
public void testLedgerReachMaximumRolloverTime() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
config.setMaximumRolloverTime(1, TimeUnit.SECONDS);

ManagedLedger ml = factory.open("ledger-reach-maximum-rollover-time", config);
long firstLedgerId = ml.addEntry("test".getBytes()).getLedgerId();

// the ledger rollover scheduled time is between 1000 and 1050 ms,
// wait 1100 ms, the ledger should be rolled over.
Awaitility.await()
.atMost(1100, TimeUnit.MILLISECONDS)
.pollInterval(100, TimeUnit.MILLISECONDS)
.until(() -> firstLedgerId != ml.addEntry("test".getBytes()).getLedgerId());
}

@Test
public void testExpiredLedgerDeletionAfterManagedLedgerRestart() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
Expand Down
Expand Up @@ -72,7 +72,7 @@ public void testCurrentLedgerRolloverIfFull() throws Exception {
managedLedgerConfig.setRetentionTime(1, TimeUnit.SECONDS);
managedLedgerConfig.setMaxEntriesPerLedger(2);
managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
managedLedgerConfig.setMaximumRolloverTime(5, TimeUnit.MILLISECONDS);
managedLedgerConfig.setMaximumRolloverTime(1, TimeUnit.SECONDS);

int msgNum = 10;
for (int i = 0; i < msgNum; i++) {
Expand Down

0 comments on commit cb66036

Please sign in to comment.