Skip to content

Commit

Permalink
Add memory mode support to OTLP exporters (#6430)
Browse files Browse the repository at this point in the history
  • Loading branch information
jack-berg committed May 9, 2024
1 parent 715211e commit 0d2d67e
Show file tree
Hide file tree
Showing 34 changed files with 499 additions and 140 deletions.
Expand Up @@ -15,6 +15,7 @@
import io.opentelemetry.exporter.internal.grpc.GrpcExporter;
import io.opentelemetry.exporter.internal.http.HttpExporter;
import io.opentelemetry.exporter.internal.http.HttpExporterBuilder;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler;
import io.opentelemetry.exporter.sender.grpc.managedchannel.internal.UpstreamGrpcSender;
import io.opentelemetry.exporter.sender.okhttp.internal.OkHttpGrpcSender;
Expand Down Expand Up @@ -67,7 +68,7 @@ public void export(

private static ManagedChannel defaultGrpcChannel;

private static GrpcExporter<TraceRequestMarshaler> upstreamGrpcExporter;
private static GrpcExporter<Marshaler> upstreamGrpcExporter;
private static GrpcExporter<TraceRequestMarshaler> okhttpGrpcSender;
private static HttpExporter<TraceRequestMarshaler> httpExporter;

Expand Down
Expand Up @@ -7,11 +7,17 @@

import io.opentelemetry.exporter.internal.http.HttpExporter;
import io.opentelemetry.exporter.internal.http.HttpExporterBuilder;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.otlp.logs.LogsRequestMarshaler;
import io.opentelemetry.exporter.internal.otlp.logs.LowAllocationLogsRequestMarshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
import java.util.StringJoiner;
import javax.annotation.concurrent.ThreadSafe;

/**
Expand All @@ -22,14 +28,18 @@
@ThreadSafe
public final class OtlpHttpLogRecordExporter implements LogRecordExporter {

private final HttpExporterBuilder<LogsRequestMarshaler> builder;
private final HttpExporter<LogsRequestMarshaler> delegate;
private final Deque<LowAllocationLogsRequestMarshaler> marshalerPool = new ArrayDeque<>();
private final HttpExporterBuilder<Marshaler> builder;
private final HttpExporter<Marshaler> delegate;
private final MemoryMode memoryMode;

OtlpHttpLogRecordExporter(
HttpExporterBuilder<LogsRequestMarshaler> builder,
HttpExporter<LogsRequestMarshaler> delegate) {
HttpExporterBuilder<Marshaler> builder,
HttpExporter<Marshaler> delegate,
MemoryMode memoryMode) {
this.builder = builder;
this.delegate = delegate;
this.memoryMode = memoryMode;
}

/**
Expand Down Expand Up @@ -61,7 +71,7 @@ public static OtlpHttpLogRecordExporterBuilder builder() {
* @since 1.29.0
*/
public OtlpHttpLogRecordExporterBuilder toBuilder() {
return new OtlpHttpLogRecordExporterBuilder(builder.copy());
return new OtlpHttpLogRecordExporterBuilder(builder.copy(), memoryMode);
}

/**
Expand All @@ -72,8 +82,24 @@ public OtlpHttpLogRecordExporterBuilder toBuilder() {
*/
@Override
public CompletableResultCode export(Collection<LogRecordData> logs) {
LogsRequestMarshaler exportRequest = LogsRequestMarshaler.create(logs);
return delegate.export(exportRequest, logs.size());
if (memoryMode == MemoryMode.REUSABLE_DATA) {
LowAllocationLogsRequestMarshaler marshaler = marshalerPool.poll();
if (marshaler == null) {
marshaler = new LowAllocationLogsRequestMarshaler();
}
LowAllocationLogsRequestMarshaler exportMarshaler = marshaler;
exportMarshaler.initialize(logs);
return delegate
.export(exportMarshaler, logs.size())
.whenComplete(
() -> {
exportMarshaler.reset();
marshalerPool.add(exportMarshaler);
});
}
// MemoryMode == MemoryMode.IMMUTABLE_DATA
LogsRequestMarshaler request = LogsRequestMarshaler.create(logs);
return delegate.export(request, logs.size());
}

@Override
Expand All @@ -89,6 +115,9 @@ public CompletableResultCode shutdown() {

@Override
public String toString() {
return "OtlpHttpLogRecordExporter{" + builder.toString(false) + "}";
StringJoiner joiner = new StringJoiner(", ", "OtlpHttpLogRecordExporter{", "}");
joiner.add(builder.toString(false));
joiner.add("memoryMode=" + memoryMode);
return joiner.toString();
}
}
Expand Up @@ -14,8 +14,9 @@
import io.opentelemetry.exporter.internal.compression.CompressorProvider;
import io.opentelemetry.exporter.internal.compression.CompressorUtil;
import io.opentelemetry.exporter.internal.http.HttpExporterBuilder;
import io.opentelemetry.exporter.internal.otlp.logs.LogsRequestMarshaler;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.common.export.ProxyOptions;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.time.Duration;
Expand All @@ -33,16 +34,19 @@
public final class OtlpHttpLogRecordExporterBuilder {

private static final String DEFAULT_ENDPOINT = "http://localhost:4318/v1/logs";
private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.IMMUTABLE_DATA;

private final HttpExporterBuilder<LogsRequestMarshaler> delegate;
private final HttpExporterBuilder<Marshaler> delegate;
private MemoryMode memoryMode;

OtlpHttpLogRecordExporterBuilder(HttpExporterBuilder<LogsRequestMarshaler> delegate) {
OtlpHttpLogRecordExporterBuilder(HttpExporterBuilder<Marshaler> delegate, MemoryMode memoryMode) {
this.delegate = delegate;
this.memoryMode = memoryMode;
OtlpUserAgent.addUserAgentHeader(delegate::addConstantHeaders);
}

OtlpHttpLogRecordExporterBuilder() {
this(new HttpExporterBuilder<>("otlp", "log", DEFAULT_ENDPOINT));
this(new HttpExporterBuilder<>("otlp", "log", DEFAULT_ENDPOINT), DEFAULT_MEMORY_MODE);
}

/**
Expand Down Expand Up @@ -206,12 +210,19 @@ public OtlpHttpLogRecordExporterBuilder setMeterProvider(
return this;
}

/** Set the {@link MemoryMode}. */
OtlpHttpLogRecordExporterBuilder setMemoryMode(MemoryMode memoryMode) {
requireNonNull(memoryMode, "memoryMode");
this.memoryMode = memoryMode;
return this;
}

/**
* Constructs a new instance of the exporter based on the builder's values.
*
* @return a new exporter's instance
*/
public OtlpHttpLogRecordExporter build() {
return new OtlpHttpLogRecordExporter(delegate, delegate.build());
return new OtlpHttpLogRecordExporter(delegate, delegate.build(), memoryMode);
}
}
Expand Up @@ -7,6 +7,8 @@

import io.opentelemetry.exporter.internal.http.HttpExporter;
import io.opentelemetry.exporter.internal.http.HttpExporterBuilder;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.otlp.metrics.LowAllocationMetricsRequestMarshaler;
import io.opentelemetry.exporter.internal.otlp.metrics.MetricsRequestMarshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.MemoryMode;
Expand All @@ -17,7 +19,9 @@
import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector;
import io.opentelemetry.sdk.metrics.export.DefaultAggregationSelector;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
import java.util.StringJoiner;
import javax.annotation.concurrent.ThreadSafe;

Expand All @@ -29,15 +33,16 @@
@ThreadSafe
public final class OtlpHttpMetricExporter implements MetricExporter {

private final HttpExporterBuilder<MetricsRequestMarshaler> builder;
private final HttpExporter<MetricsRequestMarshaler> delegate;
private final Deque<LowAllocationMetricsRequestMarshaler> marshalerPool = new ArrayDeque<>();
private final HttpExporterBuilder<Marshaler> builder;
private final HttpExporter<Marshaler> delegate;
private final AggregationTemporalitySelector aggregationTemporalitySelector;
private final DefaultAggregationSelector defaultAggregationSelector;
private final MemoryMode memoryMode;

OtlpHttpMetricExporter(
HttpExporterBuilder<MetricsRequestMarshaler> builder,
HttpExporter<MetricsRequestMarshaler> delegate,
HttpExporterBuilder<Marshaler> builder,
HttpExporter<Marshaler> delegate,
AggregationTemporalitySelector aggregationTemporalitySelector,
DefaultAggregationSelector defaultAggregationSelector,
MemoryMode memoryMode) {
Expand Down Expand Up @@ -103,8 +108,24 @@ public MemoryMode getMemoryMode() {
*/
@Override
public CompletableResultCode export(Collection<MetricData> metrics) {
MetricsRequestMarshaler exportRequest = MetricsRequestMarshaler.create(metrics);
return delegate.export(exportRequest, metrics.size());
if (memoryMode == MemoryMode.REUSABLE_DATA) {
LowAllocationMetricsRequestMarshaler marshaler = marshalerPool.poll();
if (marshaler == null) {
marshaler = new LowAllocationMetricsRequestMarshaler();
}
LowAllocationMetricsRequestMarshaler exportMarshaler = marshaler;
exportMarshaler.initialize(metrics);
return delegate
.export(exportMarshaler, metrics.size())
.whenComplete(
() -> {
exportMarshaler.reset();
marshalerPool.add(exportMarshaler);
});
}
// MemoryMode == MemoryMode.IMMUTABLE_DATA
MetricsRequestMarshaler request = MetricsRequestMarshaler.create(metrics);
return delegate.export(request, metrics.size());
}

/**
Expand Down
Expand Up @@ -13,7 +13,7 @@
import io.opentelemetry.exporter.internal.compression.CompressorProvider;
import io.opentelemetry.exporter.internal.compression.CompressorUtil;
import io.opentelemetry.exporter.internal.http.HttpExporterBuilder;
import io.opentelemetry.exporter.internal.otlp.metrics.MetricsRequestMarshaler;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.common.export.ProxyOptions;
Expand Down Expand Up @@ -42,16 +42,15 @@ public final class OtlpHttpMetricExporterBuilder {
AggregationTemporalitySelector.alwaysCumulative();
private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.IMMUTABLE_DATA;

private final HttpExporterBuilder<MetricsRequestMarshaler> delegate;
private final HttpExporterBuilder<Marshaler> delegate;
private AggregationTemporalitySelector aggregationTemporalitySelector =
DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR;

private DefaultAggregationSelector defaultAggregationSelector =
DefaultAggregationSelector.getDefault();
private MemoryMode memoryMode;

OtlpHttpMetricExporterBuilder(
HttpExporterBuilder<MetricsRequestMarshaler> delegate, MemoryMode memoryMode) {
OtlpHttpMetricExporterBuilder(HttpExporterBuilder<Marshaler> delegate, MemoryMode memoryMode) {
this.delegate = delegate;
this.memoryMode = memoryMode;
delegate.setMeterProvider(MeterProvider::noop);
Expand Down
Expand Up @@ -7,11 +7,17 @@

import io.opentelemetry.exporter.internal.http.HttpExporter;
import io.opentelemetry.exporter.internal.http.HttpExporterBuilder;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.otlp.traces.LowAllocationTraceRequestMarshaler;
import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
import java.util.StringJoiner;
import javax.annotation.concurrent.ThreadSafe;

/**
Expand All @@ -22,14 +28,18 @@
@ThreadSafe
public final class OtlpHttpSpanExporter implements SpanExporter {

private final HttpExporterBuilder<TraceRequestMarshaler> builder;
private final HttpExporter<TraceRequestMarshaler> delegate;
private final Deque<LowAllocationTraceRequestMarshaler> marshalerPool = new ArrayDeque<>();
private final HttpExporterBuilder<Marshaler> builder;
private final HttpExporter<Marshaler> delegate;
private final MemoryMode memoryMode;

OtlpHttpSpanExporter(
HttpExporterBuilder<TraceRequestMarshaler> builder,
HttpExporter<TraceRequestMarshaler> delegate) {
HttpExporterBuilder<Marshaler> builder,
HttpExporter<Marshaler> delegate,
MemoryMode memoryMode) {
this.builder = builder;
this.delegate = delegate;
this.memoryMode = memoryMode;
}

/**
Expand Down Expand Up @@ -61,7 +71,7 @@ public static OtlpHttpSpanExporterBuilder builder() {
* @since 1.29.0
*/
public OtlpHttpSpanExporterBuilder toBuilder() {
return new OtlpHttpSpanExporterBuilder(builder.copy());
return new OtlpHttpSpanExporterBuilder(builder.copy(), memoryMode);
}

/**
Expand All @@ -72,8 +82,24 @@ public OtlpHttpSpanExporterBuilder toBuilder() {
*/
@Override
public CompletableResultCode export(Collection<SpanData> spans) {
TraceRequestMarshaler exportRequest = TraceRequestMarshaler.create(spans);
return delegate.export(exportRequest, spans.size());
if (memoryMode == MemoryMode.REUSABLE_DATA) {
LowAllocationTraceRequestMarshaler marshaler = marshalerPool.poll();
if (marshaler == null) {
marshaler = new LowAllocationTraceRequestMarshaler();
}
LowAllocationTraceRequestMarshaler exportMarshaler = marshaler;
exportMarshaler.initialize(spans);
return delegate
.export(exportMarshaler, spans.size())
.whenComplete(
() -> {
exportMarshaler.reset();
marshalerPool.add(exportMarshaler);
});
}
// MemoryMode == MemoryMode.IMMUTABLE_DATA
TraceRequestMarshaler request = TraceRequestMarshaler.create(spans);
return delegate.export(request, spans.size());
}

/**
Expand All @@ -94,6 +120,9 @@ public CompletableResultCode shutdown() {

@Override
public String toString() {
return "OtlpHttpSpanExporter{" + builder.toString(false) + "}";
StringJoiner joiner = new StringJoiner(", ", "OtlpHttpSpanExporter{", "}");
joiner.add(builder.toString(false));
joiner.add("memoryMode=" + memoryMode);
return joiner.toString();
}
}
Expand Up @@ -14,8 +14,9 @@
import io.opentelemetry.exporter.internal.compression.CompressorProvider;
import io.opentelemetry.exporter.internal.compression.CompressorUtil;
import io.opentelemetry.exporter.internal.http.HttpExporterBuilder;
import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.common.export.ProxyOptions;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.time.Duration;
Expand All @@ -33,16 +34,19 @@
public final class OtlpHttpSpanExporterBuilder {

private static final String DEFAULT_ENDPOINT = "http://localhost:4318/v1/traces";
private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.IMMUTABLE_DATA;

private final HttpExporterBuilder<TraceRequestMarshaler> delegate;
private final HttpExporterBuilder<Marshaler> delegate;
private MemoryMode memoryMode;

OtlpHttpSpanExporterBuilder(HttpExporterBuilder<TraceRequestMarshaler> delegate) {
OtlpHttpSpanExporterBuilder(HttpExporterBuilder<Marshaler> delegate, MemoryMode memoryMode) {
this.delegate = delegate;
this.memoryMode = memoryMode;
OtlpUserAgent.addUserAgentHeader(delegate::addConstantHeaders);
}

OtlpHttpSpanExporterBuilder() {
this(new HttpExporterBuilder<>("otlp", "span", DEFAULT_ENDPOINT));
this(new HttpExporterBuilder<>("otlp", "span", DEFAULT_ENDPOINT), DEFAULT_MEMORY_MODE);
}

/**
Expand Down Expand Up @@ -207,12 +211,19 @@ public OtlpHttpSpanExporterBuilder setMeterProvider(
return this;
}

/** Set the {@link MemoryMode}. */
OtlpHttpSpanExporterBuilder setMemoryMode(MemoryMode memoryMode) {
requireNonNull(memoryMode, "memoryMode");
this.memoryMode = memoryMode;
return this;
}

/**
* Constructs a new instance of the exporter based on the builder's values.
*
* @return a new exporter's instance
*/
public OtlpHttpSpanExporter build() {
return new OtlpHttpSpanExporter(delegate, delegate.build());
return new OtlpHttpSpanExporter(delegate, delegate.build(), memoryMode);
}
}

0 comments on commit 0d2d67e

Please sign in to comment.