Skip to content

Commit

Permalink
opentelemetry: Add optional grpc.lb.locality to per-call metrics
Browse files Browse the repository at this point in the history
The optional label API was added in 4c78a97 and xds_cluster_impl was
plumbed in 077dcbf.

From gRFC A78:

> ### Optional xDS Locality Label
>
> When xDS is used, it is desirable for some metrics to include an optional
> label indicating which xDS locality the metrics are associated with.
> We want to provide this optional label for the metrics in both the
> existing per-call metrics defined in [A66] and in the new metrics for
> the WRR LB policy, described below.
>
> If locality information is available, the value of this label will be of
> the form `{region="${REGION}", zone="${ZONE}", sub_zone="${SUB_ZONE}"}`,
> where `${REGION}`, `${ZONE}`, and `${SUB_ZONE}` are replaced with the
> actual values.  If no locality information is available, the label will
> be set to the empty string.
>
> #### Per-Call Metrics
>
> To support the locality label in the per-call metrics, we will provide
> a mechanism for LB picker to add optional labels to the call attempt
> tracer.  We will then use this mechanism in the `xds_cluster_impl`
> policy's picker to set the locality label. ...
>
> This label will be available on the following per-call metrics:
> - `grpc.client.attempt.duration`
> - `grpc.client.attempt.sent_total_compressed_message_size`
> - `grpc.client.attempt.rcvd_total_compressed_message_size`
  • Loading branch information
ejona86 committed May 9, 2024
1 parent b6f7b69 commit a639175
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.grpc.opentelemetry;

