Skip to content

Commit

Permalink
Unify compression configuration for exporters (#4775)
Browse files Browse the repository at this point in the history
* Fix handling of compressionMethod `none` in GrpcExporterBuilder

* Fix handling of compressionMethod `none` in OkHttpExporterBuilder

* Add compression configuration assertions to AbstractGrpcTelemetryExporterTest

* Add compression configuration to JaegerGrpcSpanExporterBuilder

* Add compression configuration to ZipkinSpanExporterBuilder

* Specify that zipkin default compression is gzip

Co-authored-by: Jack Berg <jberg@newrelic.com>
  • Loading branch information
Donnerbart and jack-berg committed Nov 1, 2022
1 parent b6c58c5 commit fa46f19
Show file tree
Hide file tree
Showing 13 changed files with 445 additions and 9 deletions.
@@ -1,2 +1,4 @@
Comparing source compatibility of against
No changes.
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.jaeger.JaegerGrpcSpanExporterBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.jaeger.JaegerGrpcSpanExporterBuilder setCompression(java.lang.String)
@@ -1,2 +1,4 @@
Comparing source compatibility of against
No changes.
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.zipkin.ZipkinSpanExporterBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.zipkin.ZipkinSpanExporterBuilder setCompression(java.lang.String)
Expand Up @@ -98,7 +98,7 @@ public GrpcExporterBuilder<T> setEndpoint(String endpoint) {
}

public GrpcExporterBuilder<T> setCompression(String compressionMethod) {
this.compressionEnabled = true;
this.compressionEnabled = compressionMethod.equals("gzip");
return this;
}

Expand Down
Expand Up @@ -72,9 +72,7 @@ public OkHttpExporterBuilder<T> setEndpoint(String endpoint) {
}

public OkHttpExporterBuilder<T> setCompression(String compressionMethod) {
if (compressionMethod.equals("gzip")) {
this.compressionEnabled = true;
}
this.compressionEnabled = compressionMethod.equals("gzip");
return this;
}

Expand Down
@@ -0,0 +1,183 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.internal.grpc;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.Codec;
import io.grpc.ManagedChannel;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import java.net.URI;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class GrpcExporterBuilderTest {

private final ManagedChannel channel = mock(ManagedChannel.class);

private GrpcExporterBuilder<Marshaler> builder;

@BeforeEach
@SuppressWarnings("unchecked")
void setUp() {
Supplier<BiFunction<Channel, String, MarshalerServiceStub<Marshaler, ?, ?>>> grpcStubFactory =
mock(Supplier.class);
when(grpcStubFactory.get())
.thenReturn((c, s) -> new TestMarshalerServiceStub(c, CallOptions.DEFAULT));

builder =
GrpcExporter.builder(
"otlp", "span", 0, URI.create("http://localhost:4317"), grpcStubFactory, "/test");
}

@Test
void compressionDefault() {
GrpcExporter<Marshaler> exporter = builder.build();
try {
assertThat(exporter)
.isInstanceOfSatisfying(
OkHttpGrpcExporter.class,
otlp -> assertThat(otlp).extracting("compressionEnabled").isEqualTo(false));
} finally {
exporter.shutdown();
}
}

@Test
void compressionNone() {
GrpcExporter<Marshaler> exporter = builder.setCompression("none").build();
try {
assertThat(exporter)
.isInstanceOfSatisfying(
OkHttpGrpcExporter.class,
otlp -> assertThat(otlp).extracting("compressionEnabled").isEqualTo(false));
} finally {
exporter.shutdown();
}
}

@Test
void compressionGzip() {
GrpcExporter<Marshaler> exporter = builder.setCompression("gzip").build();
try {
assertThat(exporter)
.isInstanceOfSatisfying(
OkHttpGrpcExporter.class,
otlp -> assertThat(otlp).extracting("compressionEnabled").isEqualTo(true));
} finally {
exporter.shutdown();
}
}

@Test
void compressionEnabledAndDisabled() {
GrpcExporter<Marshaler> exporter =
builder.setCompression("gzip").setCompression("none").build();
try {
assertThat(exporter)
.isInstanceOfSatisfying(
OkHttpGrpcExporter.class,
otlp -> assertThat(otlp).extracting("compressionEnabled").isEqualTo(false));
} finally {
exporter.shutdown();
}
}

@Test
void compressionDefaultWithChannel() {
GrpcExporter<Marshaler> exporter = builder.setChannel(channel).build();
try {
assertThat(exporter)
.isInstanceOfSatisfying(
UpstreamGrpcExporter.class,
otlp ->
assertThat(otlp)
.extracting("stub")
.extracting("callOptions.compressorName")
.isEqualTo(Codec.Identity.NONE.getMessageEncoding()));
} finally {
exporter.shutdown();
}
}

@Test
void compressionNoneWithChannel() {
GrpcExporter<Marshaler> exporter = builder.setChannel(channel).setCompression("none").build();
try {
assertThat(exporter)
.isInstanceOfSatisfying(
UpstreamGrpcExporter.class,
otlp ->
assertThat(otlp)
.extracting("stub")
.extracting("callOptions.compressorName")
.isEqualTo(Codec.Identity.NONE.getMessageEncoding()));
} finally {
exporter.shutdown();
}
}

@Test
void compressionGzipWithChannel() {
GrpcExporter<Marshaler> exporter = builder.setChannel(channel).setCompression("gzip").build();
try {
assertThat(exporter)
.isInstanceOfSatisfying(
UpstreamGrpcExporter.class,
otlp ->
assertThat(otlp)
.extracting("stub")
.extracting("callOptions.compressorName")
.isEqualTo(new Codec.Gzip().getMessageEncoding()));
} finally {
exporter.shutdown();
}
}

@Test
void compressionEnabledAndDisabledWithChannel() {
GrpcExporter<Marshaler> exporter =
builder.setChannel(channel).setCompression("gzip").setCompression("none").build();
try {
assertThat(exporter)
.isInstanceOfSatisfying(
UpstreamGrpcExporter.class,
otlp ->
assertThat(otlp)
.extracting("stub")
.extracting("callOptions.compressorName")
.isEqualTo(Codec.Identity.NONE.getMessageEncoding()));
} finally {
exporter.shutdown();
}
}

private final class TestMarshalerServiceStub
extends MarshalerServiceStub<Marshaler, Void, TestMarshalerServiceStub> {

private TestMarshalerServiceStub(Channel channel, CallOptions callOptions) {
super(channel, callOptions);
}

@Override
protected TestMarshalerServiceStub build(Channel channel, CallOptions callOptions) {
return new TestMarshalerServiceStub(channel, callOptions);
}

@Override
public ListenableFuture<Void> export(Marshaler request) {
return Futures.immediateVoidFuture();
}
}
}
@@ -0,0 +1,70 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.internal.okhttp;

import static org.assertj.core.api.Assertions.assertThat;

import io.opentelemetry.exporter.internal.marshal.Marshaler;
import org.junit.jupiter.api.Test;

class OkHttpExporterBuilderTest {

private final OkHttpExporterBuilder<Marshaler> builder =
new OkHttpExporterBuilder<>("otlp", "span", "http://localhost:4318/v1/traces");

@Test
void compressionDefault() {
OkHttpExporter<Marshaler> exporter = builder.build();
try {
assertThat(exporter)
.isInstanceOfSatisfying(
OkHttpExporter.class,
otlp -> assertThat(otlp).extracting("compressionEnabled").isEqualTo(false));
} finally {
exporter.shutdown();
}
}

@Test
void compressionNone() {
OkHttpExporter<Marshaler> exporter = builder.setCompression("none").build();
try {
assertThat(exporter)
.isInstanceOfSatisfying(
OkHttpExporter.class,
otlp -> assertThat(otlp).extracting("compressionEnabled").isEqualTo(false));
} finally {
exporter.shutdown();
}
}

@Test
void compressionGzip() {
OkHttpExporter<Marshaler> exporter = builder.setCompression("gzip").build();
try {
assertThat(exporter)
.isInstanceOfSatisfying(
OkHttpExporter.class,
otlp -> assertThat(otlp).extracting("compressionEnabled").isEqualTo(true));
} finally {
exporter.shutdown();
}
}

@Test
void compressionEnabledAndDisabled() {
OkHttpExporter<Marshaler> exporter =
builder.setCompression("gzip").setCompression("none").build();
try {
assertThat(exporter)
.isInstanceOfSatisfying(
OkHttpExporter.class,
otlp -> assertThat(otlp).extracting("compressionEnabled").isEqualTo(false));
} finally {
exporter.shutdown();
}
}
}
Expand Up @@ -66,6 +66,19 @@ public JaegerGrpcSpanExporterBuilder setEndpoint(String endpoint) {
return this;
}

/**
* Sets the method used to compress payloads. If unset, compression is disabled. Currently
* supported compression methods include "gzip" and "none".
*/
public JaegerGrpcSpanExporterBuilder setCompression(String compressionMethod) {
requireNonNull(compressionMethod, "compressionMethod");
checkArgument(
compressionMethod.equals("gzip") || compressionMethod.equals("none"),
"Unsupported compression method. Supported compression methods include: gzip, none.");
delegate.setCompression(compressionMethod);
return this;
}

/**
* Sets the maximum time to wait for the collector to process an exported batch of spans. If
* unset, defaults to {@value DEFAULT_TIMEOUT_SECS}s.
Expand Down
Expand Up @@ -357,6 +357,57 @@ void invalidConfig() {
assertThatThrownBy(() -> JaegerGrpcSpanExporter.builder().setEndpoint("gopher://localhost"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid endpoint, must start with http:// or https://: gopher://localhost");

assertThatThrownBy(() -> JaegerGrpcSpanExporter.builder().setCompression(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("compressionMethod");
assertThatThrownBy(() -> JaegerGrpcSpanExporter.builder().setCompression("foo"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
"Unsupported compression method. Supported compression methods include: gzip, none.");
}

@Test
void compressionDefault() {
JaegerGrpcSpanExporter exporter = JaegerGrpcSpanExporter.builder().build();
try {
assertThat(exporter).extracting("delegate.compressionEnabled").isEqualTo(false);
} finally {
exporter.shutdown();
}
}

@Test
void compressionNone() {
JaegerGrpcSpanExporter exporter =
JaegerGrpcSpanExporter.builder().setCompression("none").build();
try {
assertThat(exporter).extracting("delegate.compressionEnabled").isEqualTo(false);
} finally {
exporter.shutdown();
}
}

@Test
void compressionGzip() {
JaegerGrpcSpanExporter exporter =
JaegerGrpcSpanExporter.builder().setCompression("gzip").build();
try {
assertThat(exporter).extracting("delegate.compressionEnabled").isEqualTo(true);
} finally {
exporter.shutdown();
}
}

@Test
void compressionEnabledAndDisabled() {
JaegerGrpcSpanExporter exporter =
JaegerGrpcSpanExporter.builder().setCompression("gzip").setCompression("none").build();
try {
assertThat(exporter).extracting("delegate.compressionEnabled").isEqualTo(false);
} finally {
exporter.shutdown();
}
}

@Test
Expand Down

0 comments on commit fa46f19

Please sign in to comment.