Skip to content

Commit

Permalink
[broker][monitoring][fix] fix pulsar_subscription_msg_ack_rate (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
tjiuming committed Aug 4, 2022
1 parent 085678a commit 9339fd4
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
subsStats.unackedMessages += cStats.unackedMessages;
subsStats.msgRateRedeliver += cStats.msgRateRedeliver;
subsStats.msgRateOut += cStats.msgRateOut;
subsStats.messageAckRate += cStats.messageAckRate;
subsStats.msgThroughputOut += cStats.msgThroughputOut;
subsStats.bytesOutCounter += cStats.bytesOutCounter;
subsStats.msgOutCounter += cStats.msgOutCounter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ private void testMessageAckRateMetric(String topicName, boolean exposeTopicLevel
Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricStr);
Collection<PrometheusMetricsTest.Metric> ackRateMetric = metricsMap.get("pulsar_consumer_msg_ack_rate");

Collection<PrometheusMetricsTest.Metric> subAckRateMetrics = metricsMap.get("pulsar_subscription_msg_ack_rate");

String rateOutMetricName = exposeTopicLevelMetrics ? "pulsar_consumer_msg_rate_out" : "pulsar_rate_out";
Collection<PrometheusMetricsTest.Metric> rateOutMetric = metricsMap.get(rateOutMetricName);
Assert.assertTrue(ackRateMetric.size() > 0);
Expand All @@ -316,9 +318,16 @@ private void testMessageAckRateMetric(String topicName, boolean exposeTopicLevel
.filter(metric -> metric.tags.get("consumer_name").equals(consumer1Name)
|| metric.tags.get("consumer_name").equals(consumer2Name))
.mapToDouble(metric -> metric.value).sum();
double subAckRate = subAckRateMetrics
.stream()
.filter(m -> m.tags.get("subscription").equals(subName))
.mapToDouble(m -> m.value)
.sum();

Assert.assertEquals(subAckRateMetrics.size(), 1);
Assert.assertTrue(totalAckRate > 0D);
Assert.assertTrue(totalRateOut > 0D);
Assert.assertEquals(totalAckRate, subAckRate, 0.1D * totalAckRate);
Assert.assertEquals(totalAckRate, totalRateOut, totalRateOut * 0.1D);
} else {
double totalAckRate = ackRateMetric.stream()
Expand Down

0 comments on commit 9339fd4

Please sign in to comment.