Skip to content

Commit

Permalink
Replace AtomicInteger with MutableInt
Browse files Browse the repository at this point in the history
  • Loading branch information
massakam committed Apr 22, 2024
1 parent 06d34ea commit 442bb30
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 12 deletions.
Expand Up @@ -36,10 +36,10 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.namespace.NamespaceService;
Expand Down Expand Up @@ -746,7 +746,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats

replicators.forEach((region, replicator) -> replicator.updateRates());

final AtomicInteger producerCount = new AtomicInteger();
final MutableInt producerCount = new MutableInt();
topicStatsStream.startObject(topic);

topicStatsStream.startList("publishers");
Expand All @@ -761,16 +761,16 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
topicStats.remotePublishersStats.put(producer.getRemoteCluster(), publisherStats);
} else {
// Exclude producers for replication from "publishers" and "producerCount"
producerCount.incrementAndGet();
producerCount.increment();
if (hydratePublishers) {
StreamingStats.writePublisherStats(topicStatsStream, publisherStats);
}
}
});
topicStatsStream.endList();

nsStats.producerCount += producerCount.get();
bundleStats.producerCount += producerCount.get();
nsStats.producerCount += producerCount.intValue();
bundleStats.producerCount += producerCount.intValue();

// Start replicator stats
topicStatsStream.startObject("replication");
Expand Down Expand Up @@ -860,7 +860,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
// Remaining dest stats.
topicStats.averageMsgSize = topicStats.aggMsgRateIn == 0.0 ? 0.0
: (topicStats.aggMsgThroughputIn / topicStats.aggMsgRateIn);
topicStatsStream.writePair("producerCount", producerCount.get());
topicStatsStream.writePair("producerCount", producerCount.intValue());
topicStatsStream.writePair("averageMsgSize", topicStats.averageMsgSize);
topicStatsStream.writePair("msgRateIn", topicStats.aggMsgRateIn);
topicStatsStream.writePair("msgRateOut", topicStats.aggMsgRateOut);
Expand Down
Expand Up @@ -48,7 +48,6 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -86,6 +85,7 @@
import org.apache.bookkeeper.net.BookieId;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
Expand Down Expand Up @@ -2108,7 +2108,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats

replicators.forEach((region, replicator) -> replicator.updateRates());

final AtomicInteger producerCount = new AtomicInteger();
final MutableInt producerCount = new MutableInt();
topicStatsStream.startObject(topic);

// start publisher stats
Expand All @@ -2124,16 +2124,16 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
topicStatsHelper.remotePublishersStats.put(producer.getRemoteCluster(), publisherStats);
} else {
// Exclude producers for replication from "publishers" and "producerCount"
producerCount.incrementAndGet();
producerCount.increment();
if (hydratePublishers) {
StreamingStats.writePublisherStats(topicStatsStream, publisherStats);
}
}
});
topicStatsStream.endList();

nsStats.producerCount += producerCount.get();
bundleStats.producerCount += producerCount.get();
nsStats.producerCount += producerCount.intValue();
bundleStats.producerCount += producerCount.intValue();

// if publish-rate increases (eg: 0 to 1K) then pick max publish-rate and if publish-rate decreases then keep
// average rate.
Expand Down Expand Up @@ -2302,7 +2302,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
// Remaining dest stats.
topicStatsHelper.averageMsgSize = topicStatsHelper.aggMsgRateIn == 0.0 ? 0.0
: (topicStatsHelper.aggMsgThroughputIn / topicStatsHelper.aggMsgRateIn);
topicStatsStream.writePair("producerCount", producerCount.get());
topicStatsStream.writePair("producerCount", producerCount.intValue());
topicStatsStream.writePair("averageMsgSize", topicStatsHelper.averageMsgSize);
topicStatsStream.writePair("msgRateIn", topicStatsHelper.aggMsgRateIn);
topicStatsStream.writePair("msgRateOut", topicStatsHelper.aggMsgRateOut);
Expand Down

0 comments on commit 442bb30

Please sign in to comment.