Skip to content

Commit

Permalink
Polish "Add Observation instrumentation for gRPC client and server" (m…
Browse files Browse the repository at this point in the history
  • Loading branch information
izeye committed Oct 17, 2022
1 parent 3e16a68 commit 320b38f
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class ObservationGrpcClientCallListener<RespT> extends SimpleForwardingClientCal

private final Scope scope;

public ObservationGrpcClientCallListener(ClientCall.Listener<RespT> delegate, Scope scope) {
ObservationGrpcClientCallListener(ClientCall.Listener<RespT> delegate, Scope scope) {
super(delegate);
this.scope = scope;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class ObservationGrpcServerCallListener<RespT> extends SimpleForwardingServerCal

private final Scope scope;

public ObservationGrpcServerCallListener(Listener<RespT> delegate, Scope scope) {
ObservationGrpcServerCallListener(Listener<RespT> delegate, Scope scope) {
super(delegate);
this.scope = scope;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
* </p>
* <pre>
* Server server = ServerBuilder.forPort(8080)
* .intercept(new ObservationGrpcServerInterceptor(meterRegistry))
* .intercept(new ObservationGrpcServerInterceptor(observationRegistry))
* .build();
* server.start()
* </pre> The instrumentation is based on the behavior of Spring Cloud Sleuth and Brave.
Expand Down Expand Up @@ -98,17 +98,15 @@ public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Scope scope = observation.start().openScope();
ObservationGrpcServerCall<ReqT, RespT> serverCall = new ObservationGrpcServerCall<>(call, scope);

Listener<ReqT> result;
try {
result = next.startCall(serverCall, headers);
Listener<ReqT> result = next.startCall(serverCall, headers);
return new ObservationGrpcServerCallListener<>(result, scope);
}
catch (Exception ex) {
scope.close();
observation.error(ex).stop();
throw ex;
}

return new ObservationGrpcServerCallListener<>(result, scope);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import io.micrometer.observation.Observation.Event;
import io.micrometer.observation.ObservationHandler;
import io.micrometer.observation.ObservationRegistry;
import io.micrometer.observation.ObservationRegistry.ObservationConfig;
import io.micrometer.observation.ObservationTextPublisher;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -83,11 +82,9 @@ void setUp() {

MeterRegistry meterRegistry = new SimpleMeterRegistry();
ObservationRegistry observationRegistry = ObservationRegistry.create();
ObservationConfig observationConfig = observationRegistry.observationConfig();
observationConfig.observationHandler(new ObservationTextPublisher());
observationConfig.observationHandler(new DefaultMeterObservationHandler(meterRegistry));
observationConfig.observationHandler(serverHandler);
observationConfig.observationHandler(clientHandler);
observationRegistry.observationConfig().observationHandler(new ObservationTextPublisher())
.observationHandler(new DefaultMeterObservationHandler(meterRegistry)).observationHandler(serverHandler)
.observationHandler(clientHandler);

this.serverInterceptor = new ObservationGrpcServerInterceptor(observationRegistry);
this.clientInterceptor = new ObservationGrpcClientInterceptor(observationRegistry);
Expand Down Expand Up @@ -178,7 +175,7 @@ void clientStreamingRpc() {

@Test
void serverStreamingRpc() {
// Use async stub since blocking stu cannot detect the server side completion
// Use async stub since blocking stub cannot detect the server side completion
SimpleServiceStub asyncStub = SimpleServiceGrpc.newStub(channel);

List<String> messages = new ArrayList<>();
Expand Down Expand Up @@ -220,7 +217,7 @@ void bidiStreamingRpc() {
StreamObserver<SimpleRequest> requestObserver = asyncStub.bidiStreamingRpc(responseObserver);

requestObserver.onNext(request1);
await().until(() -> messages.size() >= 2);
await().until(() -> messages.size() == 2);
assertThat(messages).containsExactly("Hello-1-A", "Hello-1-B");
messages.clear();

Expand All @@ -237,9 +234,8 @@ void bidiStreamingRpc() {
GrpcClientEvents.MESSAGE_RECEIVED, GrpcClientEvents.MESSAGE_RECEIVED);

requestObserver.onNext(request2);
await().until(() -> messages.size() >= 2);
await().until(() -> messages.size() == 2);
assertThat(messages).containsExactly("Hello-2-A", "Hello-2-B");
messages.clear();

assertThat(serverHandler.getContext().getStatusCode()).isNull();
assertThat(clientHandler.getContext().getStatusCode()).isNull();
Expand Down Expand Up @@ -389,7 +385,7 @@ public void onError(Throwable t) {

@Override
public void onCompleted() {
throw new RuntimeException("Should not successfully completed");
throw new RuntimeException("Should not be successfully completed");
}
};
}
Expand Down Expand Up @@ -421,7 +417,7 @@ void verifyClientContext(String serviceName, String methodName, String contextua
// GRPC service extending SimpleService and provides echo implementation.
static class EchoService extends SimpleServiceImplBase {

// echo the response message
// echo the request message
@Override
public void unaryRpc(SimpleRequest request, StreamObserver<SimpleResponse> responseObserver) {
SimpleResponse response = SimpleResponse.newBuilder().setResponseMessage(request.getRequestMessage())
Expand Down Expand Up @@ -464,7 +460,7 @@ public void serverStreamingRpc(SimpleRequest request, StreamObserver<SimpleRespo
responseObserver.onCompleted();
}

// returns two message per received message
// returns two messages per received message
@Override
public StreamObserver<SimpleRequest> bidiStreamingRpc(StreamObserver<SimpleResponse> responseObserver) {
return new StreamObserver<>() {
Expand Down Expand Up @@ -505,7 +501,7 @@ static class ContextAndEventHoldingObservationHandler<T extends Observation.Cont

private final Class<T> contextClass;

public ContextAndEventHoldingObservationHandler(Class<T> contextClass) {
ContextAndEventHoldingObservationHandler(Class<T> contextClass) {
this.contextClass = contextClass;
}

Expand All @@ -524,11 +520,11 @@ public void onEvent(Event event, T context) {
}

@Nullable
public T getContext() {
T getContext() {
return this.contextHolder.get();
}

public List<Event> getEvents() {
List<Event> getEvents() {
return this.events;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.protobuf.services.HealthStatusManager;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.grpc.ObservationGrpcClientInterceptor;
import io.micrometer.core.instrument.binder.grpc.ObservationGrpcServerInterceptor;
import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler;
Expand All @@ -43,12 +41,12 @@
*/
public class GrpcObservationSample {

public static void main(final String... args) throws IOException {
MeterRegistry meterRegistry = new SimpleMeterRegistry();
public static void main(String[] args) throws IOException {
SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry();

ObservationRegistry observationRegistry = ObservationRegistry.create();
observationRegistry.observationConfig().observationHandler(new ObservationTextPublisher());
observationRegistry.observationConfig().observationHandler(new DefaultMeterObservationHandler(meterRegistry));
observationRegistry.observationConfig().observationHandler(new ObservationTextPublisher())
.observationHandler(new DefaultMeterObservationHandler(meterRegistry));

HealthStatusManager service = new HealthStatusManager();

Expand All @@ -65,9 +63,7 @@ public static void main(final String... args) throws IOException {
HealthCheckResponse response = healthClient.check(request);

System.out.println("Check Status: " + response.getStatus());
for (Meter meter : meterRegistry.getMeters()) {
System.out.println(meter.getClass().getSimpleName() + "->" + meter.getId() + ":" + meter.measure());
}
System.out.println(meterRegistry.getMetersAsString());

channel.shutdownNow();
server.shutdownNow();
Expand Down

0 comments on commit 320b38f

Please sign in to comment.