From 12aef52f47eb497165df0cd19e4052b1bc9f9768 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Thu, 26 Aug 2021 16:10:43 +0800 Subject: [PATCH] [package management service] fix `wrappedBuffer` always using the same block of memory (#11782) --- .../storage/bookkeeper/DLOutputStream.java | 4 ++-- .../BookKeeperPackagesStorageTest.java | 20 ++++++++++++++++++- .../bookkeeper/DLOutputStreamTest.java | 13 ++++++++++++ 3 files changed, 34 insertions(+), 3 deletions(-) diff --git a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java index a951cd459a76d..7329aa96d8e7c 100644 --- a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java +++ b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java @@ -58,8 +58,8 @@ private CompletableFuture> getRecords(InputStream inputStream) { try { int read = 0; while ((read = inputStream.read(readBuffer)) != -1) { - log.info("write something into the ledgers " + offset); - ByteBuf writeBuf = Unpooled.wrappedBuffer(readBuffer, 0, read); + log.info("write something into the ledgers offset: {}, length: {}", offset, read); + ByteBuf writeBuf = Unpooled.copiedBuffer(readBuffer, 0, read); offset += writeBuf.readableBytes(); LogRecord record = new LogRecord(offset, writeBuf); records.add(record); diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java index 1e6df3fa73f83..339bafca76951 100644 --- a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java +++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; +import org.apache.commons.lang3.RandomUtils; import org.apache.distributedlog.exceptions.LogNotFoundException; import org.apache.distributedlog.exceptions.ZKException; import org.apache.pulsar.packages.management.core.PackagesStorage; @@ -41,7 +42,7 @@ public class BookKeeperPackagesStorageTest extends BookKeeperClusterTestCase { private PackagesStorage storage; public BookKeeperPackagesStorageTest() { - super(1); + super(2); } @BeforeMethod() @@ -89,6 +90,23 @@ public void testReadWriteOperations() throws ExecutionException, InterruptedExce assertEquals(testData, readResult); } + @Test(timeOut = 60000) + public void testReadWriteLargeDataOperations() throws ExecutionException, InterruptedException { + byte[] data = RandomUtils.nextBytes(8192 * 3 + 4096); + ByteArrayInputStream testDataStream = new ByteArrayInputStream(data); + String testPath = "test-large-read-write"; + + // write some data to the dlog + storage.writeAsync(testPath, testDataStream).get(); + + // read the data from the dlog + ByteArrayOutputStream readData = new ByteArrayOutputStream(); + storage.readAsync(testPath, readData).get(); + byte[] readResult = readData.toByteArray(); + + assertEquals(data, readResult); + } + @Test(timeOut = 60000) public void testReadNonExistentData() { String testPath = "non-existent-path"; diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java index 38578b06a0b98..c815a5d70177b 100644 --- a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java +++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java @@ -96,6 +96,19 @@ public void writeBytesArrayData() throws ExecutionException, InterruptedExceptio verify(dlm, times(1)).asyncClose(); } + @Test + public void writeLongBytesArrayData() throws ExecutionException, InterruptedException { + byte[] data = new byte[8192 * 3 + 4096]; + DLOutputStream.openWriterAsync(dlm) + .thenCompose(w -> w.writeAsync(new ByteArrayInputStream(data)) + .thenCompose(DLOutputStream::closeAsync)).get(); + + verify(writer, times(1)).writeBulk(any(List.class)); + verify(writer, times(1)).markEndOfStream(); + verify(writer, times(1)).asyncClose(); + verify(dlm, times(1)).asyncClose(); + } + @Test public void openAsyncLogWriterFailed() { when(dlm.openAsyncLogWriter()).thenReturn(failedFuture(new Exception("Open writer was failed")));