Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Low allocation OTLP marshaler #6328

Closed
wants to merge 17 commits into from
Expand Up @@ -89,10 +89,15 @@ public static void longToBase16String(long value, char[] dest, int destOffset) {
/** Returns the {@code byte[]} decoded from the given hex {@link CharSequence}. */
public static byte[] bytesFromBase16(CharSequence value, int length) {
byte[] result = new byte[length / 2];
bytesFromBase16(value, length, result);
return result;
}

/** Fills {@code bytes} with bytes decoded from the given hex {@link CharSequence}. */
public static void bytesFromBase16(CharSequence value, int length, byte[] bytes) {
for (int i = 0; i < length; i += 2) {
result[i / 2] = byteFromBase16(value.charAt(i), value.charAt(i + 1));
bytes[i / 2] = byteFromBase16(value.charAt(i), value.charAt(i + 1));
}
return result;
}

/** Fills {@code dest} with the hex encoding of {@code bytes}. */
Expand Down
1 change: 1 addition & 0 deletions exporters/common/build.gradle.kts
Expand Up @@ -14,6 +14,7 @@ dependencies {
api(project(":sdk-extensions:autoconfigure-spi"))

compileOnly(project(":sdk:common"))
compileOnly(project(":exporters:common:compile-stub"))

compileOnly("org.codehaus.mojo:animal-sniffer-annotations")

Expand Down
6 changes: 6 additions & 0 deletions exporters/common/compile-stub/build.gradle.kts
@@ -0,0 +1,6 @@
plugins {
id("otel.java-conventions")
}

description = "OpenTelemetry Exporter Compile Stub"
otelJava.moduleName.set("io.opentelemetry.exporter.internal.compile-stub")
@@ -0,0 +1,35 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package sun.misc;

import java.lang.reflect.Field;

/**
* sun.misc.Unsafe from the JDK isn't found by the compiler, we provide out own trimmed down version
* that we can compile against.
*/
public class Unsafe {

public long objectFieldOffset(Field f) {
return -1;
}

public Object getObject(Object o, long offset) {
return null;
}

public byte getByte(Object o, long offset) {
return 0;
}

public int arrayBaseOffset(Class<?> arrayClass) {
return 0;
}

public long getLong(Object o, long offset) {
return 0;
}
}
Expand Up @@ -108,6 +108,14 @@ public void writeString(ProtoFieldInfo field, byte[] utf8Bytes) throws IOExcepti
generator.writeString(new String(utf8Bytes, StandardCharsets.UTF_8));
}

@Override
public void writeString(
ProtoFieldInfo field, String string, int utf8Length, MarshalerContext context)
throws IOException {
generator.writeFieldName(field.getJsonName());
generator.writeString(string);
}

@Override
public void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException {
generator.writeBinaryField(field.getJsonName(), value);
Expand Down Expand Up @@ -165,6 +173,44 @@ public void serializeRepeatedMessage(
generator.writeEndArray();
}

@Override
public <T> void serializeRepeatedMessage(
ProtoFieldInfo field,
List<? extends T> messages,
StatelessMarshaler<T> marshaler,
MarshalerContext context)
throws IOException {
generator.writeArrayFieldStart(field.getJsonName());
for (int i = 0; i < messages.size(); i++) {
T message = messages.get(i);
generator.writeStartObject();
marshaler.writeTo(this, message, context);
generator.writeEndObject();
}
generator.writeEndArray();
}

@Override
protected void writeStartRepeated(ProtoFieldInfo field) throws IOException {
generator.writeArrayFieldStart(field.getJsonName());
}

@Override
protected void writeEndRepeated() throws IOException {
generator.writeEndArray();
}

@Override
protected void writeStartRepeatedElement(ProtoFieldInfo field, int protoMessageSize)
throws IOException {
generator.writeStartObject();
}

@Override
protected void writeEndRepeatedElement() throws IOException {
generator.writeEndObject();
}

// Not a field.
void writeMessageValue(Marshaler message) throws IOException {
generator.writeStartObject();
Expand Down
@@ -0,0 +1,232 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.internal.marshal;

import io.opentelemetry.api.trace.SpanId;
import io.opentelemetry.api.trace.TraceId;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.Nullable;

/**
* Class for keeping marshaling state. The state consists of integers, that we call sizes, and
* objects, that we call data. Both integers and objects can be read from the state in the order
* they were added (first in, first out). Additionally, this class provides various pools and caches
* for objects that can be reused between marshalling attempts.
*/
public final class MarshalerContext {
private final boolean marshalStringNoAllocation;
private final boolean marshalStringUnsafe;

private int[] sizes = new int[16];
private int sizeReadIndex;
private int sizeWriteIndex;
private Object[] data = new Object[16];
private int dataReadIndex;
private int dataWriteIndex;

@SuppressWarnings("BooleanParameter")
public MarshalerContext() {
this(true, true);
}

public MarshalerContext(boolean marshalStringNoAllocation, boolean marshalStringUnsafe) {
this.marshalStringNoAllocation = marshalStringNoAllocation;
this.marshalStringUnsafe = marshalStringUnsafe;
}

public boolean marshalStringNoAllocation() {
return marshalStringNoAllocation;
}

public boolean marshalStringUnsafe() {
return marshalStringUnsafe;
}

public void addSize(int size) {
growSizeIfNeeded();
sizes[sizeWriteIndex++] = size;
}

public int addSize() {
growSizeIfNeeded();
return sizeWriteIndex++;
}

private void growSizeIfNeeded() {
if (sizeWriteIndex == sizes.length) {
int[] newSizes = new int[sizes.length * 2];
System.arraycopy(sizes, 0, newSizes, 0, sizes.length);
sizes = newSizes;
laurit marked this conversation as resolved.
Show resolved Hide resolved
}
}

public void setSize(int index, int size) {
sizes[index] = size;
}

public int getSize() {
return sizes[sizeReadIndex++];
}

public void addData(@Nullable Object o) {
growDataIfNeeded();
data[dataWriteIndex++] = o;
}

private void growDataIfNeeded() {
if (dataWriteIndex == data.length) {
Object[] newData = new Object[data.length * 2];
System.arraycopy(data, 0, newData, 0, data.length);
data = newData;
}
}

public <T> T getData(Class<T> type) {
return type.cast(data[dataReadIndex++]);
}

private final IdPool traceIdPool = new IdPool(TraceId.getLength() / 2);

/** Returns a buffer that can be used to hold a trace id. */
public byte[] getTraceIdBuffer() {
return traceIdPool.get();
}

private final IdPool spanIdPool = new IdPool(SpanId.getLength() / 2);

/** Returns a buffer that can be used to hold a span id. */
public byte[] getSpanIdBuffer() {
return spanIdPool.get();
}

private static class IdPool {
private final List<byte[]> pool = new ArrayList<>();
int index;
final int idSize;

IdPool(int idSize) {
this.idSize = idSize;
}

byte[] get() {
if (index < pool.size()) {
return pool.get(index++);
}
byte[] result = new byte[idSize];
pool.add(result);
index++;

return result;
}

void reset() {
index = 0;
}
}

private final Pool<Map<?, ?>> mapPool = new Pool<>(IdentityHashMap::new, Map::clear);

/** Returns a pooled identity map. */
@SuppressWarnings("unchecked")
public <K, V> Map<K, V> getIdentityMap() {
return (Map<K, V>) mapPool.get();
}

private final Pool<List<?>> listPool = new Pool<>(ArrayList::new, List::clear);

/** Returns a pooled list. */
@SuppressWarnings("unchecked")
public <T> List<T> getList() {
return (List<T>) listPool.get();
}

private static class Pool<T> {
private final List<T> pool = new ArrayList<>();
private int index;
private final Supplier<T> factory;
private final Consumer<T> clean;

Pool(Supplier<T> factory, Consumer<T> clean) {
this.factory = factory;
this.clean = clean;
}

T get() {
if (index < pool.size()) {
return pool.get(index++);
}
T result = factory.get();
pool.add(result);
index++;

return result;
}

void reset() {
for (int i = 0; i < index; i++) {
clean.accept(pool.get(i));
}
index = 0;
}
}

/** Reset context so that serialization could be re-run. */
public void resetReadIndex() {
sizeReadIndex = 0;
dataReadIndex = 0;
}

/** Reset context so that it could be reused. */
public void reset() {
sizeReadIndex = 0;
sizeWriteIndex = 0;
for (int i = 0; i < dataWriteIndex; i++) {
data[i] = null;
}
dataReadIndex = 0;
dataWriteIndex = 0;

traceIdPool.reset();
spanIdPool.reset();

mapPool.reset();
listPool.reset();
}

private static final AtomicInteger KEY_INDEX = new AtomicInteger();

public static class Key {
final int index = KEY_INDEX.getAndIncrement();
}

public static Key key() {
return new Key();
}

private Object[] instances = new Object[16];

@SuppressWarnings("unchecked")
public <T> T getInstance(Key key, Supplier<T> supplier) {
if (key.index >= instances.length) {
Object[] newData = new Object[instances.length * 2];
System.arraycopy(instances, 0, newData, 0, instances.length);
instances = newData;
}

T result = (T) instances[key.index];
if (result == null) {
result = supplier.get();
instances[key.index] = result;
}
return result;
}
}