Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-270 Add config to set metadata size threshold for compression. #20307

Closed
lifepuzzlefun opened this issue May 11, 2023 · 2 comments
Closed

Comments

@lifepuzzlefun
Copy link
Contributor

lifepuzzlefun commented May 11, 2023

Background knowledge

Current config

Current we support to set compression on ManagedLedger and ManagedCursor metadata
by managedLedgerInfoCompressionType and managedCursorInfoCompressionType

About metadata

How current compression works?

Bytes format when compression applied.
[MAGIC_NUMBER](2) + [METADATA_SIZE](4) + [METADATA_PAYLOAD] + [MANAGED_LEDGER_INFO_PAYLOAD]

enum CompressionType {
NONE = 0;
LZ4 = 1;
ZLIB = 2;
ZSTD = 3;
SNAPPY = 4;
}
message ManagedLedgerInfoMetadata {
required CompressionType compressionType = 1;
required int32 uncompressedSize = 2;
}
message ManagedCursorInfoMetadata {
required CompressionType compressionType = 1;
required int32 uncompressedSize = 2;
}

When the metadata reader try to decode the raw metadata bytes, if the magic number MAGIC_MANAGED_INFO_METADATA in the first short bytes occurs, it means that the reader should decode the bytes using the compression header to decode.

Related code:

public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) {
if (ledgerInfoCompressionType.equals(CompressionType.NONE)) {
return managedLedgerInfo.toByteArray();
}
MLDataFormats.ManagedLedgerInfoMetadata mlInfoMetadata = MLDataFormats.ManagedLedgerInfoMetadata
.newBuilder()
.setCompressionType(ledgerInfoCompressionType)
.setUncompressedSize(managedLedgerInfo.getSerializedSize())
.build();
return compressManagedInfo(managedLedgerInfo.toByteArray(), mlInfoMetadata.toByteArray(),
mlInfoMetadata.getSerializedSize(), ledgerInfoCompressionType);
}
public byte[] compressCursorInfo(ManagedCursorInfo managedCursorInfo) {
if (cursorInfoCompressionType.equals(CompressionType.NONE)) {
return managedCursorInfo.toByteArray();
}
MLDataFormats.ManagedCursorInfoMetadata metadata = MLDataFormats.ManagedCursorInfoMetadata
.newBuilder()
.setCompressionType(cursorInfoCompressionType)
.setUncompressedSize(managedCursorInfo.getSerializedSize())
.build();
return compressManagedInfo(managedCursorInfo.toByteArray(), metadata.toByteArray(),
metadata.getSerializedSize(), cursorInfoCompressionType);
}
public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProtocolBufferException {
ByteBuf byteBuf = Unpooled.wrappedBuffer(data);
byte[] metadataBytes = extractCompressMetadataBytes(byteBuf);
if (metadataBytes != null) {
try {
MLDataFormats.ManagedLedgerInfoMetadata metadata =
MLDataFormats.ManagedLedgerInfoMetadata.parseFrom(metadataBytes);
return ManagedLedgerInfo.parseFrom(getCompressionCodec(metadata.getCompressionType())
.decode(byteBuf, metadata.getUncompressedSize()).nioBuffer());
} catch (Exception e) {
log.error("Failed to parse managedLedgerInfo metadata, "
+ "fall back to parse managedLedgerInfo directly.", e);
return ManagedLedgerInfo.parseFrom(data);
} finally {
byteBuf.release();
}
} else {
return ManagedLedgerInfo.parseFrom(data);
}
}
public ManagedCursorInfo parseManagedCursorInfo(byte[] data) throws InvalidProtocolBufferException {
ByteBuf byteBuf = Unpooled.wrappedBuffer(data);
byte[] metadataBytes = extractCompressMetadataBytes(byteBuf);
if (metadataBytes != null) {
try {
MLDataFormats.ManagedCursorInfoMetadata metadata =
MLDataFormats.ManagedCursorInfoMetadata.parseFrom(metadataBytes);
return ManagedCursorInfo.parseFrom(getCompressionCodec(metadata.getCompressionType())
.decode(byteBuf, metadata.getUncompressedSize()).nioBuffer());
} catch (Exception e) {
log.error("Failed to parse ManagedCursorInfo metadata, "
+ "fall back to parse ManagedCursorInfo directly", e);
return ManagedCursorInfo.parseFrom(data);
} finally {
byteBuf.release();
}
} else {
return ManagedCursorInfo.parseFrom(data);
}
}
/**
* Compress Managed Info data such as LedgerInfo, CursorInfo.
*
* compression data structure
* [MAGIC_NUMBER](2) + [METADATA_SIZE](4) + [METADATA_PAYLOAD] + [MANAGED_LEDGER_INFO_PAYLOAD]
*/
private byte[] compressManagedInfo(byte[] info, byte[] metadata, int metadataSerializedSize,
MLDataFormats.CompressionType compressionType) {
if (compressionType == null || compressionType.equals(CompressionType.NONE)) {
return info;
}
ByteBuf metadataByteBuf = null;
ByteBuf encodeByteBuf = null;
try {
metadataByteBuf = PulsarByteBufAllocator.DEFAULT.buffer(metadataSerializedSize + 6,
metadataSerializedSize + 6);
metadataByteBuf.writeShort(MAGIC_MANAGED_INFO_METADATA);
metadataByteBuf.writeInt(metadataSerializedSize);
metadataByteBuf.writeBytes(metadata);
encodeByteBuf = getCompressionCodec(compressionType)
.encode(Unpooled.wrappedBuffer(info));
CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
compositeByteBuf.addComponent(true, metadataByteBuf);
compositeByteBuf.addComponent(true, encodeByteBuf);
byte[] dataBytes = new byte[compositeByteBuf.readableBytes()];
compositeByteBuf.readBytes(dataBytes);
return dataBytes;
} finally {
if (metadataByteBuf != null) {
metadataByteBuf.release();
}
if (encodeByteBuf != null) {
encodeByteBuf.release();
}
}
}
private byte[] extractCompressMetadataBytes(ByteBuf data) {
if (data.readableBytes() > 0 && data.readShort() == MAGIC_MANAGED_INFO_METADATA) {
int metadataSize = data.readInt();
byte[] metadataBytes = new byte[metadataSize];
data.readBytes(metadataBytes);
return metadataBytes;
}
return null;
}

