Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove log unacked msg. #14246

Merged
merged 1 commit into from Feb 11, 2022
Merged

Conversation

Technoboy-
Copy link
Contributor

Motivation

For #13383 fix batch message ack issue, but recently users find that there may exist the unacked-count be negative which will print the log :

if (unackedMsgs < 0) {
log.error("unackedMsgs is : {}, ackedMessages : {}, consumer : {}", unackedMsgs, ackedMessages, consumer);
}

Then we use the below test to reproduce the case :

    @Test
    public void testNegativeAcks()
            throws Exception {
        String topic = BrokerTestUtil.newUniqueName("testNegativeAcks");

        @Cleanup
        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                .topic(topic)
                .subscriptionName("sub1")
                .subscriptionType(SubscriptionType.Shared)
                .enableBatchIndexAcknowledgment(true)
                .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
                .ackTimeout(1000, TimeUnit.MILLISECONDS)
                .subscribe();

        @Cleanup
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic(topic)
                .enableBatching(true)
                .batchingMaxMessages(10)
                .create();

        Set<String> sentMessages = new HashSet<>();

        final int N = 10000;
        for (int i = 0; i < N; i++) {
            String value = "test-" + i;
            producer.sendAsync(value);
            sentMessages.add(value);
        }
        producer.flush();

        // group1
        for (int i = 0; i < 50; i++) {
            Message<String> msg = consumer.receive();
            if (i % 2 == 0) {
                consumer.acknowledgeAsync(msg);
            } else {
                consumer.negativeAcknowledge(msg);
            }
        }
        // group2
        for (int i = 0; i < 50; i++) {
            Message<String> msg = consumer.receive();
            if (i % 2 == 0) {
                consumer.acknowledgeAsync(msg);
            } else {
                consumer.negativeAcknowledge(msg);
            }
        }
        // group3
        for (int i = 0; i < 10; i++) {
            Message<String> msg = consumer.receive();
            if (i % 2 == 0) {
                consumer.acknowledgeAsync(msg);
            } else {
                consumer.negativeAcknowledge(msg);
            }
        }
        Message<String> receive;
        do{
            receive = consumer.receive(20, TimeUnit.SECONDS);
            consumer.acknowledgeAsync(receive);
        } while (receive != null);
    }

Because when redeliver occurs, ack bitset does not bring back to the broker, we can't calculate how many ack-msg we received. So in this case, the unack-msg count may be negative. But nothing impact.
To reduce user confusion, suggest deleting the log now.
Later we may find a good way to resolve the redelivering case.

Documentation

  • no-need-doc

@Technoboy- Technoboy- self-assigned this Feb 11, 2022
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Feb 11, 2022
@merlimat merlimat merged commit 78bfaa2 into apache:master Feb 11, 2022
@codelipenghui codelipenghui added this to the 2.10.0 milestone Feb 12, 2022
@codelipenghui codelipenghui added release/2.9.2 release/2.8.3 release/blocker Indicate the PR or issue that should block the release until it gets resolved labels Feb 12, 2022
codelipenghui pushed a commit that referenced this pull request Feb 15, 2022
### Motivation

As #13383 fixed the batch ack issue. we find that the unack-msg count could be negative(#14246). At first, we think it was the normal case caused by msg redelivery.  But after diving into the logic, we find it's a bug.

The test is copy from #14246 :

```
for (int i = 0; i < 50; i++) {
      Message<String> msg = consumer.receive();
      if (i % 2 == 0) {
           consumer.acknowledgeAsync(msg);
       } else {
            consumer.negativeAcknowledge(msg);
       }
}       
```
When msg is `negativeAcknowledge`,  Consumer#redeliverUnacknowledgedMessages will invoke:
https://github.com/apache/pulsar/blob/b22f70658927e07e3726d32290065f47313070b9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L900-L912

When calculating `totalRedeliveryMessages`, it must check `pendingAcks` contains this message. and remove from `pendingAcks` after that.  (Dispatch messages will add messages to pendingAcks)
So the above test may exist that when `negativeAcknowledge` first and then `acknowledgeAsync`.  
`acknowledgeAsync` mapped to `Consumer#individualAckNormal` and decrease unack-msg in :
https://github.com/apache/pulsar/blob/b22f70658927e07e3726d32290065f47313070b9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L543-L561
It doesn't check `pendingAcks`. this is the root cause. Should move line 556 to 545.
michaeljmarshall pushed a commit that referenced this pull request Feb 15, 2022
(cherry picked from commit 78bfaa2)
@michaeljmarshall michaeljmarshall added the cherry-picked/branch-2.8 Archived: 2.8 is end of life label Feb 15, 2022
@michaeljmarshall
Copy link
Member

@Technoboy- following up here, now that we merged #14288, is this still required? Given that the log line helped find a bug, I am concerned that removing the log line might hide other bugs.

michaeljmarshall pushed a commit that referenced this pull request Feb 15, 2022
As #13383 fixed the batch ack issue. we find that the unack-msg count could be negative(#14246). At first, we think it was the normal case caused by msg redelivery.  But after diving into the logic, we find it's a bug.

