Skip to content

Commit

Permalink
[PIP-146] ManagedCursorInfo compression
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
  • Loading branch information
nodece committed Apr 11, 2022
1 parent 954b0cd commit 3b3bc5e
Show file tree
Hide file tree
Showing 8 changed files with 236 additions and 77 deletions.
Expand Up @@ -86,4 +86,9 @@ public class ManagedLedgerFactoryConfig {
* ManagedLedgerInfo compression type. If the compression type is null or invalid, don't compress data.
*/
private String managedLedgerInfoCompressionType = MLDataFormats.CompressionType.NONE.name();

/**
* ManagedCursorInfo compression type. If the compression type is null or invalid, don't compress data.
*/
private String managedCursorInfoCompressionType = MLDataFormats.CompressionType.NONE.name();
}
Expand Up @@ -185,7 +185,8 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
this.bookkeeperFactory = bookKeeperGroupFactory;
this.isBookkeeperManaged = isBookkeeperManaged;
this.metadataStore = metadataStore;
this.store = new MetaStoreImpl(metadataStore, scheduledExecutor, config.getManagedLedgerInfoCompressionType());
this.store = new MetaStoreImpl(metadataStore, scheduledExecutor, config.getManagedLedgerInfoCompressionType(),
config.getManagedCursorInfoCompressionType());
this.config = config;
this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
this.entryCacheManager = new EntryCacheManager(this);
Expand Down
Expand Up @@ -39,6 +39,7 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
Expand All @@ -55,30 +56,39 @@ public class MetaStoreImpl implements MetaStore {
private final MetadataStore store;
private final OrderedExecutor executor;

private static final int MAGIC_MANAGED_LEDGER_INFO_METADATA = 0x4778; // 0100 0111 0111 1000
private final CompressionType compressionType;
private static final int MAGIC_MANAGED_INFO_METADATA = 0x4778; // 0100 0111 0111 1000
private final CompressionType ledgerInfoCompressionType;
private final CompressionType cursorInfoCompressionType;

public MetaStoreImpl(MetadataStore store, OrderedExecutor executor) {
this.store = store;
this.executor = executor;
this.compressionType = CompressionType.NONE;
this.ledgerInfoCompressionType = CompressionType.NONE;
this.cursorInfoCompressionType = CompressionType.NONE;
}

public MetaStoreImpl(MetadataStore store, OrderedExecutor executor, String compressionType) {
public MetaStoreImpl(MetadataStore store, OrderedExecutor executor, String ledgerInfoCompressionType,
String cursorInfoCompressionType) {
this.store = store;
this.executor = executor;
CompressionType finalCompressionType;
if (compressionType != null) {
try {
finalCompressionType = CompressionType.valueOf(compressionType);
} catch (Exception e) {
log.error("Failed to get compression type {} error msg: {}.", compressionType, e.getMessage());
throw e;
}
} else {
finalCompressionType = CompressionType.NONE;
this.ledgerInfoCompressionType = parseCompressionType(ledgerInfoCompressionType);
this.cursorInfoCompressionType = parseCompressionType(cursorInfoCompressionType);
}

private CompressionType parseCompressionType(String value) {
if (StringUtils.isEmpty(value)) {
return CompressionType.NONE;
}

CompressionType compressionType;
try {
compressionType = CompressionType.valueOf(value);
} catch (Exception e) {
log.error("Failed to get compression type {} error msg: {}.", value, e.getMessage());
throw e;
}
this.compressionType = finalCompressionType;

return compressionType;
}

@Override
Expand Down Expand Up @@ -185,7 +195,7 @@ public void asyncGetCursorInfo(String ledgerName, String cursorName,
.thenAcceptAsync(optRes -> {
if (optRes.isPresent()) {
try {
ManagedCursorInfo info = ManagedCursorInfo.parseFrom(optRes.get().getValue());
ManagedCursorInfo info = parseManagedCursorInfo(optRes.get().getValue());
callback.operationComplete(info, optRes.get().getStat());
} catch (InvalidProtocolBufferException e) {
callback.operationFailed(getException(e));
Expand All @@ -208,7 +218,7 @@ public void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedC
info.getCursorsLedgerId(), info.getMarkDeleteLedgerId(), info.getMarkDeleteEntryId());

String path = PREFIX + ledgerName + "/" + cursorName;
byte[] content = info.toByteArray(); // Binary format
byte[] content = compressCursorInfo(info);

long expectedVersion;

Expand Down Expand Up @@ -322,32 +332,97 @@ private static MetaStoreException getException(Throwable t) {
}
}

public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) {
if (ledgerInfoCompressionType.equals(CompressionType.NONE)) {
return managedLedgerInfo.toByteArray();
}
MLDataFormats.ManagedLedgerInfoMetadata mlInfoMetadata = MLDataFormats.ManagedLedgerInfoMetadata
.newBuilder()
.setCompressionType(ledgerInfoCompressionType)
.setUncompressedSize(managedLedgerInfo.getSerializedSize())
.build();
return compressManagedInfo(managedLedgerInfo.toByteArray(), mlInfoMetadata.toByteArray(),
mlInfoMetadata.getSerializedSize(), ledgerInfoCompressionType);
}

public byte[] compressCursorInfo(ManagedCursorInfo managedCursorInfo) {
if (cursorInfoCompressionType.equals(CompressionType.NONE)) {
return managedCursorInfo.toByteArray();
}
MLDataFormats.ManagedCursorInfoMetadata metadata = MLDataFormats.ManagedCursorInfoMetadata
.newBuilder()
.setCompressionType(cursorInfoCompressionType)
.setUncompressedSize(managedCursorInfo.getSerializedSize())
.build();
return compressManagedInfo(managedCursorInfo.toByteArray(), metadata.toByteArray(),
metadata.getSerializedSize(), cursorInfoCompressionType);
}

public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProtocolBufferException {
ByteBuf byteBuf = Unpooled.wrappedBuffer(data);

byte[] metadataBytes = extractCompressMetadataBytes(byteBuf);
if (metadataBytes != null) {
try {
MLDataFormats.ManagedLedgerInfoMetadata metadata =
MLDataFormats.ManagedLedgerInfoMetadata.parseFrom(metadataBytes);
return ManagedLedgerInfo.parseFrom(getCompressionCodec(metadata.getCompressionType())
.decode(byteBuf, metadata.getUncompressedSize()).nioBuffer());
} catch (Exception e) {
log.error("Failed to parse managedLedgerInfo metadata, "
+ "fall back to parse managedLedgerInfo directly.", e);
return ManagedLedgerInfo.parseFrom(data);
} finally {
byteBuf.release();
}
} else {
return ManagedLedgerInfo.parseFrom(data);
}
}

public ManagedCursorInfo parseManagedCursorInfo(byte[] data) throws InvalidProtocolBufferException {
ByteBuf byteBuf = Unpooled.wrappedBuffer(data);

byte[] metadataBytes = extractCompressMetadataBytes(byteBuf);
if (metadataBytes != null) {
try {
MLDataFormats.ManagedCursorInfoMetadata metadata =
MLDataFormats.ManagedCursorInfoMetadata.parseFrom(metadataBytes);
return ManagedCursorInfo.parseFrom(getCompressionCodec(metadata.getCompressionType())
.decode(byteBuf, metadata.getUncompressedSize()).nioBuffer());
} catch (Exception e) {
log.error("Failed to parse ManagedCursorInfo metadata, "
+ "fall back to parse ManagedCursorInfo directly", e);
return ManagedCursorInfo.parseFrom(data);
} finally {
byteBuf.release();
}
} else {
return ManagedCursorInfo.parseFrom(data);
}
}

/**
* Compress ManagedLedgerInfo data.
* Compress Managed Info data such as LedgerInfo, CursorInfo.
*
* compression data structure
* [MAGIC_NUMBER](2) + [METADATA_SIZE](4) + [METADATA_PAYLOAD] + [MANAGED_LEDGER_INFO_PAYLOAD]
*/
public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) {
*/
private byte[] compressManagedInfo(byte[] info, byte[] metadata, int metadataSerializedSize,
MLDataFormats.CompressionType compressionType) {
if (compressionType == null || compressionType.equals(CompressionType.NONE)) {
return managedLedgerInfo.toByteArray();
return info;
}
ByteBuf metadataByteBuf = null;
ByteBuf encodeByteBuf = null;
try {
MLDataFormats.ManagedLedgerInfoMetadata mlInfoMetadata = MLDataFormats.ManagedLedgerInfoMetadata
.newBuilder()
.setCompressionType(compressionType)
.setUncompressedSize(managedLedgerInfo.getSerializedSize())
.build();
metadataByteBuf = PulsarByteBufAllocator.DEFAULT.buffer(
mlInfoMetadata.getSerializedSize() + 6, mlInfoMetadata.getSerializedSize() + 6);
metadataByteBuf.writeShort(MAGIC_MANAGED_LEDGER_INFO_METADATA);
metadataByteBuf.writeInt(mlInfoMetadata.getSerializedSize());
metadataByteBuf.writeBytes(mlInfoMetadata.toByteArray());

metadataByteBuf = PulsarByteBufAllocator.DEFAULT.buffer(metadataSerializedSize + 6,
metadataSerializedSize + 6);
metadataByteBuf.writeShort(MAGIC_MANAGED_INFO_METADATA);
metadataByteBuf.writeInt(metadataSerializedSize);
metadataByteBuf.writeBytes(metadata);
encodeByteBuf = getCompressionCodec(compressionType)
.encode(Unpooled.wrappedBuffer(managedLedgerInfo.toByteArray()));
.encode(Unpooled.wrappedBuffer(info));
CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
compositeByteBuf.addComponent(true, metadataByteBuf);
compositeByteBuf.addComponent(true, encodeByteBuf);
Expand All @@ -364,42 +439,14 @@ public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) {
}
}

public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProtocolBufferException {
ByteBuf byteBuf = Unpooled.wrappedBuffer(data);
if (byteBuf.readableBytes() > 0 && byteBuf.readShort() == MAGIC_MANAGED_LEDGER_INFO_METADATA) {
ByteBuf decodeByteBuf = null;
try {
int metadataSize = byteBuf.readInt();
byte[] metadataBytes = new byte[metadataSize];
byteBuf.readBytes(metadataBytes);
MLDataFormats.ManagedLedgerInfoMetadata metadata =
MLDataFormats.ManagedLedgerInfoMetadata.parseFrom(metadataBytes);

long unpressedSize = metadata.getUncompressedSize();
decodeByteBuf = getCompressionCodec(metadata.getCompressionType())
.decode(byteBuf, (int) unpressedSize);
byte[] decodeBytes;
// couldn't decode data by ZLIB compression byteBuf array() directly
if (decodeByteBuf.hasArray() && !CompressionType.ZLIB.equals(metadata.getCompressionType())) {
decodeBytes = decodeByteBuf.array();
} else {
decodeBytes = new byte[decodeByteBuf.readableBytes() - decodeByteBuf.readerIndex()];
decodeByteBuf.readBytes(decodeBytes);
}
return ManagedLedgerInfo.parseFrom(decodeBytes);
} catch (Exception e) {
log.error("Failed to parse managedLedgerInfo metadata, "
+ "fall back to parse managedLedgerInfo directly.", e);
return ManagedLedgerInfo.parseFrom(data);
} finally {
if (decodeByteBuf != null) {
decodeByteBuf.release();
}
byteBuf.release();
}
} else {
return ManagedLedgerInfo.parseFrom(data);
private byte[] extractCompressMetadataBytes(ByteBuf data) {
if (data.readableBytes() > 0 && data.readShort() == MAGIC_MANAGED_INFO_METADATA) {
int metadataSize = data.readInt();
byte[] metadataBytes = new byte[metadataSize];
data.readBytes(metadataBytes);
return metadataBytes;
}
return null;
}

private CompressionCodec getCompressionCodec(CompressionType compressionType) {
Expand Down
5 changes: 5 additions & 0 deletions managed-ledger/src/main/proto/MLDataFormats.proto
Expand Up @@ -137,3 +137,8 @@ message ManagedLedgerInfoMetadata {
required CompressionType compressionType = 1;
required int32 uncompressedSize = 2;
}

message ManagedCursorInfoMetadata {
required CompressionType compressionType = 1;
required int32 uncompressedSize = 2;
}
@@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl;

import static org.junit.Assert.assertEquals;
import static org.testng.Assert.expectThrows;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.common.api.proto.CompressionType;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/**
* ManagedCursorInfo metadata test.
*/
@Slf4j
public class ManagedCursorInfoMetadataTest {
private final String INVALID_TYPE = "INVALID_TYPE";

@DataProvider(name = "compressionTypeProvider")
private Object[][] compressionTypeProvider() {
return new Object[][]{
{null},
{INVALID_TYPE},
{CompressionType.NONE.name()},
{CompressionType.LZ4.name()},
{CompressionType.ZLIB.name()},
{CompressionType.ZSTD.name()},
{CompressionType.SNAPPY.name()}
};
}

@Test(dataProvider = "compressionTypeProvider")
public void testEncodeAndDecode(String compressionType) throws IOException {
long ledgerId = 10000;
MLDataFormats.ManagedCursorInfo.Builder builder = MLDataFormats.ManagedCursorInfo.newBuilder();

builder.setCursorsLedgerId(ledgerId);
builder.setMarkDeleteLedgerId(ledgerId);

List<MLDataFormats.BatchedEntryDeletionIndexInfo> batchedEntryDeletionIndexInfos = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
MLDataFormats.NestedPositionInfo nestedPositionInfo = MLDataFormats.NestedPositionInfo.newBuilder()
.setEntryId(i).setLedgerId(i).build();
MLDataFormats.BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = MLDataFormats
.BatchedEntryDeletionIndexInfo.newBuilder().setPosition(nestedPositionInfo).build();
batchedEntryDeletionIndexInfos.add(batchedEntryDeletionIndexInfo);
}
builder.addAllBatchedEntryDeletionIndexInfo(batchedEntryDeletionIndexInfos);

MetaStoreImpl metaStore;
if (INVALID_TYPE.equals(compressionType)) {
IllegalArgumentException compressionTypeEx = expectThrows(IllegalArgumentException.class, () -> {
new MetaStoreImpl(null, null, null, compressionType);
});
assertEquals("No enum constant org.apache.bookkeeper.mledger.proto.MLDataFormats.CompressionType."
+ compressionType, compressionTypeEx.getMessage());
return;
} else {
metaStore = new MetaStoreImpl(null, null, null, compressionType);
}

MLDataFormats.ManagedCursorInfo managedCursorInfo = builder.build();
byte[] compressionBytes = metaStore.compressCursorInfo(managedCursorInfo);
log.info("[{}] Uncompressed data size: {}, compressed data size: {}",
compressionType, managedCursorInfo.getSerializedSize(), compressionBytes.length);
if (compressionType == null || compressionType.equals(CompressionType.NONE.name())) {
Assert.assertEquals(compressionBytes.length, managedCursorInfo.getSerializedSize());
}

// parse compression data and unCompression data, check their results.
MLDataFormats.ManagedCursorInfo info1 = metaStore.parseManagedCursorInfo(compressionBytes);
MLDataFormats.ManagedCursorInfo info2 = metaStore.parseManagedCursorInfo(managedCursorInfo.toByteArray());
Assert.assertEquals(info1, info2);
}
}

0 comments on commit 3b3bc5e

Please sign in to comment.