Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: do not expose orca proto in ORCA api #9366

Merged
merged 2 commits into from Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -19,11 +19,11 @@
import static io.grpc.testing.integration.AbstractInteropTest.ORCA_OOB_REPORT_KEY;
import static io.grpc.testing.integration.AbstractInteropTest.ORCA_RPC_REPORT_KEY;

import com.github.xds.data.orca.v3.OrcaLoadReport;
import io.grpc.ConnectivityState;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.services.CallMetricRecorder;
import io.grpc.testing.integration.Messages.TestOrcaReport;
import io.grpc.util.ForwardingLoadBalancer;
import io.grpc.util.ForwardingLoadBalancerHelper;
Expand Down Expand Up @@ -87,8 +87,8 @@ public Subchannel createSubchannel(CreateSubchannelArgs args) {
Subchannel subchannel = super.createSubchannel(args);
OrcaOobUtil.setListener(subchannel, new OrcaOobUtil.OrcaOobReportListener() {
@Override
public void onLoadReport(OrcaLoadReport orcaLoadReport) {
latestOobReport = fromOrcaLoadReport(orcaLoadReport);
public void onLoadReport(CallMetricRecorder.CallMetricReport orcaLoadReport) {
latestOobReport = fromCallMetricReport(orcaLoadReport);
}
},
OrcaOobUtil.OrcaReportingConfig.newBuilder()
Expand Down Expand Up @@ -133,22 +133,23 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
OrcaPerRequestUtil.getInstance().newOrcaClientStreamTracerFactory(
new OrcaPerRequestUtil.OrcaPerRequestReportListener() {
@Override
public void onLoadReport(OrcaLoadReport orcaLoadReport) {
public void onLoadReport(CallMetricRecorder.CallMetricReport callMetricReport) {
AtomicReference<TestOrcaReport> reportRef =
args.getCallOptions().getOption(ORCA_RPC_REPORT_KEY);
reportRef.set(fromOrcaLoadReport(orcaLoadReport));
reportRef.set(fromCallMetricReport(callMetricReport));
}
}));
}
}
}

