Skip to content

Commit

Permalink
Low allocation OTLP metrics marshaler (#6422)
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit committed May 7, 2024
1 parent 36bc703 commit 996c9c3
Show file tree
Hide file tree
Showing 24 changed files with 1,897 additions and 79 deletions.
Expand Up @@ -13,6 +13,7 @@
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.LongUpDownCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.exporter.internal.otlp.metrics.LowAllocationMetricsRequestMarshaler;
import io.opentelemetry.exporter.internal.otlp.metrics.MetricsRequestMarshaler;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.data.MetricData;
Expand All @@ -38,6 +39,9 @@
public class MetricsRequestMarshalerBenchmark {

private static final Collection<MetricData> METRICS;
private static final LowAllocationMetricsRequestMarshaler MARSHALER =
new LowAllocationMetricsRequestMarshaler();
private static final TestOutputStream OUTPUT = new TestOutputStream();

static {
InMemoryMetricReader metricReader = InMemoryMetricReader.create();
Expand Down Expand Up @@ -116,10 +120,42 @@ public class MetricsRequestMarshalerBenchmark {
}

@Benchmark
public TestOutputStream marshaler() throws IOException {
public int marshalStateful() throws IOException {
MetricsRequestMarshaler marshaler = MetricsRequestMarshaler.create(METRICS);
TestOutputStream bos = new TestOutputStream();
marshaler.writeBinaryTo(bos);
return bos;
OUTPUT.reset();
marshaler.writeBinaryTo(OUTPUT);
return OUTPUT.getCount();
}

@Benchmark
public int marshalStatefulJson() throws IOException {
MetricsRequestMarshaler marshaler = MetricsRequestMarshaler.create(METRICS);
OUTPUT.reset();
marshaler.writeJsonTo(OUTPUT);
return OUTPUT.getCount();
}

@Benchmark
public int marshalStateless() throws IOException {
MARSHALER.initialize(METRICS);
try {
OUTPUT.reset();
MARSHALER.writeBinaryTo(OUTPUT);
return OUTPUT.getCount();
} finally {
MARSHALER.reset();
}
}

@Benchmark
public int marshalStatelessJson() throws IOException {
MARSHALER.initialize(METRICS);
try {
OUTPUT.reset();
MARSHALER.writeJsonTo(OUTPUT);
return OUTPUT.getCount();
} finally {
MARSHALER.reset();
}
}
}
Expand Up @@ -37,22 +37,15 @@ static ExemplarMarshaler[] createRepeated(List<? extends ExemplarData> exemplars
return marshalers;
}

private static ExemplarMarshaler create(ExemplarData exemplar) {
// Visible for testing
static ExemplarMarshaler create(ExemplarData exemplar) {
KeyValueMarshaler[] attributeMarshalers =
KeyValueMarshaler.createForAttributes(exemplar.getFilteredAttributes());

ProtoFieldInfo valueField;
if (exemplar instanceof LongExemplarData) {
valueField = io.opentelemetry.proto.metrics.v1.internal.Exemplar.AS_INT;
} else {
assert exemplar instanceof DoubleExemplarData;
valueField = io.opentelemetry.proto.metrics.v1.internal.Exemplar.AS_DOUBLE;
}

return new ExemplarMarshaler(
exemplar.getEpochNanos(),
exemplar,
valueField,
toProtoExemplarValueType(exemplar),
exemplar.getSpanContext(),
attributeMarshalers);
}
Expand Down Expand Up @@ -121,4 +114,13 @@ private static int calculateSize(
filteredAttributeMarshalers);
return size;
}

static ProtoFieldInfo toProtoExemplarValueType(ExemplarData exemplar) {
if (exemplar instanceof LongExemplarData) {
return io.opentelemetry.proto.metrics.v1.internal.Exemplar.AS_INT;
} else {
assert exemplar instanceof DoubleExemplarData;
return io.opentelemetry.proto.metrics.v1.internal.Exemplar.AS_DOUBLE;
}
}
}
@@ -0,0 +1,93 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.internal.otlp.metrics;

import static io.opentelemetry.exporter.internal.otlp.metrics.ExemplarMarshaler.toProtoExemplarValueType;

import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.exporter.internal.marshal.MarshalerContext;
import io.opentelemetry.exporter.internal.marshal.MarshalerUtil;
import io.opentelemetry.exporter.internal.marshal.ProtoFieldInfo;
import io.opentelemetry.exporter.internal.marshal.Serializer;
import io.opentelemetry.exporter.internal.marshal.StatelessMarshaler;
import io.opentelemetry.exporter.internal.marshal.StatelessMarshalerUtil;
import io.opentelemetry.exporter.internal.otlp.KeyValueStatelessMarshaler;
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
import io.opentelemetry.sdk.metrics.data.ExemplarData;
import io.opentelemetry.sdk.metrics.data.LongExemplarData;
import java.io.IOException;

/** See {@link ExemplarMarshaler}. */
final class ExemplarStatelessMarshaler implements StatelessMarshaler<ExemplarData> {
static final ExemplarStatelessMarshaler INSTANCE = new ExemplarStatelessMarshaler();

private ExemplarStatelessMarshaler() {}

@Override
public void writeTo(Serializer output, ExemplarData exemplar, MarshalerContext context)
throws IOException {
output.serializeFixed64(
io.opentelemetry.proto.metrics.v1.internal.Exemplar.TIME_UNIX_NANO,
exemplar.getEpochNanos());
ProtoFieldInfo valueField = toProtoExemplarValueType(exemplar);
if (valueField == io.opentelemetry.proto.metrics.v1.internal.Exemplar.AS_INT) {
output.serializeFixed64Optional(valueField, ((LongExemplarData) exemplar).getValue());
} else {
output.serializeDoubleOptional(valueField, ((DoubleExemplarData) exemplar).getValue());
}
SpanContext spanContext = exemplar.getSpanContext();
if (spanContext.isValid()) {
output.serializeSpanId(
io.opentelemetry.proto.metrics.v1.internal.Exemplar.SPAN_ID,
spanContext.getSpanId(),
context);
output.serializeTraceId(
io.opentelemetry.proto.metrics.v1.internal.Exemplar.TRACE_ID,
spanContext.getTraceId(),
context);
}
output.serializeRepeatedMessageWithContext(
io.opentelemetry.proto.metrics.v1.internal.Exemplar.FILTERED_ATTRIBUTES,
exemplar.getFilteredAttributes(),
KeyValueStatelessMarshaler.INSTANCE,
context);
}

@Override
public int getBinarySerializedSize(ExemplarData exemplar, MarshalerContext context) {
int size = 0;
size +=
MarshalerUtil.sizeFixed64(
io.opentelemetry.proto.metrics.v1.internal.Exemplar.TIME_UNIX_NANO,
exemplar.getEpochNanos());
ProtoFieldInfo valueField = toProtoExemplarValueType(exemplar);
if (valueField == io.opentelemetry.proto.metrics.v1.internal.Exemplar.AS_INT) {
size +=
MarshalerUtil.sizeFixed64Optional(valueField, ((LongExemplarData) exemplar).getValue());
} else {
size +=
MarshalerUtil.sizeDoubleOptional(valueField, ((DoubleExemplarData) exemplar).getValue());
}
SpanContext spanContext = exemplar.getSpanContext();
if (spanContext.isValid()) {
size +=
MarshalerUtil.sizeSpanId(
io.opentelemetry.proto.metrics.v1.internal.Exemplar.SPAN_ID, spanContext.getSpanId());
size +=
MarshalerUtil.sizeTraceId(
io.opentelemetry.proto.metrics.v1.internal.Exemplar.TRACE_ID,
spanContext.getTraceId());
}
size +=
StatelessMarshalerUtil.sizeRepeatedMessageWithContext(
io.opentelemetry.proto.metrics.v1.internal.Exemplar.FILTERED_ATTRIBUTES,
exemplar.getFilteredAttributes(),
KeyValueStatelessMarshaler.INSTANCE,
context);

return size;
}
}
Expand Up @@ -45,7 +45,7 @@ protected void writeTo(Serializer output) throws IOException {
}
}

private static int calculateSize(int offset, List<Long> counts) {
static int calculateSize(int offset, List<Long> counts) {
int size = 0;
size += MarshalerUtil.sizeSInt32(ExponentialHistogramDataPoint.Buckets.OFFSET, offset);
if (counts instanceof DynamicPrimitiveLongList) {
Expand Down
@@ -0,0 +1,47 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.internal.otlp.metrics;

import io.opentelemetry.exporter.internal.marshal.MarshalerContext;
import io.opentelemetry.exporter.internal.marshal.Serializer;
import io.opentelemetry.exporter.internal.marshal.StatelessMarshaler;
import io.opentelemetry.proto.metrics.v1.internal.ExponentialHistogramDataPoint;
import io.opentelemetry.sdk.internal.DynamicPrimitiveLongList;
import io.opentelemetry.sdk.internal.PrimitiveLongList;
import io.opentelemetry.sdk.metrics.data.ExponentialHistogramBuckets;
import java.io.IOException;
import java.util.List;

/** See {@link ExponentialHistogramBucketsMarshaler}. */
final class ExponentialHistogramBucketsStatelessMarshaler
implements StatelessMarshaler<ExponentialHistogramBuckets> {
static final ExponentialHistogramBucketsStatelessMarshaler INSTANCE =
new ExponentialHistogramBucketsStatelessMarshaler();

private ExponentialHistogramBucketsStatelessMarshaler() {}

@Override
public void writeTo(
Serializer output, ExponentialHistogramBuckets buckets, MarshalerContext context)
throws IOException {
output.serializeSInt32(ExponentialHistogramDataPoint.Buckets.OFFSET, buckets.getOffset());
List<Long> counts = buckets.getBucketCounts();
if (counts instanceof DynamicPrimitiveLongList) {
output.serializeRepeatedUInt64(
ExponentialHistogramDataPoint.Buckets.BUCKET_COUNTS, (DynamicPrimitiveLongList) counts);
} else {
output.serializeRepeatedUInt64(
ExponentialHistogramDataPoint.Buckets.BUCKET_COUNTS, PrimitiveLongList.toArray(counts));
}
}

@Override
public int getBinarySerializedSize(
ExponentialHistogramBuckets buckets, MarshalerContext context) {
return ExponentialHistogramBucketsMarshaler.calculateSize(
buckets.getOffset(), buckets.getBucketCounts());
}
}
@@ -0,0 +1,113 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.internal.otlp.metrics;

import io.opentelemetry.exporter.internal.marshal.MarshalerContext;
import io.opentelemetry.exporter.internal.marshal.MarshalerUtil;
import io.opentelemetry.exporter.internal.marshal.Serializer;
import io.opentelemetry.exporter.internal.marshal.StatelessMarshaler;
import io.opentelemetry.exporter.internal.marshal.StatelessMarshalerUtil;
import io.opentelemetry.exporter.internal.otlp.KeyValueStatelessMarshaler;
import io.opentelemetry.proto.metrics.v1.internal.ExponentialHistogramDataPoint;
import io.opentelemetry.sdk.metrics.data.ExponentialHistogramPointData;
import java.io.IOException;

/** See {@link ExponentialHistogramDataPointMarshaler}. */
final class ExponentialHistogramDataPointStatelessMarshaler
implements StatelessMarshaler<ExponentialHistogramPointData> {
static final ExponentialHistogramDataPointStatelessMarshaler INSTANCE =
new ExponentialHistogramDataPointStatelessMarshaler();

private ExponentialHistogramDataPointStatelessMarshaler() {}

@Override
public void writeTo(
Serializer output, ExponentialHistogramPointData point, MarshalerContext context)
throws IOException {
output.serializeFixed64(
ExponentialHistogramDataPoint.START_TIME_UNIX_NANO, point.getStartEpochNanos());
output.serializeFixed64(ExponentialHistogramDataPoint.TIME_UNIX_NANO, point.getEpochNanos());
output.serializeFixed64(ExponentialHistogramDataPoint.COUNT, point.getCount());
output.serializeDouble(ExponentialHistogramDataPoint.SUM, point.getSum());
if (point.hasMin()) {
output.serializeDoubleOptional(ExponentialHistogramDataPoint.MIN, point.getMin());
}
if (point.hasMax()) {
output.serializeDoubleOptional(ExponentialHistogramDataPoint.MAX, point.getMax());
}
output.serializeSInt32(ExponentialHistogramDataPoint.SCALE, point.getScale());
output.serializeFixed64(ExponentialHistogramDataPoint.ZERO_COUNT, point.getZeroCount());
output.serializeMessageWithContext(
ExponentialHistogramDataPoint.POSITIVE,
point.getPositiveBuckets(),
ExponentialHistogramBucketsStatelessMarshaler.INSTANCE,
context);
output.serializeMessageWithContext(
ExponentialHistogramDataPoint.NEGATIVE,
point.getNegativeBuckets(),
ExponentialHistogramBucketsStatelessMarshaler.INSTANCE,
context);
output.serializeRepeatedMessageWithContext(
ExponentialHistogramDataPoint.EXEMPLARS,
point.getExemplars(),
ExemplarStatelessMarshaler.INSTANCE,
context);
output.serializeRepeatedMessageWithContext(
ExponentialHistogramDataPoint.ATTRIBUTES,
point.getAttributes(),
KeyValueStatelessMarshaler.INSTANCE,
context);
}

@Override
public int getBinarySerializedSize(
ExponentialHistogramPointData point, MarshalerContext context) {
int size = 0;
size +=
MarshalerUtil.sizeFixed64(
ExponentialHistogramDataPoint.START_TIME_UNIX_NANO, point.getStartEpochNanos());
size +=
MarshalerUtil.sizeFixed64(
ExponentialHistogramDataPoint.TIME_UNIX_NANO, point.getEpochNanos());
size += MarshalerUtil.sizeFixed64(ExponentialHistogramDataPoint.COUNT, point.getCount());
size += MarshalerUtil.sizeDouble(ExponentialHistogramDataPoint.SUM, point.getSum());
if (point.hasMin()) {
size += MarshalerUtil.sizeDoubleOptional(ExponentialHistogramDataPoint.MIN, point.getMin());
}
if (point.hasMax()) {
size += MarshalerUtil.sizeDoubleOptional(ExponentialHistogramDataPoint.MAX, point.getMax());
}
size += MarshalerUtil.sizeSInt32(ExponentialHistogramDataPoint.SCALE, point.getScale());
size +=
MarshalerUtil.sizeFixed64(ExponentialHistogramDataPoint.ZERO_COUNT, point.getZeroCount());
size +=
StatelessMarshalerUtil.sizeMessageWithContext(
ExponentialHistogramDataPoint.POSITIVE,
point.getPositiveBuckets(),
ExponentialHistogramBucketsStatelessMarshaler.INSTANCE,
context);
size +=
StatelessMarshalerUtil.sizeMessageWithContext(
ExponentialHistogramDataPoint.NEGATIVE,
point.getNegativeBuckets(),
ExponentialHistogramBucketsStatelessMarshaler.INSTANCE,
context);
size +=
StatelessMarshalerUtil.sizeRepeatedMessageWithContext(
ExponentialHistogramDataPoint.EXEMPLARS,
point.getExemplars(),
ExemplarStatelessMarshaler.INSTANCE,
context);
size +=
StatelessMarshalerUtil.sizeRepeatedMessageWithContext(
ExponentialHistogramDataPoint.ATTRIBUTES,
point.getAttributes(),
KeyValueStatelessMarshaler.INSTANCE,
context);

return size;
}
}

0 comments on commit 996c9c3

Please sign in to comment.