Skip to content

Commit

Permalink
1. move ledger rollover scheduler task from BrokerService to `Manag…
Browse files Browse the repository at this point in the history
…edLedgerImpl`

2. scheduled the ledger rollover task after update ledger last created time
  • Loading branch information
gaoran10 committed Jul 6, 2021
1 parent 0fb9c52 commit 87a1312
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
final EntryCache entryCache;

private ScheduledFuture<?> timeoutTask;
private ScheduledFuture<?> checkLedgerRollTask;

/**
* This lock is held while the ledgers list or propertiesMap is updated asynchronously on the metadata store. Since we use the store
Expand All @@ -199,7 +200,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private long lastLedgerCreationInitiationTimestamp = 0;

private static final Random random = new Random(System.currentTimeMillis());
private final long maximumRolloverTimeMs;
protected final Supplier<Boolean> mlOwnershipChecker;

volatile PositionImpl lastConfirmedEntry;
Expand Down Expand Up @@ -293,8 +293,6 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper
this.uninitializedCursors = Maps.newHashMap();
this.clock = config.getClock();

// Get the next rollover time. Add a random value upto 5% to avoid rollover multiple ledgers at the same time
this.maximumRolloverTimeMs = (long) (config.getMaximumRolloverTimeMs() * (1 + random.nextDouble() * 5 / 100.0));
this.mlOwnershipChecker = mlOwnershipChecker;
this.propertiesMap = Maps.newHashMap();
}
Expand Down Expand Up @@ -437,7 +435,7 @@ public void operationFailed(MetaStoreException e) {

log.info("[{}] Created ledger {}", name, lh.getId());
STATE_UPDATER.set(this, State.LedgerOpened);
lastLedgerCreatedTimestamp = clock.millis();
updateLastLedgerCreatedTimeAndScheduleRolloverTask();
currentLedger = lh;

lastConfirmedEntry = new PositionImpl(lh.getId(), -1);
Expand Down Expand Up @@ -1215,6 +1213,10 @@ public synchronized void asyncClose(final CloseCallback callback, final Object c
this.timeoutTask.cancel(false);
}

if (this.checkLedgerRollTask != null) {
this.checkLedgerRollTask.cancel(false);
}

LedgerHandle lh = currentLedger;

if (lh == null) {
Expand Down Expand Up @@ -1382,7 +1384,7 @@ private void updateLedgersListAfterRollover(MetaStoreCallback<Void> callback) {

public synchronized void updateLedgersIdsComplete(Stat stat) {
STATE_UPDATER.set(this, State.LedgerOpened);
lastLedgerCreatedTimestamp = clock.millis();
updateLastLedgerCreatedTimeAndScheduleRolloverTask();

if (log.isDebugEnabled()) {
log.debug("[{}] Resending {} pending messages", name, pendingAddEntries.size());
Expand Down Expand Up @@ -3122,7 +3124,7 @@ private boolean currentLedgerIsFull() {
|| currentLedgerSize >= (config.getMaxSizePerLedgerMb() * MegaByte));

long timeSinceLedgerCreationMs = clock.millis() - lastLedgerCreatedTimestamp;
boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= maximumRolloverTimeMs;
boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= config.getMaximumRolloverTimeMs();

if (spaceQuotaReached || maxLedgerTimeReached) {
if (config.getMinimumRolloverTimeMs() > 0) {
Expand Down Expand Up @@ -3573,4 +3575,21 @@ public CompletableFuture<Set<BookieId>> getEnsemblesAsync(long ledgerId) {
return CompletableFuture.completedFuture(ensembles);
});
}

private void updateLastLedgerCreatedTimeAndScheduleRolloverTask() {
this.lastLedgerCreatedTimestamp = clock.millis();
if (config.getMaximumRolloverTimeMs() > 0) {
if (checkLedgerRollTask != null && !checkLedgerRollTask.isDone()) {
// new ledger has been created successfully
// and the previous checkLedgerRollTask is not done, we could cancel it
checkLedgerRollTask.cancel(true);
}
// Add a random value upto 5% to avoid rollover multiple ledgers at the same time
long schedulerMaxRolloverTime =
(long) (config.getMaximumRolloverTimeMs() * (1 + random.nextDouble() * 5 / 100.0));
this.checkLedgerRollTask = this.scheduledExecutor.schedule(
safeRun(this::rollCurrentLedgerIfFull), schedulerMaxRolloverTime, TimeUnit.MILLISECONDS);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand Down Expand Up @@ -2871,4 +2872,22 @@ public void testOpEntryAdd_toString_doesNotThrowNPE(){
", dataLength=" + dataLength +
'}';
}

@Test
public void testLedgerReachMaximumRolloverTime() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
config.setMaximumRolloverTime(1, TimeUnit.SECONDS);

ManagedLedger ml = factory.open("ledger-reach-maximum-rollover-time", config);
long firstLedgerId = ((PositionImpl) ml.addEntry("test".getBytes())).getLedgerId();

// the ledger rollover scheduled time is between 1000 and 1050 ms,
// wait 1100 ms, the ledger should be rolled over.
Awaitility.await()
.atMost(1100, TimeUnit.MILLISECONDS)
.pollInterval(100, TimeUnit.MILLISECONDS)
.until(() -> firstLedgerId != ((PositionImpl) ml.addEntry("test".getBytes())).getLedgerId());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private final ScheduledExecutorService compactionMonitor;
private final ScheduledExecutorService messagePublishBufferMonitor;
private final ScheduledExecutorService consumedLedgersMonitor;
private final ScheduledExecutorService ledgerFullMonitor;
private ScheduledExecutorService topicPublishRateLimiterMonitor;
private ScheduledExecutorService brokerPublishRateLimiterMonitor;
private ScheduledExecutorService deduplicationSnapshotMonitor;
Expand Down Expand Up @@ -297,8 +296,6 @@ public BrokerService(PulsarService pulsar) throws Exception {
Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-publish-buffer-monitor"));
this.consumedLedgersMonitor = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("consumed-Ledgers-monitor"));
this.ledgerFullMonitor =
Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("ledger-full-monitor"));

this.backlogQuotaManager = new BacklogQuotaManager(pulsar);
this.backlogQuotaChecker = Executors
Expand Down Expand Up @@ -440,7 +437,6 @@ public void start() throws Exception {
this.startCompactionMonitor();
this.startMessagePublishBufferMonitor();
this.startConsumedLedgersMonitor();
this.startLedgerFullMonitor();
this.startBacklogQuotaChecker();
this.updateBrokerPublisherThrottlingMaxRate();
this.startCheckReplicationPolicies();
Expand Down Expand Up @@ -528,12 +524,6 @@ protected void startConsumedLedgersMonitor() {
}
}

protected void startLedgerFullMonitor() {
int interval = pulsar().getConfiguration().getManagedLedgerMaxLedgerRolloverTimeMinutes();
ledgerFullMonitor.scheduleAtFixedRate(safeRun(this::checkLedgerFull),
interval, interval, TimeUnit.MINUTES);
}

protected void startBacklogQuotaChecker() {
if (pulsar().getConfiguration().isBacklogQuotaCheckEnabled()) {
final int interval = pulsar().getConfiguration().getBacklogQuotaCheckIntervalInSeconds();
Expand Down Expand Up @@ -672,7 +662,6 @@ public void close() throws IOException {
inactivityMonitor.shutdown();
messageExpiryMonitor.shutdown();
compactionMonitor.shutdown();
ledgerFullMonitor.shutdown();
messagePublishBufferMonitor.shutdown();
consumedLedgersMonitor.shutdown();
backlogQuotaChecker.shutdown();
Expand Down Expand Up @@ -1435,18 +1424,6 @@ private void checkConsumedLedgers() {
});
}

private void checkLedgerFull() {
forEachTopic((t) -> {
if (t instanceof PersistentTopic) {
Optional.ofNullable(((PersistentTopic) t).getManagedLedger()).ifPresent(
managedLedger -> {
managedLedger.rollCurrentLedgerIfFull();
}
);
}
});
}

public void checkMessageDeduplicationInfo() {
forEachTopic(Topic::checkMessageDeduplicationInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void testCurrentLedgerRolloverIfFull() throws Exception {
managedLedgerConfig.setRetentionTime(1, TimeUnit.SECONDS);
managedLedgerConfig.setMaxEntriesPerLedger(2);
managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
managedLedgerConfig.setMaximumRolloverTime(5, TimeUnit.MILLISECONDS);
managedLedgerConfig.setMaximumRolloverTime(1, TimeUnit.SECONDS);

int msgNum = 10;
for (int i = 0; i < msgNum; i++) {
Expand Down

0 comments on commit 87a1312

Please sign in to comment.