import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LOCALITY_KEY;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.TARGET_KEY;
Expand All @@ -40,6 +41,8 @@
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StreamTracer;
import io.opentelemetry.api.common.AttributesBuilder;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -63,6 +66,7 @@
*/
final class OpenTelemetryMetricsModule {
private static final Logger logger = Logger.getLogger(OpenTelemetryMetricsModule.class.getName());
private static final String LOCALITY_LABEL_NAME = "grpc.lb.locality";
public static final ImmutableSet<String> DEFAULT_PER_CALL_METRICS_SET =
ImmutableSet.of(
"grpc.client.attempt.started",
Expand All @@ -81,11 +85,13 @@ final class OpenTelemetryMetricsModule {

private final OpenTelemetryMetricsResource resource;
private final Supplier<Stopwatch> stopwatchSupplier;
private final boolean localityEnabled;

OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
OpenTelemetryMetricsResource resource) {
OpenTelemetryMetricsResource resource, Collection<String> optionalLabels) {
this.resource = checkNotNull(resource, "resource");
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
this.localityEnabled = optionalLabels.contains(LOCALITY_LABEL_NAME);
}

/**
Expand Down Expand Up @@ -140,6 +146,7 @@ private static final class ClientTracer extends ClientStreamTracer {
final String fullMethodName;
volatile long outboundWireSize;
volatile long inboundWireSize;
volatile String locality;
long attemptNanos;
Code statusCode;

Expand Down Expand Up @@ -173,6 +180,13 @@ public void inboundWireSize(long bytes) {
}
}

@Override
public void addOptionalLabel(String key, String value) {
if (LOCALITY_LABEL_NAME.equals(key)) {
locality = value;
}
}

@Override
public void streamClosed(Status status) {
stopwatch.stop();
Expand All @@ -192,10 +206,18 @@ public void streamClosed(Status status) {
}

void recordFinishedAttempt() {
io.opentelemetry.api.common.Attributes attribute =
io.opentelemetry.api.common.Attributes.of(METHOD_KEY, fullMethodName,
TARGET_KEY, target,
STATUS_KEY, statusCode.toString());
AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
.put(METHOD_KEY, fullMethodName)
.put(TARGET_KEY, target)
.put(STATUS_KEY, statusCode.toString());
if (module.localityEnabled) {
String savedLocality = locality;
if (savedLocality == null) {
savedLocality = "unknown";
}
builder.put(LOCALITY_KEY, savedLocality);
}
io.opentelemetry.api.common.Attributes attribute = builder.build();

if (module.resource.clientAttemptDurationCounter() != null ) {
module.resource.clientAttemptDurationCounter()
Expand Down Expand Up @@ -315,7 +337,8 @@ void callEnded(Status status) {

void recordFinishedCall() {
if (attemptsPerCall.get() == 0) {
ClientTracer tracer = new ClientTracer(this, module, null, target, fullMethodName);
ClientTracer tracer =
new ClientTracer(this, module, null, target, fullMethodName);
tracer.attemptNanos = attemptStopwatch.elapsed(TimeUnit.NANOSECONDS);
tracer.statusCode = status.getCode();
tracer.recordFinishedAttempt();
Expand Down Expand Up @@ -478,8 +501,8 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
// which is true for all generated methods. Otherwise, programatically
// created methods result in high cardinality metrics.
final CallAttemptsTracerFactory tracerFactory = new CallAttemptsTracerFactory(
OpenTelemetryMetricsModule.this, target, recordMethodName(method.getFullMethodName(),
method.isSampledToLocalTracing()));
OpenTelemetryMetricsModule.this, target,
recordMethodName(method.getFullMethodName(), method.isSampledToLocalTracing()));
ClientCall<ReqT, RespT> call =
next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory));
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@ private OpenTelemetryModule(Builder builder) {
this.enableMetrics = ImmutableMap.copyOf(builder.enableMetrics);
this.disableDefault = builder.disableAll;
this.resource = createMetricInstruments(meter, enableMetrics, disableDefault);
this.openTelemetryMetricsModule = new OpenTelemetryMetricsModule(STOPWATCH_SUPPLIER, resource);
this.optionalLabels = ImmutableList.copyOf(builder.optionalLabels);
this.openTelemetryMetricsModule =
new OpenTelemetryMetricsModule(STOPWATCH_SUPPLIER, resource, optionalLabels);
this.sink = new OpenTelemetryMetricSink(meter, enableMetrics, disableDefault, optionalLabels);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ public final class OpenTelemetryConstants {

public static final AttributeKey<String> TARGET_KEY = AttributeKey.stringKey("grpc.target");

public static final AttributeKey<String> LOCALITY_KEY =
AttributeKey.stringKey("grpc.lb.locality");

private OpenTelemetryConstants() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.grpc.opentelemetry;

import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LOCALITY_KEY;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.TARGET_KEY;
Expand Down Expand Up @@ -52,6 +53,7 @@
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -158,8 +160,7 @@ public void setUp() throws Exception {
public void testClientInterceptors() {
OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter,
enabledMetricsMap, disableDefaultMetrics);
OpenTelemetryMetricsModule module =
new OpenTelemetryMetricsModule(fakeClock.getStopwatchSupplier(), resource);
OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource);
grpcServerRule.getServiceRegistry().addService(
ServerServiceDefinition.builder("package1.service2").addMethod(
method, new ServerCallHandler<String, String>() {
Expand Down Expand Up @@ -215,8 +216,7 @@ public void clientBasicMetrics() {
String target = "target:///";
OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter,
enabledMetricsMap, disableDefaultMetrics);
OpenTelemetryMetricsModule module =
new OpenTelemetryMetricsModule(fakeClock.getStopwatchSupplier(), resource);
OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource);
OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory =
new CallAttemptsTracerFactory(module, target, method.getFullMethodName());
Metadata headers = new Metadata();
Expand All @@ -243,6 +243,8 @@ public void clientBasicMetrics() {
.hasAttributes(attributes)
.hasValue(1))));

tracer.addOptionalLabel("grpc.lb.locality", "should-be-ignored");

fakeClock.forwardTime(30, TimeUnit.MILLISECONDS);
tracer.outboundHeaders();

Expand Down Expand Up @@ -353,8 +355,7 @@ public void recordAttemptMetrics() {
String target = "dns:///example.com";
OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter,
enabledMetricsMap, disableDefaultMetrics);
OpenTelemetryMetricsModule module =
new OpenTelemetryMetricsModule(fakeClock.getStopwatchSupplier(), resource);
OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource);
OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory =
new OpenTelemetryMetricsModule.CallAttemptsTracerFactory(module, target,
method.getFullMethodName());
Expand Down Expand Up @@ -779,8 +780,7 @@ public void clientStreamNeverCreatedStillRecordMetrics() {
String target = "dns:///foo.example.com";
OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter,
enabledMetricsMap, disableDefaultMetrics);
OpenTelemetryMetricsModule module =
new OpenTelemetryMetricsModule(fakeClock.getStopwatchSupplier(), resource);
OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource);
OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory =
new OpenTelemetryMetricsModule.CallAttemptsTracerFactory(module, target,
method.getFullMethodName());
Expand Down Expand Up @@ -880,11 +880,142 @@ public void clientStreamNeverCreatedStillRecordMetrics() {
}

@Test
public void serverBasicMetrics() {
public void clientLocalityMetrics_present() {
String target = "target:///";
OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter,
enabledMetricsMap, disableDefaultMetrics);
OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule(
fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"));
OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory =
new CallAttemptsTracerFactory(module, target, method.getFullMethodName());

ClientStreamTracer tracer =
callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata());
tracer.addOptionalLabel("grpc.lb.foo", "unimportant");
tracer.addOptionalLabel("grpc.lb.locality", "should-be-overwritten");
tracer.addOptionalLabel("grpc.lb.locality", "the-moon");
tracer.addOptionalLabel("grpc.lb.foo", "thats-no-moon");
tracer.streamClosed(Status.OK);
callAttemptsTracerFactory.callEnded(Status.OK);

io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of(
TARGET_KEY, target,
METHOD_KEY, method.getFullMethodName());

io.opentelemetry.api.common.Attributes clientAttributes
= io.opentelemetry.api.common.Attributes.of(
TARGET_KEY, target,
METHOD_KEY, method.getFullMethodName(),
STATUS_KEY, Status.Code.OK.toString());

io.opentelemetry.api.common.Attributes clientAttributesWithLocality
= clientAttributes.toBuilder()
.put(LOCALITY_KEY, "the-moon")
.build();

assertThat(openTelemetryTesting.getMetrics())
.satisfiesExactlyInAnyOrder(
metric ->
assertThat(metric)
.hasName(CLIENT_ATTEMPT_COUNT_INSTRUMENT_NAME)
.hasLongSumSatisfying(
longSum -> longSum.hasPointsSatisfying(
point -> point.hasAttributes(attributes))),
metric ->
assertThat(metric)
.hasName(CLIENT_ATTEMPT_DURATION_INSTRUMENT_NAME)
.hasHistogramSatisfying(
histogram -> histogram.hasPointsSatisfying(
point -> point.hasAttributes(clientAttributesWithLocality))),
metric ->
assertThat(metric)
.hasName(CLIENT_ATTEMPT_SENT_TOTAL_COMPRESSED_MESSAGE_SIZE)
.hasHistogramSatisfying(
histogram -> histogram.hasPointsSatisfying(
point -> point.hasAttributes(clientAttributesWithLocality))),
metric ->
assertThat(metric)
.hasName(CLIENT_ATTEMPT_RECV_TOTAL_COMPRESSED_MESSAGE_SIZE)
.hasHistogramSatisfying(
histogram -> histogram.hasPointsSatisfying(
point -> point.hasAttributes(clientAttributesWithLocality))),
metric ->
assertThat(metric)
.hasName(CLIENT_CALL_DURATION)
.hasHistogramSatisfying(
histogram -> histogram.hasPointsSatisfying(
point -> point.hasAttributes(clientAttributes))));
}

@Test
public void clientLocalityMetrics_missing() {
String target = "target:///";
OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter,
enabledMetricsMap, disableDefaultMetrics);
OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule(
fakeClock.getStopwatchSupplier(), resource);
fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"));
OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory =
new CallAttemptsTracerFactory(module, target, method.getFullMethodName());

ClientStreamTracer tracer =
callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata());
tracer.streamClosed(Status.OK);
callAttemptsTracerFactory.callEnded(Status.OK);

io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of(
TARGET_KEY, target,
METHOD_KEY, method.getFullMethodName());

io.opentelemetry.api.common.Attributes clientAttributes
= io.opentelemetry.api.common.Attributes.of(
TARGET_KEY, target,
METHOD_KEY, method.getFullMethodName(),
STATUS_KEY, Status.Code.OK.toString());

io.opentelemetry.api.common.Attributes clientAttributesWithLocality
= clientAttributes.toBuilder()
.put(LOCALITY_KEY, "unknown")
.build();

assertThat(openTelemetryTesting.getMetrics())
.satisfiesExactlyInAnyOrder(
metric ->
assertThat(metric)
.hasName(CLIENT_ATTEMPT_COUNT_INSTRUMENT_NAME)
.hasLongSumSatisfying(
longSum -> longSum.hasPointsSatisfying(
point -> point.hasAttributes(attributes))),
metric ->
assertThat(metric)
.hasName(CLIENT_ATTEMPT_DURATION_INSTRUMENT_NAME)
.hasHistogramSatisfying(
histogram -> histogram.hasPointsSatisfying(
point -> point.hasAttributes(clientAttributesWithLocality))),
metric ->
assertThat(metric)
.hasName(CLIENT_ATTEMPT_SENT_TOTAL_COMPRESSED_MESSAGE_SIZE)
.hasHistogramSatisfying(
histogram -> histogram.hasPointsSatisfying(
point -> point.hasAttributes(clientAttributesWithLocality))),
metric ->
assertThat(metric)
.hasName(CLIENT_ATTEMPT_RECV_TOTAL_COMPRESSED_MESSAGE_SIZE)
.hasHistogramSatisfying(
histogram -> histogram.hasPointsSatisfying(
point -> point.hasAttributes(clientAttributesWithLocality))),
metric ->
assertThat(metric)
.hasName(CLIENT_CALL_DURATION)
.hasHistogramSatisfying(
histogram -> histogram.hasPointsSatisfying(
point -> point.hasAttributes(clientAttributes))));
}

@Test
public void serverBasicMetrics() {
OpenTelemetryMetricsResource resource = OpenTelemetryModule.createMetricInstruments(testMeter,
enabledMetricsMap, disableDefaultMetrics);
OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource);
ServerStreamTracer.Factory tracerFactory = module.getServerTracerFactory();
ServerStreamTracer tracer =
tracerFactory.newServerStreamTracer(method.getFullMethodName(), new Metadata());
Expand Down Expand Up @@ -994,6 +1125,12 @@ public void serverBasicMetrics() {

}

private OpenTelemetryMetricsModule newOpenTelemetryMetricsModule(
OpenTelemetryMetricsResource resource) {
return new OpenTelemetryMetricsModule(
fakeClock.getStopwatchSupplier(), resource, Arrays.asList());
}

static class CallInfo<ReqT, RespT> extends ServerCallInfo<ReqT, RespT> {
private final MethodDescriptor<ReqT, RespT> methodDescriptor;
private final Attributes attributes;
Expand Down

0 comments on commit a639175

Please sign in to comment.