Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory Mode: Adding support for OTLP HTTP Exporter - Second draft #6273

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -41,9 +41,12 @@
import static io.opentelemetry.exporter.internal.marshal.WireFormat.MAX_VARINT32_SIZE;
import static io.opentelemetry.exporter.internal.marshal.WireFormat.MAX_VARINT_SIZE;

import com.google.common.base.Utf8;
import io.opentelemetry.api.internal.ConfigUtil;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

/**
* Protobuf wire encoder.
Expand Down Expand Up @@ -181,6 +184,8 @@ final void writeEnumNoTag(final int value) throws IOException {
writeInt32NoTag(value);
}

abstract void writeByteArrayNoTag(final ByteBuffer value) throws IOException;

/** Write a {@code bytes} field to the stream. */
final void writeByteArrayNoTag(final byte[] value) throws IOException {
writeByteArrayNoTag(value, 0, value.length);
Expand Down Expand Up @@ -324,11 +329,29 @@ static int computeEnumSizeNoTag(final int value) {
return computeInt32SizeNoTag(value);
}

/** Compute the number of bytes that would be needed to encode a {@code string} field. */
public static int computeStringSizeNoTag(final String value) {
int length;
try {
length = Utf8.encodedLength(value);
} catch (IllegalArgumentException e) {
final byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
length = bytes.length;
}

return computeLengthDelimitedFieldSize(length);
}

/** Compute the number of bytes that would be needed to encode a {@code bytes} field. */
public static int computeByteArraySizeNoTag(final byte[] value) {
return computeLengthDelimitedFieldSize(value.length);
}

/** Compute the number of bytes that would be needed to encode a {@code bytes} field. */
public static int computeByteBufferSizeNoTag(final ByteBuffer value) {
return computeLengthDelimitedFieldSize(value.remaining());
}

static int computeLengthDelimitedFieldSize(int fieldLength) {
return computeUInt32SizeNoTag(fieldLength) + fieldLength;
}
Expand Down Expand Up @@ -481,6 +504,15 @@ void reset(OutputStream out) {
totalBytesWritten = 0;
}

@Override
void writeByteArrayNoTag(final ByteBuffer value) throws IOException {
// what happens if the value is an empty string?
writeUInt32NoTag(value.remaining());
while (value.hasRemaining()) {
write(value.get());
}
}

@Override
void writeByteArrayNoTag(final byte[] value, int offset, int length) throws IOException {
writeUInt32NoTag(length);
Expand Down
@@ -0,0 +1,42 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.internal.marshal;

import io.opentelemetry.sdk.internal.DynamicList;
import java.util.List;

public final class DefaultMessageSize implements MessageSize {

private int encodedSize = 0;
private DynamicList<MessageSize> messageFieldSizes = DynamicList.empty();

DefaultMessageSize() {}

public void set(int encodedSize) {
this.encodedSize = encodedSize;
this.messageFieldSizes = DynamicList.empty();
}

public void set(int encodedSize, DynamicList<MessageSize> messageFieldSizes) {
this.encodedSize = encodedSize;
this.messageFieldSizes = messageFieldSizes;
}

@Override
public int getEncodedSize() {
return encodedSize;
}

@Override
public List<MessageSize> getMessageTypedFieldSizes() {
return messageFieldSizes;
}

@Override
public MessageSize getMessageTypeFieldSize(int messageFieldPosition) {
return messageFieldSizes.get(messageFieldPosition);
}
}
@@ -0,0 +1,51 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.internal.marshal;

// Protocol Buffers - Google's data interchange format
// Copyright 2008 Google Inc. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file or at
// https://developers.google.com/open-source/licenses/bsd

import java.nio.Buffer;

/**
* Wrappers around {@link Buffer} methods that are covariantly overridden in Java 9+. See
* https://github.com/protocolbuffers/protobuf/issues/11393
*
* <p>TODO remove when Java 8 support is no longer needed.
*/
// Copied from
// https://github.com/protocolbuffers/protobuf/blob/master/java/core/src/main/java/com/google/protobuf/Java8Compatability.java
final class Java8Compatibility {
static void clear(Buffer b) {
b.clear();
}

static void flip(Buffer b) {
b.flip();
}

static void limit(Buffer b, int limit) {
b.limit(limit);
}

static void mark(Buffer b) {
b.mark();
}

static void position(Buffer b, int position) {
b.position(position);
}

static void reset(Buffer b) {
b.reset();
}

private Java8Compatibility() {}
}
Expand Up @@ -108,6 +108,13 @@ public void writeString(ProtoFieldInfo field, byte[] utf8Bytes) throws IOExcepti
generator.writeString(new String(utf8Bytes, StandardCharsets.UTF_8));
}

/** Writes a protobuf {@code string} field, even if it matches the default value. */
@Override
public void writeString(ProtoFieldInfo field, String string) throws IOException {
generator.writeFieldName(field.getJsonName());
generator.writeString(string);
}

@Override
public void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException {
generator.writeBinaryField(field.getJsonName(), value);
Expand Down Expand Up @@ -165,13 +172,41 @@ public void serializeRepeatedMessage(
generator.writeEndArray();
}

@Override
public void serializeRepeatedMessage(
ProtoFieldInfo field,
List<?> repeatedMessage,
MessageSerializer repeatedMessageSerializer,
List<MessageSize> repeatedMessageSize,
MarshallingObjectsPool pool)
throws IOException {
generator.writeArrayFieldStart(field.getJsonName());
for (int i = 0; i < repeatedMessage.size(); i++) {
Object message = repeatedMessage.get(i);
MessageSize messageSize = repeatedMessageSize.get(i);
writeMessageValue(message, repeatedMessageSerializer, messageSize, pool);
}
generator.writeEndArray();
}

// Not a field.
void writeMessageValue(Marshaler message) throws IOException {
generator.writeStartObject();
message.writeTo(this);
generator.writeEndObject();
}

void writeMessageValue(
Object message,
MessageSerializer messageSerializer,
MessageSize messageSize,
MarshallingObjectsPool pool)
throws IOException {
generator.writeStartObject();
messageSerializer.serialize(this, message, messageSize, pool);
generator.writeEndObject();
}

@Override
public void writeSerializedMessage(byte[] protoSerialized, String jsonSerialized)
throws IOException {
Expand Down
Expand Up @@ -8,6 +8,7 @@
import io.opentelemetry.api.trace.SpanId;
import io.opentelemetry.api.trace.TraceId;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.internal.DynamicList;
import io.opentelemetry.sdk.internal.DynamicPrimitiveLongList;
import io.opentelemetry.sdk.resources.Resource;
import java.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -160,6 +161,19 @@ public static int sizeRepeatedDouble(ProtoFieldInfo field, List<Double> values)
return sizeRepeatedFixed64(field, values.size());
}

/** Returns the size of a repeated message field. */
@SuppressWarnings("ForLoopReplaceableByForEach")
public static int sizeRepeatedMessage(
ProtoFieldInfo field, DynamicList<MessageSize> repeatedMessage) {
int size = 0;
int fieldTagSize = field.getTagSize();
for (int i = 0; i < repeatedMessage.size(); i++) {
int fieldSize = repeatedMessage.get(i).getEncodedSize();
size += fieldTagSize + CodedOutputStream.computeUInt32SizeNoTag(fieldSize) + fieldSize;
}
return size;
}

/** Returns the size of a repeated message field. */
@SuppressWarnings("AvoidObjectArrays")
public static <T extends Marshaler> int sizeRepeatedMessage(
Expand Down Expand Up @@ -191,6 +205,11 @@ public static int sizeMessage(ProtoFieldInfo field, Marshaler message) {
return field.getTagSize() + CodedOutputStream.computeUInt32SizeNoTag(fieldSize) + fieldSize;
}

public static int sizeMessage(ProtoFieldInfo field, MessageSize messageSize) {
int fieldSize = messageSize.getEncodedSize();
return field.getTagSize() + CodedOutputStream.computeUInt32SizeNoTag(fieldSize) + fieldSize;
}

/** Returns the size of a bool field. */
public static int sizeBool(ProtoFieldInfo field, boolean value) {
if (!value) {
Expand Down Expand Up @@ -270,6 +289,14 @@ public static int sizeFixed32(ProtoFieldInfo field, int message) {
return field.getTagSize() + CodedOutputStream.computeFixed32SizeNoTag(message);
}

/** Returns the size of a string field, encoded using UTF-8 encoding. */
public static int sizeStringUtf8(ProtoFieldInfo field, String message) {
if (message.isEmpty()) {
return 0;
}
return field.getTagSize() + CodedOutputStream.computeStringSizeNoTag(message);
}

/** Returns the size of a bytes field. */
public static int sizeBytes(ProtoFieldInfo field, byte[] message) {
if (message.length == 0) {
Expand Down
@@ -0,0 +1,32 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.internal.marshal;

import io.opentelemetry.sdk.internal.DynamicList;
import io.opentelemetry.sdk.internal.ObjectPool;

public class MarshallingObjectsPool {
private final ObjectPool<DynamicList<MessageSize>> messageSizeDynamicListPool =
new ObjectPool<>(DynamicList::new);
private final ObjectPool<DefaultMessageSize> defaultMessageSizePool =
new ObjectPool<>(DefaultMessageSize::new);

// Accepts listSize to enable choosing the right size reusable list from the pool
// in the future
public DynamicList<MessageSize> borrowDynamicList(int listSize) {
DynamicList<MessageSize> dynamicList = messageSizeDynamicListPool.borrowObject();
dynamicList.resizeAndClear(listSize);
return dynamicList;
}

public void returnDynamicList(DynamicList<MessageSize> dynamicList) {
messageSizeDynamicListPool.returnObject(dynamicList);
}

public ObjectPool<DefaultMessageSize> getDefaultMessageSizePool() {
return defaultMessageSizePool;
}
}
@@ -0,0 +1,39 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.internal.marshal;

import java.util.List;

public interface MessageSize {

/** Returns the size of the protobuf-encoded message in bytes. */
int getEncodedSize();

/** Returns the size of the message-type fields in the message. */
List<MessageSize> getMessageTypedFieldSizes();

/**
* Returns the size of the field at the given position.
*
* <p>A message is composed of message-type fields or primitive fields. For example:
*
* <pre>
* message MyMessage {
* string field1 = 1;
* Address field2 = 2;
* int32 field3 = 3;
* FullName field4 = 4;
* }
* </pre>
*
* <p>The field sizes that are returned are the sizes of the embedded message types only. In the
* above example, {@code getMessageTypeFieldSize(0)} will return the size of {@code field2} and
* {@code getMessageTypeFieldSize(1)} will return the size of {@code field4}.
*
* @param messageFieldPosition The embedded message sequence number, starting from 0.
*/
MessageSize getMessageTypeFieldSize(int messageFieldPosition);
}