Skip to content

Commit

Permalink
[Issue 11493] Simple implementation of getting number of references f…
Browse files Browse the repository at this point in the history
…rom C++ client (apache#11535)

Fixes apache#11493 

Master Issue: apache#11493 

### Motivation
In Pulsar, we use a single client to create multiple producers/consumers/readers. Is there any method/attribute that can give information on number of producers/readers/consumers connected to the given pulsar client instance at the given point of time?

Say there is a single pulsar client instance. Multiple consumers and readers are created from the given client instance. This client needs to be cleaned up when all the references are closed. In this case, it would be of help, to get information on the number of the consumers/readers/ getPartitionsForTopic calls are active on the given client. Ie, having the number of references for the given client can provide information on whether it is fine to clean up the client instance at the given point of time.

*Explain here the context, and why you're making that change. What is the problem you're trying to solve.*

### Modifications

Add these method to `Client.h`, `Client.cc`, `ClientImpl.h`, `CliemtImpl.cc` :
```
uint64_t getNumberOfProducer()
uint64_t getNumberOfConsumer()
```
To count alive producers, I get each producer by weak point and check if it is connected.

### Verifying this change

This change added tests and can be verified as follows:

Add unit test to check if these functions can return correct references number. Test file is `ClientTest.cc`. Test function is `TEST(ClientTest, testGetNumberOfReferences)`.
  • Loading branch information
Sunny-Island authored and LeBW committed Aug 9, 2021
1 parent c36c7c4 commit 68f28df
Show file tree
Hide file tree
Showing 17 changed files with 156 additions and 1 deletion.
14 changes: 14 additions & 0 deletions pulsar-client-cpp/include/pulsar/Client.h
Expand Up @@ -355,6 +355,20 @@ class PULSAR_PUBLIC Client {
*/
void shutdown();

/**
* @brief Get the number of alive producers on the current client.
*
* @return The number of alive producers on the current client.
*/
uint64_t getNumberOfProducers();

/**
* @brief Get the number of alive consumers on the current client.
*
* @return The number of alive consumers on the current client.
*/
uint64_t getNumberOfConsumers();

private:
Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration,
bool poolConnections);
Expand Down
3 changes: 3 additions & 0 deletions pulsar-client-cpp/lib/Client.cc
Expand Up @@ -175,4 +175,7 @@ Result Client::close() {
void Client::closeAsync(CloseCallback callback) { impl_->closeAsync(callback); }

void Client::shutdown() { impl_->shutdown(); }

uint64_t Client::getNumberOfProducers() { return impl_->getNumberOfProducers(); }
uint64_t Client::getNumberOfConsumers() { return impl_->getNumberOfConsumers(); }
} // namespace pulsar
24 changes: 24 additions & 0 deletions pulsar-client-cpp/lib/ClientImpl.cc
Expand Up @@ -588,6 +588,30 @@ uint64_t ClientImpl::newRequestId() {
return requestIdGenerator_++;
}

uint64_t ClientImpl::getNumberOfProducers() {
Lock lock(mutex_);
uint64_t numberOfAliveProducers = 0;
for (const auto& producer : producers_) {
const auto& producerImpl = producer.lock();
if (producerImpl) {
numberOfAliveProducers += producerImpl->getNumberOfConnectedProducer();
}
}
return numberOfAliveProducers;
}

uint64_t ClientImpl::getNumberOfConsumers() {
Lock lock(mutex_);
uint64_t numberOfAliveConsumers = 0;
for (const auto& consumer : consumers_) {
const auto consumerImpl = consumer.lock();
if (consumerImpl) {
numberOfAliveConsumers += consumerImpl->getNumberOfConnectedConsumer();
}
}
return numberOfAliveConsumers;
}

const ClientConfiguration& ClientImpl::getClientConfig() const { return clientConfiguration_; }

} /* namespace pulsar */
3 changes: 3 additions & 0 deletions pulsar-client-cpp/lib/ClientImpl.h
Expand Up @@ -83,6 +83,9 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
uint64_t newConsumerId();
uint64_t newRequestId();

uint64_t getNumberOfProducers();
uint64_t getNumberOfConsumers();