ManagedLedger metadata

message ManagedLedgerInfo {
message LedgerInfo {
required int64 ledgerId = 1;
optional int64 entries = 2;
optional int64 size = 3;
optional int64 timestamp = 4;
optional OffloadContext offloadContext = 5;
}
repeated LedgerInfo ledgerInfo = 1;
// If present, it signals the managed ledger has been
// terminated and this was the position of the last
// committed entry.
// No more entries can be written.
optional NestedPositionInfo terminatedPosition = 2;
repeated KeyValue properties = 3;
}

The metadata of ManagedLedger is mainly composed by a list of LedgerInfo to describe pulsar topic storage state.

This metadata can be small when topic is in the light traffic, when user set the topic retention policy to store more message, the managed ledger metadata will become very big.

If most of the metadata size for topic is small enough, the compression won't help too much.

ManagedCursor metadata

message ManagedCursorInfo {
// If the ledger id is -1, then the mark-delete position is
// the one from the (ledgerId, entryId) snapshot below
required int64 cursorsLedgerId = 1;
// Last snapshot of the mark-delete position
optional int64 markDeleteLedgerId = 2;
optional int64 markDeleteEntryId = 3;
repeated MessageRange individualDeletedMessages = 4;
// Additional custom properties associated with
// the current cursor position
repeated LongProperty properties = 5;
optional int64 lastActive = 6;
// Store which index in the batch message has been deleted
repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 7;
// Additional custom properties associated with
// the cursor
repeated StringProperty cursorProperties = 8;
}

The metadata of ManagedCursor is the subscription state of a pulsar subscription.

When user always use accumlateAcknowledge api this state is also small enough,
but when user use Individual ack or enable batchIndexAck the cursor state will become very big.
It depends on the user logic.

The same as metadataLedger metadata, when small enough this won't help too much.

Motivation

Sometimes the metadata is too small to compress. so a size based config is needed.

Goals

In Scope

Add config to set size threshold for compression on metadata.

managedLedgerInfoCompressionThresholdInBytes
managedCursorInfoCompressionThresholdInBytes

And we think the default value should be 16KB.

If the setting is set, only compress the metadata if the persistent size if above the setting.

High Level Design

Add config in ServerConfiguration and this parameter will be set in ManagedLedgerConfig
When persistent metadata the config will affect the compression logic.

Configuration

New configurations:

managedLedgerInfoCompressionThresholdInBytes = 16 * 1024;
managedCursorInfoCompressionThresholdInBytes = 16 * 1024;

Backward & Forward Compatability

Revert

The compression logic is transparent. if remove the config nothing changed.

Links

PRs that introduce metadata compression:

  1. Compress managed ledger info: [ManagedLedger] Compress managed ledger info #11490
  2. PIP-146 managedCursorInfo compression: [PIP-146] ManagedCursorInfo compression #14542

Mailing List discussion thread: https://lists.apache.org/thread/6930c74m31rflrql9y3dpjmm0sbccqkb
Mailing List voting thread: https://lists.apache.org/thread/79t4zp9hl78vd1brbb85x1x6k1j71v44

@github-actions
Copy link

The issue had no activity for 30 days, mark with Stale label.

@github-actions github-actions bot added the Stale label Jun 29, 2023
@lifepuzzlefun
Copy link
Contributor Author

closed via #19561

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant