Skip to content

Commit

Permalink
Low allocation OTLP trace marshaler (#6410)
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit committed May 1, 2024
1 parent e1f707e commit 2e59f54
Show file tree
Hide file tree
Showing 31 changed files with 2,575 additions and 62 deletions.
Expand Up @@ -89,10 +89,15 @@ public static void longToBase16String(long value, char[] dest, int destOffset) {
/** Returns the {@code byte[]} decoded from the given hex {@link CharSequence}. */
public static byte[] bytesFromBase16(CharSequence value, int length) {
byte[] result = new byte[length / 2];
bytesFromBase16(value, length, result);
return result;
}

/** Fills {@code bytes} with bytes decoded from the given hex {@link CharSequence}. */
public static void bytesFromBase16(CharSequence value, int length, byte[] bytes) {
for (int i = 0; i < length; i += 2) {
result[i / 2] = byteFromBase16(value.charAt(i), value.charAt(i + 1));
bytes[i / 2] = byteFromBase16(value.charAt(i), value.charAt(i + 1));
}
return result;
}

/** Fills {@code dest} with the hex encoding of {@code bytes}. */
Expand Down
Expand Up @@ -108,6 +108,14 @@ public void writeString(ProtoFieldInfo field, byte[] utf8Bytes) throws IOExcepti
generator.writeString(new String(utf8Bytes, StandardCharsets.UTF_8));
}

@Override
public void writeString(
ProtoFieldInfo field, String string, int utf8Length, MarshalerContext context)
throws IOException {
generator.writeFieldName(field.getJsonName());
generator.writeString(string);
}

@Override
public void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException {
generator.writeBinaryField(field.getJsonName(), value);
Expand Down Expand Up @@ -165,6 +173,44 @@ public void serializeRepeatedMessage(
generator.writeEndArray();
}

@Override
public <T> void serializeRepeatedMessageWithContext(
ProtoFieldInfo field,
List<? extends T> messages,
StatelessMarshaler<T> marshaler,
MarshalerContext context)
throws IOException {
generator.writeArrayFieldStart(field.getJsonName());
for (int i = 0; i < messages.size(); i++) {
T message = messages.get(i);
generator.writeStartObject();
marshaler.writeTo(this, message, context);
generator.writeEndObject();
}
generator.writeEndArray();
}

@Override
protected void writeStartRepeated(ProtoFieldInfo field) throws IOException {
generator.writeArrayFieldStart(field.getJsonName());
}

@Override
protected void writeEndRepeated() throws IOException {
generator.writeEndArray();
}

@Override
protected void writeStartRepeatedElement(ProtoFieldInfo field, int protoMessageSize)
throws IOException {
generator.writeStartObject();
}

@Override
protected void writeEndRepeatedElement() throws IOException {
generator.writeEndObject();
}

// Not a field.
void writeMessageValue(Marshaler message) throws IOException {
generator.writeStartObject();
Expand Down
@@ -0,0 +1,226 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.internal.marshal;

import io.opentelemetry.api.trace.SpanId;
import io.opentelemetry.api.trace.TraceId;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.Nullable;

/**
* Class for keeping marshaling state. The state consists of integers, that we call sizes, and
* objects, that we call data. Both integers and objects can be read from the state in the order
* they were added (first in, first out). Additionally, this class provides various pools and caches
* for objects that can be reused between marshalling attempts.
*/
public final class MarshalerContext {
private final boolean marshalStringNoAllocation;

private int[] sizes = new int[16];
private int sizeReadIndex;
private int sizeWriteIndex;
private Object[] data = new Object[16];
private int dataReadIndex;
private int dataWriteIndex;

@SuppressWarnings("BooleanParameter")
public MarshalerContext() {
this(true);
}

public MarshalerContext(boolean marshalStringNoAllocation) {
this.marshalStringNoAllocation = marshalStringNoAllocation;
}

public boolean marshalStringNoAllocation() {
return marshalStringNoAllocation;
}

public void addSize(int size) {
growSizeIfNeeded();
sizes[sizeWriteIndex++] = size;
}

public int addSize() {
growSizeIfNeeded();
return sizeWriteIndex++;
}

private void growSizeIfNeeded() {
if (sizeWriteIndex == sizes.length) {
int[] newSizes = new int[sizes.length * 2];
System.arraycopy(sizes, 0, newSizes, 0, sizes.length);
sizes = newSizes;
}
}

public void setSize(int index, int size) {
sizes[index] = size;
}

public int getSize() {
return sizes[sizeReadIndex++];
}

public void addData(@Nullable Object o) {
growDataIfNeeded();
data[dataWriteIndex++] = o;
}

private void growDataIfNeeded() {
if (dataWriteIndex == data.length) {
Object[] newData = new Object[data.length * 2];
System.arraycopy(data, 0, newData, 0, data.length);
data = newData;
}
}

public <T> T getData(Class<T> type) {
return type.cast(data[dataReadIndex++]);
}

private final IdPool traceIdPool = new IdPool(TraceId.getLength() / 2);

/** Returns a buffer that can be used to hold a trace id. */
public byte[] getTraceIdBuffer() {
return traceIdPool.get();
}

private final IdPool spanIdPool = new IdPool(SpanId.getLength() / 2);

/** Returns a buffer that can be used to hold a span id. */
public byte[] getSpanIdBuffer() {
return spanIdPool.get();
}

private static class IdPool {
private final List<byte[]> pool = new ArrayList<>();
int index;
final int idSize;

IdPool(int idSize) {
this.idSize = idSize;
}

byte[] get() {
if (index < pool.size()) {
return pool.get(index++);
}
byte[] result = new byte[idSize];
pool.add(result);
index++;

return result;
}

void reset() {
index = 0;
}
}

private final Pool<Map<?, ?>> mapPool = new Pool<>(IdentityHashMap::new, Map::clear);

/** Returns a pooled identity map. */
@SuppressWarnings("unchecked")
public <K, V> Map<K, V> getIdentityMap() {
return (Map<K, V>) mapPool.get();
}

private final Pool<List<?>> listPool = new Pool<>(ArrayList::new, List::clear);

/** Returns a pooled list. */
@SuppressWarnings("unchecked")
public <T> List<T> getList() {
return (List<T>) listPool.get();
}

private static class Pool<T> {
private final List<T> pool = new ArrayList<>();
private int index;
private final Supplier<T> factory;
private final Consumer<T> clean;

Pool(Supplier<T> factory, Consumer<T> clean) {
this.factory = factory;
this.clean = clean;
}

T get() {
if (index < pool.size()) {
return pool.get(index++);
}
T result = factory.get();
pool.add(result);
index++;

return result;
}

void reset() {
for (int i = 0; i < index; i++) {
clean.accept(pool.get(i));
}
index = 0;
}
}

/** Reset context so that serialization could be re-run. */
public void resetReadIndex() {
sizeReadIndex = 0;
dataReadIndex = 0;
}

/** Reset context so that it could be reused. */
public void reset() {
sizeReadIndex = 0;
sizeWriteIndex = 0;
for (int i = 0; i < dataWriteIndex; i++) {
data[i] = null;
}
dataReadIndex = 0;
dataWriteIndex = 0;

traceIdPool.reset();
spanIdPool.reset();

mapPool.reset();
listPool.reset();
}

private static final AtomicInteger KEY_INDEX = new AtomicInteger();

public static class Key {
final int index = KEY_INDEX.getAndIncrement();
}

public static Key key() {
return new Key();
}

private Object[] instances = new Object[16];

@SuppressWarnings("unchecked")
public <T> T getInstance(Key key, Supplier<T> supplier) {
if (key.index >= instances.length) {
Object[] newData = new Object[instances.length * 2];
System.arraycopy(instances, 0, newData, 0, instances.length);
instances = newData;
}

T result = (T) instances[key.index];
if (result == null) {
result = supplier.get();
instances[key.index] = result;
}
return result;
}
}

0 comments on commit 2e59f54

Please sign in to comment.