const ClientConfiguration& getClientConfig() const;

const ClientConfiguration& conf() const;
Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/lib/ConsumerImpl.cc
Expand Up @@ -1233,4 +1233,6 @@ bool ConsumerImpl::isConnected() const {
return !getCnx().expired() && state_ == Ready;
}

uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 1 : 0; }

} /* namespace pulsar */
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/ConsumerImpl.h
Expand Up @@ -125,6 +125,7 @@ class ConsumerImpl : public ConsumerImplBase,
void seekAsync(uint64_t timestamp, ResultCallback callback) override;
void negativeAcknowledge(const MessageId& msgId) override;
bool isConnected() const override;
uint64_t getNumberOfConnectedConsumer() override;

virtual void disconnectConsumer();
Result fetchSingleMessageFromBroker(Message& msg);
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/ConsumerImplBase.h
Expand Up @@ -56,6 +56,7 @@ class ConsumerImplBase {
virtual void seekAsync(uint64_t timestamp, ResultCallback callback) = 0;
virtual void negativeAcknowledge(const MessageId& msgId) = 0;
virtual bool isConnected() const = 0;
virtual uint64_t getNumberOfConnectedConsumer() = 0;

private:
virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled) = 0;
Expand Down
13 changes: 13 additions & 0 deletions pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
Expand Up @@ -749,3 +749,16 @@ bool MultiTopicsConsumerImpl::isConnected() const {
}
return true;
}

uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() {
Lock lock(mutex_);
uint64_t numberOfConnectedConsumer = 0;
const auto consumers = consumers_;
lock.unlock();
for (const auto& topicAndConsumer : consumers) {
if (topicAndConsumer.second->isConnected()) {
numberOfConnectedConsumer++;
}
}
return numberOfConnectedConsumer;
}
2 changes: 2 additions & 0 deletions pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
Expand Up @@ -78,6 +78,8 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
void seekAsync(uint64_t timestamp, ResultCallback callback) override;
void negativeAcknowledge(const MessageId& msgId) override;
bool isConnected() const override;
uint64_t getNumberOfConnectedConsumer() override;

void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, MultiTopicsBrokerConsumerStatsPtr,
size_t, BrokerConsumerStatsCallback);
// return first topic name when all topics name valid, or return null pointer
Expand Down
13 changes: 13 additions & 0 deletions pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
Expand Up @@ -635,4 +635,17 @@ bool PartitionedConsumerImpl::isConnected() const {
return true;
}

uint64_t PartitionedConsumerImpl::getNumberOfConnectedConsumer() {
uint64_t numberOfConnectedConsumer = 0;
Lock consumersLock(consumersMutex_);
const auto consumers = consumers_;
consumersLock.unlock();
for (const auto& consumer : consumers) {
if (consumer->isConnected()) {
numberOfConnectedConsumer++;
}
}
return numberOfConnectedConsumer;
}

} // namespace pulsar
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/PartitionedConsumerImpl.h
Expand Up @@ -74,6 +74,7 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
void seekAsync(uint64_t timestamp, ResultCallback callback) override;
void negativeAcknowledge(const MessageId& msgId) override;
bool isConnected() const override;
uint64_t getNumberOfConnectedConsumer() override;

void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, PartitionedBrokerConsumerStatsPtr,
size_t, BrokerConsumerStatsCallback);
Expand Down
13 changes: 13 additions & 0 deletions pulsar-client-cpp/lib/PartitionedProducerImpl.cc
Expand Up @@ -387,4 +387,17 @@ bool PartitionedProducerImpl::isConnected() const {
return true;
}

uint64_t PartitionedProducerImpl::getNumberOfConnectedProducer() {
uint64_t numberOfConnectedProducer = 0;
Lock producersLock(producersMutex_);
const auto producers = producers_;
producersLock.unlock();
for (const auto& producer : producers) {
if (producer->isConnected()) {
numberOfConnectedProducer++;
}
}
return numberOfConnectedProducer;
}

} // namespace pulsar
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/PartitionedProducerImpl.h
Expand Up @@ -64,7 +64,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
void triggerFlush() override;
void flushAsync(FlushCallback callback) override;
bool isConnected() const override;