The test is copy from #14246 :

```
for (int i = 0; i < 50; i++) {
      Message<String> msg = consumer.receive();
      if (i % 2 == 0) {
           consumer.acknowledgeAsync(msg);
       } else {
            consumer.negativeAcknowledge(msg);
       }
}
```
When msg is `negativeAcknowledge`,  Consumer#redeliverUnacknowledgedMessages will invoke:
https://github.com/apache/pulsar/blob/b22f70658927e07e3726d32290065f47313070b9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L900-L912

When calculating `totalRedeliveryMessages`, it must check `pendingAcks` contains this message. and remove from `pendingAcks` after that.  (Dispatch messages will add messages to pendingAcks)
So the above test may exist that when `negativeAcknowledge` first and then `acknowledgeAsync`.
`acknowledgeAsync` mapped to `Consumer#individualAckNormal` and decrease unack-msg in :
https://github.com/apache/pulsar/blob/b22f70658927e07e3726d32290065f47313070b9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L543-L561
It doesn't check `pendingAcks`. this is the root cause. Should move line 556 to 545.

(cherry picked from commit 6b828b4)
codelipenghui pushed a commit that referenced this pull request Feb 16, 2022
(cherry picked from commit 78bfaa2)
codelipenghui pushed a commit that referenced this pull request Feb 16, 2022
### Motivation

As #13383 fixed the batch ack issue. we find that the unack-msg count could be negative(#14246). At first, we think it was the normal case caused by msg redelivery.  But after diving into the logic, we find it's a bug.

The test is copy from #14246 :

```
for (int i = 0; i < 50; i++) {
      Message<String> msg = consumer.receive();
      if (i % 2 == 0) {
           consumer.acknowledgeAsync(msg);
       } else {
            consumer.negativeAcknowledge(msg);
       }
}
```
When msg is `negativeAcknowledge`,  Consumer#redeliverUnacknowledgedMessages will invoke:
https://github.com/apache/pulsar/blob/b22f70658927e07e3726d32290065f47313070b9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L900-L912

When calculating `totalRedeliveryMessages`, it must check `pendingAcks` contains this message. and remove from `pendingAcks` after that.  (Dispatch messages will add messages to pendingAcks)
So the above test may exist that when `negativeAcknowledge` first and then `acknowledgeAsync`.
`acknowledgeAsync` mapped to `Consumer#individualAckNormal` and decrease unack-msg in :
https://github.com/apache/pulsar/blob/b22f70658927e07e3726d32290065f47313070b9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L543-L561
It doesn't check `pendingAcks`. this is the root cause. Should move line 556 to 545.

(cherry picked from commit 6b828b4)
@codelipenghui codelipenghui added the cherry-picked/branch-2.9 Archived: 2.9 is end of life label Feb 16, 2022
@Technoboy-
Copy link
Contributor Author

@Technoboy- following up here, now that we merged #14288, is this still required? Given that the log line helped find a bug, I am concerned that removing the log line might hide other bugs.

Yes, good idea. Thanks @michaeljmarshall . I have pushed a new patch #14501

Nicklee007 pushed a commit to Nicklee007/pulsar that referenced this pull request Apr 20, 2022
Nicklee007 pushed a commit to Nicklee007/pulsar that referenced this pull request Apr 20, 2022
### Motivation

As apache#13383 fixed the batch ack issue. we find that the unack-msg count could be negative(apache#14246). At first, we think it was the normal case caused by msg redelivery.  But after diving into the logic, we find it's a bug.

The test is copy from apache#14246 :

```
for (int i = 0; i < 50; i++) {
      Message<String> msg = consumer.receive();
      if (i % 2 == 0) {
           consumer.acknowledgeAsync(msg);
       } else {
            consumer.negativeAcknowledge(msg);
       }
}       
```
When msg is `negativeAcknowledge`,  Consumer#redeliverUnacknowledgedMessages will invoke:
https://github.com/apache/pulsar/blob/b22f70658927e07e3726d32290065f47313070b9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L900-L912

When calculating `totalRedeliveryMessages`, it must check `pendingAcks` contains this message. and remove from `pendingAcks` after that.  (Dispatch messages will add messages to pendingAcks)
So the above test may exist that when `negativeAcknowledge` first and then `acknowledgeAsync`.  
`acknowledgeAsync` mapped to `Consumer#individualAckNormal` and decrease unack-msg in :
https://github.com/apache/pulsar/blob/b22f70658927e07e3726d32290065f47313070b9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L543-L561
It doesn't check `pendingAcks`. this is the root cause. Should move line 556 to 545.
@Technoboy- Technoboy- deleted the remove-unacked-msg-log branch August 10, 2022 05:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker cherry-picked/branch-2.8 Archived: 2.8 is end of life cherry-picked/branch-2.9 Archived: 2.9 is end of life doc-not-needed Your PR changes do not impact docs release/blocker Indicate the PR or issue that should block the release until it gets resolved release/2.8.3 release/2.9.2
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants