Skip to content

Commit

Permalink
[package management service] fix wrappedBuffer always using the sam…
Browse files Browse the repository at this point in the history
…e block of memory (#11782)

(cherry picked from commit 12aef52)
  • Loading branch information
freeznet authored and hangc0276 committed Aug 26, 2021
1 parent 3aa1f1d commit af9ae9b
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 3 deletions.
Expand Up @@ -58,8 +58,8 @@ private CompletableFuture<List<LogRecord>> getRecords(InputStream inputStream) {
try {
int read = 0;
while ((read = inputStream.read(readBuffer)) != -1) {
log.info("write something into the ledgers " + offset);
ByteBuf writeBuf = Unpooled.wrappedBuffer(readBuffer, 0, read);
log.info("write something into the ledgers offset: {}, length: {}", offset, read);
ByteBuf writeBuf = Unpooled.copiedBuffer(readBuffer, 0, read);
offset += writeBuf.readableBytes();
LogRecord record = new LogRecord(offset, writeBuf);
records.add(record);
Expand Down
Expand Up @@ -27,6 +27,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang3.RandomUtils;
import org.apache.distributedlog.exceptions.LogNotFoundException;
import org.apache.distributedlog.exceptions.ZKException;
import org.apache.pulsar.packages.management.core.PackagesStorage;
Expand All @@ -41,7 +42,7 @@ public class BookKeeperPackagesStorageTest extends BookKeeperClusterTestCase {
private PackagesStorage storage;

public BookKeeperPackagesStorageTest() {
super(1);
super(2);
}

@BeforeMethod()
Expand Down Expand Up @@ -89,6 +90,23 @@ public void testReadWriteOperations() throws ExecutionException, InterruptedExce
assertEquals(testData, readResult);
}

@Test(timeOut = 60000)
public void testReadWriteLargeDataOperations() throws ExecutionException, InterruptedException {
byte[] data = RandomUtils.nextBytes(8192 * 3 + 4096);
ByteArrayInputStream testDataStream = new ByteArrayInputStream(data);
String testPath = "test-large-read-write";

// write some data to the dlog
storage.writeAsync(testPath, testDataStream).get();

// read the data from the dlog
ByteArrayOutputStream readData = new ByteArrayOutputStream();
storage.readAsync(testPath, readData).get();
byte[] readResult = readData.toByteArray();

assertEquals(data, readResult);
}

@Test(timeOut = 60000)
public void testReadNonExistentData() {
String testPath = "non-existent-path";
Expand Down
Expand Up @@ -96,6 +96,19 @@ public void writeBytesArrayData() throws ExecutionException, InterruptedExceptio
verify(dlm, times(1)).asyncClose();
}

@Test
public void writeLongBytesArrayData() throws ExecutionException, InterruptedException {
byte[] data = new byte[8192 * 3 + 4096];
DLOutputStream.openWriterAsync(dlm)
.thenCompose(w -> w.writeAsync(new ByteArrayInputStream(data))
.thenCompose(DLOutputStream::closeAsync)).get();

verify(writer, times(1)).writeBulk(any(List.class));
verify(writer, times(1)).markEndOfStream();
verify(writer, times(1)).asyncClose();
verify(dlm, times(1)).asyncClose();
}

@Test
public void openAsyncLogWriterFailed() {
when(dlm.openAsyncLogWriter()).thenReturn(failedFuture(new Exception("Open writer was failed")));
Expand Down

0 comments on commit af9ae9b

Please sign in to comment.