Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Low allocation OTLP trace marshaler #6410

Merged
merged 4 commits into from May 1, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -89,10 +89,15 @@ public static void longToBase16String(long value, char[] dest, int destOffset) {
/** Returns the {@code byte[]} decoded from the given hex {@link CharSequence}. */
public static byte[] bytesFromBase16(CharSequence value, int length) {
byte[] result = new byte[length / 2];
bytesFromBase16(value, length, result);
return result;
}

/** Fills {@code bytes} with bytes decoded from the given hex {@link CharSequence}. */
public static void bytesFromBase16(CharSequence value, int length, byte[] bytes) {
for (int i = 0; i < length; i += 2) {
result[i / 2] = byteFromBase16(value.charAt(i), value.charAt(i + 1));
bytes[i / 2] = byteFromBase16(value.charAt(i), value.charAt(i + 1));
}
return result;
}

/** Fills {@code dest} with the hex encoding of {@code bytes}. */
Expand Down
Expand Up @@ -108,6 +108,14 @@ public void writeString(ProtoFieldInfo field, byte[] utf8Bytes) throws IOExcepti
generator.writeString(new String(utf8Bytes, StandardCharsets.UTF_8));
}

@Override
public void writeString(
ProtoFieldInfo field, String string, int utf8Length, MarshalerContext context)
throws IOException {
generator.writeFieldName(field.getJsonName());
generator.writeString(string);
}

