Skip to content

Commit

Permalink
Fix message id error if messages were sent to a partitioned topic (ap…
Browse files Browse the repository at this point in the history
…ache#6938)

### Motivation

If messages were sent to a partitioned topic, the message id's `partition` field was always -1 because SendReceipt command only contains ledger id and entry id.

### Modifications

- Add a `partition` field to `ProducerImpl` and set the `MessageId`'s `partition` field with it in `ackReceived` method later.
- Add a test to check message id in send callback if messages were sent to a partitioned topic.

(cherry picked from commit 15cb920)
  • Loading branch information
BewareMyPower authored and Addison Higham committed Jun 11, 2020
1 parent e9288c0 commit 7a1610c
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 4 deletions.
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/PartitionedProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ unsigned int PartitionedProducerImpl::getNumPartitionsWithLock() const {
ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition) const {
using namespace std::placeholders;
std::string topicPartitionName = topicName_->getTopicPartitionName(partition);
auto producer = std::make_shared<ProducerImpl>(client_, topicPartitionName, conf_);
auto producer = std::make_shared<ProducerImpl>(client_, topicPartitionName, conf_, partition);
producer->getProducerCreatedFuture().addListener(
std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerCreated,
const_cast<PartitionedProducerImpl*>(this)->shared_from_this(), _1, _2, partition));
Expand Down
8 changes: 6 additions & 2 deletions pulsar-client-cpp/lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@ OpSendMsg::OpSendMsg(uint64_t producerId, uint64_t sequenceId, const Message& ms
sequenceId_(sequenceId),
timeout_(TimeUtils::now() + milliseconds(conf.getSendTimeout())) {}

ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic, const ProducerConfiguration& conf)
ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic, const ProducerConfiguration& conf,
int32_t partition)
: HandlerBase(
client, topic,
Backoff(milliseconds(100), seconds(60), milliseconds(std::max(100, conf.getSendTimeout() - 100)))),
conf_(conf),
executor_(client->getIOExecutorProvider()->get()),
pendingMessagesQueue_(conf_.getMaxPendingMessages()),
partition_(partition),
producerName_(conf_.getProducerName()),
producerStr_("[" + topic_ + ", " + producerName_ + "] "),
producerId_(client->newProducerId()),
Expand Down Expand Up @@ -627,7 +629,9 @@ bool ProducerImpl::removeCorruptMessage(uint64_t sequenceId) {
}
}

bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& messageId) {
bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
MessageId messageId(partition_, rawMessageId.ledgerId(), rawMessageId.entryId(),
rawMessageId.batchIndex());
OpSendMsg op;
Lock lock(mutex_);
bool havePendingAck = pendingMessagesQueue_.peek(op);
Expand Down
3 changes: 2 additions & 1 deletion pulsar-client-cpp/lib/ProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class ProducerImpl : public HandlerBase,
public ProducerImplBase {
public:
ProducerImpl(ClientImplPtr client, const std::string& topic,
const ProducerConfiguration& producerConfiguration);
const ProducerConfiguration& producerConfiguration, int32_t partition = -1);
~ProducerImpl();

int keepMaxMessageSize_;
Expand Down Expand Up @@ -150,6 +150,7 @@ class ProducerImpl : public HandlerBase,

MessageQueue pendingMessagesQueue_;

int32_t partition_; // -1 if topic is non-partitioned
std::string producerName_;
std::string producerStr_;
uint64_t producerId_;
Expand Down
54 changes: 54 additions & 0 deletions pulsar-client-cpp/tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3207,11 +3207,65 @@ TEST(BasicEndToEndTest, testSendCallback) {
Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg));
receivedIdSet.emplace(msg.getMessageId());
consumer.acknowledge(msg);
}

latch.wait();
ASSERT_EQ(sentIdSet, receivedIdSet);

consumer.close();
producer.close();

const std::string partitionedTopicName = topicName + "-" + std::to_string(time(nullptr));
const std::string url = adminUrl + "admin/v2/persistent/" +
partitionedTopicName.substr(partitionedTopicName.find("://") + 3) + "/partitions";
const int numPartitions = 3;

int res = makePutRequest(url, std::to_string(numPartitions));
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;

std::this_thread::sleep_for(std::chrono::seconds(2));

ProducerConfiguration producerConfig;
producerConfig.setBatchingEnabled(false);
producerConfig.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
ASSERT_EQ(ResultOk, client.createProducer(partitionedTopicName, producerConfig, producer));
ASSERT_EQ(ResultOk, client.subscribe(partitionedTopicName, "SubscriptionName", consumer));

sentIdSet.clear();
receivedIdSet.clear();

const int numMessages = numPartitions * 2;
latch = Latch(numMessages);
for (int i = 0; i < numMessages; i++) {
const auto msg = MessageBuilder().setContent("a").build();
producer.sendAsync(msg, [&sentIdSet, i, &latch](Result result, const MessageId &id) {
ASSERT_EQ(ResultOk, result);
sentIdSet.emplace(id);
latch.countdown();
});
}

for (int i = 0; i < numMessages; i++) {
Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg));
receivedIdSet.emplace(msg.getMessageId());
consumer.acknowledge(msg);
}

latch.wait();
ASSERT_EQ(sentIdSet, receivedIdSet);

std::set<int> partitionIndexSet;
for (const auto &id : sentIdSet) {
partitionIndexSet.emplace(id.partition());
}
std::set<int> expectedPartitionIndexSet;
for (int i = 0; i < numPartitions; i++) {
expectedPartitionIndexSet.emplace(i);
}
ASSERT_EQ(sentIdSet, receivedIdSet);

consumer.close();
producer.close();
client.close();
Expand Down

0 comments on commit 7a1610c

Please sign in to comment.