Skip to content

Commit

Permalink
Revert "[fix][client-c++] Close messages_ when PartitionedConsumer …
Browse files Browse the repository at this point in the history
…is closed (apache#16887)"

This reverts commit 2e761b8.
  • Loading branch information
mattisonchao committed Aug 5, 2022
1 parent 29b23ac commit c2b685b
Show file tree
Hide file tree
Showing 4 changed files with 0 additions and 58 deletions.
4 changes: 0 additions & 4 deletions pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -721,10 +721,6 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) {
messageProcessed(msg);
return ResultOk;
} else {
Lock lock(mutex_);
if (state_ != Ready) {
return ResultAlreadyClosed;
}
return ResultTimeout;
}
}
Expand Down
4 changes: 0 additions & 4 deletions pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -493,10 +493,6 @@ Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) {
unAckedMessageTrackerPtr_->add(msg.getMessageId());
return ResultOk;
} else {
lock.lock();
if (state_ != Ready) {
return ResultAlreadyClosed;
}
return ResultTimeout;
}
}
Expand Down
7 changes: 0 additions & 7 deletions pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,6 @@ Result PartitionedConsumerImpl::receive(Message& msg, int timeout) {
unAckedMessageTrackerPtr_->add(msg.getMessageId());
return ResultOk;
} else {
lock.lock();
if (state_ != Ready) {
return ResultAlreadyClosed;
}
return ResultTimeout;
}
}
Expand Down Expand Up @@ -432,9 +428,6 @@ void PartitionedConsumerImpl::messageReceived(Consumer consumer, const Message&

void PartitionedConsumerImpl::failPendingReceiveCallback() {
Message msg;

messages_.close();

Lock lock(pendingReceiveMutex_);
while (!pendingReceives_.empty()) {
ReceiveCallback callback = pendingReceives_.front();
Expand Down
43 changes: 0 additions & 43 deletions pulsar-client-cpp/tests/ConsumerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -720,47 +720,4 @@ TEST(ConsumerTest, testIsConnected) {
ASSERT_FALSE(consumer.isConnected());
}

TEST(ConsumerTest, testPartitionsWithCloseUnblock) {
Client client(lookupUrl);
const std::string partitionedTopic = "testPartitionsWithCloseUnblock" + std::to_string(time(nullptr));
constexpr int numPartitions = 2;

int res =
makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions",
std::to_string(numPartitions));
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;

Consumer consumer;
ConsumerConfiguration consumerConfig;
ASSERT_EQ(ResultOk, client.subscribe(partitionedTopic, "SubscriptionName", consumerConfig, consumer));

// send messages
ProducerConfiguration producerConfig;
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(partitionedTopic, producerConfig, producer));
Message msg = MessageBuilder().setContent("message").build();
ASSERT_EQ(ResultOk, producer.send(msg));

producer.close();

// receive message on another thread
pulsar::Latch latch(1);
auto thread = std::thread([&]() {
Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg, 10 * 1000));
consumer.acknowledge(msg.getMessageId());
ASSERT_EQ(ResultAlreadyClosed, consumer.receive(msg, 10 * 1000));
latch.countdown();
});

std::this_thread::sleep_for(std::chrono::seconds(1));

consumer.close();

bool wasUnblocked = latch.wait(std::chrono::milliseconds(100));

ASSERT_TRUE(wasUnblocked);
thread.join();
}

} // namespace pulsar

0 comments on commit c2b685b

Please sign in to comment.