Skip to content

Commit

Permalink
improve code
Browse files Browse the repository at this point in the history
  • Loading branch information
dao-jun committed Apr 26, 2024
1 parent 0cbe85a commit d675bae
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2171,7 +2171,10 @@ public synchronized void readEntriesComplete(List<Entry> entries0, Object ctx) {
return;
}
completed = true;
// Make sure the entries are in the correct order
entries.sort(Comparator.comparingLong(Entry::getEntryId));
// If we want to read [1, 2, 3, 4, 5], but we only read [1, 2, 3], [4,5] are filtered, so we need to pass
// the `lastReadPosition([5])` to make sure the cursor read position is correct.
callback.internalReadEntriesComplete(entries, ctx, lastReadPosition);
}

Expand All @@ -2183,18 +2186,30 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj
completed = true;
// If there are entries been read success, try to let the read operation success as possible.
List<Entry> entries = filterEntries();
if (entries.isEmpty()) {
callback.readEntriesFailed(exception, ctx);
} else {
callback.readEntriesComplete(entries, ctx);
}
}

if (!entries.isEmpty()) {
// Move the read position of the cursor to the next position of the last read entry,
// or we will deliver the same entry to the consumer more than once.
Entry entry = entries.get(entries.size() - 1);
PositionImpl position = PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
PositionImpl nextReadPosition = callback.cursor.getNextAvailablePosition(position);
callback.updateReadPosition(nextReadPosition);
}
callback.internalReadEntriesFailed(entries, exception, ctx);
}

/**
* Filter the entries that have been read success.
* <p>
* If we want to read [1, 2, 3, 4, 5], but only read [1, 2, 4, 5] successfully, [3] is read failed,
* only return [1,2] to the caller, to make sure the read operation success as possible
* and keep the ordering guarantee.
*
* @return filtered entries
*/
private List<Entry> filterEntries() {
if (entries.isEmpty()) {
return Collections.emptyList();
}
// Make sure the `readPosition` of `cursor` could be moved correctly.
List<Entry> entries = new ArrayList<>();
for (long entryId : entryIds) {
if (this.entries.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Predicate;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
Expand All @@ -29,6 +30,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -100,7 +102,14 @@ public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
internalReadEntriesFailed(null, exception, ctx);
}

void internalReadEntriesFailed(Collection<Entry> ret, ManagedLedgerException exception, Object ctx) {
cursor.readOperationCompleted();
if (CollectionUtils.isNotEmpty(ret)) {
entries.addAll(ret);
}

if (!entries.isEmpty()) {
// There were already some entries that were read before, we can return them
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4395,28 +4395,23 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
long ledgerId = ledger.currentLedger.getId();

callback.readEntriesComplete(List.of(EntryImpl.create(ledgerId, 1, new byte[1])), null);
callback.readEntriesComplete(List.of(EntryImpl.create(ledgerId, 3, new byte[1])), null);
callback.readEntriesComplete(List.of(EntryImpl.create(ledgerId, 5, new byte[1])), null);
callback.readEntriesComplete(List.of(EntryImpl.create(ledgerId, 3, new byte[3])), null);
callback.readEntriesComplete(List.of(EntryImpl.create(ledgerId, 7, new byte[7])), null);
callback.readEntriesFailed(new ManagedLedgerException.InvalidCursorPositionException("Invalid cursor position"), null);
// After call readEntriesFailed, the following readEntriesComplete should be ignored.
callback.readEntriesComplete(List.of(EntryImpl.create(ledgerId, 5, new byte[5])), null);

latch.await();
// should not fail
assertFalse(failed.get());
assertEquals(entries.size(), 5);
assertEquals(entries.size(), 2);

// `entries` should be only the entries with entryId 1 and 3.
assertEquals(entries.get(0).getEntryId(), 1);
assertEquals(entries.get(1).getEntryId(), 3);

// Manually trigger the callback
Entry entry1 = entries.get(0);
Entry entry3 = entries.get(1);
Entry entry5 = entries.get(2);
assertEquals(entry1.getData().length, 1);
assertEquals(entry3.getData().length, 1);
assertEquals(entry5.getData().length, 1);

// Read from ledger.
Entry entry7 = entries.get(3);
Entry entry9 = entries.get(4);
assertNotEquals(entry7.getData().length, 1);
assertNotEquals(entry9.getData().length, 1);
// ReadPosition should be updated to [4]
assertEquals(cursor.getReadPosition().getEntryId(), 4);
}

@Test
Expand Down

0 comments on commit d675bae

Please sign in to comment.