Skip to content

Commit

Permalink
fix ledger rollover scheduler task
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoran10 committed Jun 26, 2021
1 parent 0c34e86 commit 8d42418
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 14 deletions.
Expand Up @@ -401,8 +401,6 @@ public void operationFailed(MetaStoreException e) {
});

scheduleTimeoutTask();

scheduleRollOverLedgerTask();
}

private synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedgerCallback callback) {
Expand Down Expand Up @@ -467,7 +465,7 @@ public void operationFailed(MetaStoreException e) {

log.info("[{}] Created ledger {}", name, lh.getId());
STATE_UPDATER.set(this, State.LedgerOpened);
lastLedgerCreatedTimestamp = clock.millis();
updateLastLedgerCreatedTime();
currentLedger = lh;

lastConfirmedEntry = new PositionImpl(lh.getId(), -1);
Expand Down Expand Up @@ -1488,7 +1486,7 @@ private void updateLedgersListAfterRollover(MetaStoreCallback<Void> callback) {

public synchronized void updateLedgersIdsComplete(Stat stat) {
STATE_UPDATER.set(this, State.LedgerOpened);
lastLedgerCreatedTimestamp = clock.millis();
updateLastLedgerCreatedTime();

if (log.isDebugEnabled()) {
log.debug("[{}] Resending {} pending messages", name, pendingAddEntries.size());
Expand Down Expand Up @@ -3361,7 +3359,7 @@ private boolean currentLedgerIsFull() {
|| currentLedgerSize >= (config.getMaxSizePerLedgerMb() * MegaByte));

long timeSinceLedgerCreationMs = clock.millis() - lastLedgerCreatedTimestamp;
boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= maximumRolloverTimeMs;
boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= config.getMaximumRolloverTimeMs();

if (spaceQuotaReached || maxLedgerTimeReached) {
if (config.getMinimumRolloverTimeMs() > 0) {
Expand Down Expand Up @@ -3663,15 +3661,6 @@ private void scheduleTimeoutTask() {
}
}

private void scheduleRollOverLedgerTask() {
if (config.getMaximumRolloverTimeMs() > 0) {
long interval = config.getMaximumRolloverTimeMs();
this.checkLedgerRollTask = this.scheduledExecutor.scheduleAtFixedRate(safeRun(() -> {
rollCurrentLedgerIfFull();
}), interval, interval, TimeUnit.MILLISECONDS);
}
}

private void checkAddTimeout() {
long timeoutSec = config.getAddEntryTimeoutSeconds();
if (timeoutSec < 1) {
Expand Down Expand Up @@ -3929,4 +3918,13 @@ public CompletableFuture<Set<BookieId>> getEnsemblesAsync(long ledgerId) {
return CompletableFuture.completedFuture(ensembles);
});
}

private void updateLastLedgerCreatedTime() {
this.lastLedgerCreatedTimestamp = clock.millis();
if (config.getMaximumRolloverTimeMs() > 0) {
this.checkLedgerRollTask = this.scheduledExecutor.schedule(
safeRun(this::rollCurrentLedgerIfFull), maximumRolloverTimeMs, TimeUnit.MILLISECONDS);
}
}

}
Expand Up @@ -2922,6 +2922,23 @@ public void testManagedLedgerRollOverIfFull() throws Exception {
Assert.assertEquals(ledger.getTotalSize(), 0);
}

@Test
public void testLedgerReachMaximumRolloverTime() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setRetentionTime(1, TimeUnit.SECONDS);
config.setMaxEntriesPerLedger(2);
config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
config.setMaximumRolloverTime(1, TimeUnit.SECONDS);

ManagedLedger ml = factory.open("ledger-reach-maximum-rollover-time", config);
Position position1 = ml.addEntry("test".getBytes());

Thread.sleep(1000);
Position position2 = ml.addEntry("test".getBytes());

Assert.assertNotEquals(position1.getLedgerId(), position2.getLedgerId());
}

@Test
public void testExpiredLedgerDeletionAfterManagedLedgerRestart() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
Expand Down

0 comments on commit 8d42418

Please sign in to comment.