Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ledger] Fix ledger rollover scheduled task #11116

Merged
merged 3 commits into from Jul 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -401,8 +401,6 @@ public void operationFailed(MetaStoreException e) {
});

scheduleTimeoutTask();

scheduleRollOverLedgerTask();
315157973 marked this conversation as resolved.
Show resolved Hide resolved
}

private synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedgerCallback callback) {
Expand Down Expand Up @@ -467,7 +465,7 @@ public void operationFailed(MetaStoreException e) {

log.info("[{}] Created ledger {}", name, lh.getId());
STATE_UPDATER.set(this, State.LedgerOpened);
lastLedgerCreatedTimestamp = clock.millis();
updateLastLedgerCreatedTimeAndScheduleRolloverTask();
currentLedger = lh;

lastConfirmedEntry = new PositionImpl(lh.getId(), -1);
Expand Down Expand Up @@ -1488,7 +1486,7 @@ private void updateLedgersListAfterRollover(MetaStoreCallback<Void> callback) {

public synchronized void updateLedgersIdsComplete(Stat stat) {
STATE_UPDATER.set(this, State.LedgerOpened);
lastLedgerCreatedTimestamp = clock.millis();
updateLastLedgerCreatedTimeAndScheduleRolloverTask();

if (log.isDebugEnabled()) {
log.debug("[{}] Resending {} pending messages", name, pendingAddEntries.size());
Expand Down Expand Up @@ -3361,7 +3359,7 @@ private boolean currentLedgerIsFull() {
|| currentLedgerSize >= (config.getMaxSizePerLedgerMb() * MegaByte));

long timeSinceLedgerCreationMs = clock.millis() - lastLedgerCreatedTimestamp;
boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= maximumRolloverTimeMs;
boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= config.getMaximumRolloverTimeMs();
315157973 marked this conversation as resolved.
Show resolved Hide resolved

if (spaceQuotaReached || maxLedgerTimeReached) {
if (config.getMinimumRolloverTimeMs() > 0) {
Expand Down Expand Up @@ -3663,15 +3661,6 @@ private void scheduleTimeoutTask() {
}
}

private void scheduleRollOverLedgerTask() {
if (config.getMaximumRolloverTimeMs() > 0) {
long interval = config.getMaximumRolloverTimeMs();
this.checkLedgerRollTask = this.scheduledExecutor.scheduleAtFixedRate(safeRun(() -> {
rollCurrentLedgerIfFull();
}), interval, interval, TimeUnit.MILLISECONDS);
}
}

private void checkAddTimeout() {
long timeoutSec = config.getAddEntryTimeoutSeconds();
if (timeoutSec < 1) {
Expand Down Expand Up @@ -3929,4 +3918,18 @@ public CompletableFuture<Set<BookieId>> getEnsemblesAsync(long ledgerId) {
return CompletableFuture.completedFuture(ensembles);
});
}

private void updateLastLedgerCreatedTimeAndScheduleRolloverTask() {
this.lastLedgerCreatedTimestamp = clock.millis();
if (config.getMaximumRolloverTimeMs() > 0) {
if (checkLedgerRollTask != null && !checkLedgerRollTask.isDone()) {
// new ledger has been created successfully
// and the previous checkLedgerRollTask is not done, we could cancel it
checkLedgerRollTask.cancel(true);
}
this.checkLedgerRollTask = this.scheduledExecutor.schedule(
safeRun(this::rollCurrentLedgerIfFull), getMaximumRolloverTimeMs(config), TimeUnit.MILLISECONDS);
}
}

}
Expand Up @@ -2926,6 +2926,23 @@ public void testManagedLedgerRollOverIfFull() throws Exception {
Assert.assertEquals(ledger.getTotalSize(), 0);
}

@Test
public void testLedgerReachMaximumRolloverTime() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
config.setMaximumRolloverTime(1, TimeUnit.SECONDS);

ManagedLedger ml = factory.open("ledger-reach-maximum-rollover-time", config);
long firstLedgerId = ml.addEntry("test".getBytes()).getLedgerId();

// the ledger rollover scheduled time is between 1000 and 1050 ms,
// wait 1100 ms, the ledger should be rolled over.
Awaitility.await()
.atMost(1100, TimeUnit.MILLISECONDS)
.pollInterval(100, TimeUnit.MILLISECONDS)
.until(() -> firstLedgerId != ml.addEntry("test".getBytes()).getLedgerId());
}

@Test
public void testExpiredLedgerDeletionAfterManagedLedgerRestart() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
Expand Down
Expand Up @@ -72,7 +72,7 @@ public void testCurrentLedgerRolloverIfFull() throws Exception {
managedLedgerConfig.setRetentionTime(1, TimeUnit.SECONDS);
managedLedgerConfig.setMaxEntriesPerLedger(2);
managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
managedLedgerConfig.setMaximumRolloverTime(5, TimeUnit.MILLISECONDS);
managedLedgerConfig.setMaximumRolloverTime(1, TimeUnit.SECONDS);

int msgNum = 10;
for (int i = 0; i < msgNum; i++) {
Expand Down