Skip to content

Commit

Permalink
fix: Incorrect acknowledgment behavior in the listener of the multi-t…
Browse files Browse the repository at this point in the history
…opic consumer. (#423)

### Motivation
apache/pulsar-client-node#371

### Modifications
- Add the message to the unacknowledged tracker before call the listener.

### Verifying this change
- Add `testMultiConsumerListenerAndAck` to cover it.
  • Loading branch information
shibd committed Apr 6, 2024
1 parent 27d8cc0 commit 3f0b33b
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 1 deletion.
2 changes: 1 addition & 1 deletion lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -568,8 +568,8 @@ void MultiTopicsConsumerImpl::internalListener(Consumer consumer) {
incomingMessages_.pop(m);
try {
Consumer self{get_shared_this_ptr()};
messageListener_(self, m);
messageProcessed(m);
messageListener_(self, m);
} catch (const std::exception& e) {
LOG_ERROR("Exception thrown from listener of Partitioned Consumer" << e.what());
}
Expand Down
1 change: 1 addition & 0 deletions lib/MultiTopicsConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition);
FRIEND_TEST(ConsumerTest, testPatternSubscribeTopic);
FRIEND_TEST(ConsumerTest, testMultiConsumerListenerAndAck);
};

typedef std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImplPtr;
Expand Down
1 change: 1 addition & 0 deletions lib/UnAckedMessageTrackerEnabled.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class UnAckedMessageTrackerEnabled : public std::enable_shared_from_this<UnAcked
FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testBatchUnAckedMessageTracker);
FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition);
FRIEND_TEST(ConsumerTest, testMultiConsumerListenerAndAck);
};
} // namespace pulsar

Expand Down
42 changes: 42 additions & 0 deletions tests/ConsumerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1490,4 +1490,46 @@ TEST(ConsumerTest, testSNIProxyConnect) {
ASSERT_EQ(ResultOk, client.subscribe(topic, "test-sub", consumer));
client.close();
}

TEST(ConsumerTest, testMultiConsumerListenerAndAck) {
Client client{lookupUrl};

const std::string topicName = "testConsumerEventWithPartition-topic-" + std::to_string(time(nullptr));
int res = makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions",
std::to_string(5));
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;

// Create a producer
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));

int num = 10;
// Use listener to consume
Latch latch{num};
Consumer consumer;
ConsumerConfiguration consumerConfiguration;
PulsarFriend::setConsumerUnAckMessagesTimeoutMs(consumerConfiguration, 2000);
consumerConfiguration.setMessageListener([&latch](Consumer& consumer, const Message& msg) {
LOG_INFO("Received message '" << msg.getDataAsString() << "' and ack it");
consumer.acknowledge(msg);
latch.countdown();
});
ASSERT_EQ(ResultOk, client.subscribe(topicName, "consumer-1", consumerConfiguration, consumer));

// Send synchronously
for (int i = 0; i < 10; ++i) {
Message msg = MessageBuilder().setContent("content" + std::to_string(i)).build();
Result result = producer.send(msg);
LOG_INFO("Message sent: " << result);
}

ASSERT_TRUE(latch.wait(std::chrono::seconds(5)));
auto multiConsumerImplPtr = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer);
auto tracker =
static_cast<UnAckedMessageTrackerEnabled*>(multiConsumerImplPtr->unAckedMessageTrackerPtr_.get());
ASSERT_EQ(0, tracker->size());

client.close();
}

} // namespace pulsar

0 comments on commit 3f0b33b

Please sign in to comment.