uint64_t getNumberOfConnectedProducer() override;
void handleSinglePartitionProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr,
const unsigned int partitionIndex);

Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/lib/ProducerImpl.cc
Expand Up @@ -826,5 +826,7 @@ bool ProducerImpl::isConnected() const {
return !getCnx().expired() && state_ == Ready;
}

uint64_t ProducerImpl::getNumberOfConnectedProducer() { return isConnected() ? 1 : 0; }

} // namespace pulsar
/* namespace pulsar */
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/ProducerImpl.h
Expand Up @@ -70,6 +70,7 @@ class ProducerImpl : public HandlerBase,
void triggerFlush() override;
void flushAsync(FlushCallback callback) override;
bool isConnected() const override;
uint64_t getNumberOfConnectedProducer() override;

bool removeCorruptMessage(uint64_t sequenceId);

Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/ProducerImplBase.h
Expand Up @@ -45,6 +45,7 @@ class ProducerImplBase {
virtual void triggerFlush() = 0;
virtual void flushAsync(FlushCallback callback) = 0;
virtual bool isConnected() const = 0;
virtual uint64_t getNumberOfConnectedProducer() = 0;
};
} // namespace pulsar
#endif // PULSAR_PRODUCER_IMPL_BASE_HEADER
61 changes: 61 additions & 0 deletions pulsar-client-cpp/tests/ClientTest.cc
Expand Up @@ -18,6 +18,8 @@
*/
#include <gtest/gtest.h>

#include "HttpHelper.h"

#include <future>
#include <pulsar/Client.h>
#include "../lib/checksum/ChecksumProvider.h"
Expand Down Expand Up @@ -114,3 +116,62 @@ TEST(ClientTest, testConnectTimeout) {
clientLow.close();
clientDefault.close();
}

TEST(ClientTest, testGetNumberOfReferences) {
Client client("pulsar://localhost:6650");

// Producer test
uint64_t numberOfProducers = 0;
const std::string nonPartitionedTopic =
"testGetNumberOfReferencesNonPartitionedTopic" + std::to_string(time(nullptr));

const std::string partitionedTopic =
"testGetNumberOfReferencesPartitionedTopic" + std::to_string(time(nullptr));
Producer producer;
client.createProducer(nonPartitionedTopic, producer);
numberOfProducers = 1;
ASSERT_EQ(numberOfProducers, client.getNumberOfProducers());

producer.close();
numberOfProducers = 0;
ASSERT_EQ(numberOfProducers, client.getNumberOfProducers());

// PartitionedProducer
int res = makePutRequest(
"http://localhost:8080/admin/v2/persistent/public/default/" + partitionedTopic + "/partitions", "2");
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;

client.createProducer(partitionedTopic, producer);
numberOfProducers = 2;
ASSERT_EQ(numberOfProducers, client.getNumberOfProducers());
producer.close();
numberOfProducers = 0;
ASSERT_EQ(numberOfProducers, client.getNumberOfProducers());

// Consumer test
uint64_t numberOfConsumers = 0;

Consumer consumer1;
client.subscribe(nonPartitionedTopic, "consumer-1", consumer1);
numberOfConsumers = 1;
ASSERT_EQ(numberOfConsumers, client.getNumberOfConsumers());

consumer1.close();
numberOfConsumers = 0;
ASSERT_EQ(numberOfConsumers, client.getNumberOfConsumers());

Consumer consumer2;
Consumer consumer3;
client.subscribe(partitionedTopic, "consumer-2", consumer2);
numberOfConsumers = 2;
ASSERT_EQ(numberOfConsumers, client.getNumberOfConsumers());
client.subscribe(nonPartitionedTopic, "consumer-3", consumer3);
numberOfConsumers = 3;
ASSERT_EQ(numberOfConsumers, client.getNumberOfConsumers());
consumer2.close();
consumer3.close();
numberOfConsumers = 0;
ASSERT_EQ(numberOfConsumers, client.getNumberOfConsumers());

client.close();
}

0 comments on commit 68f28df

Please sign in to comment.