diff --git a/pulsar-client-cpp/include/pulsar/Client.h b/pulsar-client-cpp/include/pulsar/Client.h index 1bc26145380549..49dd011963b748 100644 --- a/pulsar-client-cpp/include/pulsar/Client.h +++ b/pulsar-client-cpp/include/pulsar/Client.h @@ -355,6 +355,20 @@ class PULSAR_PUBLIC Client { */ void shutdown(); + /** + * @brief Get the number of alive producers on the current client. + * + * @return The number of alive producers on the current client. + */ + uint64_t getNumberOfProducers(); + + /** + * @brief Get the number of alive consumers on the current client. + * + * @return The number of alive consumers on the current client. + */ + uint64_t getNumberOfConsumers(); + private: Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration, bool poolConnections); diff --git a/pulsar-client-cpp/lib/Client.cc b/pulsar-client-cpp/lib/Client.cc index 59bf8f4c98879d..c72232a38c6236 100644 --- a/pulsar-client-cpp/lib/Client.cc +++ b/pulsar-client-cpp/lib/Client.cc @@ -175,4 +175,7 @@ Result Client::close() { void Client::closeAsync(CloseCallback callback) { impl_->closeAsync(callback); } void Client::shutdown() { impl_->shutdown(); } + +uint64_t Client::getNumberOfProducers() { return impl_->getNumberOfProducers(); } +uint64_t Client::getNumberOfConsumers() { return impl_->getNumberOfConsumers(); } } // namespace pulsar diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc index 126b05a86e85df..d6663e834bb503 100644 --- a/pulsar-client-cpp/lib/ClientImpl.cc +++ b/pulsar-client-cpp/lib/ClientImpl.cc @@ -588,6 +588,30 @@ uint64_t ClientImpl::newRequestId() { return requestIdGenerator_++; } +uint64_t ClientImpl::getNumberOfProducers() { + Lock lock(mutex_); + uint64_t numberOfAliveProducers = 0; + for (const auto& producer : producers_) { + const auto& producerImpl = producer.lock(); + if (producerImpl) { + numberOfAliveProducers += producerImpl->getNumberOfConnectedProducer(); + } + } + return numberOfAliveProducers; +} + +uint64_t ClientImpl::getNumberOfConsumers() { + Lock lock(mutex_); + uint64_t numberOfAliveConsumers = 0; + for (const auto& consumer : consumers_) { + const auto consumerImpl = consumer.lock(); + if (consumerImpl) { + numberOfAliveConsumers += consumerImpl->getNumberOfConnectedConsumer(); + } + } + return numberOfAliveConsumers; +} + const ClientConfiguration& ClientImpl::getClientConfig() const { return clientConfiguration_; } } /* namespace pulsar */ diff --git a/pulsar-client-cpp/lib/ClientImpl.h b/pulsar-client-cpp/lib/ClientImpl.h index 81eb59690329c7..847872abfd29b1 100644 --- a/pulsar-client-cpp/lib/ClientImpl.h +++ b/pulsar-client-cpp/lib/ClientImpl.h @@ -83,6 +83,9 @@ class ClientImpl : public std::enable_shared_from_this { uint64_t newConsumerId(); uint64_t newRequestId(); + uint64_t getNumberOfProducers(); + uint64_t getNumberOfConsumers(); + const ClientConfiguration& getClientConfig() const; const ClientConfiguration& conf() const; diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc index c26841754e60cd..fea7d4d6522d7d 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.cc +++ b/pulsar-client-cpp/lib/ConsumerImpl.cc @@ -1233,4 +1233,6 @@ bool ConsumerImpl::isConnected() const { return !getCnx().expired() && state_ == Ready; } +uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 1 : 0; } + } /* namespace pulsar */ diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h index 28f96dd7e38453..cde8293ff69bab 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.h +++ b/pulsar-client-cpp/lib/ConsumerImpl.h @@ -125,6 +125,7 @@ class ConsumerImpl : public ConsumerImplBase, void seekAsync(uint64_t timestamp, ResultCallback callback) override; void negativeAcknowledge(const MessageId& msgId) override; bool isConnected() const override; + uint64_t getNumberOfConnectedConsumer() override; virtual void disconnectConsumer(); Result fetchSingleMessageFromBroker(Message& msg); diff --git a/pulsar-client-cpp/lib/ConsumerImplBase.h b/pulsar-client-cpp/lib/ConsumerImplBase.h index c8f36d0415adb0..693d4da9a37795 100644 --- a/pulsar-client-cpp/lib/ConsumerImplBase.h +++ b/pulsar-client-cpp/lib/ConsumerImplBase.h @@ -56,6 +56,7 @@ class ConsumerImplBase { virtual void seekAsync(uint64_t timestamp, ResultCallback callback) = 0; virtual void negativeAcknowledge(const MessageId& msgId) = 0; virtual bool isConnected() const = 0; + virtual uint64_t getNumberOfConnectedConsumer() = 0; private: virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled) = 0; diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc index 4b0d741667528f..64aaada2503c2d 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc @@ -749,3 +749,16 @@ bool MultiTopicsConsumerImpl::isConnected() const { } return true; } + +uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() { + Lock lock(mutex_); + uint64_t numberOfConnectedConsumer = 0; + const auto consumers = consumers_; + lock.unlock(); + for (const auto& topicAndConsumer : consumers) { + if (topicAndConsumer.second->isConnected()) { + numberOfConnectedConsumer++; + } + } + return numberOfConnectedConsumer; +} diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h index df3039b57025cb..3a1249bc5084f1 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h @@ -78,6 +78,8 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase, void seekAsync(uint64_t timestamp, ResultCallback callback) override; void negativeAcknowledge(const MessageId& msgId) override; bool isConnected() const override; + uint64_t getNumberOfConnectedConsumer() override; + void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, MultiTopicsBrokerConsumerStatsPtr, size_t, BrokerConsumerStatsCallback); // return first topic name when all topics name valid, or return null pointer diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc index 9a9c8826337a02..7aa506e9f109c4 100644 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc @@ -635,4 +635,17 @@ bool PartitionedConsumerImpl::isConnected() const { return true; } +uint64_t PartitionedConsumerImpl::getNumberOfConnectedConsumer() { + uint64_t numberOfConnectedConsumer = 0; + Lock consumersLock(consumersMutex_); + const auto consumers = consumers_; + consumersLock.unlock(); + for (const auto& consumer : consumers) { + if (consumer->isConnected()) { + numberOfConnectedConsumer++; + } + } + return numberOfConnectedConsumer; +} + } // namespace pulsar diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h index 2696288e539e2c..83ada952abad2e 100644 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h +++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h @@ -74,6 +74,7 @@ class PartitionedConsumerImpl : public ConsumerImplBase, void seekAsync(uint64_t timestamp, ResultCallback callback) override; void negativeAcknowledge(const MessageId& msgId) override; bool isConnected() const override; + uint64_t getNumberOfConnectedConsumer() override; void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, PartitionedBrokerConsumerStatsPtr, size_t, BrokerConsumerStatsCallback); diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc index 0df729d1f505a3..4e01263d6aac15 100644 --- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc @@ -387,4 +387,17 @@ bool PartitionedProducerImpl::isConnected() const { return true; } +uint64_t PartitionedProducerImpl::getNumberOfConnectedProducer() { + uint64_t numberOfConnectedProducer = 0; + Lock producersLock(producersMutex_); + const auto producers = producers_; + producersLock.unlock(); + for (const auto& producer : producers) { + if (producer->isConnected()) { + numberOfConnectedProducer++; + } + } + return numberOfConnectedProducer; +} + } // namespace pulsar diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.h b/pulsar-client-cpp/lib/PartitionedProducerImpl.h index cd1ee8c05f12b2..c097190fca3336 100644 --- a/pulsar-client-cpp/lib/PartitionedProducerImpl.h +++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.h @@ -64,7 +64,7 @@ class PartitionedProducerImpl : public ProducerImplBase, void triggerFlush() override; void flushAsync(FlushCallback callback) override; bool isConnected() const override; - + uint64_t getNumberOfConnectedProducer() override; void handleSinglePartitionProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr, const unsigned int partitionIndex); diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index d81e9582fa97b0..c7a45513ab2f7a 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -826,5 +826,7 @@ bool ProducerImpl::isConnected() const { return !getCnx().expired() && state_ == Ready; } +uint64_t ProducerImpl::getNumberOfConnectedProducer() { return isConnected() ? 1 : 0; } + } // namespace pulsar /* namespace pulsar */ diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h index caec85cb2d15b8..2c51d41f12788a 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.h +++ b/pulsar-client-cpp/lib/ProducerImpl.h @@ -70,6 +70,7 @@ class ProducerImpl : public HandlerBase, void triggerFlush() override; void flushAsync(FlushCallback callback) override; bool isConnected() const override; + uint64_t getNumberOfConnectedProducer() override; bool removeCorruptMessage(uint64_t sequenceId); diff --git a/pulsar-client-cpp/lib/ProducerImplBase.h b/pulsar-client-cpp/lib/ProducerImplBase.h index a947624f3ff5b2..15a6e1d5a3f80c 100644 --- a/pulsar-client-cpp/lib/ProducerImplBase.h +++ b/pulsar-client-cpp/lib/ProducerImplBase.h @@ -45,6 +45,7 @@ class ProducerImplBase { virtual void triggerFlush() = 0; virtual void flushAsync(FlushCallback callback) = 0; virtual bool isConnected() const = 0; + virtual uint64_t getNumberOfConnectedProducer() = 0; }; } // namespace pulsar #endif // PULSAR_PRODUCER_IMPL_BASE_HEADER diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc index 129a322b262e5c..10b5b32ad6ceed 100644 --- a/pulsar-client-cpp/tests/ClientTest.cc +++ b/pulsar-client-cpp/tests/ClientTest.cc @@ -18,6 +18,8 @@ */ #include +#include "HttpHelper.h" + #include #include #include "../lib/checksum/ChecksumProvider.h" @@ -114,3 +116,62 @@ TEST(ClientTest, testConnectTimeout) { clientLow.close(); clientDefault.close(); } + +TEST(ClientTest, testGetNumberOfReferences) { + Client client("pulsar://localhost:6650"); + + // Producer test + uint64_t numberOfProducers = 0; + const std::string nonPartitionedTopic = + "testGetNumberOfReferencesNonPartitionedTopic" + std::to_string(time(nullptr)); + + const std::string partitionedTopic = + "testGetNumberOfReferencesPartitionedTopic" + std::to_string(time(nullptr)); + Producer producer; + client.createProducer(nonPartitionedTopic, producer); + numberOfProducers = 1; + ASSERT_EQ(numberOfProducers, client.getNumberOfProducers()); + + producer.close(); + numberOfProducers = 0; + ASSERT_EQ(numberOfProducers, client.getNumberOfProducers()); + + // PartitionedProducer + int res = makePutRequest( + "http://localhost:8080/admin/v2/persistent/public/default/" + partitionedTopic + "/partitions", "2"); + ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; + + client.createProducer(partitionedTopic, producer); + numberOfProducers = 2; + ASSERT_EQ(numberOfProducers, client.getNumberOfProducers()); + producer.close(); + numberOfProducers = 0; + ASSERT_EQ(numberOfProducers, client.getNumberOfProducers()); + + // Consumer test + uint64_t numberOfConsumers = 0; + + Consumer consumer1; + client.subscribe(nonPartitionedTopic, "consumer-1", consumer1); + numberOfConsumers = 1; + ASSERT_EQ(numberOfConsumers, client.getNumberOfConsumers()); + + consumer1.close(); + numberOfConsumers = 0; + ASSERT_EQ(numberOfConsumers, client.getNumberOfConsumers()); + + Consumer consumer2; + Consumer consumer3; + client.subscribe(partitionedTopic, "consumer-2", consumer2); + numberOfConsumers = 2; + ASSERT_EQ(numberOfConsumers, client.getNumberOfConsumers()); + client.subscribe(nonPartitionedTopic, "consumer-3", consumer3); + numberOfConsumers = 3; + ASSERT_EQ(numberOfConsumers, client.getNumberOfConsumers()); + consumer2.close(); + consumer3.close(); + numberOfConsumers = 0; + ASSERT_EQ(numberOfConsumers, client.getNumberOfConsumers()); + + client.close(); +} \ No newline at end of file