Skip to content

Commit

Permalink
service: make the orca MetricReport a top level experimental class (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
YifeiZhuang committed Jul 28, 2022
1 parent 25183ed commit 014c022
Show file tree
Hide file tree
Showing 14 changed files with 101 additions and 84 deletions.
Expand Up @@ -23,7 +23,7 @@
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.services.CallMetricRecorder;
import io.grpc.services.MetricReport;
import io.grpc.testing.integration.Messages.TestOrcaReport;
import io.grpc.util.ForwardingLoadBalancer;
import io.grpc.util.ForwardingLoadBalancerHelper;
Expand Down Expand Up @@ -87,7 +87,7 @@ public Subchannel createSubchannel(CreateSubchannelArgs args) {
Subchannel subchannel = super.createSubchannel(args);
OrcaOobUtil.setListener(subchannel, new OrcaOobUtil.OrcaOobReportListener() {
@Override
public void onLoadReport(CallMetricRecorder.CallMetricReport orcaLoadReport) {
public void onLoadReport(MetricReport orcaLoadReport) {
latestOobReport = fromCallMetricReport(orcaLoadReport);
}
},
Expand Down Expand Up @@ -133,7 +133,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
OrcaPerRequestUtil.getInstance().newOrcaClientStreamTracerFactory(
new OrcaPerRequestUtil.OrcaPerRequestReportListener() {
@Override
public void onLoadReport(CallMetricRecorder.CallMetricReport callMetricReport) {
public void onLoadReport(MetricReport callMetricReport) {
AtomicReference<TestOrcaReport> reportRef =
args.getCallOptions().getOption(ORCA_RPC_REPORT_KEY);
reportRef.set(fromCallMetricReport(callMetricReport));
Expand All @@ -143,8 +143,7 @@ public void onLoadReport(CallMetricRecorder.CallMetricReport callMetricReport) {
}
}

private static TestOrcaReport fromCallMetricReport(
CallMetricRecorder.CallMetricReport callMetricReport) {
private static TestOrcaReport fromCallMetricReport(MetricReport callMetricReport) {
return TestOrcaReport.newBuilder()
.setCpuUtilization(callMetricReport.getCpuUtilization())
.setMemoryUtilization(callMetricReport.getMemoryUtilization())
Expand Down
1 change: 1 addition & 0 deletions services/BUILD.bazel
Expand Up @@ -37,6 +37,7 @@ java_library(
srcs = [
"src/main/java/io/grpc/services/CallMetricRecorder.java",
"src/main/java/io/grpc/services/MetricRecorder.java",
"src/main/java/io/grpc/services/MetricReport.java",
],
deps = [
"//api",
Expand Down
42 changes: 2 additions & 40 deletions services/src/main/java/io/grpc/services/CallMetricRecorder.java
Expand Up @@ -16,8 +16,6 @@

package io.grpc.services;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.Context;
import io.grpc.ExperimentalApi;
Expand Down Expand Up @@ -46,42 +44,6 @@ public final class CallMetricRecorder {
private double memoryUtilizationMetric = 0;
private volatile boolean disabled;

public static final class CallMetricReport {
private double cpuUtilization;
private double memoryUtilization;
private Map<String, Double> requestCostMetrics;
private Map<String, Double> utilizationMetrics;

/**
* A gRPC object of orca load report. LB policies listening at per-rpc or oob orca load reports
* will be notified of the metrics data in this data format.
*/
CallMetricReport(double cpuUtilization, double memoryUtilization,
Map<String, Double> requestCostMetrics,
Map<String, Double> utilizationMetrics) {
this.cpuUtilization = cpuUtilization;
this.memoryUtilization = memoryUtilization;
this.requestCostMetrics = checkNotNull(requestCostMetrics, "requestCostMetrics");
this.utilizationMetrics = checkNotNull(utilizationMetrics, "utilizationMetrics");
}

public double getCpuUtilization() {
return cpuUtilization;
}

public double getMemoryUtilization() {
return memoryUtilization;
}

public Map<String, Double> getRequestCostMetrics() {
return requestCostMetrics;
}

public Map<String, Double> getUtilizationMetrics() {
return utilizationMetrics;
}
}

/**
* Returns the call metric recorder attached to the current {@link Context}. If there is none,
* returns a no-op recorder.
Expand Down Expand Up @@ -201,13 +163,13 @@ Map<String, Double> finalizeAndDump() {
*
* @return a per-request ORCA reports containing all saved metrics.
*/
CallMetricReport finalizeAndDump2() {
MetricReport finalizeAndDump2() {
Map<String, Double> savedRequestCostMetrics = finalizeAndDump();
Map<String, Double> savedUtilizationMetrics = utilizationMetrics.get();
if (savedUtilizationMetrics == null) {
savedUtilizationMetrics = Collections.emptyMap();
}
return new CallMetricReport(cpuUtilizationMetric,
return new MetricReport(cpuUtilizationMetric,
memoryUtilizationMetric, Collections.unmodifiableMap(savedRequestCostMetrics),
Collections.unmodifiableMap(savedUtilizationMetrics)
);
Expand Down
Expand Up @@ -41,15 +41,13 @@ public static Map<String, Double> finalizeAndDump(CallMetricRecorder recorder) {
return recorder.finalizeAndDump();
}

public static CallMetricRecorder.CallMetricReport finalizeAndDump2(CallMetricRecorder recorder) {
public static MetricReport finalizeAndDump2(CallMetricRecorder recorder) {
return recorder.finalizeAndDump2();
}

public static CallMetricRecorder.CallMetricReport createMetricReport(
double cpuUtilization, double memoryUtilization,
Map<String, Double> requestCostMetrics,
Map<String, Double> utilizationMetrics) {
return new CallMetricRecorder.CallMetricReport(cpuUtilization, memoryUtilization,
public static MetricReport createMetricReport(double cpuUtilization, double memoryUtilization,
Map<String, Double> requestCostMetrics, Map<String, Double> utilizationMetrics) {
return new MetricReport(cpuUtilization, memoryUtilization,
requestCostMetrics, utilizationMetrics);
}
}
Expand Up @@ -29,7 +29,7 @@ public final class InternalMetricRecorder {
private InternalMetricRecorder() {
}

public static CallMetricRecorder.CallMetricReport getMetricReport(MetricRecorder recorder) {
public static MetricReport getMetricReport(MetricRecorder recorder) {
return recorder.getMetricReport();
}
}
4 changes: 2 additions & 2 deletions services/src/main/java/io/grpc/services/MetricRecorder.java
Expand Up @@ -86,8 +86,8 @@ public void clearMemoryUtilizationMetric() {
memoryUtilization = 0;
}

CallMetricRecorder.CallMetricReport getMetricReport() {
return new CallMetricRecorder.CallMetricReport(cpuUtilization, memoryUtilization,
MetricReport getMetricReport() {
return new MetricReport(cpuUtilization, memoryUtilization,
Collections.emptyMap(), Collections.unmodifiableMap(metricsData));
}
}
59 changes: 59 additions & 0 deletions services/src/main/java/io/grpc/services/MetricReport.java
@@ -0,0 +1,59 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.services;

import static com.google.common.base.Preconditions.checkNotNull;

import io.grpc.ExperimentalApi;
import java.util.Map;

/**
* A gRPC object of orca load report. LB policies listening at per-rpc or oob orca load reports
* will be notified of the metrics data in this data format.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/9381")
public final class MetricReport {
private double cpuUtilization;
private double memoryUtilization;
private Map<String, Double> requestCostMetrics;
private Map<String, Double> utilizationMetrics;

MetricReport(double cpuUtilization, double memoryUtilization,
Map<String, Double> requestCostMetrics,
Map<String, Double> utilizationMetrics) {
this.cpuUtilization = cpuUtilization;
this.memoryUtilization = memoryUtilization;
this.requestCostMetrics = checkNotNull(requestCostMetrics, "requestCostMetrics");
this.utilizationMetrics = checkNotNull(utilizationMetrics, "utilizationMetrics");
}

public double getCpuUtilization() {
return cpuUtilization;
}

public double getMemoryUtilization() {
return memoryUtilization;
}

public Map<String, Double> getRequestCostMetrics() {
return requestCostMetrics;
}

public Map<String, Double> getUtilizationMetrics() {
return utilizationMetrics;
}
}
Expand Up @@ -47,7 +47,7 @@ public void dumpDumpsAllSavedMetricValues() {
recorder.recordCpuUtilizationMetric(0.1928);
recorder.recordMemoryUtilizationMetric(47.4);

CallMetricRecorder.CallMetricReport dump = recorder.finalizeAndDump2();
MetricReport dump = recorder.finalizeAndDump2();
Truth.assertThat(dump.getUtilizationMetrics())
.containsExactly("util1", 154353.423, "util2", 0.1367, "util3", 1437.34);
Truth.assertThat(dump.getRequestCostMetrics())
Expand Down Expand Up @@ -76,7 +76,7 @@ public void lastValueWinForMetricsWithSameName() {
recorder.recordMemoryUtilizationMetric(9384.0);
recorder.recordUtilizationMetric("util1", 84323.3);

CallMetricRecorder.CallMetricReport dump = recorder.finalizeAndDump2();
MetricReport dump = recorder.finalizeAndDump2();
Truth.assertThat(dump.getRequestCostMetrics())
.containsExactly("cost1", 4654.67, "cost2", 75.83);
Truth.assertThat(dump.getMemoryUtilization()).isEqualTo(9384.0);
Expand Down
Expand Up @@ -31,6 +31,7 @@
import io.grpc.protobuf.ProtoUtils;
import io.grpc.services.CallMetricRecorder;
import io.grpc.services.InternalCallMetricRecorder;
import io.grpc.services.MetricReport;

/**
* A {@link ServerInterceptor} that intercepts a {@link ServerCall} by running server-side RPC
Expand Down Expand Up @@ -89,8 +90,7 @@ public void close(Status status, Metadata trailers) {
next);
}

private static OrcaLoadReport fromInternalReport(
CallMetricRecorder.CallMetricReport internalReport) {
private static OrcaLoadReport fromInternalReport(MetricReport internalReport) {
return OrcaLoadReport.newBuilder()
.setCpuUtilization(internalReport.getCpuUtilization())
.setMemUtilization(internalReport.getMemoryUtilization())
Expand Down
9 changes: 4 additions & 5 deletions xds/src/main/java/io/grpc/xds/orca/OrcaOobUtil.java
Expand Up @@ -51,7 +51,7 @@
import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.internal.GrpcUtil;
import io.grpc.services.CallMetricRecorder;
import io.grpc.services.MetricReport;
import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.util.ForwardingSubchannel;
import java.util.HashMap;
Expand Down Expand Up @@ -169,9 +169,9 @@ public interface OrcaOobReportListener {
* <p>Note this callback will be invoked from the {@link SynchronizationContext} of the
* delegated helper, implementations should not block.
*
* @param report load report in the format of grpc {@link CallMetricRecorder.CallMetricReport}.
* @param report load report in the format of grpc {@link MetricReport}.
*/
void onLoadReport(CallMetricRecorder.CallMetricReport report);
void onLoadReport(MetricReport report);
}

static final Attributes.Key<SubchannelImpl> ORCA_REPORTING_STATE_KEY =
Expand Down Expand Up @@ -451,8 +451,7 @@ void handleResponse(OrcaLoadReport response) {
callHasResponded = true;
backoffPolicy = null;
subchannelLogger.log(ChannelLogLevel.DEBUG, "Received an ORCA report: {0}", response);
CallMetricRecorder.CallMetricReport metricReport =
OrcaPerRequestUtil.fromOrcaLoadReport(response);
MetricReport metricReport = OrcaPerRequestUtil.fromOrcaLoadReport(response);
for (OrcaOobReportListener listener : configs.keySet()) {
listener.onLoadReport(metricReport);
}
Expand Down
10 changes: 5 additions & 5 deletions xds/src/main/java/io/grpc/xds/orca/OrcaPerRequestUtil.java
Expand Up @@ -28,8 +28,8 @@
import io.grpc.Metadata;
import io.grpc.internal.ForwardingClientStreamTracer;
import io.grpc.protobuf.ProtoUtils;
import io.grpc.services.CallMetricRecorder;
import io.grpc.services.InternalCallMetricRecorder;
import io.grpc.services.MetricReport;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -182,9 +182,9 @@ public interface OrcaPerRequestReportListener {
* <p>Note this callback will be invoked from the network thread as the RPC finishes,
* implementations should not block.
*
* @param report load report in the format of grpc {@link CallMetricRecorder.CallMetricReport}.
* @param report load report in the format of grpc {@link MetricReport}.
*/
void onLoadReport(CallMetricRecorder.CallMetricReport report);
void onLoadReport(MetricReport report);
}

/**
Expand Down Expand Up @@ -252,7 +252,7 @@ public void inboundTrailers(Metadata trailers) {
}
}

static CallMetricRecorder.CallMetricReport fromOrcaLoadReport(OrcaLoadReport loadReport) {
static MetricReport fromOrcaLoadReport(OrcaLoadReport loadReport) {
return InternalCallMetricRecorder.createMetricReport(loadReport.getCpuUtilization(),
loadReport.getMemUtilization(), loadReport.getRequestCostMap(),
loadReport.getUtilizationMap());
Expand All @@ -271,7 +271,7 @@ void addListener(OrcaPerRequestReportListener listener) {
}

void onReport(OrcaLoadReport report) {
CallMetricRecorder.CallMetricReport metricReport = fromOrcaLoadReport(report);
MetricReport metricReport = fromOrcaLoadReport(report);
for (OrcaPerRequestReportListener listener : listeners) {
listener.onLoadReport(metricReport);
}
Expand Down
4 changes: 2 additions & 2 deletions xds/src/main/java/io/grpc/xds/orca/OrcaServiceImpl.java
Expand Up @@ -26,9 +26,9 @@
import io.grpc.BindableService;
import io.grpc.ServerServiceDefinition;
import io.grpc.SynchronizationContext;
import io.grpc.services.CallMetricRecorder;
import io.grpc.services.InternalMetricRecorder;
import io.grpc.services.MetricRecorder;
import io.grpc.services.MetricReport;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -146,7 +146,7 @@ public void run() {
}

private OrcaLoadReport generateMetricsReport() {
CallMetricRecorder.CallMetricReport internalReport =
MetricReport internalReport =
InternalMetricRecorder.getMetricReport(metricRecorder);
return OrcaLoadReport.newBuilder().setCpuUtilization(internalReport.getCpuUtilization())
.setMemUtilization(internalReport.getMemoryUtilization())
Expand Down
6 changes: 3 additions & 3 deletions xds/src/test/java/io/grpc/xds/orca/OrcaOobUtilTest.java
Expand Up @@ -62,7 +62,7 @@
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.FakeClock;
import io.grpc.services.CallMetricRecorder;
import io.grpc.services.MetricReport;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.util.ForwardingLoadBalancerHelper;
Expand Down Expand Up @@ -667,7 +667,7 @@ public void policiesReceiveSameReportIndependently() {
orcaServiceImps[0].calls.peek().responseObserver.onNext(report);
assertLog(subchannels[0].logs, "DEBUG: Received an ORCA report: " + report);
// Only parent helper's listener receives the report.
ArgumentCaptor<CallMetricRecorder.CallMetricReport> parentReportCaptor =
ArgumentCaptor<MetricReport> parentReportCaptor =
ArgumentCaptor.forClass(null);
verify(mockOrcaListener1).onLoadReport(parentReportCaptor.capture());
assertThat(OrcaPerRequestUtilTest.reportEqual(parentReportCaptor.getValue(),
Expand All @@ -679,7 +679,7 @@ public void policiesReceiveSameReportIndependently() {
orcaServiceImps[0].calls.peek().responseObserver.onNext(report);
assertLog(subchannels[0].logs, "DEBUG: Received an ORCA report: " + report);
// Both helper receives the same report instance.
ArgumentCaptor<CallMetricRecorder.CallMetricReport> childReportCaptor =
ArgumentCaptor<MetricReport> childReportCaptor =
ArgumentCaptor.forClass(null);
verify(mockOrcaListener1, times(2))
.onLoadReport(parentReportCaptor.capture());
Expand Down

0 comments on commit 014c022

Please sign in to comment.