Skip to content

Commit

Permalink
[Patch] Fix decode compression managedLedgerInfo data (apache#11569)
Browse files Browse the repository at this point in the history
* unify using the class `MLDataFormats.CompressionType` in package `managed-ledger`.
* fix compression data decode error
  • Loading branch information
gaoran10 authored and ciaocloud committed Oct 16, 2021
1 parent cb9c4c5 commit d6492ed
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import lombok.Data;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.pulsar.common.api.proto.CompressionType;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;

/**
* Configuration for a {@link ManagedLedgerFactory}.
Expand Down Expand Up @@ -80,5 +80,5 @@ public class ManagedLedgerFactoryConfig {
/**
* ManagedLedgerInfo compression type. If the compression type is null or invalid, don't compress data.
*/
private String managedLedgerInfoCompressionType = CompressionType.NONE.name();
private String managedLedgerInfoCompressionType = MLDataFormats.CompressionType.NONE.name();
}
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,8 @@ public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) {
metadataByteBuf.writeInt(mlInfoMetadata.getSerializedSize());
metadataByteBuf.writeBytes(mlInfoMetadata.toByteArray());

encodeByteBuf = getCompressionCodec().encode(Unpooled.wrappedBuffer(managedLedgerInfo.toByteArray()));
encodeByteBuf = getCompressionCodec(compressionType)
.encode(Unpooled.wrappedBuffer(managedLedgerInfo.toByteArray()));

CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
compositeByteBuf.addComponent(true, metadataByteBuf);
Expand Down Expand Up @@ -347,7 +348,8 @@ public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProto
MLDataFormats.ManagedLedgerInfoMetadata.parseFrom(metadataBytes);

long unpressedSize = metadata.getUncompressedSize();
decodeByteBuf = getCompressionCodec().decode(byteBuf, (int) unpressedSize);
decodeByteBuf = getCompressionCodec(metadata.getCompressionType())
.decode(byteBuf, (int) unpressedSize);
byte[] decodeBytes;
// couldn't decode data by ZLIB compression byteBuf array() directly
if (decodeByteBuf.hasArray() && !CompressionType.ZLIB.equals(metadata.getCompressionType())) {
Expand All @@ -371,7 +373,7 @@ public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProto
}
}

private CompressionCodec getCompressionCodec() {
private CompressionCodec getCompressionCodec(CompressionType compressionType) {
return CompressionCodecProvider.getCompressionCodec(
org.apache.pulsar.common.api.proto.CompressionType.valueOf(compressionType.name()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@

import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand All @@ -37,7 +38,7 @@ public class ManagedLedgerCompressionTest extends BrokerTestBase {
@BeforeClass
@Override
protected void setup() throws Exception {
conf.setManagedLedgerInfoCompressionType(CompressionType.NONE.name());
conf.setManagedLedgerInfoCompressionType(MLDataFormats.CompressionType.NONE.name());
super.baseSetup();
}

Expand All @@ -47,7 +48,7 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test(timeOut = 1000 * 10)
@Test(timeOut = 1000 * 20)
public void testRestartBrokerEnableManagedLedgerInfoCompression() throws Exception {
String topic = newTopicName();
@Cleanup
Expand All @@ -61,27 +62,17 @@ public void testRestartBrokerEnableManagedLedgerInfoCompression() throws Excepti
.subscribe();

int messageCnt = 100;
for (int i = 0; i < messageCnt; i++) {
producer.newMessage().value("test".getBytes()).send();
}
for (int i = 0; i < messageCnt; i++) {
Message<byte[]> message = consumer.receive(1000, TimeUnit.SECONDS);
consumer.acknowledge(message);
Assert.assertNotNull(message);
}
produceAndConsume(producer, consumer, messageCnt);

stopBroker();
conf.setManagedLedgerInfoCompressionType(CompressionType.ZSTD.name());
conf.setManagedLedgerInfoCompressionType(MLDataFormats.CompressionType.ZSTD.name());
startBroker();
produceAndConsume(producer, consumer, messageCnt);

for (int i = 0; i < messageCnt; i++) {
producer.newMessage().value("test".getBytes()).send();
}
for (int i = 0; i < messageCnt; i++) {
Message<byte[]> message = consumer.receive(1000, TimeUnit.SECONDS);
Assert.assertNotNull(message);
consumer.acknowledge(message);
}
stopBroker();
conf.setManagedLedgerInfoCompressionType(MLDataFormats.CompressionType.LZ4.name());
startBroker();
produceAndConsume(producer, consumer, messageCnt);

stopBroker();
conf.setManagedLedgerInfoCompressionType("INVALID");
Expand All @@ -94,6 +85,22 @@ public void testRestartBrokerEnableManagedLedgerInfoCompression() throws Excepti
"No enum constant org.apache.bookkeeper.mledger.proto.MLDataFormats.CompressionType.INVALID",
e.getCause().getMessage());
}

conf.setManagedLedgerInfoCompressionType(MLDataFormats.CompressionType.NONE.name());
startBroker();
produceAndConsume(producer, consumer, messageCnt);
}

private void produceAndConsume(Producer<byte[]> producer,
Consumer<byte[]> consumer, int messageCnt) throws PulsarClientException {
for (int i = 0; i < messageCnt; i++) {
producer.newMessage().value("test".getBytes()).send();
}
for (int i = 0; i < messageCnt; i++) {
Message<byte[]> message = consumer.receive(1000, TimeUnit.SECONDS);
consumer.acknowledge(message);
Assert.assertNotNull(message);
}
}

}

0 comments on commit d6492ed

Please sign in to comment.