Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory Mode: Adding support for OTLP HTTP Exporter - First draft #6171

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -41,9 +41,12 @@
import static io.opentelemetry.exporter.internal.marshal.WireFormat.MAX_VARINT32_SIZE;
import static io.opentelemetry.exporter.internal.marshal.WireFormat.MAX_VARINT_SIZE;

import com.google.common.base.Utf8;
import io.opentelemetry.api.internal.ConfigUtil;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

/**
* Protobuf wire encoder.
Expand Down Expand Up @@ -181,6 +184,8 @@ final void writeEnumNoTag(final int value) throws IOException {
writeInt32NoTag(value);
}

abstract void writeByteArrayNoTag(final ByteBuffer value) throws IOException;

/** Write a {@code bytes} field to the stream. */
final void writeByteArrayNoTag(final byte[] value) throws IOException {
writeByteArrayNoTag(value, 0, value.length);
Expand Down Expand Up @@ -324,6 +329,22 @@ static int computeEnumSizeNoTag(final int value) {
return computeInt32SizeNoTag(value);
}

/**
* Compute the number of bytes that would be needed to encode a {@code string} field
*/
public static int computeStringSizeNoTag(final String value) {
int length;
try {
length = Utf8.encodedLength(value);
} catch (IllegalArgumentException e) {
final byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
length = bytes.length;
}

return computeLengthDelimitedFieldSize(length);
}


/** Compute the number of bytes that would be needed to encode a {@code bytes} field. */
public static int computeByteArraySizeNoTag(final byte[] value) {
return computeLengthDelimitedFieldSize(value.length);
Expand Down Expand Up @@ -481,6 +502,15 @@ void reset(OutputStream out) {
totalBytesWritten = 0;
}

@Override
void writeByteArrayNoTag(final ByteBuffer value) throws IOException {
// what happens if the value is an empty string?
writeUInt32NoTag(value.remaining());
while (value.hasRemaining()) {
write(value.get());
}
}

@Override
void writeByteArrayNoTag(final byte[] value, int offset, int length) throws IOException {
writeUInt32NoTag(length);
Expand Down
@@ -0,0 +1,46 @@
package io.opentelemetry.exporter.internal.marshal;

// Protocol Buffers - Google's data interchange format
// Copyright 2008 Google Inc. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file or at
// https://developers.google.com/open-source/licenses/bsd

import java.nio.Buffer;

/**
* Wrappers around {@link Buffer} methods that are covariantly overridden in Java 9+. See
* https://github.com/protocolbuffers/protobuf/issues/11393
*
* <p>TODO remove when Java 8 support is no longer needed.
*/
// Copied from
// https://github.com/protocolbuffers/protobuf/blob/master/java/core/src/main/java/com/google/protobuf/Java8Compatability.java
final class Java8Compatibility {
static void clear(Buffer b) {
b.clear();
}

static void flip(Buffer b) {
b.flip();
}

static void limit(Buffer b, int limit) {
b.limit(limit);
}

static void mark(Buffer b) {
b.mark();
}

static void position(Buffer b, int position) {
b.position(position);
}

static void reset(Buffer b) {
b.reset();
}

private Java8Compatibility() {}
}
Expand Up @@ -96,6 +96,12 @@ protected void writeDoubleValue(double value) throws IOException {
generator.writeNumber(value);
}

/** Writes a protobuf {@code string} field, even if it matches the default value. */
public void writeString(ProtoFieldInfo field, String string) throws IOException {
generator.writeFieldName(field.getJsonName());
generator.writeString(string);
}

@Override
public void writeString(ProtoFieldInfo field, byte[] utf8Bytes) throws IOException {
generator.writeFieldName(field.getJsonName());
Expand Down
Expand Up @@ -265,6 +265,14 @@ public static int sizeFixed32(ProtoFieldInfo field, int message) {
return field.getTagSize() + CodedOutputStream.computeFixed32SizeNoTag(message);
}

/** Returns the size of a string field, encoded using UTF-8 encoding */
public static int sizeStringUtf8(ProtoFieldInfo field, String message) {
if (message == null || message.isEmpty()) {
return 0;
}
return field.getTagSize() + CodedOutputStream.computeStringSizeNoTag(message);
}

/** Returns the size of a bytes field. */
public static int sizeBytes(ProtoFieldInfo field, byte[] message) {
if (message.length == 0) {
Expand Down
Expand Up @@ -10,6 +10,7 @@
import io.opentelemetry.api.trace.TraceId;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -23,7 +24,7 @@ final class ProtoSerializer extends Serializer implements AutoCloseable {
// reusing buffers for the thread is almost free. Even with multiple threads, it should still be
// worth it and is common practice in serialization libraries such as Jackson.
private static final ThreadLocal<Map<String, byte[]>> THREAD_LOCAL_ID_CACHE = new ThreadLocal<>();

private static final ThreadLocal<ByteBuffer> THREAD_LOCAL_STRING_BYTE_BUFFER = new ThreadLocal<>();
private final CodedOutputStream output;
private final Map<String, byte[]> idCache;

Expand Down Expand Up @@ -117,6 +118,25 @@ protected void writeDoubleValue(double value) throws IOException {
output.writeDoubleNoTag(value);
}

@Override
public void writeString(ProtoFieldInfo field, String string) throws IOException {
output.writeUInt32NoTag(field.getTag());
ByteBuffer byteBuffer = THREAD_LOCAL_STRING_BYTE_BUFFER.get();
if (byteBuffer == null) {
byteBuffer = ByteBuffer.allocateDirect(string.length() * 4);
THREAD_LOCAL_STRING_BYTE_BUFFER.set(byteBuffer);
} else {
byteBuffer.clear();
if (byteBuffer.capacity() < string.length() * 4) {
byteBuffer = ByteBuffer.allocateDirect(string.length() * 4);
THREAD_LOCAL_STRING_BYTE_BUFFER.set(byteBuffer);
}
}
Utf8.encodeUtf8(string, byteBuffer);
byteBuffer.flip(); // Make it readable for CodedOutputStream
output.writeByteArrayNoTag(byteBuffer);
}

@Override
public void writeString(ProtoFieldInfo field, byte[] utf8Bytes) throws IOException {
writeBytes(field, utf8Bytes);
Expand Down
Expand Up @@ -156,6 +156,17 @@ public void serializeDoubleOptional(ProtoFieldInfo field, double value) throws I

protected abstract void writeDoubleValue(double value) throws IOException;

/**
* Serializes a protobuf {@code string} field. {@code utf8Bytes} is the UTF8 encoded bytes of the
* string to serialize.
*/
public void serializeString(ProtoFieldInfo field, String string) throws IOException {
if (string.length() == 0) {
return;
}
writeString(field, string);
}

/**
* Serializes a protobuf {@code string} field. {@code utf8Bytes} is the UTF8 encoded bytes of the
* string to serialize.
Expand All @@ -167,6 +178,9 @@ public void serializeString(ProtoFieldInfo field, byte[] utf8Bytes) throws IOExc
writeString(field, utf8Bytes);
}

/** Writes a protobuf {@code string} field, even if it matches the default value. */
public abstract void writeString(ProtoFieldInfo field, String string) throws IOException;

/** Writes a protobuf {@code string} field, even if it matches the default value. */
public abstract void writeString(ProtoFieldInfo field, byte[] utf8Bytes) throws IOException;

Expand Down