Skip to content

PIP 54: Support acknowledgment at batch index level

lipenghui edited this page Feb 18, 2020 · 5 revisions

Motivation

Currently, the managed cursor maintain the acked messages by mark delete position and individual delete messages. All of the mark delete position and individual delete messages are faced to batch messages(ledgerId,entryId). For users, the single message is the publishing and consuming unit. Since the managed cursor can’t maintain the local index of a batch message, broker will dispatch acked messages(not batch) to consumers like issue https://github.com/apache/pulsar/issues/5969.

So this PIP is to support the ability to track the ack status of each batch index to avoid dispatch acked messages to users.

Approach

This approach requires the cooperation of the client and server. When the broker dispatch messages, it will carry the batch index that has been acked. The client will filter out the batch index that has been acked.

Client need to send the batch index ack information to broker, so that broker can maintain the batch index ack status.

The managed cursor maintain the batch index ack status in memory by using a BitSet and the BitSet can be persisted to ledger and metastore, so that can avoid broker crash. When broker receive the batch index ack request, the acked batch index will be add the BitSet. When broker dispatch messages to client will get the batch message index ack status from the managed cursor and send it to client. When all indexes of the batch message are acked, the cursor will delete the batch message.

Be careful when calculating consumer permits because client filter out the acked batch index, so the broker need to increase the available permits equals to acked batch indexes. Otherwise, the broker will stop dispatch messages to that consumer because it does not have enough available permits.

For save resources, transfer the acked batch indexes ranges between client and broker. And in managed cursor also persist batch indexes ranges to the cursor ledger and metadata store.

Changes

Wire protocol

message BatchMessageIndexesAckData {
    required MessageIdData message_id = 1;
    required int32 batchSize = 2;
    repeated IntRange ack_indexes = 3;
}
message CommandMessage {
    repeated IntRange acked_indexes = 4;
}
message CommandAck {
    repeated BatchMessageIndexesAckData batch_message_ack_indexes = 8;
}

Managed ledger data formats

message BatchDeletedIndexRange {
    required uint32 lowerIndex = 1;
    required uint32 upperIndex = 2;
}

message BatchDeletedIndexInfo {
    required NestedPositionInfo position = 1;
    repeated BatchDeletedIndexRange batchDeletedIndexes = 2;
}
message PositionInfo {
    // Store which index in the batch message has been deleted
    repeated BatchDeletedIndexInfo batchDeletedIndexes = 5;
}

Configuration

Added a flag named batchIndexAcknowledgeEnable in broker.conf to enable or disable the batch index acknowledgement

Clone this wiki locally