From c38ef79bf7360d419d4345130f6d81176a205af8 Mon Sep 17 00:00:00 2001 From: gaoran10 Date: Sun, 27 Jun 2021 01:13:37 +0800 Subject: [PATCH 1/3] fix ledger rollover scheduler task --- .../mledger/impl/ManagedLedgerImpl.java | 26 +++++++++---------- .../mledger/impl/ManagedLedgerTest.java | 17 ++++++++++++ 2 files changed, 29 insertions(+), 14 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 049b1c7e64792..a05cae08146f8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -401,8 +401,6 @@ public void operationFailed(MetaStoreException e) { }); scheduleTimeoutTask(); - - scheduleRollOverLedgerTask(); } private synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedgerCallback callback) { @@ -467,7 +465,7 @@ public void operationFailed(MetaStoreException e) { log.info("[{}] Created ledger {}", name, lh.getId()); STATE_UPDATER.set(this, State.LedgerOpened); - lastLedgerCreatedTimestamp = clock.millis(); + updateLastLedgerCreatedTime(); currentLedger = lh; lastConfirmedEntry = new PositionImpl(lh.getId(), -1); @@ -1488,7 +1486,7 @@ private void updateLedgersListAfterRollover(MetaStoreCallback callback) { public synchronized void updateLedgersIdsComplete(Stat stat) { STATE_UPDATER.set(this, State.LedgerOpened); - lastLedgerCreatedTimestamp = clock.millis(); + updateLastLedgerCreatedTime(); if (log.isDebugEnabled()) { log.debug("[{}] Resending {} pending messages", name, pendingAddEntries.size()); @@ -3361,7 +3359,7 @@ private boolean currentLedgerIsFull() { || currentLedgerSize >= (config.getMaxSizePerLedgerMb() * MegaByte)); long timeSinceLedgerCreationMs = clock.millis() - lastLedgerCreatedTimestamp; - boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= maximumRolloverTimeMs; + boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= config.getMaximumRolloverTimeMs(); if (spaceQuotaReached || maxLedgerTimeReached) { if (config.getMinimumRolloverTimeMs() > 0) { @@ -3663,15 +3661,6 @@ private void scheduleTimeoutTask() { } } - private void scheduleRollOverLedgerTask() { - if (config.getMaximumRolloverTimeMs() > 0) { - long interval = config.getMaximumRolloverTimeMs(); - this.checkLedgerRollTask = this.scheduledExecutor.scheduleAtFixedRate(safeRun(() -> { - rollCurrentLedgerIfFull(); - }), interval, interval, TimeUnit.MILLISECONDS); - } - } - private void checkAddTimeout() { long timeoutSec = config.getAddEntryTimeoutSeconds(); if (timeoutSec < 1) { @@ -3929,4 +3918,13 @@ public CompletableFuture> getEnsemblesAsync(long ledgerId) { return CompletableFuture.completedFuture(ensembles); }); } + + private void updateLastLedgerCreatedTime() { + this.lastLedgerCreatedTimestamp = clock.millis(); + if (config.getMaximumRolloverTimeMs() > 0) { + this.checkLedgerRollTask = this.scheduledExecutor.schedule( + safeRun(this::rollCurrentLedgerIfFull), maximumRolloverTimeMs, TimeUnit.MILLISECONDS); + } + } + } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 1e2390dad5cb1..0e53fdb6a4168 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -2926,6 +2926,23 @@ public void testManagedLedgerRollOverIfFull() throws Exception { Assert.assertEquals(ledger.getTotalSize(), 0); } + @Test + public void testLedgerReachMaximumRolloverTime() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setRetentionTime(1, TimeUnit.SECONDS); + config.setMaxEntriesPerLedger(2); + config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS); + config.setMaximumRolloverTime(1, TimeUnit.SECONDS); + + ManagedLedger ml = factory.open("ledger-reach-maximum-rollover-time", config); + Position position1 = ml.addEntry("test".getBytes()); + + Thread.sleep(1000); + Position position2 = ml.addEntry("test".getBytes()); + + Assert.assertNotEquals(position1.getLedgerId(), position2.getLedgerId()); + } + @Test public void testExpiredLedgerDeletionAfterManagedLedgerRestart() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig(); From eadebdcb06ce1b92e5e9ba9f0beb1c13d67db9da Mon Sep 17 00:00:00 2001 From: gaoran10 Date: Tue, 29 Jun 2021 19:01:33 +0800 Subject: [PATCH 2/3] cancel the previous undone ledger rollover schedule task if there is a new ledger has been created successfully. --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 13 +++++++++---- .../service/CurrentLedgerRolloverIfFullTest.java | 2 +- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index a05cae08146f8..e3148b9986f08 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -465,7 +465,7 @@ public void operationFailed(MetaStoreException e) { log.info("[{}] Created ledger {}", name, lh.getId()); STATE_UPDATER.set(this, State.LedgerOpened); - updateLastLedgerCreatedTime(); + updateLastLedgerCreatedTimeAndScheduleRolloverTask(); currentLedger = lh; lastConfirmedEntry = new PositionImpl(lh.getId(), -1); @@ -1486,7 +1486,7 @@ private void updateLedgersListAfterRollover(MetaStoreCallback callback) { public synchronized void updateLedgersIdsComplete(Stat stat) { STATE_UPDATER.set(this, State.LedgerOpened); - updateLastLedgerCreatedTime(); + updateLastLedgerCreatedTimeAndScheduleRolloverTask(); if (log.isDebugEnabled()) { log.debug("[{}] Resending {} pending messages", name, pendingAddEntries.size()); @@ -3919,11 +3919,16 @@ public CompletableFuture> getEnsemblesAsync(long ledgerId) { }); } - private void updateLastLedgerCreatedTime() { + private void updateLastLedgerCreatedTimeAndScheduleRolloverTask() { this.lastLedgerCreatedTimestamp = clock.millis(); if (config.getMaximumRolloverTimeMs() > 0) { + if (checkLedgerRollTask != null && !checkLedgerRollTask.isDone()) { + // new ledger has been created successfully + // and the previous checkLedgerRollTask is not done, we could cancel it + checkLedgerRollTask.cancel(true); + } this.checkLedgerRollTask = this.scheduledExecutor.schedule( - safeRun(this::rollCurrentLedgerIfFull), maximumRolloverTimeMs, TimeUnit.MILLISECONDS); + safeRun(this::rollCurrentLedgerIfFull), getMaximumRolloverTimeMs(config), TimeUnit.MILLISECONDS); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java index 1bb8dcb24e4d8..77ec229862e90 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java @@ -72,7 +72,7 @@ public void testCurrentLedgerRolloverIfFull() throws Exception { managedLedgerConfig.setRetentionTime(1, TimeUnit.SECONDS); managedLedgerConfig.setMaxEntriesPerLedger(2); managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS); - managedLedgerConfig.setMaximumRolloverTime(5, TimeUnit.MILLISECONDS); + managedLedgerConfig.setMaximumRolloverTime(1, TimeUnit.SECONDS); int msgNum = 10; for (int i = 0; i < msgNum; i++) { From c51f9c207f638a64db5eaff092b273634d1308af Mon Sep 17 00:00:00 2001 From: gaoran10 Date: Wed, 30 Jun 2021 15:08:22 +0800 Subject: [PATCH 3/3] fix test --- .../mledger/impl/ManagedLedgerTest.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 0e53fdb6a4168..62b44b8c09136 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -2929,18 +2929,18 @@ public void testManagedLedgerRollOverIfFull() throws Exception { @Test public void testLedgerReachMaximumRolloverTime() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig(); - config.setRetentionTime(1, TimeUnit.SECONDS); - config.setMaxEntriesPerLedger(2); config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS); config.setMaximumRolloverTime(1, TimeUnit.SECONDS); ManagedLedger ml = factory.open("ledger-reach-maximum-rollover-time", config); - Position position1 = ml.addEntry("test".getBytes()); - - Thread.sleep(1000); - Position position2 = ml.addEntry("test".getBytes()); - - Assert.assertNotEquals(position1.getLedgerId(), position2.getLedgerId()); + long firstLedgerId = ml.addEntry("test".getBytes()).getLedgerId(); + + // the ledger rollover scheduled time is between 1000 and 1050 ms, + // wait 1100 ms, the ledger should be rolled over. + Awaitility.await() + .atMost(1100, TimeUnit.MILLISECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> firstLedgerId != ml.addEntry("test".getBytes()).getLedgerId()); } @Test