private static TestOrcaReport fromOrcaLoadReport(OrcaLoadReport orcaLoadReport) {
private static TestOrcaReport fromCallMetricReport(
CallMetricRecorder.CallMetricReport callMetricReport) {
return TestOrcaReport.newBuilder()
.setCpuUtilization(orcaLoadReport.getCpuUtilization())
.setMemoryUtilization(orcaLoadReport.getMemUtilization())
.putAllRequestCost(orcaLoadReport.getRequestCostMap())
.putAllUtilization(orcaLoadReport.getUtilizationMap())
.setCpuUtilization(callMetricReport.getCpuUtilization())
.setMemoryUtilization(callMetricReport.getMemoryUtilization())
.putAllRequestCost(callMetricReport.getRequestCostMetrics())
.putAllUtilization(callMetricReport.getUtilizationMetrics())
.build();
}
}
24 changes: 22 additions & 2 deletions services/src/main/java/io/grpc/services/CallMetricRecorder.java
Expand Up @@ -19,6 +19,7 @@
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import io.grpc.Context;
import io.grpc.ExperimentalApi;
import java.util.Collections;
Expand Down Expand Up @@ -53,9 +54,10 @@ public static final class CallMetricReport {
private Map<String, Double> utilizationMetrics;

/**
* Create a report for all backend metrics.
* A gRPC object of orca load report. LB policies listening at per-rpc or oob orca load reports
* will be notified of the metrics data in this data format.
*/
CallMetricReport(double cpuUtilization, double memoryUtilization,
public CallMetricReport(double cpuUtilization, double memoryUtilization,
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
Map<String, Double> requestCostMetrics,
Map<String, Double> utilizationMetrics) {
this.cpuUtilization = cpuUtilization;
Expand All @@ -79,6 +81,24 @@ public Map<String, Double> getRequestCostMetrics() {
public Map<String, Double> getUtilizationMetrics() {
return utilizationMetrics;
}

@Override
public int hashCode() {
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
return Objects.hashCode(cpuUtilization, memoryUtilization, requestCostMetrics,
utilizationMetrics);
}

@Override
public boolean equals(Object o) {
if (o instanceof CallMetricReport) {
CallMetricReport that = (CallMetricReport) o;
return cpuUtilization == that.cpuUtilization
&& memoryUtilization == that.memoryUtilization
&& Objects.equal(requestCostMetrics, that.requestCostMetrics)
&& Objects.equal(utilizationMetrics, that.utilizationMetrics);
}
return false;
}
}

/**
Expand Down
9 changes: 6 additions & 3 deletions xds/src/main/java/io/grpc/xds/orca/OrcaOobUtil.java
Expand Up @@ -51,6 +51,7 @@
import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.internal.GrpcUtil;
import io.grpc.services.CallMetricRecorder;
import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.util.ForwardingSubchannel;
import java.util.HashMap;
Expand Down Expand Up @@ -168,9 +169,9 @@ public interface OrcaOobReportListener {
* <p>Note this callback will be invoked from the {@link SynchronizationContext} of the
* delegated helper, implementations should not block.
*
* @param report load report in the format of ORCA protocol.
* @param report load report in the format of grpc {@link CallMetricRecorder.CallMetricReport}.
*/
void onLoadReport(OrcaLoadReport report);
void onLoadReport(CallMetricRecorder.CallMetricReport report);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do it as a follow-up, but it probably is appropriate to move CallMetricReport to a top-level class in services?

}

static final Attributes.Key<SubchannelImpl> ORCA_REPORTING_STATE_KEY =
Expand Down Expand Up @@ -450,8 +451,10 @@ void handleResponse(OrcaLoadReport response) {
callHasResponded = true;
backoffPolicy = null;
subchannelLogger.log(ChannelLogLevel.DEBUG, "Received an ORCA report: {0}", response);
CallMetricRecorder.CallMetricReport metricReport =
OrcaPerRequestUtil.fromOrcaLoadReport(response);
for (OrcaOobReportListener listener : configs.keySet()) {
listener.onLoadReport(response);
listener.onLoadReport(metricReport);
}
call.request(1);
}
Expand Down
16 changes: 12 additions & 4 deletions xds/src/main/java/io/grpc/xds/orca/OrcaPerRequestUtil.java
Expand Up @@ -28,6 +28,7 @@
import io.grpc.Metadata;
import io.grpc.internal.ForwardingClientStreamTracer;
import io.grpc.protobuf.ProtoUtils;
import io.grpc.services.CallMetricRecorder;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -175,14 +176,14 @@ public abstract ClientStreamTracer.Factory newOrcaClientStreamTracerFactory(
public interface OrcaPerRequestReportListener {

/**
* Invoked when an per-request ORCA report is received.
* Invoked when a per-request ORCA report is received.
*
* <p>Note this callback will be invoked from the network thread as the RPC finishes,
* implementations should not block.
*
* @param report load report in the format of ORCA format.
* @param report load report in the format of grpc {@link CallMetricRecorder.CallMetricReport}.
*/
void onLoadReport(OrcaLoadReport report);
void onLoadReport(CallMetricRecorder.CallMetricReport report);
}

/**
Expand Down Expand Up @@ -250,6 +251,12 @@ public void inboundTrailers(Metadata trailers) {
}
}

static CallMetricRecorder.CallMetricReport fromOrcaLoadReport(OrcaLoadReport loadReport) {
return new CallMetricRecorder.CallMetricReport(loadReport.getCpuUtilization(),
loadReport.getMemUtilization(), loadReport.getRequestCostMap(),
loadReport.getUtilizationMap());
}

/**
* A container class to hold registered {@link OrcaPerRequestReportListener}s and invoke all of
* them when an {@link OrcaLoadReport} is received.
Expand All @@ -263,8 +270,9 @@ void addListener(OrcaPerRequestReportListener listener) {
}

void onReport(OrcaLoadReport report) {
CallMetricRecorder.CallMetricReport metricReport = fromOrcaLoadReport(report);
for (OrcaPerRequestReportListener listener : listeners) {
listener.onLoadReport(report);
listener.onLoadReport(metricReport);
}
}
}
Expand Down
22 changes: 15 additions & 7 deletions xds/src/test/java/io/grpc/xds/orca/OrcaOobUtilTest.java
Expand Up @@ -61,6 +61,7 @@
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.FakeClock;
import io.grpc.services.CallMetricRecorder;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.util.ForwardingLoadBalancerHelper;
Expand Down Expand Up @@ -285,7 +286,8 @@ public void singlePolicyTypicalWorkflow() {
OrcaLoadReport report = OrcaLoadReport.getDefaultInstance();
serverCall.responseObserver.onNext(report);
assertLog(subchannel.logs, "DEBUG: Received an ORCA report: " + report);
verify(mockOrcaListener0, times(i + 1)).onLoadReport(eq(report));
verify(mockOrcaListener0, times(i + 1)).onLoadReport(eq(
OrcaPerRequestUtil.fromOrcaLoadReport(report)));
}

for (int i = 0; i < NUM_SUBCHANNELS; i++) {
Expand Down Expand Up @@ -369,7 +371,8 @@ public void twoLevelPoliciesTypicalWorkflow() {
OrcaLoadReport report = OrcaLoadReport.getDefaultInstance();
serverCall.responseObserver.onNext(report);
assertLog(subchannel.logs, "DEBUG: Received an ORCA report: " + report);
verify(mockOrcaListener1, times(i + 1)).onLoadReport(eq(report));
verify(mockOrcaListener1, times(i + 1)).onLoadReport(
eq(OrcaPerRequestUtil.fromOrcaLoadReport(report)));
}

for (int i = 0; i < NUM_SUBCHANNELS; i++) {
Expand Down Expand Up @@ -425,7 +428,8 @@ public void orcReportingDisabledWhenServiceNotImplemented() {
OrcaLoadReport report = OrcaLoadReport.getDefaultInstance();
serverCall.responseObserver.onNext(report);
assertLog(subchannel.logs, "DEBUG: Received an ORCA report: " + report);
verify(mockOrcaListener0).onLoadReport(eq(report));
verify(mockOrcaListener0).onLoadReport(
eq(OrcaPerRequestUtil.fromOrcaLoadReport(report)));

verifyNoInteractions(backoffPolicyProvider);
}
Expand Down Expand Up @@ -471,7 +475,8 @@ public void orcaReportingStreamClosedAndRetried() {
OrcaLoadReport report = OrcaLoadReport.getDefaultInstance();
orcaServiceImp.calls.peek().responseObserver.onNext(report);
assertLog(subchannel.logs, "DEBUG: Received an ORCA report: " + report);
inOrder.verify(mockOrcaListener0).onLoadReport(eq(report));
inOrder.verify(mockOrcaListener0).onLoadReport(
eq(OrcaPerRequestUtil.fromOrcaLoadReport(report)));

// Server closes the ORCA reporting RPC after a response, will restart immediately.
orcaServiceImp.calls.poll().responseObserver.onCompleted();
Expand Down Expand Up @@ -659,17 +664,20 @@ public void policiesReceiveSameReportIndependently() {
orcaServiceImps[0].calls.peek().responseObserver.onNext(report);
assertLog(subchannels[0].logs, "DEBUG: Received an ORCA report: " + report);
// Only parent helper's listener receives the report.
ArgumentCaptor<OrcaLoadReport> parentReportCaptor = ArgumentCaptor.forClass(null);
ArgumentCaptor<CallMetricRecorder.CallMetricReport> parentReportCaptor =
ArgumentCaptor.forClass(null);
verify(mockOrcaListener1).onLoadReport(parentReportCaptor.capture());
assertThat(parentReportCaptor.getValue()).isEqualTo(report);
assertThat(parentReportCaptor.getValue()).isEqualTo(
OrcaPerRequestUtil.fromOrcaLoadReport(report));
verifyNoMoreInteractions(mockOrcaListener2);

// Now child helper also wants to receive reports.
OrcaOobUtil.setListener(childSubchannel, mockOrcaListener2, SHORT_INTERVAL_CONFIG);
orcaServiceImps[0].calls.peek().responseObserver.onNext(report);
assertLog(subchannels[0].logs, "DEBUG: Received an ORCA report: " + report);
// Both helper receives the same report instance.
ArgumentCaptor<OrcaLoadReport> childReportCaptor = ArgumentCaptor.forClass(null);
ArgumentCaptor<CallMetricRecorder.CallMetricReport> childReportCaptor =
ArgumentCaptor.forClass(null);
verify(mockOrcaListener1, times(2))
.onLoadReport(parentReportCaptor.capture());
verify(mockOrcaListener2)
Expand Down
19 changes: 13 additions & 6 deletions xds/src/test/java/io/grpc/xds/orca/OrcaPerRequestUtilTest.java
Expand Up @@ -30,6 +30,7 @@
import com.github.xds.data.orca.v3.OrcaLoadReport;
import io.grpc.ClientStreamTracer;
import io.grpc.Metadata;
import io.grpc.services.CallMetricRecorder;
import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaPerRequestReportListener;
import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaReportingTracerFactory;
import org.junit.Before;
Expand Down Expand Up @@ -96,9 +97,11 @@ public void singlePolicyTypicalWorkflow() {
OrcaReportingTracerFactory.ORCA_ENDPOINT_LOAD_METRICS_KEY,
OrcaLoadReport.getDefaultInstance());
tracer.inboundTrailers(trailer);
ArgumentCaptor<OrcaLoadReport> reportCaptor = ArgumentCaptor.forClass(null);
ArgumentCaptor<CallMetricRecorder.CallMetricReport> reportCaptor =
ArgumentCaptor.forClass(null);
verify(orcaListener1).onLoadReport(reportCaptor.capture());
assertThat(reportCaptor.getValue()).isEqualTo(OrcaLoadReport.getDefaultInstance());
assertThat(reportCaptor.getValue()).isEqualTo(
OrcaPerRequestUtil.fromOrcaLoadReport(OrcaLoadReport.getDefaultInstance()));
}

/**
Expand Down Expand Up @@ -136,11 +139,14 @@ public void twoLevelPoliciesTypicalWorkflow() {
OrcaReportingTracerFactory.ORCA_ENDPOINT_LOAD_METRICS_KEY,
OrcaLoadReport.getDefaultInstance());
childTracer.inboundTrailers(trailer);
ArgumentCaptor<OrcaLoadReport> parentReportCap = ArgumentCaptor.forClass(null);
ArgumentCaptor<OrcaLoadReport> childReportCap = ArgumentCaptor.forClass(null);
ArgumentCaptor<CallMetricRecorder.CallMetricReport> parentReportCap =
ArgumentCaptor.forClass(null);
ArgumentCaptor<CallMetricRecorder.CallMetricReport> childReportCap =
ArgumentCaptor.forClass(null);
verify(orcaListener1).onLoadReport(parentReportCap.capture());
verify(orcaListener2).onLoadReport(childReportCap.capture());
assertThat(parentReportCap.getValue()).isEqualTo(OrcaLoadReport.getDefaultInstance());
assertThat(parentReportCap.getValue()).isEqualTo(
OrcaPerRequestUtil.fromOrcaLoadReport(OrcaLoadReport.getDefaultInstance()));
assertThat(childReportCap.getValue()).isSameInstanceAs(parentReportCap.getValue());
}

Expand All @@ -163,7 +169,8 @@ public void onlyParentPolicyReceivesReportsIfCreatesOwnTracer() {
OrcaReportingTracerFactory.ORCA_ENDPOINT_LOAD_METRICS_KEY,
OrcaLoadReport.getDefaultInstance());
parentTracer.inboundTrailers(trailer);
verify(orcaListener1).onLoadReport(eq(OrcaLoadReport.getDefaultInstance()));
verify(orcaListener1).onLoadReport(eq(
OrcaPerRequestUtil.fromOrcaLoadReport(OrcaLoadReport.getDefaultInstance())));
verifyNoInteractions(childFactory);
verifyNoInteractions(orcaListener2);
}
Expand Down