forked from grpc/grpc-java
-
Notifications
You must be signed in to change notification settings - Fork 0
/
OrcaPerRequestUtil.java
280 lines (259 loc) · 10.8 KB
/
OrcaPerRequestUtil.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds.orca;
import static com.google.common.base.Preconditions.checkNotNull;
import com.github.xds.data.orca.v3.OrcaLoadReport;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.CallOptions;
import io.grpc.ClientStreamTracer;
import io.grpc.ClientStreamTracer.StreamInfo;
import io.grpc.ExperimentalApi;
import io.grpc.LoadBalancer;
import io.grpc.Metadata;
import io.grpc.internal.ForwardingClientStreamTracer;
import io.grpc.protobuf.ProtoUtils;
import io.grpc.services.CallMetricRecorder;
import io.grpc.services.InternalCallMetricRecorder;
import java.util.ArrayList;
import java.util.List;
/**
* Utility class that provides method for {@link LoadBalancer} to install listeners to receive
* per-request backend cost metrics in the format of Open Request Cost Aggregation (ORCA).
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/9128")
public abstract class OrcaPerRequestUtil {
private static final ClientStreamTracer NOOP_CLIENT_STREAM_TRACER = new ClientStreamTracer() {};
private static final ClientStreamTracer.Factory NOOP_CLIENT_STREAM_TRACER_FACTORY =
new ClientStreamTracer.Factory() {
@Override
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
return NOOP_CLIENT_STREAM_TRACER;
}
};
private static final OrcaPerRequestUtil DEFAULT_INSTANCE =
new OrcaPerRequestUtil() {
@Override
public ClientStreamTracer.Factory newOrcaClientStreamTracerFactory(
OrcaPerRequestReportListener listener) {
return newOrcaClientStreamTracerFactory(NOOP_CLIENT_STREAM_TRACER_FACTORY, listener);
}
@Override
public ClientStreamTracer.Factory newOrcaClientStreamTracerFactory(
ClientStreamTracer.Factory delegate, OrcaPerRequestReportListener listener) {
return new OrcaReportingTracerFactory(delegate, listener);
}
};
/**
* Gets an {@code OrcaPerRequestUtil} instance that provides actual implementation of
* {@link #newOrcaClientStreamTracerFactory}.
*/
public static OrcaPerRequestUtil getInstance() {
return DEFAULT_INSTANCE;
}
/**
* Creates a new {@link io.grpc.ClientStreamTracer.Factory} with provided {@link
* OrcaPerRequestReportListener} installed to receive callback when a per-request ORCA report is
* received.
*
* <p>Example usages for leaf level policy (e.g., WRR policy)
*
* <pre>
* {@code
* class WrrPicker extends SubchannelPicker {
*
* public PickResult pickSubchannel(PickSubchannelArgs args) {
* Subchannel subchannel = ... // WRR picking logic
* return PickResult.withSubchannel(
* subchannel,
* OrcaPerRequestReportUtil.getInstance().newOrcaClientStreamTracerFactory(listener));
* }
* }
* }
* </pre>
*
* @param listener contains the callback to be invoked when a per-request ORCA report is received.
*/
public abstract ClientStreamTracer.Factory newOrcaClientStreamTracerFactory(
OrcaPerRequestReportListener listener);
/**
* Creates a new {@link io.grpc.ClientStreamTracer.Factory} with provided {@link
* OrcaPerRequestReportListener} installed to receive callback when a per-request ORCA report is
* received.
*
* <p>Example usages:
*
* <ul>
* <li> Delegating policy (e.g., xDS)
* <pre>
* {@code
* class XdsPicker extends SubchannelPicker {
*
* public PickResult pickSubchannel(PickSubchannelArgs args) {
* SubchannelPicker perLocalityPicker = ... // locality picking logic
* Result result = perLocalityPicker.pickSubchannel(args);
* return PickResult.withSubchannel(
* result.getSubchannel(),
* OrcaPerRequestReportUtil.getInstance().newOrcaClientTracerFactory(
* result.getStreamTracerFactory(), listener));
*
* }
* }
* }
* </pre>
* </li>
* <li> Delegating policy with additional tracing logic
* <pre>
* {@code
* class WrappingPicker extends SubchannelPicker {
*
* public PickResult pickSubchannel(PickSubchannelArgs args) {
* Result result = delegate.pickSubchannel(args);
* return PickResult.withSubchannel(
* result.getSubchannel(),
* new ClientStreamTracer.Factory() {
* public ClientStreamTracer newClientStreamTracer(
* StreamInfo info, Metadata metadata) {
* ClientStreamTracer.Factory orcaTracerFactory =
* OrcaPerRequestReportUtil.getInstance().newOrcaClientStreamTracerFactory(
* result.getStreamTracerFactory(), listener);
*
* // Wrap the tracer from the delegate factory if you need to trace the
* // stream for your own.
* final ClientStreamTracer orcaTracer =
* orcaTracerFactory.newClientStreamTracer(info, metadata);
*
* return ForwardingClientStreamTracer() {
* protected ClientStreamTracer delegate() {
* return orcaTracer;
* }
*
* public void inboundMessage(int seqNo) {
* // Handle this event.
* ...
* }
* };
* }
* });
* }
* }
* }
* </pre>
* </li>
* </ul>
*
* @param delegate the delegate factory to produce other client stream tracing.
* @param listener contains the callback to be invoked when a per-request ORCA report is received.
*/
public abstract ClientStreamTracer.Factory newOrcaClientStreamTracerFactory(
ClientStreamTracer.Factory delegate, OrcaPerRequestReportListener listener);
/**
* The listener interface for receiving per-request ORCA reports from backends. The class that is
* interested in processing backend cost metrics implements this interface, and the object created
* with that class is registered with a component, using methods in {@link OrcaPerRequestUtil}.
* When an ORCA report is received, that object's {@code onLoadReport} method is invoked.
*/
public interface OrcaPerRequestReportListener {
/**
* Invoked when a per-request ORCA report is received.
*
* <p>Note this callback will be invoked from the network thread as the RPC finishes,
* implementations should not block.
*
* @param report load report in the format of grpc {@link CallMetricRecorder.CallMetricReport}.
*/
void onLoadReport(CallMetricRecorder.CallMetricReport report);
}
/**
* An {@link OrcaReportingTracerFactory} wraps a delegated {@link ClientStreamTracer.Factory} with
* additional functionality to produce {@link ClientStreamTracer} instances that extract
* per-request ORCA reports and push to registered listeners for calls they trace.
*/
@VisibleForTesting
static final class OrcaReportingTracerFactory extends
ClientStreamTracer.Factory {
@VisibleForTesting
static final Metadata.Key<OrcaLoadReport> ORCA_ENDPOINT_LOAD_METRICS_KEY =
Metadata.Key.of(
"endpoint-load-metrics-bin",
ProtoUtils.metadataMarshaller(OrcaLoadReport.getDefaultInstance()));
private static final CallOptions.Key<OrcaReportBroker> ORCA_REPORT_BROKER_KEY =
CallOptions.Key.create("internal-orca-report-broker");
private final ClientStreamTracer.Factory delegate;
private final OrcaPerRequestReportListener listener;
OrcaReportingTracerFactory(
ClientStreamTracer.Factory delegate, OrcaPerRequestReportListener listener) {
this.delegate = checkNotNull(delegate, "delegate");
this.listener = checkNotNull(listener, "listener");
}
@Override
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
OrcaReportBroker broker = info.getCallOptions().getOption(ORCA_REPORT_BROKER_KEY);
boolean augmented = false;
if (broker == null) {
broker = new OrcaReportBroker();
info =
info.toBuilder()
.setCallOptions(info.getCallOptions().withOption(ORCA_REPORT_BROKER_KEY, broker))
.build();
augmented = true;
}
broker.addListener(listener);
ClientStreamTracer tracer = delegate.newClientStreamTracer(info, headers);
if (augmented) {
final ClientStreamTracer currTracer = tracer;
final OrcaReportBroker currBroker = broker;
// The actual tracer that performs ORCA report deserialization.
tracer =
new ForwardingClientStreamTracer() {
@Override
protected ClientStreamTracer delegate() {
return currTracer;
}
@Override
public void inboundTrailers(Metadata trailers) {
OrcaLoadReport report = trailers.get(ORCA_ENDPOINT_LOAD_METRICS_KEY);
if (report != null) {
currBroker.onReport(report);
}
delegate().inboundTrailers(trailers);
}
};
}
return tracer;
}
}
static CallMetricRecorder.CallMetricReport fromOrcaLoadReport(OrcaLoadReport loadReport) {
return InternalCallMetricRecorder.createMetricReport(loadReport.getCpuUtilization(),
loadReport.getMemUtilization(), loadReport.getRequestCostMap(),
loadReport.getUtilizationMap());
}
/**
* A container class to hold registered {@link OrcaPerRequestReportListener}s and invoke all of
* them when an {@link OrcaLoadReport} is received.
*/
private static final class OrcaReportBroker {
private final List<OrcaPerRequestReportListener> listeners = new ArrayList<>();
void addListener(OrcaPerRequestReportListener listener) {
listeners.add(listener);
}
void onReport(OrcaLoadReport report) {
CallMetricRecorder.CallMetricReport metricReport = fromOrcaLoadReport(report);
for (OrcaPerRequestReportListener listener : listeners) {
listener.onLoadReport(metricReport);
}
}
}
}