diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/CustomBackendMetricsLoadBalancerProvider.java b/interop-testing/src/main/java/io/grpc/testing/integration/CustomBackendMetricsLoadBalancerProvider.java index 372403ed47f..7c88c7b1d73 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/CustomBackendMetricsLoadBalancerProvider.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/CustomBackendMetricsLoadBalancerProvider.java @@ -19,11 +19,11 @@ import static io.grpc.testing.integration.AbstractInteropTest.ORCA_OOB_REPORT_KEY; import static io.grpc.testing.integration.AbstractInteropTest.ORCA_RPC_REPORT_KEY; -import com.github.xds.data.orca.v3.OrcaLoadReport; import io.grpc.ConnectivityState; import io.grpc.LoadBalancer; import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerRegistry; +import io.grpc.services.CallMetricRecorder; import io.grpc.testing.integration.Messages.TestOrcaReport; import io.grpc.util.ForwardingLoadBalancer; import io.grpc.util.ForwardingLoadBalancerHelper; @@ -87,8 +87,8 @@ public Subchannel createSubchannel(CreateSubchannelArgs args) { Subchannel subchannel = super.createSubchannel(args); OrcaOobUtil.setListener(subchannel, new OrcaOobUtil.OrcaOobReportListener() { @Override - public void onLoadReport(OrcaLoadReport orcaLoadReport) { - latestOobReport = fromOrcaLoadReport(orcaLoadReport); + public void onLoadReport(CallMetricRecorder.CallMetricReport orcaLoadReport) { + latestOobReport = fromCallMetricReport(orcaLoadReport); } }, OrcaOobUtil.OrcaReportingConfig.newBuilder() @@ -133,22 +133,23 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { OrcaPerRequestUtil.getInstance().newOrcaClientStreamTracerFactory( new OrcaPerRequestUtil.OrcaPerRequestReportListener() { @Override - public void onLoadReport(OrcaLoadReport orcaLoadReport) { + public void onLoadReport(CallMetricRecorder.CallMetricReport callMetricReport) { AtomicReference reportRef = args.getCallOptions().getOption(ORCA_RPC_REPORT_KEY); - reportRef.set(fromOrcaLoadReport(orcaLoadReport)); + reportRef.set(fromCallMetricReport(callMetricReport)); } })); } } } - private static TestOrcaReport fromOrcaLoadReport(OrcaLoadReport orcaLoadReport) { + private static TestOrcaReport fromCallMetricReport( + CallMetricRecorder.CallMetricReport callMetricReport) { return TestOrcaReport.newBuilder() - .setCpuUtilization(orcaLoadReport.getCpuUtilization()) - .setMemoryUtilization(orcaLoadReport.getMemUtilization()) - .putAllRequestCost(orcaLoadReport.getRequestCostMap()) - .putAllUtilization(orcaLoadReport.getUtilizationMap()) + .setCpuUtilization(callMetricReport.getCpuUtilization()) + .setMemoryUtilization(callMetricReport.getMemoryUtilization()) + .putAllRequestCost(callMetricReport.getRequestCostMetrics()) + .putAllUtilization(callMetricReport.getUtilizationMetrics()) .build(); } } diff --git a/services/src/main/java/io/grpc/services/CallMetricRecorder.java b/services/src/main/java/io/grpc/services/CallMetricRecorder.java index da4283eba81..e87512e3002 100644 --- a/services/src/main/java/io/grpc/services/CallMetricRecorder.java +++ b/services/src/main/java/io/grpc/services/CallMetricRecorder.java @@ -53,7 +53,8 @@ public static final class CallMetricReport { private Map utilizationMetrics; /** - * Create a report for all backend metrics. + * A gRPC object of orca load report. LB policies listening at per-rpc or oob orca load reports + * will be notified of the metrics data in this data format. */ CallMetricReport(double cpuUtilization, double memoryUtilization, Map requestCostMetrics, diff --git a/services/src/main/java/io/grpc/services/InternalCallMetricRecorder.java b/services/src/main/java/io/grpc/services/InternalCallMetricRecorder.java index 5c6c79e71f2..01f6cc6417c 100644 --- a/services/src/main/java/io/grpc/services/InternalCallMetricRecorder.java +++ b/services/src/main/java/io/grpc/services/InternalCallMetricRecorder.java @@ -44,4 +44,12 @@ public static Map finalizeAndDump(CallMetricRecorder recorder) { public static CallMetricRecorder.CallMetricReport finalizeAndDump2(CallMetricRecorder recorder) { return recorder.finalizeAndDump2(); } + + public static CallMetricRecorder.CallMetricReport createMetricReport( + double cpuUtilization, double memoryUtilization, + Map requestCostMetrics, + Map utilizationMetrics) { + return new CallMetricRecorder.CallMetricReport(cpuUtilization, memoryUtilization, + requestCostMetrics, utilizationMetrics); + } } diff --git a/xds/src/main/java/io/grpc/xds/orca/OrcaOobUtil.java b/xds/src/main/java/io/grpc/xds/orca/OrcaOobUtil.java index 0e88c02cd15..2e6a2891569 100644 --- a/xds/src/main/java/io/grpc/xds/orca/OrcaOobUtil.java +++ b/xds/src/main/java/io/grpc/xds/orca/OrcaOobUtil.java @@ -51,6 +51,7 @@ import io.grpc.internal.BackoffPolicy; import io.grpc.internal.ExponentialBackoffPolicy; import io.grpc.internal.GrpcUtil; +import io.grpc.services.CallMetricRecorder; import io.grpc.util.ForwardingLoadBalancerHelper; import io.grpc.util.ForwardingSubchannel; import java.util.HashMap; @@ -168,9 +169,9 @@ public interface OrcaOobReportListener { *

Note this callback will be invoked from the {@link SynchronizationContext} of the * delegated helper, implementations should not block. * - * @param report load report in the format of ORCA protocol. + * @param report load report in the format of grpc {@link CallMetricRecorder.CallMetricReport}. */ - void onLoadReport(OrcaLoadReport report); + void onLoadReport(CallMetricRecorder.CallMetricReport report); } static final Attributes.Key ORCA_REPORTING_STATE_KEY = @@ -450,8 +451,10 @@ void handleResponse(OrcaLoadReport response) { callHasResponded = true; backoffPolicy = null; subchannelLogger.log(ChannelLogLevel.DEBUG, "Received an ORCA report: {0}", response); + CallMetricRecorder.CallMetricReport metricReport = + OrcaPerRequestUtil.fromOrcaLoadReport(response); for (OrcaOobReportListener listener : configs.keySet()) { - listener.onLoadReport(response); + listener.onLoadReport(metricReport); } call.request(1); } diff --git a/xds/src/main/java/io/grpc/xds/orca/OrcaPerRequestUtil.java b/xds/src/main/java/io/grpc/xds/orca/OrcaPerRequestUtil.java index 52f9f9f5f1b..4006e989545 100644 --- a/xds/src/main/java/io/grpc/xds/orca/OrcaPerRequestUtil.java +++ b/xds/src/main/java/io/grpc/xds/orca/OrcaPerRequestUtil.java @@ -28,6 +28,8 @@ import io.grpc.Metadata; import io.grpc.internal.ForwardingClientStreamTracer; import io.grpc.protobuf.ProtoUtils; +import io.grpc.services.CallMetricRecorder; +import io.grpc.services.InternalCallMetricRecorder; import java.util.ArrayList; import java.util.List; @@ -175,14 +177,14 @@ public abstract ClientStreamTracer.Factory newOrcaClientStreamTracerFactory( public interface OrcaPerRequestReportListener { /** - * Invoked when an per-request ORCA report is received. + * Invoked when a per-request ORCA report is received. * *

Note this callback will be invoked from the network thread as the RPC finishes, * implementations should not block. * - * @param report load report in the format of ORCA format. + * @param report load report in the format of grpc {@link CallMetricRecorder.CallMetricReport}. */ - void onLoadReport(OrcaLoadReport report); + void onLoadReport(CallMetricRecorder.CallMetricReport report); } /** @@ -250,6 +252,12 @@ public void inboundTrailers(Metadata trailers) { } } + static CallMetricRecorder.CallMetricReport fromOrcaLoadReport(OrcaLoadReport loadReport) { + return InternalCallMetricRecorder.createMetricReport(loadReport.getCpuUtilization(), + loadReport.getMemUtilization(), loadReport.getRequestCostMap(), + loadReport.getUtilizationMap()); + } + /** * A container class to hold registered {@link OrcaPerRequestReportListener}s and invoke all of * them when an {@link OrcaLoadReport} is received. @@ -263,8 +271,9 @@ void addListener(OrcaPerRequestReportListener listener) { } void onReport(OrcaLoadReport report) { + CallMetricRecorder.CallMetricReport metricReport = fromOrcaLoadReport(report); for (OrcaPerRequestReportListener listener : listeners) { - listener.onLoadReport(report); + listener.onLoadReport(metricReport); } } } diff --git a/xds/src/test/java/io/grpc/xds/orca/OrcaOobUtilTest.java b/xds/src/test/java/io/grpc/xds/orca/OrcaOobUtilTest.java index 770cb783b56..5a981123ae0 100644 --- a/xds/src/test/java/io/grpc/xds/orca/OrcaOobUtilTest.java +++ b/xds/src/test/java/io/grpc/xds/orca/OrcaOobUtilTest.java @@ -25,6 +25,7 @@ import static io.grpc.ConnectivityState.SHUTDOWN; import static org.junit.Assert.fail; import static org.mockito.AdditionalAnswers.delegatesTo; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.inOrder; @@ -61,6 +62,7 @@ import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.internal.BackoffPolicy; import io.grpc.internal.FakeClock; +import io.grpc.services.CallMetricRecorder; import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcCleanupRule; import io.grpc.util.ForwardingLoadBalancerHelper; @@ -285,7 +287,9 @@ public void singlePolicyTypicalWorkflow() { OrcaLoadReport report = OrcaLoadReport.getDefaultInstance(); serverCall.responseObserver.onNext(report); assertLog(subchannel.logs, "DEBUG: Received an ORCA report: " + report); - verify(mockOrcaListener0, times(i + 1)).onLoadReport(eq(report)); + verify(mockOrcaListener0, times(i + 1)).onLoadReport( + argThat(new OrcaPerRequestUtilTest.MetricsReportMatcher( + OrcaPerRequestUtil.fromOrcaLoadReport(report)))); } for (int i = 0; i < NUM_SUBCHANNELS; i++) { @@ -369,7 +373,9 @@ public void twoLevelPoliciesTypicalWorkflow() { OrcaLoadReport report = OrcaLoadReport.getDefaultInstance(); serverCall.responseObserver.onNext(report); assertLog(subchannel.logs, "DEBUG: Received an ORCA report: " + report); - verify(mockOrcaListener1, times(i + 1)).onLoadReport(eq(report)); + verify(mockOrcaListener1, times(i + 1)).onLoadReport( + argThat(new OrcaPerRequestUtilTest.MetricsReportMatcher( + OrcaPerRequestUtil.fromOrcaLoadReport(report)))); } for (int i = 0; i < NUM_SUBCHANNELS; i++) { @@ -425,8 +431,9 @@ public void orcReportingDisabledWhenServiceNotImplemented() { OrcaLoadReport report = OrcaLoadReport.getDefaultInstance(); serverCall.responseObserver.onNext(report); assertLog(subchannel.logs, "DEBUG: Received an ORCA report: " + report); - verify(mockOrcaListener0).onLoadReport(eq(report)); - + verify(mockOrcaListener0).onLoadReport( + argThat(new OrcaPerRequestUtilTest.MetricsReportMatcher( + OrcaPerRequestUtil.fromOrcaLoadReport(report)))); verifyNoInteractions(backoffPolicyProvider); } @@ -471,8 +478,9 @@ public void orcaReportingStreamClosedAndRetried() { OrcaLoadReport report = OrcaLoadReport.getDefaultInstance(); orcaServiceImp.calls.peek().responseObserver.onNext(report); assertLog(subchannel.logs, "DEBUG: Received an ORCA report: " + report); - inOrder.verify(mockOrcaListener0).onLoadReport(eq(report)); - + inOrder.verify(mockOrcaListener0).onLoadReport( + argThat(new OrcaPerRequestUtilTest.MetricsReportMatcher( + OrcaPerRequestUtil.fromOrcaLoadReport(report)))); // Server closes the ORCA reporting RPC after a response, will restart immediately. orcaServiceImp.calls.poll().responseObserver.onCompleted(); assertThat(subchannel.logs).containsExactly( @@ -659,9 +667,11 @@ public void policiesReceiveSameReportIndependently() { orcaServiceImps[0].calls.peek().responseObserver.onNext(report); assertLog(subchannels[0].logs, "DEBUG: Received an ORCA report: " + report); // Only parent helper's listener receives the report. - ArgumentCaptor parentReportCaptor = ArgumentCaptor.forClass(null); + ArgumentCaptor parentReportCaptor = + ArgumentCaptor.forClass(null); verify(mockOrcaListener1).onLoadReport(parentReportCaptor.capture()); - assertThat(parentReportCaptor.getValue()).isEqualTo(report); + assertThat(OrcaPerRequestUtilTest.reportEqual(parentReportCaptor.getValue(), + OrcaPerRequestUtil.fromOrcaLoadReport(report))).isTrue(); verifyNoMoreInteractions(mockOrcaListener2); // Now child helper also wants to receive reports. @@ -669,7 +679,8 @@ public void policiesReceiveSameReportIndependently() { orcaServiceImps[0].calls.peek().responseObserver.onNext(report); assertLog(subchannels[0].logs, "DEBUG: Received an ORCA report: " + report); // Both helper receives the same report instance. - ArgumentCaptor childReportCaptor = ArgumentCaptor.forClass(null); + ArgumentCaptor childReportCaptor = + ArgumentCaptor.forClass(null); verify(mockOrcaListener1, times(2)) .onLoadReport(parentReportCaptor.capture()); verify(mockOrcaListener2) diff --git a/xds/src/test/java/io/grpc/xds/orca/OrcaPerRequestUtilTest.java b/xds/src/test/java/io/grpc/xds/orca/OrcaPerRequestUtilTest.java index 76fbdd3e027..2b3de09b573 100644 --- a/xds/src/test/java/io/grpc/xds/orca/OrcaPerRequestUtilTest.java +++ b/xds/src/test/java/io/grpc/xds/orca/OrcaPerRequestUtilTest.java @@ -19,7 +19,7 @@ import static com.google.common.truth.Truth.assertThat; import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -28,8 +28,10 @@ import static org.mockito.Mockito.when; import com.github.xds.data.orca.v3.OrcaLoadReport; +import com.google.common.base.Objects; import io.grpc.ClientStreamTracer; import io.grpc.Metadata; +import io.grpc.services.CallMetricRecorder; import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaPerRequestReportListener; import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaReportingTracerFactory; import org.junit.Before; @@ -37,6 +39,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatcher; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -96,9 +99,33 @@ public void singlePolicyTypicalWorkflow() { OrcaReportingTracerFactory.ORCA_ENDPOINT_LOAD_METRICS_KEY, OrcaLoadReport.getDefaultInstance()); tracer.inboundTrailers(trailer); - ArgumentCaptor reportCaptor = ArgumentCaptor.forClass(null); + ArgumentCaptor reportCaptor = + ArgumentCaptor.forClass(null); verify(orcaListener1).onLoadReport(reportCaptor.capture()); - assertThat(reportCaptor.getValue()).isEqualTo(OrcaLoadReport.getDefaultInstance()); + assertThat(reportEqual(reportCaptor.getValue(), + OrcaPerRequestUtil.fromOrcaLoadReport(OrcaLoadReport.getDefaultInstance()))).isTrue(); + } + + static final class MetricsReportMatcher implements + ArgumentMatcher { + private CallMetricRecorder.CallMetricReport original; + + public MetricsReportMatcher(CallMetricRecorder.CallMetricReport report) { + this.original = report; + } + + @Override + public boolean matches(CallMetricRecorder.CallMetricReport argument) { + return reportEqual(original, argument); + } + } + + static boolean reportEqual(CallMetricRecorder.CallMetricReport a, + CallMetricRecorder.CallMetricReport b) { + return a.getCpuUtilization() == b.getCpuUtilization() + && a.getMemoryUtilization() == b.getMemoryUtilization() + && Objects.equal(a.getRequestCostMetrics(), b.getRequestCostMetrics()) + && Objects.equal(a.getUtilizationMetrics(), b.getUtilizationMetrics()); } /** @@ -136,11 +163,14 @@ public void twoLevelPoliciesTypicalWorkflow() { OrcaReportingTracerFactory.ORCA_ENDPOINT_LOAD_METRICS_KEY, OrcaLoadReport.getDefaultInstance()); childTracer.inboundTrailers(trailer); - ArgumentCaptor parentReportCap = ArgumentCaptor.forClass(null); - ArgumentCaptor childReportCap = ArgumentCaptor.forClass(null); + ArgumentCaptor parentReportCap = + ArgumentCaptor.forClass(null); + ArgumentCaptor childReportCap = + ArgumentCaptor.forClass(null); verify(orcaListener1).onLoadReport(parentReportCap.capture()); verify(orcaListener2).onLoadReport(childReportCap.capture()); - assertThat(parentReportCap.getValue()).isEqualTo(OrcaLoadReport.getDefaultInstance()); + assertThat(reportEqual(parentReportCap.getValue(), + OrcaPerRequestUtil.fromOrcaLoadReport(OrcaLoadReport.getDefaultInstance()))).isTrue(); assertThat(childReportCap.getValue()).isSameInstanceAs(parentReportCap.getValue()); } @@ -159,11 +189,12 @@ public void onlyParentPolicyReceivesReportsIfCreatesOwnTracer() { ClientStreamTracer parentTracer = parentFactory.newClientStreamTracer(STREAM_INFO, new Metadata()); Metadata trailer = new Metadata(); + OrcaLoadReport report = OrcaLoadReport.getDefaultInstance(); trailer.put( - OrcaReportingTracerFactory.ORCA_ENDPOINT_LOAD_METRICS_KEY, - OrcaLoadReport.getDefaultInstance()); + OrcaReportingTracerFactory.ORCA_ENDPOINT_LOAD_METRICS_KEY, report); parentTracer.inboundTrailers(trailer); - verify(orcaListener1).onLoadReport(eq(OrcaLoadReport.getDefaultInstance())); + verify(orcaListener1).onLoadReport( + argThat(new MetricsReportMatcher(OrcaPerRequestUtil.fromOrcaLoadReport(report)))); verifyNoInteractions(childFactory); verifyNoInteractions(orcaListener2); }