Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix batch ack count is negtive issue. #14288

Merged
merged 1 commit into from Feb 15, 2022

Conversation

Technoboy-
Copy link
Contributor

Motivation

As #13383 fixed the batch ack issue. we find that the unack-msg count could be negative(#14246). At first, we think it was the normal case caused by msg redelivery. But after diving into the logic, we find it's a bug.

The test is copy from #14246 :

for (int i = 0; i < 50; i++) {
      Message<String> msg = consumer.receive();
      if (i % 2 == 0) {
           consumer.acknowledgeAsync(msg);
       } else {
            consumer.negativeAcknowledge(msg);
       }
}       

When msg is negativeAcknowledge, Consumer#redeliverUnacknowledgedMessages will invoke:

public void redeliverUnacknowledgedMessages(List<MessageIdData> messageIds) {
int totalRedeliveryMessages = 0;
List<PositionImpl> pendingPositions = Lists.newArrayList();
for (MessageIdData msg : messageIds) {
PositionImpl position = PositionImpl.get(msg.getLedgerId(), msg.getEntryId());
LongPair longPair = pendingAcks.get(position.getLedgerId(), position.getEntryId());
if (longPair != null) {
int unAckedCount = (int) getUnAckedCountForBatchIndexLevelEnabled(position, longPair.first);
pendingAcks.remove(position.getLedgerId(), position.getEntryId());
totalRedeliveryMessages += unAckedCount;
pendingPositions.add(position);
}
}

When calculating totalRedeliveryMessages, it must check pendingAcks contains this message. and remove from pendingAcks after that. (Dispatch messages will add messages to pendingAcks)
So the above test may exist that when negativeAcknowledge first and then acknowledgeAsync.
acknowledgeAsync mapped to Consumer#individualAckNormal and decrease unack-msg in :

private long getAckedCountForBatchIndexLevelEnabled(PositionImpl position, long batchSize, long[] ackSets) {
long ackedCount = 0;
if (isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(subType)) {
long[] cursorAckSet = getCursorAckSet(position);
if (cursorAckSet != null) {
BitSetRecyclable cursorBitSet = BitSetRecyclable.create().resetWords(cursorAckSet);
int lastCardinality = cursorBitSet.cardinality();
BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(ackSets);
cursorBitSet.and(givenBitSet);
givenBitSet.recycle();
int currentCardinality = cursorBitSet.cardinality();
ackedCount = lastCardinality - currentCardinality;
cursorBitSet.recycle();
} else if (pendingAcks.get(position.getLedgerId(), position.getEntryId()) != null) {
ackedCount = batchSize - BitSet.valueOf(ackSets).cardinality();
}
}
return ackedCount;
}

It doesn't check pendingAcks. this is the root cause. Should move line 556 to 545.

Documentation

  • no-need-doc

@Technoboy- Technoboy- requested review from codelipenghui, congbobo184 and gaoran10 and removed request for codelipenghui February 15, 2022 06:06
@Technoboy- Technoboy- self-assigned this Feb 15, 2022
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Feb 15, 2022
@codelipenghui codelipenghui added this to the 2.10.0 milestone Feb 15, 2022
@codelipenghui codelipenghui added release/2.8.3 release/2.9.2 release/blocker Indicate the PR or issue that should block the release until it gets resolved labels Feb 15, 2022
@codelipenghui codelipenghui merged commit 6b828b4 into apache:master Feb 15, 2022
michaeljmarshall pushed a commit that referenced this pull request Feb 15, 2022
As #13383 fixed the batch ack issue. we find that the unack-msg count could be negative(#14246). At first, we think it was the normal case caused by msg redelivery.  But after diving into the logic, we find it's a bug.

The test is copy from #14246 :

```
for (int i = 0; i < 50; i++) {
      Message<String> msg = consumer.receive();
      if (i % 2 == 0) {
           consumer.acknowledgeAsync(msg);
       } else {
            consumer.negativeAcknowledge(msg);
       }
}
```
When msg is `negativeAcknowledge`,  Consumer#redeliverUnacknowledgedMessages will invoke:
https://github.com/apache/pulsar/blob/b22f70658927e07e3726d32290065f47313070b9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L900-L912

When calculating `totalRedeliveryMessages`, it must check `pendingAcks` contains this message. and remove from `pendingAcks` after that.  (Dispatch messages will add messages to pendingAcks)
So the above test may exist that when `negativeAcknowledge` first and then `acknowledgeAsync`.
`acknowledgeAsync` mapped to `Consumer#individualAckNormal` and decrease unack-msg in :
https://github.com/apache/pulsar/blob/b22f70658927e07e3726d32290065f47313070b9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L543-L561
It doesn't check `pendingAcks`. this is the root cause. Should move line 556 to 545.

(cherry picked from commit 6b828b4)
@michaeljmarshall michaeljmarshall added the cherry-picked/branch-2.8 Archived: 2.8 is end of life label Feb 15, 2022
codelipenghui pushed a commit that referenced this pull request Feb 16, 2022
### Motivation

As #13383 fixed the batch ack issue. we find that the unack-msg count could be negative(#14246). At first, we think it was the normal case caused by msg redelivery.  But after diving into the logic, we find it's a bug.

The test is copy from #14246 :

```
for (int i = 0; i < 50; i++) {
      Message<String> msg = consumer.receive();
      if (i % 2 == 0) {
           consumer.acknowledgeAsync(msg);
       } else {
            consumer.negativeAcknowledge(msg);
       }
}
```
When msg is `negativeAcknowledge`,  Consumer#redeliverUnacknowledgedMessages will invoke:
https://github.com/apache/pulsar/blob/b22f70658927e07e3726d32290065f47313070b9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L900-L912

When calculating `totalRedeliveryMessages`, it must check `pendingAcks` contains this message. and remove from `pendingAcks` after that.  (Dispatch messages will add messages to pendingAcks)
So the above test may exist that when `negativeAcknowledge` first and then `acknowledgeAsync`.
`acknowledgeAsync` mapped to `Consumer#individualAckNormal` and decrease unack-msg in :
https://github.com/apache/pulsar/blob/b22f70658927e07e3726d32290065f47313070b9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L543-L561
It doesn't check `pendingAcks`. this is the root cause. Should move line 556 to 545.

(cherry picked from commit 6b828b4)
@codelipenghui codelipenghui added the cherry-picked/branch-2.9 Archived: 2.9 is end of life label Feb 16, 2022
Nicklee007 pushed a commit to Nicklee007/pulsar that referenced this pull request Apr 20, 2022
### Motivation

As apache#13383 fixed the batch ack issue. we find that the unack-msg count could be negative(apache#14246). At first, we think it was the normal case caused by msg redelivery.  But after diving into the logic, we find it's a bug.

The test is copy from apache#14246 :

```
for (int i = 0; i < 50; i++) {
      Message<String> msg = consumer.receive();
      if (i % 2 == 0) {
           consumer.acknowledgeAsync(msg);
       } else {
            consumer.negativeAcknowledge(msg);
       }
}       
```
When msg is `negativeAcknowledge`,  Consumer#redeliverUnacknowledgedMessages will invoke:
https://github.com/apache/pulsar/blob/b22f70658927e07e3726d32290065f47313070b9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L900-L912

When calculating `totalRedeliveryMessages`, it must check `pendingAcks` contains this message. and remove from `pendingAcks` after that.  (Dispatch messages will add messages to pendingAcks)
So the above test may exist that when `negativeAcknowledge` first and then `acknowledgeAsync`.  
`acknowledgeAsync` mapped to `Consumer#individualAckNormal` and decrease unack-msg in :
https://github.com/apache/pulsar/blob/b22f70658927e07e3726d32290065f47313070b9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L543-L561
It doesn't check `pendingAcks`. this is the root cause. Should move line 556 to 545.
@Technoboy- Technoboy- deleted the fix-batch-ack-negtive-issue branch August 10, 2022 05:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker cherry-picked/branch-2.8 Archived: 2.8 is end of life cherry-picked/branch-2.9 Archived: 2.9 is end of life doc-not-needed Your PR changes do not impact docs release/blocker Indicate the PR or issue that should block the release until it gets resolved release/2.8.3 release/2.9.2
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants