Skip to content

Commit

Permalink
Group OTLP data points under same metric (#4564)
Browse files Browse the repository at this point in the history
Group OTLP data points that share the same metric to increase efficiency in publishing. Before this change, the same metric would be duplicated for each unique Meter in Micrometer that had the same name, description, and base unit.

See gh-4053
  • Loading branch information
lenin-jaganathan committed Mar 4, 2024
1 parent 789610f commit 175a0c9
Show file tree
Hide file tree
Showing 4 changed files with 607 additions and 184 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.micrometer.core.instrument.step.StepMeterRegistry;
import io.micrometer.core.instrument.util.MeterPartition;
import io.micrometer.core.instrument.util.NamedThreadFactory;
import io.micrometer.core.instrument.util.TimeUtils;
import io.micrometer.core.ipc.http.HttpSender;
import io.micrometer.core.ipc.http.HttpUrlConnectionSender;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
Expand All @@ -50,13 +49,8 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.DoubleSupplier;
import java.util.function.ToDoubleFunction;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;

import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE;
import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA;

/**
* Publishes meters in OTLP (OpenTelemetry Protocol) format. HTTP with Protobuf encoding
Expand Down Expand Up @@ -90,12 +84,10 @@ public class OtlpMeterRegistry extends PushMeterRegistry {

private final Resource resource;

private final io.opentelemetry.proto.metrics.v1.AggregationTemporality otlpAggregationTemporality;
private final AggregationTemporality aggregationTemporality;

private final TimeUnit baseTimeUnit;

private long deltaAggregationTimeUnixNano = 0L;

// Time when the last scheduled rollOver has started. Applicable only for delta
// flavour.
private volatile long lastMeterRolloverStartTime = -1;
Expand All @@ -119,9 +111,7 @@ private OtlpMeterRegistry(OtlpConfig config, Clock clock, HttpSender httpSender)
this.baseTimeUnit = config.baseTimeUnit();
this.httpSender = httpSender;
this.resource = Resource.newBuilder().addAllAttributes(getResourceAttributes()).build();
this.otlpAggregationTemporality = AggregationTemporality
.toOtlpAggregationTemporality(config.aggregationTemporality());
setDeltaAggregationTimeUnixNano();
this.aggregationTemporality = config.aggregationTemporality();
config().namingConvention(NamingConvention.dot);
start(DEFAULT_THREAD_FACTORY);
}
Expand All @@ -147,15 +137,10 @@ public void stop() {

@Override
protected void publish() {
if (isDelta()) {
setDeltaAggregationTimeUnixNano();
}
for (List<Meter> batch : MeterPartition.partition(this, config.batchSize())) {
List<Metric> metrics = batch.stream()
.map(meter -> meter.match(this::writeGauge, this::writeCounter, this::writeHistogramSupport,
this::writeHistogramSupport, this::writeHistogramSupport, this::writeGauge,
this::writeFunctionCounter, this::writeFunctionTimer, this::writeMeter))
.collect(Collectors.toList());
OtlpMetricConverter otlpMetricConverter = new OtlpMetricConverter(clock, config.step(), getBaseTimeUnit(),
config.aggregationTemporality(), config().namingConvention());
otlpMetricConverter.addMeters(batch);

try {
ExportMetricsServiceRequest request = ExportMetricsServiceRequest.newBuilder()
Expand All @@ -165,7 +150,7 @@ protected void publish() {
// we don't have instrumentation library/version
// attached to meters; leave unknown for now
// .setScope(InstrumentationScope.newBuilder().setName("").setVersion("").build())
.addAllMetrics(metrics)
.addAllMetrics(otlpMetricConverter.getAllMetrics())
.build())
.build())
.build();
Expand Down Expand Up @@ -307,50 +292,6 @@ else if (meter instanceof OtlpStepDistributionSummary) {
}
}

// VisibleForTesting
Metric writeMeter(Meter meter) {
// TODO support writing custom meters
// one gauge per measurement
return getMetricBuilder(meter.getId()).build();
}

// VisibleForTesting
Metric writeGauge(Gauge gauge) {
return getMetricBuilder(gauge.getId())
.setGauge(io.opentelemetry.proto.metrics.v1.Gauge.newBuilder()
.addDataPoints(NumberDataPoint.newBuilder()
.setTimeUnixNano(getTimeUnixNano())
.setAsDouble(gauge.value())
.addAllAttributes(getTagsForId(gauge.getId()))
.build()))
.build();
}

// VisibleForTesting
Metric writeCounter(Counter counter) {
return writeSum(counter, counter::count);
}

// VisibleForTesting
Metric writeFunctionCounter(FunctionCounter functionCounter) {
return writeSum(functionCounter, functionCounter::count);
}

private Metric writeSum(Meter meter, DoubleSupplier count) {
return getMetricBuilder(meter.getId())
.setSum(Sum.newBuilder()
.addDataPoints(NumberDataPoint.newBuilder()
.setStartTimeUnixNano(getStartTimeNanos(meter))
.setTimeUnixNano(getTimeUnixNano())
.setAsDouble(count.getAsDouble())
.addAllAttributes(getTagsForId(meter.getId()))
.build())
.setIsMonotonic(true)
.setAggregationTemporality(otlpAggregationTemporality)
.build())
.build();
}

/**
* This will poll the values from meters, which will cause a roll over for Step-meters
* if past the step boundary. This gives some control over when roll over happens
Expand All @@ -373,122 +314,12 @@ private long getInitialDelay() {
return stepMillis - (clock.wallTime() % stepMillis) + 1;
}

// VisibleForTesting
Metric writeHistogramSupport(HistogramSupport histogramSupport) {
Metric.Builder metricBuilder = getMetricBuilder(histogramSupport.getId());
boolean isTimeBased = histogramSupport instanceof Timer || histogramSupport instanceof LongTaskTimer;
HistogramSnapshot histogramSnapshot = histogramSupport.takeSnapshot();

Iterable<? extends KeyValue> tags = getTagsForId(histogramSupport.getId());
long startTimeNanos = getStartTimeNanos(histogramSupport);
double total = isTimeBased ? histogramSnapshot.total(getBaseTimeUnit()) : histogramSnapshot.total();
long count = histogramSnapshot.count();

// if percentiles configured, use summary
if (histogramSnapshot.percentileValues().length != 0) {
SummaryDataPoint.Builder summaryData = SummaryDataPoint.newBuilder()
.addAllAttributes(tags)
.setStartTimeUnixNano(startTimeNanos)
.setTimeUnixNano(getTimeUnixNano())
.setSum(total)
.setCount(count);
for (ValueAtPercentile percentile : histogramSnapshot.percentileValues()) {
summaryData.addQuantileValues(SummaryDataPoint.ValueAtQuantile.newBuilder()
.setQuantile(percentile.percentile())
.setValue(TimeUtils.convert(percentile.value(), TimeUnit.NANOSECONDS, getBaseTimeUnit())));
}
metricBuilder.setSummary(Summary.newBuilder().addDataPoints(summaryData));
return metricBuilder.build();
}

HistogramDataPoint.Builder histogramDataPoint = HistogramDataPoint.newBuilder()
.addAllAttributes(tags)
.setStartTimeUnixNano(startTimeNanos)
.setTimeUnixNano(getTimeUnixNano())
.setSum(total)
.setCount(count);

if (isDelta()) {
histogramDataPoint.setMax(isTimeBased ? histogramSnapshot.max(getBaseTimeUnit()) : histogramSnapshot.max());
}
// if histogram enabled, add histogram buckets
if (histogramSnapshot.histogramCounts().length != 0) {
for (CountAtBucket countAtBucket : histogramSnapshot.histogramCounts()) {
if (countAtBucket.bucket() != Double.POSITIVE_INFINITY) {
// OTLP expects explicit bounds to not contain POSITIVE_INFINITY but
// there should be a
// bucket count representing values between last bucket and
// POSITIVE_INFINITY.
histogramDataPoint.addExplicitBounds(
isTimeBased ? countAtBucket.bucket(getBaseTimeUnit()) : countAtBucket.bucket());
}
histogramDataPoint.addBucketCounts((long) countAtBucket.count());
}
metricBuilder.setHistogram(io.opentelemetry.proto.metrics.v1.Histogram.newBuilder()
.setAggregationTemporality(otlpAggregationTemporality)
.addDataPoints(histogramDataPoint));
return metricBuilder.build();
}

return metricBuilder
.setHistogram(io.opentelemetry.proto.metrics.v1.Histogram.newBuilder()
.setAggregationTemporality(otlpAggregationTemporality)
.addDataPoints(histogramDataPoint))
.build();
}

// VisibleForTesting
Metric writeFunctionTimer(FunctionTimer functionTimer) {
return getMetricBuilder(functionTimer.getId())
.setHistogram(io.opentelemetry.proto.metrics.v1.Histogram.newBuilder()
.addDataPoints(HistogramDataPoint.newBuilder()
.addAllAttributes(getTagsForId(functionTimer.getId()))
.setStartTimeUnixNano(getStartTimeNanos((functionTimer)))
.setTimeUnixNano(getTimeUnixNano())
.setSum(functionTimer.totalTime(getBaseTimeUnit()))
.setCount((long) functionTimer.count()))
.setAggregationTemporality(otlpAggregationTemporality))
.build();
}

private boolean isCumulative() {
return this.otlpAggregationTemporality == AGGREGATION_TEMPORALITY_CUMULATIVE;
return this.aggregationTemporality == AggregationTemporality.CUMULATIVE;
}

private boolean isDelta() {
return this.otlpAggregationTemporality == AGGREGATION_TEMPORALITY_DELTA;
}

// VisibleForTesting
void setDeltaAggregationTimeUnixNano() {
this.deltaAggregationTimeUnixNano = (clock.wallTime() / config.step().toMillis()) * config.step().toNanos();
}

private long getTimeUnixNano() {
return isCumulative() ? TimeUnit.MILLISECONDS.toNanos(this.clock.wallTime()) : deltaAggregationTimeUnixNano;
}

private long getStartTimeNanos(Meter meter) {
return isCumulative() ? ((StartTimeAwareMeter) meter).getStartTimeNanos()
: deltaAggregationTimeUnixNano - config.step().toNanos();
}

private Metric.Builder getMetricBuilder(Meter.Id id) {
Metric.Builder builder = Metric.newBuilder().setName(getConventionName(id));
if (id.getBaseUnit() != null) {
builder.setUnit(id.getBaseUnit());
}
if (id.getDescription() != null) {
builder.setDescription(id.getDescription());
}
return builder;
}

private Iterable<? extends KeyValue> getTagsForId(Meter.Id id) {
return id.getTags()
.stream()
.map(tag -> createKeyValue(tag.getKey(), tag.getValue()))
.collect(Collectors.toList());
return this.aggregationTemporality == AggregationTemporality.DELTA;
}

// VisibleForTesting
Expand Down

0 comments on commit 175a0c9

Please sign in to comment.