Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Static descriptions for Kafka consumer metrics #4952

Closed
pirgeo opened this issue Apr 9, 2024 · 11 comments
Closed

Static descriptions for Kafka consumer metrics #4952

pirgeo opened this issue Apr 9, 2024 · 11 comments
Labels
closed-as-inactive feedback-reminder waiting for feedback We need additional information before we can continue

Comments

@pirgeo
Copy link
Contributor

pirgeo commented Apr 9, 2024

Please describe the feature request.
The Kafka instrumentation produces dynamic descriptions, meaning that descriptions are not always the same for one metric key. Here is a list of some descriptions that I have come across in the past:

metric name description
kafka.consumer.fetch.manager.bytes.consumed.total The total number of bytes consumed
kafka.consumer.fetch.manager.bytes.consumed.total The total number of bytes consumed for a topic
kafka.consumer.fetch.manager.records.consumed.rate The average number of records consumed per second
kafka.consumer.fetch.manager.records.consumed.rate The average number of records consumed per second for a topic
kafka.consumer.fetch.manager.records.lag.max The max lag of the partition
kafka.consumer.fetch.manager.records.lag.max The maximum lag in terms of number of records for any partition in this window. NOTE: This is based on current offset and not committed offset
kafka.consumer.fetch.manager.fetch.size.max The maximum number of bytes fetched per request
kafka.consumer.fetch.manager.fetch.size.max The maximum number of bytes fetched per request for a topic
kafka.consumer.fetch.manager.records.lead.min The min lead of the partition
kafka.consumer.fetch.manager.records.lead.min The minimum lead in terms of number of records for any partition in this window

Additional context
There are prior examples using a static description per metric key, see #3201 and more recently #4864. I am not entirely sure how these metrics are created, but I think they are originating from this class: https://github.com/micrometer-metrics/micrometer/blob/main/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaConsumerMetrics.java

CC @izeye, since you worked on #4864, do you think this is a reasonable change?

@izeye
Copy link
Contributor

izeye commented Apr 9, 2024

The KafkaConsumerMetrics has been deprecated and doesn't seem to be possible to produce "The total number of bytes consumed for a topic" as description, for example.

They seem to have been produced from the KafkaClientMetrics and have come from org.apache.kafka.common.MetricName.description().

Looking into the KafkaClientMetrics, the pairs won't exist at the same time as when a more specific meter is registered, a less specific one will be unregistered. So with this arrangement, it seems that Prometheus would be okay, but Dynatrace could be affected.

See https://kafka.apache.org/documentation/#consumer_fetch_monitoring

@pirgeo
Copy link
Contributor Author

pirgeo commented Apr 10, 2024

So you are saying that there should not be a situation where two meters with different descriptions are registered at the same time? I assume you are referring to checkAndBindMetrics. I've dug through that method, and I think I understand how it works:

Get metrics currently registered in Kafka, and remove the ones that are in Kafka but no longer in MM:

