Skip to content

Commit

Permalink
Forbid to read other topic's data in managedLedger layer (apache#11912)
Browse files Browse the repository at this point in the history
* forbid to read other topic's data in managedLedger layer

* format code

* update exception type

* fix test

(cherry picked from commit a7bdc5e)
(cherry picked from commit 8bf8000)
  • Loading branch information
hangc0276 authored and eolivelli committed Sep 6, 2021
1 parent 5119585 commit d419734
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1816,6 +1816,13 @@ public void asyncReadEntry(PositionImpl position, ReadEntryCallback callback, Ob
if (log.isDebugEnabled()) {
log.debug("[{}] Reading entry ledger {}: {}", name, position.getLedgerId(), position.getEntryId());
}
if (!ledgers.containsKey(position.getLedgerId())) {
log.error("[{}] Failed to get message with ledger {}:{} the ledgerId does not belong to this topic "
+ "or has been deleted.", name, position.getLedgerId(), position.getEntryId());
callback.readEntryFailed(new ManagedLedgerException.NonRecoverableLedgerException("Message not found, "
+ "the ledgerId does not belong to this topic or has been deleted"), ctx);
return;
}
if (position.getLedgerId() == currentLedger.getId()) {
asyncReadEntry(currentLedger, position, callback, ctx);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
Expand Down Expand Up @@ -109,16 +110,12 @@
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Stat;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Slf4j
public class ManagedLedgerTest extends MockedBookKeeperTestCase {

private static final Logger log = LoggerFactory.getLogger(ManagedLedgerTest.class);

private static final Charset Encoding = Charsets.UTF_8;

@DataProvider(name = "checkOwnershipFlag")
Expand Down Expand Up @@ -2964,7 +2961,7 @@ public void testExpiredLedgerDeletionAfterManagedLedgerRestart() throws Exceptio
Assert.assertEquals(finalManagedLedger.getTotalSize(), 0);
});
}

@Test(timeOut = 20000)
public void testAsyncTruncateLedgerRetention() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
Expand Down

0 comments on commit d419734

Please sign in to comment.