Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Low allocation OTLP marshaler #6328

Closed
wants to merge 17 commits into from
Expand Up @@ -89,10 +89,15 @@ public static void longToBase16String(long value, char[] dest, int destOffset) {
/** Returns the {@code byte[]} decoded from the given hex {@link CharSequence}. */
public static byte[] bytesFromBase16(CharSequence value, int length) {
byte[] result = new byte[length / 2];
bytesFromBase16(value, length, result);
return result;
}

/** Fills {@code bytes} with bytes decoded from the given hex {@link CharSequence}. */
public static void bytesFromBase16(CharSequence value, int length, byte[] bytes) {
for (int i = 0; i < length; i += 2) {
result[i / 2] = byteFromBase16(value.charAt(i), value.charAt(i + 1));
bytes[i / 2] = byteFromBase16(value.charAt(i), value.charAt(i + 1));
}
return result;
}

/** Fills {@code dest} with the hex encoding of {@code bytes}. */
Expand Down
Expand Up @@ -213,7 +213,7 @@ static int computeInt32SizeNoTag(final int value) {
}

/** Compute the number of bytes that would be needed to encode a {@code uint32} field. */
static int computeUInt32SizeNoTag(final int value) {
public static int computeUInt32SizeNoTag(final int value) {
if ((value & (~0 << 7)) == 0) {
return 1;
}
Expand Down Expand Up @@ -329,7 +329,7 @@ public static int computeByteArraySizeNoTag(final byte[] value) {
return computeLengthDelimitedFieldSize(value.length);
}

static int computeLengthDelimitedFieldSize(int fieldLength) {
public static int computeLengthDelimitedFieldSize(int fieldLength) {
return computeUInt32SizeNoTag(fieldLength) + fieldLength;
}

Expand Down
Expand Up @@ -108,18 +108,24 @@ public void writeString(ProtoFieldInfo field, byte[] utf8Bytes) throws IOExcepti
generator.writeString(new String(utf8Bytes, StandardCharsets.UTF_8));
}

@Override
public void writeString(ProtoFieldInfo field, String string, int utf8Length) throws IOException {
generator.writeFieldName(field.getJsonName());
generator.writeString(string);
}

@Override
public void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException {
generator.writeBinaryField(field.getJsonName(), value);
}

@Override
protected void writeStartMessage(ProtoFieldInfo field, int protoMessageSize) throws IOException {
public void writeStartMessage(ProtoFieldInfo field, int protoMessageSize) throws IOException {
generator.writeObjectFieldStart(field.getJsonName());
}

@Override
protected void writeEndMessage() throws IOException {
public void writeEndMessage() throws IOException {
generator.writeEndObject();
}

Expand Down Expand Up @@ -165,6 +171,44 @@ public void serializeRepeatedMessage(
generator.writeEndArray();
}

@Override
public <T> void serializeRepeatedMessage(
ProtoFieldInfo field,
List<T> messages,
MarshalerContext context,
MessageConsumer<Serializer, T, MarshalerContext> consumer)
throws IOException {
generator.writeArrayFieldStart(field.getJsonName());
for (int i = 0; i < messages.size(); i++) {
T message = messages.get(i);
generator.writeStartObject();
consumer.accept(this, message, context);
generator.writeEndObject();
}
generator.writeEndArray();
}

@Override
public void writeStartRepeated(ProtoFieldInfo field) throws IOException {
generator.writeArrayFieldStart(field.getJsonName());
}

@Override
public void writeEndRepeated() throws IOException {
generator.writeEndArray();
}

@Override
public void writeStartRepeatedElement(ProtoFieldInfo field, int protoMessageSize)
throws IOException {
generator.writeStartObject();
}

@Override
public void writeEndRepeatedElement() throws IOException {
generator.writeEndObject();
}

// Not a field.
void writeMessageValue(Marshaler message) throws IOException {
generator.writeStartObject();
Expand Down
@@ -0,0 +1,218 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.internal.marshal;

import io.opentelemetry.api.trace.SpanId;
import io.opentelemetry.api.trace.TraceId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.Nullable;

/**
* Class for keeping marshaling state. The state consists of integers, that we call sizes, and
* objects, that we call data. Both integers and objects can be read from the state in the order
* they were added (first in, first out). Additionally, this class provides various pools and caches
* for objects that can be reused between marshalling attempts.
*/
public final class MarshalerContext {
public static final boolean MARSHAL_STRING_NO_ALLOCATION = true;

private int[] sizes = new int[1024];
private int sizeReadIndex;
private int sizeWriteIndex;
private Object[] data = new Object[1024];
private int dataReadIndex;
private int dataWriteIndex;

public boolean marshalStringNoAllocation() {
return MARSHAL_STRING_NO_ALLOCATION;
}

public void addSize(int size) {
growSizeIfNeeded();
sizes[sizeWriteIndex++] = size;
}

public int addSize() {
growSizeIfNeeded();
return sizeWriteIndex++;
}

private void growSizeIfNeeded() {
if (sizeWriteIndex == sizes.length) {
int[] newSizes = new int[sizes.length * 2];
sizes = newSizes;

Check warning on line 52 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerContext.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerContext.java#L51-L52

Added lines #L51 - L52 were not covered by tests
laurit marked this conversation as resolved.
Show resolved Hide resolved
}
}

public void setSize(int index, int size) {
sizes[index] = size;
}

public int getSize() {
return sizes[sizeReadIndex++];
}

public int addData() {
growDataIfNeeded();
return dataWriteIndex++;

Check warning on line 66 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerContext.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerContext.java#L65-L66

Added lines #L65 - L66 were not covered by tests
}

public void addData(@Nullable Object o) {
growDataIfNeeded();
data[dataWriteIndex++] = o;
}

private void growDataIfNeeded() {
if (dataWriteIndex == data.length) {
Object[] newData = new Object[data.length * 2];
System.arraycopy(data, 0, newData, 0, data.length);
data = newData;

Check warning on line 78 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerContext.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerContext.java#L76-L78

Added lines #L76 - L78 were not covered by tests
}
}

public byte[] getByteArray() {
return (byte[]) data[dataReadIndex++];
}

public String getString() {
return (String) data[dataReadIndex++];

Check warning on line 87 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerContext.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerContext.java#L87

Added line #L87 was not covered by tests
}

public <T> T getObject(Class<T> type) {
return type.cast(data[dataReadIndex++]);
}

public void setData(int index, Object value) {
data[index] = value;
}

Check warning on line 96 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerContext.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerContext.java#L95-L96

Added lines #L95 - L96 were not covered by tests

private final IdPool traceIdPool = new IdPool(TraceId.getLength() / 2);

/** Returns a buffer that can be used to hold a trace id. */
public byte[] getTraceIdBuffer() {
return traceIdPool.get();
}

private final IdPool spanIdPool = new IdPool(SpanId.getLength() / 2);

/** Returns a buffer that can be used to hold a span id. */
public byte[] getSpanIdBuffer() {
return spanIdPool.get();
}

private static class IdPool {
private final List<byte[]> pool = new ArrayList<>();
int index;
final int idSize;

IdPool(int idSize) {
this.idSize = idSize;
}

byte[] get() {
if (index < pool.size()) {
return pool.get(index++);
}
byte[] result = new byte[idSize];
pool.add(result);
index++;

return result;
}

void reset() {
index = 0;
}
}

private final Pool<Map<?, ?>> mapPool = new Pool<>(IdentityHashMap::new, Map::clear);

/** Returns a pooled identity map. */
@SuppressWarnings("unchecked")
public <K, V> Map<K, V> getIdentityMap() {
return (Map<K, V>) mapPool.get();
}

private final Pool<List<?>> listPool = new Pool<>(ArrayList::new, List::clear);

/** Returns a pooled list. */
@SuppressWarnings("unchecked")
public <T> List<T> getList() {
return (List<T>) listPool.get();
}

private static class Pool<T> {
private final List<T> pool = new ArrayList<>();
private int index;
private final Supplier<T> factory;
private final Consumer<T> clean;

Pool(Supplier<T> factory, Consumer<T> clean) {
this.factory = factory;
this.clean = clean;
}

T get() {
if (index < pool.size()) {
return pool.get(index++);
}
T result = factory.get();
pool.add(result);
index++;

return result;
}

void reset() {
for (int i = 0; i < index; i++) {
clean.accept(pool.get(i));
}
index = 0;
}
}

/** Reset context so that serialization could be re-run. */
public void resetReadIndex() {
sizeReadIndex = 0;
dataReadIndex = 0;
}

/** Reset context so that it could be reused. */
public void reset() {
sizeReadIndex = 0;
sizeWriteIndex = 0;
for (int i = 0; i < dataWriteIndex; i++) {
data[i] = null;
}
dataReadIndex = 0;
dataWriteIndex = 0;

traceIdPool.reset();
spanIdPool.reset();

mapPool.reset();
listPool.reset();
}

private final Map<Class<?>, Object> instanceMap = new HashMap<>();

/** Returns cached instance produced by the given supplier. */
@SuppressWarnings("unchecked")
public <T> T getInstance(Class<T> key, Supplier<T> supplier) {
T result = (T) instanceMap.get(key);
if (result == null) {
result = supplier.get();
instanceMap.put(key, result);
}
return result;
}
}