Skip to content

PIP 54: Support acknowledgment at batch index level

lipenghui edited this page Feb 18, 2020 · 5 revisions

Motivation

Currently, the managed cursor maintain the acked messages by mark delete position and individual delete messages. All of the mark delete position and individual delete messages are faced to batch messages(ledgerId,entryId). For users, the single message is the publishing and consuming unit. Since the managed cursor can’t maintain the local index of a batch message, broker will dispatch acked messages(not batch) to consumers like issue https://github.com/apache/pulsar/issues/5969.

So this PIP is to support the ability to track the ack status of each batch index to avoid dispatch acked messages to users.

Approach

This approach requires the cooperation of the client and server. When the broker dispatch messages, it will carry the batch index that has been acked. The client will filter out the batch index that has been acked.

Client need to send the batch index ack information to broker, so that broker can maintain the batch index ack status.

The managed cursor maintain the batch index ack status in memory by using a BitSet and the BitSet can be persisted to ledger and metastore, so that can avoid broker crash. When broker receive the batch index ack request, the acked batch index will be add the BitSet. When broker dispatch messages to client will get the batch message index ack status from the managed cursor and send it to client. When all indexes of the batch message are acked, the cursor will delete the batch message.

Be careful when calculating consumer permits because client filter out the acked batch index, so the broker need to increase the available permits equals to acked batch indexes. Otherwise, the broker will stop dispatch messages to that consumer because it does not have enough available permits.

For save resources, transfer the acked batch indexes ranges between client and broker. And in managed cursor also persist batch indexes ranges to the cursor ledger and metadata store.

Changes

Wire protocol

message CommandMessage {
    repeated int64 ack_set = 4;
}
message MessageIdData {
    repeated int64 ack_set = 5;
}

Managed ledger data formats

message BatchedEntryDeletionIndexInfo {
    required NestedPositionInfo position = 1;
    repeated int64 deleteSet = 2;
}
message ManagedCursorInfo {
    // Store which index in the batch message has been deleted
    repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 7;
}
message PositionInfo {
    // Store which index in the batch message has been deleted
    repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 5;
}

Configuration

Added a flag named acknowledgmentAtBatchIndexLevelEnabled in broker.conf to enable or disable the batch index acknowledgement

Clone this wiki locally