@Override
public void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException {
generator.writeBinaryField(field.getJsonName(), value);
Expand Down Expand Up @@ -165,6 +173,44 @@ public void serializeRepeatedMessage(
generator.writeEndArray();
}

@Override
public <T> void serializeRepeatedMessage(
ProtoFieldInfo field,
List<? extends T> messages,
StatelessMarshaler<T> marshaler,
MarshalerContext context)
throws IOException {
generator.writeArrayFieldStart(field.getJsonName());
for (int i = 0; i < messages.size(); i++) {
T message = messages.get(i);
generator.writeStartObject();
marshaler.writeTo(this, message, context);
generator.writeEndObject();
}
generator.writeEndArray();
}

@Override
protected void writeStartRepeated(ProtoFieldInfo field) throws IOException {
generator.writeArrayFieldStart(field.getJsonName());
}

@Override
protected void writeEndRepeated() throws IOException {
generator.writeEndArray();
}

@Override
protected void writeStartRepeatedElement(ProtoFieldInfo field, int protoMessageSize)
throws IOException {
generator.writeStartObject();
}

@Override
protected void writeEndRepeatedElement() throws IOException {
generator.writeEndObject();
}

// Not a field.
void writeMessageValue(Marshaler message) throws IOException {
generator.writeStartObject();
Expand Down
@@ -0,0 +1,226 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.internal.marshal;

import io.opentelemetry.api.trace.SpanId;
import io.opentelemetry.api.trace.TraceId;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.Nullable;

/**
* Class for keeping marshaling state. The state consists of integers, that we call sizes, and
* objects, that we call data. Both integers and objects can be read from the state in the order
* they were added (first in, first out). Additionally, this class provides various pools and caches
* for objects that can be reused between marshalling attempts.
*/
public final class MarshalerContext {
private final boolean marshalStringNoAllocation;

private int[] sizes = new int[16];
private int sizeReadIndex;
private int sizeWriteIndex;
private Object[] data = new Object[16];
private int dataReadIndex;
private int dataWriteIndex;

@SuppressWarnings("BooleanParameter")
public MarshalerContext() {
this(true);
}

public MarshalerContext(boolean marshalStringNoAllocation) {
this.marshalStringNoAllocation = marshalStringNoAllocation;
}

public boolean marshalStringNoAllocation() {
return marshalStringNoAllocation;
}

public void addSize(int size) {
growSizeIfNeeded();
sizes[sizeWriteIndex++] = size;
}

public int addSize() {
growSizeIfNeeded();
return sizeWriteIndex++;
}

private void growSizeIfNeeded() {
if (sizeWriteIndex == sizes.length) {
int[] newSizes = new int[sizes.length * 2];
System.arraycopy(sizes, 0, newSizes, 0, sizes.length);
sizes = newSizes;
}
}

public void setSize(int index, int size) {
sizes[index] = size;
}

public int getSize() {
return sizes[sizeReadIndex++];
}

public void addData(@Nullable Object o) {
growDataIfNeeded();
data[dataWriteIndex++] = o;
}

private void growDataIfNeeded() {
if (dataWriteIndex == data.length) {
Object[] newData = new Object[data.length * 2];
System.arraycopy(data, 0, newData, 0, data.length);
data = newData;
}
}

public <T> T getData(Class<T> type) {
return type.cast(data[dataReadIndex++]);
}

private final IdPool traceIdPool = new IdPool(TraceId.getLength() / 2);

/** Returns a buffer that can be used to hold a trace id. */
public byte[] getTraceIdBuffer() {
return traceIdPool.get();
}

private final IdPool spanIdPool = new IdPool(SpanId.getLength() / 2);

/** Returns a buffer that can be used to hold a span id. */
public byte[] getSpanIdBuffer() {
return spanIdPool.get();
}

private static class IdPool {
private final List<byte[]> pool = new ArrayList<>();
int index;
final int idSize;

IdPool(int idSize) {
this.idSize = idSize;
}

byte[] get() {
if (index < pool.size()) {
return pool.get(index++);
}
byte[] result = new byte[idSize];
pool.add(result);
index++;

return result;
}

void reset() {
index = 0;
}
}

private final Pool<Map<?, ?>> mapPool = new Pool<>(IdentityHashMap::new, Map::clear);

/** Returns a pooled identity map. */
@SuppressWarnings("unchecked")
public <K, V> Map<K, V> getIdentityMap() {
return (Map<K, V>) mapPool.get();
}

private final Pool<List<?>> listPool = new Pool<>(ArrayList::new, List::clear);

/** Returns a pooled list. */
@SuppressWarnings("unchecked")
public <T> List<T> getList() {
return (List<T>) listPool.get();
}

private static class Pool<T> {
private final List<T> pool = new ArrayList<>();
private int index;
private final Supplier<T> factory;
private final Consumer<T> clean;

Pool(Supplier<T> factory, Consumer<T> clean) {
this.factory = factory;
this.clean = clean;
}

T get() {
if (index < pool.size()) {
return pool.get(index++);
}
T result = factory.get();
pool.add(result);
index++;

return result;
}

void reset() {
for (int i = 0; i < index; i++) {
clean.accept(pool.get(i));
}
index = 0;
}
}

/** Reset context so that serialization could be re-run. */
public void resetReadIndex() {
sizeReadIndex = 0;
dataReadIndex = 0;
}

/** Reset context so that it could be reused. */
public void reset() {
sizeReadIndex = 0;
sizeWriteIndex = 0;
for (int i = 0; i < dataWriteIndex; i++) {
data[i] = null;
}
dataReadIndex = 0;
dataWriteIndex = 0;

traceIdPool.reset();
spanIdPool.reset();

mapPool.reset();
listPool.reset();
}

private static final AtomicInteger KEY_INDEX = new AtomicInteger();

public static class Key {
final int index = KEY_INDEX.getAndIncrement();
}

public static Key key() {
return new Key();
}

private Object[] instances = new Object[16];

@SuppressWarnings("unchecked")
public <T> T getInstance(Key key, Supplier<T> supplier) {
if (key.index >= instances.length) {
Object[] newData = new Object[instances.length * 2];
System.arraycopy(instances, 0, newData, 0, instances.length);
instances = newData;
}

T result = (T) instances[key.index];
if (result == null) {
result = supplier.get();
instances[key.index] = result;
}
return result;
}
}