void checkAndBindMetrics(MeterRegistry registry) {
try {
Map<MetricName, ? extends Metric> currentMetrics = this.metricsSupplier.get();
this.metrics.set(currentMetrics);
if (!currentMeters.equals(currentMetrics.keySet())) {
Set<MetricName> metricsToRemove = currentMeters.stream()
.filter(metricName -> !currentMetrics.containsKey(metricName))
.collect(Collectors.toSet());
for (MetricName metricName : metricsToRemove) {
Meter.Id id = meterIdForComparison(metricName);
registry.remove(id);
registeredMeterIds.remove(id);
}

Split metrics stored in MM by metric name:

currentMeters = new HashSet<>(currentMetrics.keySet());
Map<String, List<Meter>> registryMetersByNames = registry.getMeters()
.stream()
.collect(Collectors.groupingBy(meter -> meter.getId().getName()));

Then, iterate the metrics coming from Kafka and compare the number of tags on the metric in Kafka, and the metric in Micrometer.

currentMetrics.forEach((name, metric) -> {
// Filter out non-numeric values
// Filter out metrics from groups that include metadata
if (!(metric.metricValue() instanceof Number) || METRIC_GROUP_APP_INFO.equals(name.group())
|| METRIC_GROUP_METRICS_COUNT.equals(name.group())) {
return;
}
String meterName = meterName(name);
// Kafka has metrics with lower number of tags (e.g. with/without
// topic or partition tag)
// Remove meters with lower number of tags
boolean hasLessTags = false;
for (Meter other : registryMetersByNames.getOrDefault(meterName, emptyList())) {
Meter.Id otherId = other.getId();
List<Tag> tags = otherId.getTags();
List<Tag> meterTagsWithCommonTags = meterTags(name, true);
if (tags.size() < meterTagsWithCommonTags.size()) {
registry.remove(otherId);
registeredMeterIds.remove(otherId);
}
// Check if already exists
else if (tags.size() == meterTagsWithCommonTags.size())
if (tags.containsAll(meterTagsWithCommonTags))
return;
else
break;
else
hasLessTags = true;
}
if (hasLessTags)
return;

  • If the Micrometer-registered metric has fewer tags, remove it and replace it with the (new, more specific) one from Kafka.
  • If there is a Micrometer-registered metric with the same name that has the same number of tags and if
    • they are the same tags, the meter is already registered and there is nothing to do.
    • they are not the same tags, register the new Kafka metric in addition to the already existing metric (and don't look at any other Micrometer metrics anymore - break).
  • If the Micrometer-registered metric has more tags, ignore the new Kafka metric.

If I understand correctly, that means that it is possible to register two metrics with the same name and same number of tags, but only if the tags are not the same to the ones already there. I might be reading it wrong though. Not sure if that could be the reason for two Meters with the same name but different descriptions.

As for the Dynatrace registry: as soon as the meter is removed from the registry, it will no longer be exported. Checking for conflicting descriptions happens per-export, so I am pretty sure that at the time of exporting, there are two meters with the same name but different descriptions. I'll try to reproduce it, but I don't know when I'll get around to it. Since I am not really familiar with Kafka, I am not even sure where to start.

@izeye
Copy link
Contributor

izeye commented Apr 10, 2024

As for the Dynatrace registry: as soon as the meter is removed from the registry, it will no longer be exported. Checking for conflicting descriptions happens per-export, so I am pretty sure that at the time of exporting, there are two meters with the same name but different descriptions.

@pirgeo Sorry, I thought Dynatrace required the same description for the same meter during an application's lifetime.

I thought two different descriptions came from a meter without/with "topic" dimension, for example, and the latter would survive in the end. However, based on your comment, I might miss something as the two descriptions seem to exist at the same time for some reason.

@pirgeo
Copy link
Contributor Author

pirgeo commented Apr 10, 2024

@izeye Thank you for your response!

I thought Dynatrace required the same description for the same meter during an application's lifetime.

The discrepancy is currently checked per-export and per-metric-name. This is all on the client side, though, and the description will be checked against the one already stored for the metric name upon ingestion.

Either way, the Kafka instrumentation produces multiple descriptions for the same metric key. I guess I understand the rationale in this case, although it almost seems to me as if kafka.consumer.fetch.manager.bytes.consumed.total and kafka.consumer.fetch.manager.bytes.consumed.total with a topic dimension should be two separate metrics. This is the first time I have ever come across a situation where the meaning of a metric changes during the runtime while the name stays the same. But I assume that ship has sailed, I am sure the folks on the Kafka side have their reasons for doing it that way.

As far as I can tell, there must be two separate Meters registered with the MeterRegistry at the same time, one with the topic dimension and one without. I am assuming that based on the fact that I get two different description texts in the same export ("The total number of bytes" vs. "The total number of bytes consumed for a topic"). In the Dynatrace exporter, we collect which meter names map to which descriptions (per export), and then warn if there is two conflicting descriptions.

@shakuzen
Copy link
Member

I think the intention of the logic to remove the meter with less tags is to avoid the situation you seem to be running into. There may be a bug or a concurrency issue. Kafka metrics are unique in the way they work and we rely on whatever the Kafka client is providing in terms of metrics. It'd be good to get the name and tags of the overlapping meters when this happens as a first step. Then maybe we can figure out how to reproduce it and what is happening.

@pirgeo
Copy link
Contributor Author

pirgeo commented Apr 17, 2024

I think the intention of the logic to remove the meter with less tags is to avoid the situation you seem to be running into.

Yeah, that's what I assume, too. Unfortunately, I don't have access to the tags, and I couldn't reproduce it, so I am left guessing, but I'll try to find out!

@shakuzen
Copy link
Member

Until we figure out which metrics are causing the issue, I'm not sure we'll be able to make a change we're sure fixes things. If we do get that info, please share here.

@shakuzen shakuzen added the waiting for feedback We need additional information before we can continue label Apr 22, 2024
Copy link

If you would like us to look at this issue, please provide the requested information. If the information is not provided within the next 7 days this issue will be closed.

Copy link

github-actions bot commented May 7, 2024

Closing due to lack of requested feedback. If you would like us to look at this issue, please provide the requested information and we will re-open.

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale May 7, 2024
@pirgeo
Copy link
Contributor Author

pirgeo commented May 7, 2024

Thanks for considering this. I currently simply don't have the time to look into this more thoroughly, but I guess that there might be multiple registered metrics (probably for multiple registered Kafka consumers in different states) that are producing metrics with different descriptions. I'll reopen this once I have more info.

@shakuzen
Copy link
Member

shakuzen commented May 8, 2024

I guess that there might be multiple registered metrics (probably for multiple registered Kafka consumers in different states) that are producing metrics with different descriptions.

That sounds likely to be the issue. I'm not sure what we can do about it, though, since we are taking the descriptions provided by the Kafka client. To solve it on the Micrometer side, it seems like we would need to maintain some kind of mapping of known metrics that need to use a different description. That sounds quite fragile and we don't own the descriptions - the Kafka client does. They may be different in different versions of the client and that would be a nightmare to manage on our side. I'm wondering if the Kafka client would be willing to change the descriptions to avoid this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
closed-as-inactive feedback-reminder waiting for feedback We need additional information before we can continue
Projects
None yet
Development

No branches or pull requests

3 participants