diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 04877575c2593..223443f606a8d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -180,6 +180,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { final EntryCache entryCache; private ScheduledFuture timeoutTask; + private ScheduledFuture checkLedgerRollTask; /** * This lock is held while the ledgers list or propertiesMap is updated asynchronously on the metadata store. Since we use the store @@ -199,7 +200,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { private long lastLedgerCreationInitiationTimestamp = 0; private static final Random random = new Random(System.currentTimeMillis()); - private final long maximumRolloverTimeMs; protected final Supplier mlOwnershipChecker; volatile PositionImpl lastConfirmedEntry; @@ -293,8 +293,6 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper this.uninitializedCursors = Maps.newHashMap(); this.clock = config.getClock(); - // Get the next rollover time. Add a random value upto 5% to avoid rollover multiple ledgers at the same time - this.maximumRolloverTimeMs = (long) (config.getMaximumRolloverTimeMs() * (1 + random.nextDouble() * 5 / 100.0)); this.mlOwnershipChecker = mlOwnershipChecker; this.propertiesMap = Maps.newHashMap(); } @@ -437,7 +435,7 @@ public void operationFailed(MetaStoreException e) { log.info("[{}] Created ledger {}", name, lh.getId()); STATE_UPDATER.set(this, State.LedgerOpened); - lastLedgerCreatedTimestamp = clock.millis(); + updateLastLedgerCreatedTimeAndScheduleRolloverTask(); currentLedger = lh; lastConfirmedEntry = new PositionImpl(lh.getId(), -1); @@ -1215,6 +1213,10 @@ public synchronized void asyncClose(final CloseCallback callback, final Object c this.timeoutTask.cancel(false); } + if (this.checkLedgerRollTask != null) { + this.checkLedgerRollTask.cancel(false); + } + LedgerHandle lh = currentLedger; if (lh == null) { @@ -1382,7 +1384,7 @@ private void updateLedgersListAfterRollover(MetaStoreCallback callback) { public synchronized void updateLedgersIdsComplete(Stat stat) { STATE_UPDATER.set(this, State.LedgerOpened); - lastLedgerCreatedTimestamp = clock.millis(); + updateLastLedgerCreatedTimeAndScheduleRolloverTask(); if (log.isDebugEnabled()) { log.debug("[{}] Resending {} pending messages", name, pendingAddEntries.size()); @@ -3122,7 +3124,7 @@ private boolean currentLedgerIsFull() { || currentLedgerSize >= (config.getMaxSizePerLedgerMb() * MegaByte)); long timeSinceLedgerCreationMs = clock.millis() - lastLedgerCreatedTimestamp; - boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= maximumRolloverTimeMs; + boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= config.getMaximumRolloverTimeMs(); if (spaceQuotaReached || maxLedgerTimeReached) { if (config.getMinimumRolloverTimeMs() > 0) { @@ -3573,4 +3575,21 @@ public CompletableFuture> getEnsemblesAsync(long ledgerId) { return CompletableFuture.completedFuture(ensembles); }); } + + private void updateLastLedgerCreatedTimeAndScheduleRolloverTask() { + this.lastLedgerCreatedTimestamp = clock.millis(); + if (config.getMaximumRolloverTimeMs() > 0) { + if (checkLedgerRollTask != null && !checkLedgerRollTask.isDone()) { + // new ledger has been created successfully + // and the previous checkLedgerRollTask is not done, we could cancel it + checkLedgerRollTask.cancel(true); + } + // Add a random value upto 5% to avoid rollover multiple ledgers at the same time + long schedulerMaxRolloverTime = + (long) (config.getMaximumRolloverTimeMs() * (1 + random.nextDouble() * 5 / 100.0)); + this.checkLedgerRollTask = this.scheduledExecutor.schedule( + safeRun(this::rollCurrentLedgerIfFull), schedulerMaxRolloverTime, TimeUnit.MILLISECONDS); + } + } + } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 7898db1e8ae55..922af4374fdcf 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -108,6 +108,7 @@ import org.apache.zookeeper.MockZooKeeper; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; +import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -2871,4 +2872,22 @@ public void testOpEntryAdd_toString_doesNotThrowNPE(){ ", dataLength=" + dataLength + '}'; } + + @Test + public void testLedgerReachMaximumRolloverTime() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS); + config.setMaximumRolloverTime(10, TimeUnit.SECONDS); + + ManagedLedger ml = factory.open("ledger-reach-maximum-rollover-time", config); + long firstLedgerId = ((PositionImpl) ml.addEntry("test".getBytes())).getLedgerId(); + + // the ledger rollover scheduled time is between 1000 * 10 and 1000 * 10 + 500 ms, + // wait 1000 * 12 ms, the ledger should be rolled over. + Awaitility.await() + .atMost(12, TimeUnit.SECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .until(() -> firstLedgerId != ((PositionImpl) ml.addEntry("test".getBytes())).getLedgerId()); + } + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 4f41346625900..86716ade27589 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -212,7 +212,6 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener { - if (t instanceof PersistentTopic) { - Optional.ofNullable(((PersistentTopic) t).getManagedLedger()).ifPresent( - managedLedger -> { - managedLedger.rollCurrentLedgerIfFull(); - } - ); - } - }); - } - public void checkMessageDeduplicationInfo() { forEachTopic(Topic::checkMessageDeduplicationInfo); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java index 783eac59dc99b..228a0aa479e98 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java @@ -65,7 +65,7 @@ public void testCurrentLedgerRolloverIfFull() throws Exception { managedLedgerConfig.setRetentionTime(1, TimeUnit.SECONDS); managedLedgerConfig.setMaxEntriesPerLedger(2); managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS); - managedLedgerConfig.setMaximumRolloverTime(5, TimeUnit.MILLISECONDS); + managedLedgerConfig.setMaximumRolloverTime(1, TimeUnit.SECONDS); int msgNum = 10; for (int i = 0; i < msgNum; i++) {