From 68f28df9029a94754324642f582543d09d7617bb Mon Sep 17 00:00:00 2001 From: Jiabei Zhao <41840745+Sunny-Island@users.noreply.github.com> Date: Tue, 3 Aug 2021 22:55:15 +0800 Subject: [PATCH] [Issue 11493] Simple implementation of getting number of references from C++ client (#11535) Fixes #11493 Master Issue: #11493 ### Motivation In Pulsar, we use a single client to create multiple producers/consumers/readers. Is there any method/attribute that can give information on number of producers/readers/consumers connected to the given pulsar client instance at the given point of time? Say there is a single pulsar client instance. Multiple consumers and readers are created from the given client instance. This client needs to be cleaned up when all the references are closed. In this case, it would be of help, to get information on the number of the consumers/readers/ getPartitionsForTopic calls are active on the given client. Ie, having the number of references for the given client can provide information on whether it is fine to clean up the client instance at the given point of time. *Explain here the context, and why you're making that change. What is the problem you're trying to solve.* ### Modifications Add these method to `Client.h`, `Client.cc`, `ClientImpl.h`, `CliemtImpl.cc` : ``` uint64_t getNumberOfProducer() uint64_t getNumberOfConsumer() ``` To count alive producers, I get each producer by weak point and check if it is connected. ### Verifying this change This change added tests and can be verified as follows: Add unit test to check if these functions can return correct references number. Test file is `ClientTest.cc`. Test function is `TEST(ClientTest, testGetNumberOfReferences)`. --- pulsar-client-cpp/include/pulsar/Client.h | 14 +++++ pulsar-client-cpp/lib/Client.cc | 3 + pulsar-client-cpp/lib/ClientImpl.cc | 24 ++++++++ pulsar-client-cpp/lib/ClientImpl.h | 3 + pulsar-client-cpp/lib/ConsumerImpl.cc | 2 + pulsar-client-cpp/lib/ConsumerImpl.h | 1 + pulsar-client-cpp/lib/ConsumerImplBase.h | 1 + .../lib/MultiTopicsConsumerImpl.cc | 13 ++++ .../lib/MultiTopicsConsumerImpl.h | 2 + .../lib/PartitionedConsumerImpl.cc | 13 ++++ .../lib/PartitionedConsumerImpl.h | 1 + .../lib/PartitionedProducerImpl.cc | 13 ++++ .../lib/PartitionedProducerImpl.h | 2 +- pulsar-client-cpp/lib/ProducerImpl.cc | 2 + pulsar-client-cpp/lib/ProducerImpl.h | 1 + pulsar-client-cpp/lib/ProducerImplBase.h | 1 + pulsar-client-cpp/tests/ClientTest.cc | 61 +++++++++++++++++++ 17 files changed, 156 insertions(+), 1 deletion(-) diff --git a/pulsar-client-cpp/include/pulsar/Client.h b/pulsar-client-cpp/include/pulsar/Client.h index 1bc26145380549..49dd011963b748 100644 --- a/pulsar-client-cpp/include/pulsar/Client.h +++ b/pulsar-client-cpp/include/pulsar/Client.h @@ -355,6 +355,20 @@ class PULSAR_PUBLIC Client { */ void shutdown(); + /** + * @brief Get the number of alive producers on the current client. + * + * @return The number of alive producers on the current client. + */ + uint64_t getNumberOfProducers(); + + /** + * @brief Get the number of alive consumers on the current client. + * + * @return The number of alive consumers on the current client. + */ + uint64_t getNumberOfConsumers(); + private: Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration, bool poolConnections); diff --git a/pulsar-client-cpp/lib/Client.cc b/pulsar-client-cpp/lib/Client.cc index 59bf8f4c98879d..c72232a38c6236 100644 --- a/pulsar-client-cpp/lib/Client.cc +++ b/pulsar-client-cpp/lib/Client.cc @@ -175,4 +175,7 @@ Result Client::close() { void Client::closeAsync(CloseCallback callback) { impl_->closeAsync(callback); } void Client::shutdown() { impl_->shutdown(); } + +uint64_t Client::getNumberOfProducers() { return impl_->getNumberOfProducers(); } +uint64_t Client::getNumberOfConsumers() { return impl_->getNumberOfConsumers(); } } // namespace pulsar diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc index 126b05a86e85df..d6663e834bb503 100644 --- a/pulsar-client-cpp/lib/ClientImpl.cc +++ b/pulsar-client-cpp/lib/ClientImpl.cc @@ -588,6 +588,30 @@ uint64_t ClientImpl::newRequestId() { return requestIdGenerator_++; } +uint64_t ClientImpl::getNumberOfProducers() { + Lock lock(mutex_); + uint64_t numberOfAliveProducers = 0; + for (const auto& producer : producers_) { + const auto& producerImpl = producer.lock(); + if (producerImpl) { + numberOfAliveProducers += producerImpl->getNumberOfConnectedProducer(); + } + } + return numberOfAliveProducers; +} + +uint64_t ClientImpl::getNumberOfConsumers() { + Lock lock(mutex_); + uint64_t numberOfAliveConsumers = 0; + for (const auto& consumer : consumers_) { + const auto consumerImpl = consumer.lock(); + if (consumerImpl) { + numberOfAliveConsumers += consumerImpl->getNumberOfConnectedConsumer(); + } + } + return numberOfAliveConsumers; +} + const ClientConfiguration& ClientImpl::getClientConfig() const { return clientConfiguration_; } } /* namespace pulsar */ diff --git a/pulsar-client-cpp/lib/ClientImpl.h b/pulsar-client-cpp/lib/ClientImpl.h index 81eb59690329c7..847872abfd29b1 100644 --- a/pulsar-client-cpp/lib/ClientImpl.h +++ b/pulsar-client-cpp/lib/ClientImpl.h @@ -83,6 +83,9 @@ class ClientImpl : public std::enable_shared_from_this { uint64_t newConsumerId(); uint64_t newRequestId(); + uint64_t getNumberOfProducers(); + uint64_t getNumberOfConsumers(); + const ClientConfiguration& getClientConfig() const; const ClientConfiguration& conf() const; diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc index c26841754e60cd..fea7d4d6522d7d 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.cc +++ b/pulsar-client-cpp/lib/ConsumerImpl.cc @@ -1233,4 +1233,6 @@ bool ConsumerImpl::isConnected() const { return !getCnx().expired() && state_ == Ready; } +uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 1 : 0; } + } /* namespace pulsar */ diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h index 28f96dd7e38453..cde8293ff69bab 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.h +++ b/pulsar-client-cpp/lib/ConsumerImpl.h @@ -125,6 +125,7 @@ class ConsumerImpl : public ConsumerImplBase, void seekAsync(uint64_t timestamp, ResultCallback callback) override; void negativeAcknowledge(const MessageId& msgId) override; bool isConnected() const override; + uint64_t getNumberOfConnectedConsumer() override; virtual void disconnectConsumer(); Result fetchSingleMessageFromBroker(Message& msg); diff --git a/pulsar-client-cpp/lib/ConsumerImplBase.h b/pulsar-client-cpp/lib/ConsumerImplBase.h index c8f36d0415adb0..693d4da9a37795 100644 --- a/pulsar-client-cpp/lib/ConsumerImplBase.h +++ b/pulsar-client-cpp/lib/ConsumerImplBase.h @@ -56,6 +56,7 @@ class ConsumerImplBase { virtual void seekAsync(uint64_t timestamp, ResultCallback callback) = 0; virtual void negativeAcknowledge(const MessageId& msgId) = 0; virtual bool isConnected() const = 0; + virtual uint64_t getNumberOfConnectedConsumer() = 0; private: virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled) = 0; diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc index 4b0d741667528f..64aaada2503c2d 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc @@ -749,3 +749,16 @@ bool MultiTopicsConsumerImpl::isConnected() const { } return true; } + +uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() { + Lock lock(mutex_); + uint64_t numberOfConnectedConsumer = 0; + const auto consumers = consumers_; + lock.unlock(); + for (const auto& topicAndConsumer : consumers) { + if (topicAndConsumer.second->isConnected()) { + numberOfConnectedConsumer++; + } + } + return numberOfConnectedConsumer; +} diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h index df3039b57025cb..3a1249bc5084f1 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h @@ -78,6 +78,8 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase, void seekAsync(uint64_t timestamp, ResultCallback callback) override; void negativeAcknowledge(const MessageId& msgId) override; bool isConnected() const override; + uint64_t getNumberOfConnectedConsumer() override; + void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, MultiTopicsBrokerConsumerStatsPtr, size_t, BrokerConsumerStatsCallback); // return first topic name when all topics name valid, or return null pointer diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc index 9a9c8826337a02..7aa506e9f109c4 100644 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc @@ -635,4 +635,17 @@ bool PartitionedConsumerImpl::isConnected() const { return true; } +uint64_t PartitionedConsumerImpl::getNumberOfConnectedConsumer() { + uint64_t numberOfConnectedConsumer = 0; + Lock consumersLock(consumersMutex_); + const auto consumers = consumers_; + consumersLock.unlock(); + for (const auto& consumer : consumers) { + if (consumer->isConnected()) { + numberOfConnectedConsumer++; + } + } + return numberOfConnectedConsumer; +} + } // namespace pulsar diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h index 2696288e539e2c..83ada952abad2e 100644 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h +++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h @@ -74,6 +74,7 @@ class PartitionedConsumerImpl : public ConsumerImplBase, void seekAsync(uint64_t timestamp, ResultCallback callback) override; void negativeAcknowledge(const MessageId& msgId) override; bool isConnected() const override; + uint64_t getNumberOfConnectedConsumer() override; void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, PartitionedBrokerConsumerStatsPtr, size_t, BrokerConsumerStatsCallback); diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc index 0df729d1f505a3..4e01263d6aac15 100644 --- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc @@ -387,4 +387,17 @@ bool PartitionedProducerImpl::isConnected() const { return true; } +uint64_t PartitionedProducerImpl::getNumberOfConnectedProducer() { + uint64_t numberOfConnectedProducer = 0; + Lock producersLock(producersMutex_); + const auto producers = producers_; + producersLock.unlock(); + for (const auto& producer : producers) { + if (producer->isConnected()) { + numberOfConnectedProducer++; + } + } + return numberOfConnectedProducer; +} + } // namespace pulsar diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.h b/pulsar-client-cpp/lib/PartitionedProducerImpl.h index cd1ee8c05f12b2..c097190fca3336 100644 --- a/pulsar-client-cpp/lib/PartitionedProducerImpl.h +++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.h @@ -64,7 +64,7 @@ class PartitionedProducerImpl : public ProducerImplBase, void triggerFlush() override; void flushAsync(FlushCallback callback) override; bool isConnected() const override; - + uint64_t getNumberOfConnectedProducer() override; void handleSinglePartitionProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr, const unsigned int partitionIndex); diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index d81e9582fa97b0..c7a45513ab2f7a 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -826,5 +826,7 @@ bool ProducerImpl::isConnected() const { return !getCnx().expired() && state_ == Ready; } +uint64_t ProducerImpl::getNumberOfConnectedProducer() { return isConnected() ? 1 : 0; } + } // namespace pulsar /* namespace pulsar */ diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h index caec85cb2d15b8..2c51d41f12788a 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.h +++ b/pulsar-client-cpp/lib/ProducerImpl.h @@ -70,6 +70,7 @@ class ProducerImpl : public HandlerBase, void triggerFlush() override; void flushAsync(FlushCallback callback) override; bool isConnected() const override; + uint64_t getNumberOfConnectedProducer() override; bool removeCorruptMessage(uint64_t sequenceId); diff --git a/pulsar-client-cpp/lib/ProducerImplBase.h b/pulsar-client-cpp/lib/ProducerImplBase.h index a947624f3ff5b2..15a6e1d5a3f80c 100644 --- a/pulsar-client-cpp/lib/ProducerImplBase.h +++ b/pulsar-client-cpp/lib/ProducerImplBase.h @@ -45,6 +45,7 @@ class ProducerImplBase { virtual void triggerFlush() = 0; virtual void flushAsync(FlushCallback callback) = 0; virtual bool isConnected() const = 0; + virtual uint64_t getNumberOfConnectedProducer() = 0; }; } // namespace pulsar #endif // PULSAR_PRODUCER_IMPL_BASE_HEADER diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc index 129a322b262e5c..10b5b32ad6ceed 100644 --- a/pulsar-client-cpp/tests/ClientTest.cc +++ b/pulsar-client-cpp/tests/ClientTest.cc @@ -18,6 +18,8 @@ */ #include +#include "HttpHelper.h" + #include #include #include "../lib/checksum/ChecksumProvider.h" @@ -114,3 +116,62 @@ TEST(ClientTest, testConnectTimeout) { clientLow.close(); clientDefault.close(); } + +TEST(ClientTest, testGetNumberOfReferences) { + Client client("pulsar://localhost:6650"); + + // Producer test + uint64_t numberOfProducers = 0; + const std::string nonPartitionedTopic = + "testGetNumberOfReferencesNonPartitionedTopic" + std::to_string(time(nullptr)); + + const std::string partitionedTopic = + "testGetNumberOfReferencesPartitionedTopic" + std::to_string(time(nullptr)); + Producer producer; + client.createProducer(nonPartitionedTopic, producer); + numberOfProducers = 1; + ASSERT_EQ(numberOfProducers, client.getNumberOfProducers()); + + producer.close(); + numberOfProducers = 0; + ASSERT_EQ(numberOfProducers, client.getNumberOfProducers()); + + // PartitionedProducer + int res = makePutRequest( + "http://localhost:8080/admin/v2/persistent/public/default/" + partitionedTopic + "/partitions", "2"); + ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; + + client.createProducer(partitionedTopic, producer); + numberOfProducers = 2; + ASSERT_EQ(numberOfProducers, client.getNumberOfProducers()); + producer.close(); + numberOfProducers = 0; + ASSERT_EQ(numberOfProducers, client.getNumberOfProducers()); + + // Consumer test + uint64_t numberOfConsumers = 0; + + Consumer consumer1; + client.subscribe(nonPartitionedTopic, "consumer-1", consumer1); + numberOfConsumers = 1; + ASSERT_EQ(numberOfConsumers, client.getNumberOfConsumers()); + + consumer1.close(); + numberOfConsumers = 0; + ASSERT_EQ(numberOfConsumers, client.getNumberOfConsumers()); + + Consumer consumer2; + Consumer consumer3; + client.subscribe(partitionedTopic, "consumer-2", consumer2); + numberOfConsumers = 2; + ASSERT_EQ(numberOfConsumers, client.getNumberOfConsumers()); + client.subscribe(nonPartitionedTopic, "consumer-3", consumer3); + numberOfConsumers = 3; + ASSERT_EQ(numberOfConsumers, client.getNumberOfConsumers()); + consumer2.close(); + consumer3.close(); + numberOfConsumers = 0; + ASSERT_EQ(numberOfConsumers, client.getNumberOfConsumers()); + + client.close(); +} \ No newline at end of file