Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] Fix message id error if messages were sent to a partitioned topic #6938

Merged
merged 1 commit into from
May 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/PartitionedProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ unsigned int PartitionedProducerImpl::getNumPartitionsWithLock() const {
ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition) const {
using namespace std::placeholders;
std::string topicPartitionName = topicName_->getTopicPartitionName(partition);
auto producer = std::make_shared<ProducerImpl>(client_, topicPartitionName, conf_);
auto producer = std::make_shared<ProducerImpl>(client_, topicPartitionName, conf_, partition);
producer->getProducerCreatedFuture().addListener(
std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerCreated,
const_cast<PartitionedProducerImpl*>(this)->shared_from_this(), _1, _2, partition));
Expand Down
8 changes: 6 additions & 2 deletions pulsar-client-cpp/lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@ OpSendMsg::OpSendMsg(uint64_t producerId, uint64_t sequenceId, const Message& ms
sequenceId_(sequenceId),
timeout_(TimeUtils::now() + milliseconds(conf.getSendTimeout())) {}

ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic, const ProducerConfiguration& conf)
ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic, const ProducerConfiguration& conf,
int32_t partition)
: HandlerBase(
client, topic,
Backoff(milliseconds(100), seconds(60), milliseconds(std::max(100, conf.getSendTimeout() - 100)))),
conf_(conf),
executor_(client->getIOExecutorProvider()->get()),
pendingMessagesQueue_(conf_.getMaxPendingMessages()),
partition_(partition),
producerName_(conf_.getProducerName()),
producerStr_("[" + topic_ + ", " + producerName_ + "] "),
producerId_(client->newProducerId()),
Expand Down Expand Up @@ -627,7 +629,9 @@ bool ProducerImpl::removeCorruptMessage(uint64_t sequenceId) {
}
}

bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& messageId) {
bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
MessageId messageId(partition_, rawMessageId.ledgerId(), rawMessageId.entryId(),
rawMessageId.batchIndex());
OpSendMsg op;
Lock lock(mutex_);
bool havePendingAck = pendingMessagesQueue_.peek(op);
Expand Down
3 changes: 2 additions & 1 deletion pulsar-client-cpp/lib/ProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class ProducerImpl : public HandlerBase,
public ProducerImplBase {
public:
ProducerImpl(ClientImplPtr client, const std::string& topic,
const ProducerConfiguration& producerConfiguration);
const ProducerConfiguration& producerConfiguration, int32_t partition = -1);
~ProducerImpl();

int keepMaxMessageSize_;
Expand Down Expand Up @@ -150,6 +150,7 @@ class ProducerImpl : public HandlerBase,

MessageQueue pendingMessagesQueue_;

int32_t partition_; // -1 if topic is non-partitioned
std::string producerName_;
std::string producerStr_;
uint64_t producerId_;
Expand Down
54 changes: 54 additions & 0 deletions pulsar-client-cpp/tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3214,11 +3214,65 @@ TEST(BasicEndToEndTest, testSendCallback) {
Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg));
receivedIdSet.emplace(msg.getMessageId());
consumer.acknowledge(msg);
}

latch.wait();
ASSERT_EQ(sentIdSet, receivedIdSet);

consumer.close();
producer.close();

const std::string partitionedTopicName = topicName + "-" + std::to_string(time(nullptr));
const std::string url = adminUrl + "admin/v2/persistent/" +
partitionedTopicName.substr(partitionedTopicName.find("://") + 3) + "/partitions";
const int numPartitions = 3;

int res = makePutRequest(url, std::to_string(numPartitions));
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;

std::this_thread::sleep_for(std::chrono::seconds(2));

ProducerConfiguration producerConfig;
producerConfig.setBatchingEnabled(false);
producerConfig.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
ASSERT_EQ(ResultOk, client.createProducer(partitionedTopicName, producerConfig, producer));
ASSERT_EQ(ResultOk, client.subscribe(partitionedTopicName, "SubscriptionName", consumer));

sentIdSet.clear();
receivedIdSet.clear();

const int numMessages = numPartitions * 2;
latch = Latch(numMessages);
for (int i = 0; i < numMessages; i++) {
const auto msg = MessageBuilder().setContent("a").build();
producer.sendAsync(msg, [&sentIdSet, i, &latch](Result result, const MessageId &id) {
ASSERT_EQ(ResultOk, result);
sentIdSet.emplace(id);
latch.countdown();
});
}

for (int i = 0; i < numMessages; i++) {
Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg));
receivedIdSet.emplace(msg.getMessageId());
consumer.acknowledge(msg);
}

latch.wait();
ASSERT_EQ(sentIdSet, receivedIdSet);

std::set<int> partitionIndexSet;
for (const auto &id : sentIdSet) {
partitionIndexSet.emplace(id.partition());
}
std::set<int> expectedPartitionIndexSet;
for (int i = 0; i < numPartitions; i++) {
expectedPartitionIndexSet.emplace(i);
}
ASSERT_EQ(sentIdSet, receivedIdSet);

consumer.close();
producer.close();
client.close();
Expand Down