From 442bb305119569e0fd689f23a74652831cc4883a Mon Sep 17 00:00:00 2001 From: Masahiro Sakamoto Date: Tue, 23 Apr 2024 02:01:02 +0900 Subject: [PATCH] Replace AtomicInteger with MutableInt --- .../service/nonpersistent/NonPersistentTopic.java | 12 ++++++------ .../broker/service/persistent/PersistentTopic.java | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 06c1ced96e7a98..6c2e0d6f73a3f0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -36,10 +36,10 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; +import org.apache.commons.lang3.mutable.MutableInt; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.broker.namespace.NamespaceService; @@ -746,7 +746,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats replicators.forEach((region, replicator) -> replicator.updateRates()); - final AtomicInteger producerCount = new AtomicInteger(); + final MutableInt producerCount = new MutableInt(); topicStatsStream.startObject(topic); topicStatsStream.startList("publishers"); @@ -761,7 +761,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats topicStats.remotePublishersStats.put(producer.getRemoteCluster(), publisherStats); } else { // Exclude producers for replication from "publishers" and "producerCount" - producerCount.incrementAndGet(); + producerCount.increment(); if (hydratePublishers) { StreamingStats.writePublisherStats(topicStatsStream, publisherStats); } @@ -769,8 +769,8 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats }); topicStatsStream.endList(); - nsStats.producerCount += producerCount.get(); - bundleStats.producerCount += producerCount.get(); + nsStats.producerCount += producerCount.intValue(); + bundleStats.producerCount += producerCount.intValue(); // Start replicator stats topicStatsStream.startObject("replication"); @@ -860,7 +860,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats // Remaining dest stats. topicStats.averageMsgSize = topicStats.aggMsgRateIn == 0.0 ? 0.0 : (topicStats.aggMsgThroughputIn / topicStats.aggMsgRateIn); - topicStatsStream.writePair("producerCount", producerCount.get()); + topicStatsStream.writePair("producerCount", producerCount.intValue()); topicStatsStream.writePair("averageMsgSize", topicStats.averageMsgSize); topicStatsStream.writePair("msgRateIn", topicStats.aggMsgRateIn); topicStatsStream.writePair("msgRateOut", topicStats.aggMsgRateOut); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 7c233ffb461339..adcca90786a9d1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -48,7 +48,6 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.BiFunction; @@ -86,6 +85,7 @@ import org.apache.bookkeeper.net.BookieId; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.mutable.MutableInt; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory; import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory; @@ -2108,7 +2108,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats replicators.forEach((region, replicator) -> replicator.updateRates()); - final AtomicInteger producerCount = new AtomicInteger(); + final MutableInt producerCount = new MutableInt(); topicStatsStream.startObject(topic); // start publisher stats @@ -2124,7 +2124,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats topicStatsHelper.remotePublishersStats.put(producer.getRemoteCluster(), publisherStats); } else { // Exclude producers for replication from "publishers" and "producerCount" - producerCount.incrementAndGet(); + producerCount.increment(); if (hydratePublishers) { StreamingStats.writePublisherStats(topicStatsStream, publisherStats); } @@ -2132,8 +2132,8 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats }); topicStatsStream.endList(); - nsStats.producerCount += producerCount.get(); - bundleStats.producerCount += producerCount.get(); + nsStats.producerCount += producerCount.intValue(); + bundleStats.producerCount += producerCount.intValue(); // if publish-rate increases (eg: 0 to 1K) then pick max publish-rate and if publish-rate decreases then keep // average rate. @@ -2302,7 +2302,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats // Remaining dest stats. topicStatsHelper.averageMsgSize = topicStatsHelper.aggMsgRateIn == 0.0 ? 0.0 : (topicStatsHelper.aggMsgThroughputIn / topicStatsHelper.aggMsgRateIn); - topicStatsStream.writePair("producerCount", producerCount.get()); + topicStatsStream.writePair("producerCount", producerCount.intValue()); topicStatsStream.writePair("averageMsgSize", topicStatsHelper.averageMsgSize); topicStatsStream.writePair("msgRateIn", topicStatsHelper.aggMsgRateIn); topicStatsStream.writePair("msgRateOut", topicStatsHelper.aggMsgRateOut);