From 036b40e429e4115865f9bf1f77731f9c790c4d2a Mon Sep 17 00:00:00 2001 From: Tadaya Tsuyukubo Date: Mon, 3 Oct 2022 13:38:05 -0700 Subject: [PATCH] Observation instrumentation for gRPC Add Observation instrumentation for gRPC client and server. Closes #3427 --- dependencies.gradle | 1 + micrometer-core/build.gradle | 4 + ...efaultGrpcClientObservationConvention.java | 56 ++ ...efaultGrpcServerObservationConvention.java | 56 ++ .../grpc/GrpcClientObservationContext.java | 91 +++ .../grpc/GrpcClientObservationConvention.java | 34 ++ .../grpc/GrpcObservationDocumentation.java | 128 +++++ .../grpc/GrpcServerObservationContext.java | 91 +++ .../grpc/GrpcServerObservationConvention.java | 34 ++ .../grpc/ObservationGrpcClientCall.java | 77 +++ .../ObservationGrpcClientCallListener.java | 62 ++ .../ObservationGrpcClientInterceptor.java | 105 ++++ .../grpc/ObservationGrpcServerCall.java | 64 +++ .../ObservationGrpcServerCallListener.java | 61 ++ .../ObservationGrpcServerInterceptor.java | 122 ++++ .../instrument/binder/grpc/package-info.java | 3 + .../binder/grpc/GrpcObservationTest.java | 537 ++++++++++++++++++ .../core/samples/GrpcObservationSample.java | 76 +++ 18 files changed, 1602 insertions(+) create mode 100644 micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/DefaultGrpcClientObservationConvention.java create mode 100644 micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/DefaultGrpcServerObservationConvention.java create mode 100644 micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/GrpcClientObservationContext.java create mode 100644 micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/GrpcClientObservationConvention.java create mode 100644 micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/GrpcObservationDocumentation.java create mode 100644 micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/GrpcServerObservationContext.java create mode 100644 micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/GrpcServerObservationConvention.java create mode 100644 micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/ObservationGrpcClientCall.java create mode 100644 micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/ObservationGrpcClientCallListener.java create mode 100644 micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/ObservationGrpcClientInterceptor.java create mode 100644 micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/ObservationGrpcServerCall.java create mode 100644 micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/ObservationGrpcServerCallListener.java create mode 100644 micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/ObservationGrpcServerInterceptor.java create mode 100644 micrometer-core/src/test/java/io/micrometer/core/instrument/binder/grpc/GrpcObservationTest.java create mode 100644 samples/micrometer-samples-core/src/main/java/io/micrometer/core/samples/GrpcObservationSample.java diff --git a/dependencies.gradle b/dependencies.gradle index dfae8111c0..0c94a3044f 100644 --- a/dependencies.gradle +++ b/dependencies.gradle @@ -36,6 +36,7 @@ def VERSIONS = [ 'io.grpc:grpc-services:latest.release', 'io.grpc:grpc-stubs:latest.release', 'io.grpc:grpc-alts:latest.release', + 'io.grpc:grpc-testing-proto:latest.release', 'info.ganglia.gmetric4j:gmetric4j:latest.release', 'io.prometheus:simpleclient_common:latest.release', 'io.prometheus:simpleclient_pushgateway:latest.release', diff --git a/micrometer-core/build.gradle b/micrometer-core/build.gradle index fcf9749afb..e6eab6bd0a 100644 --- a/micrometer-core/build.gradle +++ b/micrometer-core/build.gradle @@ -157,6 +157,10 @@ dependencies { } testImplementation("org.apache.maven.resolver:maven-resolver-connector-basic:latest.release") testImplementation("org.springframework:spring-core:latest.release") + + // gRPC + testImplementation("io.grpc:grpc-core") + testImplementation("io.grpc:grpc-testing-proto") } task shenandoahTest(type: Test) { diff --git a/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/DefaultGrpcClientObservationConvention.java b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/DefaultGrpcClientObservationConvention.java new file mode 100644 index 0000000000..99e15375b8 --- /dev/null +++ b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/DefaultGrpcClientObservationConvention.java @@ -0,0 +1,56 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micrometer.core.instrument.binder.grpc; + +import io.micrometer.common.KeyValue; +import io.micrometer.common.KeyValues; +import io.micrometer.core.instrument.binder.grpc.GrpcObservationDocumentation.LowCardinalityKeyNames; + +import java.util.ArrayList; +import java.util.List; + +/** + * Default convention for gRPC client. This class defines how to extract values from + * {@link GrpcClientObservationContext}. + * + * @author Tadaya Tsuyukubo + * @since 1.10.0 + */ +public class DefaultGrpcClientObservationConvention implements GrpcClientObservationConvention { + + @Override + public String getName() { + return "grpc.client"; + } + + @Override + public String getContextualName(GrpcClientObservationContext context) { + return context.getFullMethodName(); + } + + @Override + public KeyValues getLowCardinalityKeyValues(GrpcClientObservationContext context) { + List keyValues = new ArrayList<>(); + keyValues.add(LowCardinalityKeyNames.METHOD.withValue(context.getMethodName())); + keyValues.add(LowCardinalityKeyNames.SERVICE.withValue(context.getServiceName())); + keyValues.add(LowCardinalityKeyNames.METHOD_TYPE.withValue(context.getMethodType().name())); + if (context.getStatusCode() != null) { + keyValues.add(LowCardinalityKeyNames.STATUS_CODE.withValue(context.getStatusCode().name())); + } + return KeyValues.of(keyValues); + } + +} diff --git a/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/DefaultGrpcServerObservationConvention.java b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/DefaultGrpcServerObservationConvention.java new file mode 100644 index 0000000000..ba8d9bee3a --- /dev/null +++ b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/DefaultGrpcServerObservationConvention.java @@ -0,0 +1,56 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micrometer.core.instrument.binder.grpc; + +import io.micrometer.common.KeyValue; +import io.micrometer.common.KeyValues; +import io.micrometer.core.instrument.binder.grpc.GrpcObservationDocumentation.LowCardinalityKeyNames; + +import java.util.ArrayList; +import java.util.List; + +/** + * Default convention for gRPC server. This class defines how to extract values from + * {@link GrpcServerObservationContext}. + * + * @author Tadaya Tsuyukubo + * @since 1.10.0 + */ +public class DefaultGrpcServerObservationConvention implements GrpcServerObservationConvention { + + @Override + public String getName() { + return "grpc.server"; + } + + @Override + public String getContextualName(GrpcServerObservationContext context) { + return context.getFullMethodName(); + } + + @Override + public KeyValues getLowCardinalityKeyValues(GrpcServerObservationContext context) { + List keyValues = new ArrayList<>(); + keyValues.add(LowCardinalityKeyNames.METHOD.withValue(context.getMethodName())); + keyValues.add(LowCardinalityKeyNames.SERVICE.withValue(context.getServiceName())); + keyValues.add(LowCardinalityKeyNames.METHOD_TYPE.withValue(context.getMethodType().name())); + if (context.getStatusCode() != null) { + keyValues.add(LowCardinalityKeyNames.STATUS_CODE.withValue(context.getStatusCode().name())); + } + return KeyValues.of(keyValues); + } + +} diff --git a/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/GrpcClientObservationContext.java b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/GrpcClientObservationContext.java new file mode 100644 index 0000000000..2f59fce2cf --- /dev/null +++ b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/GrpcClientObservationContext.java @@ -0,0 +1,91 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.micrometer.core.instrument.binder.grpc; + +import io.grpc.Metadata; +import io.grpc.MethodDescriptor.MethodType; +import io.grpc.Status.Code; +import io.micrometer.common.lang.Nullable; +import io.micrometer.observation.Observation; +import io.micrometer.observation.transport.Propagator.Setter; +import io.micrometer.observation.transport.RequestReplySenderContext; + +/** + * {@link Observation.Context} for gRPC client. + * + * @author Tadaya Tsuyukubo + * @since 1.10.0 + */ +public class GrpcClientObservationContext extends RequestReplySenderContext { + + private String serviceName; + + private String methodName; + + private String fullMethodName; + + private MethodType methodType; + + @Nullable + private Code statusCode; + + public GrpcClientObservationContext(Setter setter) { + super(setter); + } + + public String getServiceName() { + return this.serviceName; + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } + + public String getMethodName() { + return this.methodName; + } + + public void setMethodName(String methodName) { + this.methodName = methodName; + } + + public String getFullMethodName() { + return this.fullMethodName; + } + + public void setFullMethodName(String fullMethodName) { + this.fullMethodName = fullMethodName; + } + + public MethodType getMethodType() { + return this.methodType; + } + + public void setMethodType(MethodType methodType) { + this.methodType = methodType; + } + + @Nullable + public Code getStatusCode() { + return this.statusCode; + } + + public void setStatusCode(Code statusCode) { + this.statusCode = statusCode; + } + +} diff --git a/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/GrpcClientObservationConvention.java b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/GrpcClientObservationConvention.java new file mode 100644 index 0000000000..350b0ec12c --- /dev/null +++ b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/GrpcClientObservationConvention.java @@ -0,0 +1,34 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micrometer.core.instrument.binder.grpc; + +import io.micrometer.observation.Observation.Context; +import io.micrometer.observation.ObservationConvention; + +/** + * {@link ObservationConvention} for gRPC client. + * + * @author Tadaya Tsuyukubo + * @since 1.10.0 + */ +public interface GrpcClientObservationConvention extends ObservationConvention { + + @Override + default boolean supportsContext(Context context) { + return context instanceof GrpcClientObservationContext; + } + +} diff --git a/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/GrpcObservationDocumentation.java b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/GrpcObservationDocumentation.java new file mode 100644 index 0000000000..8080d40ec8 --- /dev/null +++ b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/GrpcObservationDocumentation.java @@ -0,0 +1,128 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micrometer.core.instrument.binder.grpc; + +import io.micrometer.common.docs.KeyName; +import io.micrometer.observation.Observation.Context; +import io.micrometer.observation.Observation.Event; +import io.micrometer.observation.ObservationConvention; +import io.micrometer.observation.docs.ObservationDocumentation; + +/** + * {@link ObservationDocumentation} for gRPC. + * + * @author Tadaya Tsuyukubo + * @since 1.10.0 + */ +public enum GrpcObservationDocumentation implements ObservationDocumentation { + + CLIENT { + @Override + public Class> getDefaultConvention() { + return GrpcClientObservationConvention.class; + } + + @Override + public KeyName[] getLowCardinalityKeyNames() { + return LowCardinalityKeyNames.values(); + } + }, + SERVER { + @Override + public Class> getDefaultConvention() { + return GrpcServerObservationConvention.class; + } + + @Override + public KeyName[] getLowCardinalityKeyNames() { + return LowCardinalityKeyNames.values(); + } + }; + + public enum LowCardinalityKeyNames implements KeyName { + + METHOD { + @Override + public String asString() { + return "rpc.method"; + } + }, + METHOD_TYPE { + @Override + public String asString() { + return "rpc.type"; + } + }, + SERVICE { + @Override + public String asString() { + return "rpc.service"; + } + }, + ERROR_CODE { + @Override + public String asString() { + return "rpc.error_code"; + } + }, + STATUS_CODE { + @Override + public String asString() { + return "grpc.status_code"; + } + } + + } + + public enum GrpcClientEvents implements Event { + + MESSAGE_SENT { + @Override + public String getName() { + return "sent"; + } + + }, + MESSAGE_RECEIVED { + @Override + public String getName() { + return "received"; + } + + } + + } + + public enum GrpcServerEvents implements Event { + + MESSAGE_RECEIVED { + @Override + public String getName() { + return "received"; + } + + }, + MESSAGE_SENT { + @Override + public String getName() { + return "sent"; + } + + } + + } + +} diff --git a/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/GrpcServerObservationContext.java b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/GrpcServerObservationContext.java new file mode 100644 index 0000000000..a6a2ce3e9b --- /dev/null +++ b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/GrpcServerObservationContext.java @@ -0,0 +1,91 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.micrometer.core.instrument.binder.grpc; + +import io.grpc.Metadata; +import io.grpc.MethodDescriptor.MethodType; +import io.grpc.Status.Code; +import io.micrometer.common.lang.Nullable; +import io.micrometer.observation.Observation; +import io.micrometer.observation.transport.Propagator.Getter; +import io.micrometer.observation.transport.RequestReplyReceiverContext; + +/** + * {@link Observation.Context} for gRPC server. + * + * @author Tadaya Tsuyukubo + * @since 1.10.0 + */ +public class GrpcServerObservationContext extends RequestReplyReceiverContext { + + private String serviceName; + + private String methodName; + + private String fullMethodName; + + private MethodType methodType; + + @Nullable + private Code statusCode; + + public GrpcServerObservationContext(Getter getter) { + super(getter); + } + + public String getServiceName() { + return this.serviceName; + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } + + public String getMethodName() { + return this.methodName; + } + + public void setMethodName(String methodName) { + this.methodName = methodName; + } + + public String getFullMethodName() { + return this.fullMethodName; + } + + public void setFullMethodName(String fullMethodName) { + this.fullMethodName = fullMethodName; + } + + public MethodType getMethodType() { + return this.methodType; + } + + public void setMethodType(MethodType methodType) { + this.methodType = methodType; + } + + @Nullable + public Code getStatusCode() { + return this.statusCode; + } + + public void setStatusCode(Code statusCode) { + this.statusCode = statusCode; + } + +} diff --git a/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/GrpcServerObservationConvention.java b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/GrpcServerObservationConvention.java new file mode 100644 index 0000000000..87c3e70b86 --- /dev/null +++ b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/GrpcServerObservationConvention.java @@ -0,0 +1,34 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micrometer.core.instrument.binder.grpc; + +import io.micrometer.observation.Observation.Context; +import io.micrometer.observation.ObservationConvention; + +/** + * {@link ObservationConvention} for gRPC server. + * + * @author Tadaya Tsuyukubo + * @since 1.10.0 + */ +public interface GrpcServerObservationConvention extends ObservationConvention { + + @Override + default boolean supportsContext(Context context) { + return context instanceof GrpcServerObservationContext; + } + +} diff --git a/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/ObservationGrpcClientCall.java b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/ObservationGrpcClientCall.java new file mode 100644 index 0000000000..e4278ef731 --- /dev/null +++ b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/ObservationGrpcClientCall.java @@ -0,0 +1,77 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micrometer.core.instrument.binder.grpc; + +import io.grpc.ClientCall; +import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; +import io.grpc.Metadata; +import io.micrometer.core.instrument.binder.grpc.GrpcObservationDocumentation.GrpcClientEvents; +import io.micrometer.observation.Observation; +import io.micrometer.observation.Observation.Scope; + +/** + * A simple forwarding client call for {@link Observation}. + * + * @param type of message sent one or more times to the server. + * @param type of message received one or more times from the server. + */ +class ObservationGrpcClientCall extends SimpleForwardingClientCall { + + private final Observation observation; + + private Scope scope; + + ObservationGrpcClientCall(ClientCall delegate, Observation observation) { + super(delegate); + this.observation = observation; + } + + @Override + public void start(Listener responseListener, Metadata metadata) { + ((GrpcClientObservationContext) this.observation.getContext()).setCarrier(metadata); + this.scope = this.observation.start().openScope(); + try { + super.start(new ObservationGrpcClientCallListener<>(responseListener, this.scope), metadata); + } + catch (Throwable ex) { + handleFailure(ex); + throw ex; + } + } + + @Override + public void halfClose() { + try { + super.halfClose(); + } + catch (Throwable ex) { + handleFailure(ex); + throw ex; + } + } + + @Override + public void sendMessage(ReqT message) { + this.observation.event(GrpcClientEvents.MESSAGE_SENT); + super.sendMessage(message); + } + + private void handleFailure(Throwable ex) { + this.scope.close(); + this.observation.error(ex).stop(); + } + +} diff --git a/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/ObservationGrpcClientCallListener.java b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/ObservationGrpcClientCallListener.java new file mode 100644 index 0000000000..c53e420789 --- /dev/null +++ b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/ObservationGrpcClientCallListener.java @@ -0,0 +1,62 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micrometer.core.instrument.binder.grpc; + +import io.grpc.ClientCall; +import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; +import io.grpc.Metadata; +import io.grpc.Status; +import io.micrometer.core.instrument.binder.grpc.GrpcObservationDocumentation.GrpcClientEvents; +import io.micrometer.observation.Observation; +import io.micrometer.observation.Observation.Scope; + +/** + * A simple forwarding client call listener for {@link Observation}. + * + * @param type of message received one or more times from the server. + */ +class ObservationGrpcClientCallListener extends SimpleForwardingClientCallListener { + + private final Scope scope; + + public ObservationGrpcClientCallListener(ClientCall.Listener delegate, Scope scope) { + super(delegate); + this.scope = scope; + } + + @Override + public void onClose(Status status, Metadata metadata) { + Observation observation = this.scope.getCurrentObservation(); + GrpcClientObservationContext context = (GrpcClientObservationContext) observation.getContext(); + context.setStatusCode(status.getCode()); + if (status.getCause() != null) { + observation.error(status.getCause()); + } + + this.scope.close(); + observation.stop(); + + // We do not catch exception from the delegate. (following Brave design) + super.onClose(status, metadata); + } + + @Override + public void onMessage(RespT message) { + this.scope.getCurrentObservation().event(GrpcClientEvents.MESSAGE_RECEIVED); + super.onMessage(message); + } + +} diff --git a/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/ObservationGrpcClientInterceptor.java b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/ObservationGrpcClientInterceptor.java new file mode 100644 index 0000000000..a6158c75a4 --- /dev/null +++ b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/ObservationGrpcClientInterceptor.java @@ -0,0 +1,105 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.micrometer.core.instrument.binder.grpc; + +import io.grpc.*; +import io.grpc.Metadata.Key; +import io.grpc.MethodDescriptor.MethodType; +import io.micrometer.common.lang.Nullable; +import io.micrometer.observation.Observation; +import io.micrometer.observation.ObservationRegistry; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; + +/** + * A gRPC client interceptor that works with {@link Observation}. + *

+ * Usage: + *

+ *
+ * ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080)
+ *     .intercept(new ObservationGrpcClientInterceptor(observationRegistry))
+ *     .build();
+ * channel.newCall(method, options);
+ * 
The instrumentation is based on the behavior of Spring Cloud Sleuth and Brave. + * + * @author Tadaya Tsuyukubo + * @since 1.10.0 + */ +public class ObservationGrpcClientInterceptor implements ClientInterceptor { + + private static final GrpcClientObservationConvention DEFAULT_CONVENTION = new DefaultGrpcClientObservationConvention(); + + private static final Map> KEY_CACHE = new ConcurrentHashMap<>(); + + private final ObservationRegistry registry; + + @Nullable + private GrpcClientObservationConvention customConvention; + + public ObservationGrpcClientInterceptor(ObservationRegistry registry) { + this.registry = registry; + } + + @Override + public ClientCall interceptCall(MethodDescriptor method, + CallOptions callOptions, Channel next) { + Supplier contextSupplier = () -> { + GrpcClientObservationContext context = new GrpcClientObservationContext((carrier, keyName, value) -> { + Key key = KEY_CACHE.computeIfAbsent(keyName, + (k) -> Key.of(keyName, Metadata.ASCII_STRING_MARSHALLER)); + carrier.removeAll(key); + carrier.put(key, value); + }); + + String serviceName = method.getServiceName(); + String methodName = method.getBareMethodName(); + String fullMethodName = method.getFullMethodName(); + MethodType methodType = method.getType(); + if (serviceName != null) { + context.setServiceName(serviceName); + } + if (methodName != null) { + context.setMethodName(methodName); + } + context.setFullMethodName(fullMethodName); + context.setMethodType(methodType); + + return context; + }; + + Observation observation = GrpcObservationDocumentation.CLIENT.observation(this.customConvention, + DEFAULT_CONVENTION, contextSupplier, this.registry); + + if (observation.isNoop()) { + // do not instrument anymore + return next.newCall(method, callOptions); + } + return new ObservationGrpcClientCall<>(next.newCall(method, callOptions), observation); + } + + /** + * Set a custom {@link GrpcClientObservationConvention}. + * @param customConvention a custom convention + */ + public void setCustomConvention(@Nullable GrpcClientObservationConvention customConvention) { + this.customConvention = customConvention; + } + +} diff --git a/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/ObservationGrpcServerCall.java b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/ObservationGrpcServerCall.java new file mode 100644 index 0000000000..8a75717875 --- /dev/null +++ b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/ObservationGrpcServerCall.java @@ -0,0 +1,64 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micrometer.core.instrument.binder.grpc; + +import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.Status; +import io.micrometer.core.instrument.binder.grpc.GrpcObservationDocumentation.GrpcServerEvents; +import io.micrometer.observation.Observation; +import io.micrometer.observation.Observation.Scope; + +/** + * A simple forwarding server call for {@link Observation}. + * + * @param type of message sent one or more times to the server. + * @param type of message received one or more times from the server. + */ +class ObservationGrpcServerCall extends SimpleForwardingServerCall { + + private final Scope scope; + + ObservationGrpcServerCall(ServerCall delegate, Scope scope) { + super(delegate); + this.scope = scope; + } + + @Override + public void sendMessage(RespT message) { + this.scope.getCurrentObservation().event(GrpcServerEvents.MESSAGE_SENT); + super.sendMessage(message); + } + + @Override + public void close(Status status, Metadata trailers) { + Observation observation = this.scope.getCurrentObservation(); + + if (status.getCause() != null) { + observation.error(status.getCause()); + } + + GrpcServerObservationContext context = (GrpcServerObservationContext) observation.getContext(); + context.setStatusCode(status.getCode()); + + this.scope.close(); + observation.stop(); + + super.close(status, trailers); + } + +} diff --git a/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/ObservationGrpcServerCallListener.java b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/ObservationGrpcServerCallListener.java new file mode 100644 index 0000000000..86745fd9a0 --- /dev/null +++ b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/ObservationGrpcServerCallListener.java @@ -0,0 +1,61 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micrometer.core.instrument.binder.grpc; + +import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener; +import io.grpc.ServerCall.Listener; +import io.micrometer.core.instrument.binder.grpc.GrpcObservationDocumentation.GrpcServerEvents; +import io.micrometer.observation.Observation; +import io.micrometer.observation.Observation.Scope; + +/** + * A simple forwarding client call listener for {@link Observation}. + * + * @param type of message received one or more times from the server. + */ +class ObservationGrpcServerCallListener extends SimpleForwardingServerCallListener { + + private final Scope scope; + + public ObservationGrpcServerCallListener(Listener delegate, Scope scope) { + super(delegate); + this.scope = scope; + } + + @Override + public void onMessage(RespT message) { + this.scope.getCurrentObservation().event(GrpcServerEvents.MESSAGE_RECEIVED); + super.onMessage(message); + } + + @Override + public void onHalfClose() { + try { + super.onHalfClose(); + } + catch (Throwable ex) { + handleFailure(ex); + throw ex; + } + } + + private void handleFailure(Throwable ex) { + Observation observation = this.scope.getCurrentObservation(); + this.scope.close(); + observation.error(ex).stop(); + } + +} diff --git a/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/ObservationGrpcServerInterceptor.java b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/ObservationGrpcServerInterceptor.java new file mode 100644 index 0000000000..475d2beaba --- /dev/null +++ b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/ObservationGrpcServerInterceptor.java @@ -0,0 +1,122 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.micrometer.core.instrument.binder.grpc; + +import io.grpc.*; +import io.grpc.Metadata.Key; +import io.grpc.MethodDescriptor.MethodType; +import io.grpc.ServerCall.Listener; +import io.micrometer.common.lang.Nullable; +import io.micrometer.observation.Observation; +import io.micrometer.observation.Observation.Scope; +import io.micrometer.observation.ObservationRegistry; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; + +/** + * A gRPC server interceptor that works with {@link Observation}. + *

+ * Usage: + *

+ *
+ * Server server = ServerBuilder.forPort(8080)
+ *         .intercept(new ObservationGrpcServerInterceptor(meterRegistry))
+ *         .build();
+ * server.start()
+ * 
The instrumentation is based on the behavior of Spring Cloud Sleuth and Brave. + * + * @author Tadaya Tsuyukubo + * @since 1.10.0 + */ +public class ObservationGrpcServerInterceptor implements ServerInterceptor { + + private static final GrpcServerObservationConvention DEFAULT_CONVENTION = new DefaultGrpcServerObservationConvention(); + + private static final Map> KEY_CACHE = new ConcurrentHashMap<>(); + + private final ObservationRegistry registry; + + @Nullable + private GrpcServerObservationConvention customConvention; + + public ObservationGrpcServerInterceptor(ObservationRegistry registry) { + this.registry = registry; + } + + @Override + public Listener interceptCall(ServerCall call, Metadata headers, + ServerCallHandler next) { + Supplier contextSupplier = () -> { + GrpcServerObservationContext context = new GrpcServerObservationContext((carrier, keyName) -> { + Key key = KEY_CACHE.computeIfAbsent(keyName, + (k) -> Key.of(keyName, Metadata.ASCII_STRING_MARSHALLER)); + return carrier.get(key); + }); + context.setCarrier(headers); + + MethodDescriptor methodDescriptor = call.getMethodDescriptor(); + String serviceName = methodDescriptor.getServiceName(); + String methodName = methodDescriptor.getBareMethodName(); + String fullMethodName = methodDescriptor.getFullMethodName(); + MethodType methodType = methodDescriptor.getType(); + if (serviceName != null) { + context.setServiceName(serviceName); + } + if (methodName != null) { + context.setMethodName(methodName); + } + context.setFullMethodName(fullMethodName); + context.setMethodType(methodType); + + return context; + }; + + Observation observation = GrpcObservationDocumentation.SERVER.observation(this.customConvention, + DEFAULT_CONVENTION, contextSupplier, this.registry); + + if (observation.isNoop()) { + // do not instrument anymore + return next.startCall(call, headers); + } + + Scope scope = observation.start().openScope(); + ObservationGrpcServerCall serverCall = new ObservationGrpcServerCall<>(call, scope); + + Listener result; + try { + result = next.startCall(serverCall, headers); + } + catch (Exception ex) { + scope.close(); + observation.error(ex).stop(); + throw ex; + } + + return new ObservationGrpcServerCallListener<>(result, scope); + } + + /** + * Set a custom {@link GrpcServerObservationConvention}. + * @param customConvention a custom convention + */ + public void setCustomConvention(@Nullable GrpcServerObservationConvention customConvention) { + this.customConvention = customConvention; + } + +} diff --git a/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/package-info.java b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/package-info.java index 98622e322f..02bfd2784c 100644 --- a/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/package-info.java +++ b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/grpc/package-info.java @@ -21,4 +21,7 @@ * {@link io.micrometer.core.instrument.binder.grpc.MetricCollectingServerInterceptor} for * usage examples. */ +@NonNullApi package io.micrometer.core.instrument.binder.grpc; + +import io.micrometer.common.lang.NonNullApi; diff --git a/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/grpc/GrpcObservationTest.java b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/grpc/GrpcObservationTest.java new file mode 100644 index 0000000000..6dd5febba7 --- /dev/null +++ b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/grpc/GrpcObservationTest.java @@ -0,0 +1,537 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micrometer.core.instrument.binder.grpc; + +import io.grpc.ManagedChannel; +import io.grpc.MethodDescriptor.MethodType; +import io.grpc.Server; +import io.grpc.Status.Code; +import io.grpc.StatusRuntimeException; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.protobuf.SimpleRequest; +import io.grpc.testing.protobuf.SimpleResponse; +import io.grpc.testing.protobuf.SimpleServiceGrpc; +import io.grpc.testing.protobuf.SimpleServiceGrpc.SimpleServiceBlockingStub; +import io.grpc.testing.protobuf.SimpleServiceGrpc.SimpleServiceImplBase; +import io.grpc.testing.protobuf.SimpleServiceGrpc.SimpleServiceStub; +import io.micrometer.common.lang.Nullable; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.binder.grpc.GrpcObservationDocumentation.GrpcClientEvents; +import io.micrometer.core.instrument.binder.grpc.GrpcObservationDocumentation.GrpcServerEvents; +import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import io.micrometer.observation.Observation; +import io.micrometer.observation.Observation.Context; +import io.micrometer.observation.Observation.Event; +import io.micrometer.observation.ObservationHandler; +import io.micrometer.observation.ObservationRegistry; +import io.micrometer.observation.ObservationRegistry.ObservationConfig; +import io.micrometer.observation.ObservationTextPublisher; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.awaitility.Awaitility.await; + +/** + * Test for {@link ObservationGrpcServerInterceptor} and + * {@link ObservationGrpcClientInterceptor}. + * + * @author Tadaya Tsuyukubo + */ +class GrpcObservationTest { + + Server server; + + ManagedChannel channel; + + ContextAndEventHoldingObservationHandler serverHandler; + + ContextAndEventHoldingObservationHandler clientHandler; + + ObservationGrpcServerInterceptor serverInterceptor; + + ObservationGrpcClientInterceptor clientInterceptor; + + @BeforeEach + void setUp() { + serverHandler = new ContextAndEventHoldingObservationHandler<>(GrpcServerObservationContext.class); + clientHandler = new ContextAndEventHoldingObservationHandler<>(GrpcClientObservationContext.class); + + MeterRegistry meterRegistry = new SimpleMeterRegistry(); + ObservationRegistry observationRegistry = ObservationRegistry.create(); + ObservationConfig observationConfig = observationRegistry.observationConfig(); + observationConfig.observationHandler(new ObservationTextPublisher()); + observationConfig.observationHandler(new DefaultMeterObservationHandler(meterRegistry)); + observationConfig.observationHandler(serverHandler); + observationConfig.observationHandler(clientHandler); + + this.serverInterceptor = new ObservationGrpcServerInterceptor(observationRegistry); + this.clientInterceptor = new ObservationGrpcClientInterceptor(observationRegistry); + } + + @AfterEach + void cleanUp() { + if (this.channel != null) { + this.channel.shutdownNow(); + } + if (this.server != null) { + this.server.shutdownNow(); + } + } + + @Nested + class WithEchoService { + + @BeforeEach + void setUpEchoService() throws Exception { + EchoService echoService = new EchoService(); + server = InProcessServerBuilder.forName("sample").addService(echoService).intercept(serverInterceptor) + .build(); + server.start(); + + channel = InProcessChannelBuilder.forName("sample").intercept(clientInterceptor).build(); + } + + @Test + void unaryRpc() { + SimpleServiceBlockingStub stub = SimpleServiceGrpc.newBlockingStub(channel); + + SimpleRequest request = SimpleRequest.newBuilder().setRequestMessage("Hello").build(); + SimpleResponse response = stub.unaryRpc(request); + assertThat(response.getResponseMessage()).isEqualTo("Hello"); + + verifyServerContext("grpc.testing.SimpleService", "UnaryRpc", "grpc.testing.SimpleService/UnaryRpc", + MethodType.UNARY); + verifyClientContext("grpc.testing.SimpleService", "UnaryRpc", "grpc.testing.SimpleService/UnaryRpc", + MethodType.UNARY); + assertThat(serverHandler.getContext().getStatusCode()).isEqualTo(Code.OK); + assertThat(clientHandler.getContext().getStatusCode()).isEqualTo(Code.OK); + assertThat(serverHandler.getEvents()).containsExactly(GrpcServerEvents.MESSAGE_RECEIVED, + GrpcServerEvents.MESSAGE_SENT); + assertThat(clientHandler.getEvents()).containsExactly(GrpcClientEvents.MESSAGE_SENT, + GrpcClientEvents.MESSAGE_RECEIVED); + } + + @Test + void clientStreamingRpc() { + SimpleServiceStub asyncStub = SimpleServiceGrpc.newStub(channel); + + List messages = new ArrayList<>(); + AtomicBoolean completed = new AtomicBoolean(); + StreamObserver responseObserver = createResponseObserver(messages, completed); + + SimpleRequest request1 = SimpleRequest.newBuilder().setRequestMessage("Hello-1").build(); + SimpleRequest request2 = SimpleRequest.newBuilder().setRequestMessage("Hello-2").build(); + StreamObserver requestObserver = asyncStub.clientStreamingRpc(responseObserver); + + assertThat(serverHandler.getContext()).isNull(); + verifyClientContext("grpc.testing.SimpleService", "ClientStreamingRpc", + "grpc.testing.SimpleService/ClientStreamingRpc", MethodType.CLIENT_STREAMING); + + requestObserver.onNext(request1); + assertThat(clientHandler.getEvents()).containsExactly(GrpcClientEvents.MESSAGE_SENT); + assertThat(clientHandler.getContext().getStatusCode()).isNull(); + + requestObserver.onNext(request2); + assertThat(clientHandler.getEvents()).containsExactly(GrpcClientEvents.MESSAGE_SENT, + GrpcClientEvents.MESSAGE_SENT); + assertThat(clientHandler.getContext().getStatusCode()).isNull(); + + requestObserver.onCompleted(); + await().untilTrue(completed); + + assertThat(messages).containsExactly("Hello-1,Hello-2"); + assertThat(clientHandler.getEvents()).containsExactly(GrpcClientEvents.MESSAGE_SENT, + GrpcClientEvents.MESSAGE_SENT, GrpcClientEvents.MESSAGE_RECEIVED); + assertThat(clientHandler.getContext().getStatusCode()).isEqualTo(Code.OK); + assertThat(serverHandler.getEvents()).containsExactly(GrpcServerEvents.MESSAGE_RECEIVED, + GrpcServerEvents.MESSAGE_RECEIVED, GrpcServerEvents.MESSAGE_SENT); + + verifyServerContext("grpc.testing.SimpleService", "ClientStreamingRpc", + "grpc.testing.SimpleService/ClientStreamingRpc", MethodType.CLIENT_STREAMING); + assertThat(serverHandler.getContext().getStatusCode()).isEqualTo(Code.OK); + } + + @Test + void serverStreamingRpc() { + // Use async stub since blocking stu cannot detect the server side completion + SimpleServiceStub asyncStub = SimpleServiceGrpc.newStub(channel); + + List messages = new ArrayList<>(); + AtomicBoolean completed = new AtomicBoolean(); + StreamObserver responseObserver = createResponseObserver(messages, completed); + + SimpleRequest request = SimpleRequest.newBuilder().setRequestMessage("Hello").build(); + asyncStub.serverStreamingRpc(request, responseObserver); + + await().untilTrue(completed); + + assertThat(messages).containsExactly("Hello-1", "Hello-2"); + + // server side has finished all processing + verifyServerContext("grpc.testing.SimpleService", "ServerStreamingRpc", + "grpc.testing.SimpleService/ServerStreamingRpc", MethodType.SERVER_STREAMING); + assertThat(serverHandler.getContext().getStatusCode()).isEqualTo(Code.OK); + assertThat(serverHandler.getEvents()).containsExactly(GrpcServerEvents.MESSAGE_RECEIVED, + GrpcServerEvents.MESSAGE_SENT, GrpcServerEvents.MESSAGE_SENT); + + // verify client side before retrieving the result + verifyClientContext("grpc.testing.SimpleService", "ServerStreamingRpc", + "grpc.testing.SimpleService/ServerStreamingRpc", MethodType.SERVER_STREAMING); + assertThat(clientHandler.getContext().getStatusCode()).isEqualTo(Code.OK); + assertThat(clientHandler.getEvents()).containsExactly(GrpcClientEvents.MESSAGE_SENT, + GrpcClientEvents.MESSAGE_RECEIVED, GrpcClientEvents.MESSAGE_RECEIVED); + } + + @Test + void bidiStreamingRpc() { + SimpleServiceStub asyncStub = SimpleServiceGrpc.newStub(channel); + + List messages = new ArrayList<>(); + AtomicBoolean completed = new AtomicBoolean(); + StreamObserver responseObserver = createResponseObserver(messages, completed); + + SimpleRequest request1 = SimpleRequest.newBuilder().setRequestMessage("Hello-1").build(); + SimpleRequest request2 = SimpleRequest.newBuilder().setRequestMessage("Hello-2").build(); + StreamObserver requestObserver = asyncStub.bidiStreamingRpc(responseObserver); + + requestObserver.onNext(request1); + await().until(() -> messages.size() >= 2); + assertThat(messages).containsExactly("Hello-1-A", "Hello-1-B"); + messages.clear(); + + verifyClientContext("grpc.testing.SimpleService", "BidiStreamingRpc", + "grpc.testing.SimpleService/BidiStreamingRpc", MethodType.BIDI_STREAMING); + verifyServerContext("grpc.testing.SimpleService", "BidiStreamingRpc", + "grpc.testing.SimpleService/BidiStreamingRpc", MethodType.BIDI_STREAMING); + + assertThat(serverHandler.getContext().getStatusCode()).isNull(); + assertThat(clientHandler.getContext().getStatusCode()).isNull(); + assertThat(serverHandler.getEvents()).containsExactly(GrpcServerEvents.MESSAGE_RECEIVED, + GrpcServerEvents.MESSAGE_SENT, GrpcServerEvents.MESSAGE_SENT); + assertThat(clientHandler.getEvents()).containsExactly(GrpcClientEvents.MESSAGE_SENT, + GrpcClientEvents.MESSAGE_RECEIVED, GrpcClientEvents.MESSAGE_RECEIVED); + + requestObserver.onNext(request2); + await().until(() -> messages.size() >= 2); + assertThat(messages).containsExactly("Hello-2-A", "Hello-2-B"); + messages.clear(); + + assertThat(serverHandler.getContext().getStatusCode()).isNull(); + assertThat(clientHandler.getContext().getStatusCode()).isNull(); + assertThat(serverHandler.getEvents()).containsExactly(GrpcServerEvents.MESSAGE_RECEIVED, + GrpcServerEvents.MESSAGE_SENT, GrpcServerEvents.MESSAGE_SENT, GrpcServerEvents.MESSAGE_RECEIVED, + GrpcServerEvents.MESSAGE_SENT, GrpcServerEvents.MESSAGE_SENT); + assertThat(clientHandler.getEvents()).containsExactly(GrpcClientEvents.MESSAGE_SENT, + GrpcClientEvents.MESSAGE_RECEIVED, GrpcClientEvents.MESSAGE_RECEIVED, GrpcClientEvents.MESSAGE_SENT, + GrpcClientEvents.MESSAGE_RECEIVED, GrpcClientEvents.MESSAGE_RECEIVED); + + requestObserver.onCompleted(); + await().untilTrue(completed); + + assertThat(serverHandler.getContext().getStatusCode()).isEqualTo(Code.OK); + assertThat(clientHandler.getContext().getStatusCode()).isEqualTo(Code.OK); + } + + private StreamObserver createResponseObserver(List messages, AtomicBoolean completed) { + return new StreamObserver<>() { + + @Override + public void onNext(SimpleResponse value) { + messages.add(value.getResponseMessage()); + } + + @Override + public void onError(Throwable t) { + throw new RuntimeException("Encountered error", t); + } + + @Override + public void onCompleted() { + completed.set(true); + } + }; + } + + } + + @Nested + class WithExceptionService { + + @BeforeEach + void setUpExceptionService() throws Exception { + ExceptionService exceptionService = new ExceptionService(); + server = InProcessServerBuilder.forName("exception").addService(exceptionService) + .intercept(serverInterceptor).build(); + server.start(); + + channel = InProcessChannelBuilder.forName("exception").intercept(clientInterceptor).build(); + } + + @Test + void unaryRpcFailure() { + SimpleServiceBlockingStub stub = SimpleServiceGrpc.newBlockingStub(channel); + + SimpleRequest request = SimpleRequest.newBuilder().setRequestMessage("Hello").build(); + assertThatExceptionOfType(StatusRuntimeException.class).isThrownBy(() -> stub.unaryRpc(request)); + + verifyServerContext("grpc.testing.SimpleService", "UnaryRpc", "grpc.testing.SimpleService/UnaryRpc", + MethodType.UNARY); + verifyClientContext("grpc.testing.SimpleService", "UnaryRpc", "grpc.testing.SimpleService/UnaryRpc", + MethodType.UNARY); + assertThat(serverHandler.getContext().getStatusCode()).isEqualTo(Code.UNIMPLEMENTED); + assertThat(clientHandler.getContext().getStatusCode()).isEqualTo(Code.UNIMPLEMENTED); + assertThat(serverHandler.getEvents()).containsExactly(GrpcServerEvents.MESSAGE_RECEIVED); + assertThat(clientHandler.getEvents()).containsExactly(GrpcClientEvents.MESSAGE_SENT); + } + + @Test + void clientStreamingRpcFailure() { + SimpleServiceStub asyncStub = SimpleServiceGrpc.newStub(channel); + + AtomicBoolean errored = new AtomicBoolean(); + StreamObserver responseObserver = createResponseObserver(errored); + + asyncStub.clientStreamingRpc(responseObserver); + + await().untilTrue(errored); + + verifyClientContext("grpc.testing.SimpleService", "ClientStreamingRpc", + "grpc.testing.SimpleService/ClientStreamingRpc", MethodType.CLIENT_STREAMING); + verifyServerContext("grpc.testing.SimpleService", "ClientStreamingRpc", + "grpc.testing.SimpleService/ClientStreamingRpc", MethodType.CLIENT_STREAMING); + assertThat(clientHandler.getContext().getStatusCode()).isEqualTo(Code.UNIMPLEMENTED); + assertThat(serverHandler.getContext().getStatusCode()).isEqualTo(Code.UNIMPLEMENTED); + assertThat(clientHandler.getEvents()).isEmpty(); + assertThat(serverHandler.getEvents()).isEmpty(); + } + + @Test + void serverStreamingRpcFailure() { + // With blocking stub, it cannot detect server complete. So, use async stub. + SimpleServiceStub asyncStub = SimpleServiceGrpc.newStub(channel); + + AtomicBoolean errored = new AtomicBoolean(); + StreamObserver responseObserver = createResponseObserver(errored); + + SimpleRequest request = SimpleRequest.newBuilder().setRequestMessage("Hello").build(); + asyncStub.serverStreamingRpc(request, responseObserver); + + await().untilTrue(errored); + + verifyClientContext("grpc.testing.SimpleService", "ServerStreamingRpc", + "grpc.testing.SimpleService/ServerStreamingRpc", MethodType.SERVER_STREAMING); + verifyServerContext("grpc.testing.SimpleService", "ServerStreamingRpc", + "grpc.testing.SimpleService/ServerStreamingRpc", MethodType.SERVER_STREAMING); + assertThat(clientHandler.getContext().getStatusCode()).isEqualTo(Code.UNIMPLEMENTED); + assertThat(serverHandler.getContext().getStatusCode()).isEqualTo(Code.UNIMPLEMENTED); + assertThat(clientHandler.getEvents()).containsExactly(GrpcClientEvents.MESSAGE_SENT); + assertThat(serverHandler.getEvents()).containsExactly(GrpcServerEvents.MESSAGE_RECEIVED); + } + + @Test + void bidiStreamingRpcFailure() { + SimpleServiceStub asyncStub = SimpleServiceGrpc.newStub(channel); + + AtomicBoolean errored = new AtomicBoolean(); + StreamObserver responseObserver = createResponseObserver(errored); + + // the call to the service fails, so don't need to send message from client + asyncStub.bidiStreamingRpc(responseObserver); + + await().untilTrue(errored); + + verifyClientContext("grpc.testing.SimpleService", "BidiStreamingRpc", + "grpc.testing.SimpleService/BidiStreamingRpc", MethodType.BIDI_STREAMING); + verifyServerContext("grpc.testing.SimpleService", "BidiStreamingRpc", + "grpc.testing.SimpleService/BidiStreamingRpc", MethodType.BIDI_STREAMING); + assertThat(clientHandler.getContext().getStatusCode()).isEqualTo(Code.UNIMPLEMENTED); + assertThat(serverHandler.getContext().getStatusCode()).isEqualTo(Code.UNIMPLEMENTED); + assertThat(clientHandler.getEvents()).isEmpty(); + assertThat(serverHandler.getEvents()).isEmpty(); + } + + private StreamObserver createResponseObserver(AtomicBoolean errored) { + return new StreamObserver<>() { + @Override + public void onNext(SimpleResponse value) { + throw new RuntimeException("Should not receive any message"); + } + + @Override + public void onError(Throwable t) { + errored.set(true); + } + + @Override + public void onCompleted() { + throw new RuntimeException("Should not successfully completed"); + } + }; + } + + } + + // perform server context verification on basic information + void verifyServerContext(String serviceName, String methodName, String contextualName, MethodType methodType) { + assertThat(serverHandler.getContext()).isNotNull().satisfies((serverContext) -> { + assertThat(serverContext).isNotNull(); + assertThat(serverContext.getServiceName()).isEqualTo(serviceName); + assertThat(serverContext.getMethodName()).isEqualTo(methodName); + assertThat(serverContext.getFullMethodName()).isEqualTo(contextualName); + assertThat(serverContext.getMethodType()).isEqualTo(methodType); + }); + } + + // perform client context verification on basic information + void verifyClientContext(String serviceName, String methodName, String contextualName, MethodType methodType) { + assertThat(clientHandler.getContext()).isNotNull().satisfies((clientContext) -> { + assertThat(clientContext).isNotNull(); + assertThat(clientContext.getServiceName()).isEqualTo(serviceName); + assertThat(clientContext.getMethodName()).isEqualTo(methodName); + assertThat(clientContext.getFullMethodName()).isEqualTo(contextualName); + assertThat(clientContext.getMethodType()).isEqualTo(methodType); + }); + } + + // GRPC service extending SimpleService and provides echo implementation. + static class EchoService extends SimpleServiceImplBase { + + // echo the response message + @Override + public void unaryRpc(SimpleRequest request, StreamObserver responseObserver) { + SimpleResponse response = SimpleResponse.newBuilder().setResponseMessage(request.getRequestMessage()) + .build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } + + // returns concatenated message + @Override + public StreamObserver clientStreamingRpc(StreamObserver responseObserver) { + return new StreamObserver<>() { + final List messages = new ArrayList<>(); + + @Override + public void onNext(SimpleRequest value) { + this.messages.add(value.getRequestMessage()); + } + + @Override + public void onError(Throwable t) { + throw new RuntimeException("Encountered error", t); + } + + @Override + public void onCompleted() { + String message = String.join(",", this.messages); + responseObserver.onNext(SimpleResponse.newBuilder().setResponseMessage(message).build()); + responseObserver.onCompleted(); + } + }; + } + + // returns two messages + @Override + public void serverStreamingRpc(SimpleRequest request, StreamObserver responseObserver) { + String message = request.getRequestMessage(); + responseObserver.onNext(SimpleResponse.newBuilder().setResponseMessage(message + "-1").build()); + responseObserver.onNext(SimpleResponse.newBuilder().setResponseMessage(message + "-2").build()); + responseObserver.onCompleted(); + } + + // returns two message per received message + @Override + public StreamObserver bidiStreamingRpc(StreamObserver responseObserver) { + return new StreamObserver<>() { + + @Override + public void onNext(SimpleRequest value) { + String message = value.getRequestMessage(); + responseObserver.onNext(SimpleResponse.newBuilder().setResponseMessage(message + "-A").build()); + responseObserver.onNext(SimpleResponse.newBuilder().setResponseMessage(message + "-B").build()); + } + + @Override + public void onError(Throwable t) { + throw new RuntimeException("Encountered error", t); + } + + @Override + public void onCompleted() { + responseObserver.onCompleted(); + } + }; + } + + } + + // Default implementation in the parent class throws UNIMPLEMENTED error + static class ExceptionService extends SimpleServiceImplBase { + + } + + // Hold reference to the Context and Events happened in ObservationHandler + static class ContextAndEventHoldingObservationHandler + implements ObservationHandler { + + private final AtomicReference contextHolder = new AtomicReference<>(); + + private final List events = new ArrayList<>(); + + private final Class contextClass; + + public ContextAndEventHoldingObservationHandler(Class contextClass) { + this.contextClass = contextClass; + } + + @Override + public boolean supportsContext(Context context) { + if (this.contextClass.isInstance(context)) { + this.contextHolder.set(this.contextClass.cast(context)); + return true; + } + return false; + } + + @Override + public void onEvent(Event event, T context) { + this.events.add(event); + } + + @Nullable + public T getContext() { + return this.contextHolder.get(); + } + + public List getEvents() { + return this.events; + } + + } + +} diff --git a/samples/micrometer-samples-core/src/main/java/io/micrometer/core/samples/GrpcObservationSample.java b/samples/micrometer-samples-core/src/main/java/io/micrometer/core/samples/GrpcObservationSample.java new file mode 100644 index 0000000000..428007fd81 --- /dev/null +++ b/samples/micrometer-samples-core/src/main/java/io/micrometer/core/samples/GrpcObservationSample.java @@ -0,0 +1,76 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micrometer.core.samples; + +import io.grpc.ManagedChannel; +import io.grpc.Server; +import io.grpc.health.v1.HealthCheckRequest; +import io.grpc.health.v1.HealthCheckResponse; +import io.grpc.health.v1.HealthGrpc; +import io.grpc.health.v1.HealthGrpc.HealthBlockingStub; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.protobuf.services.HealthStatusManager; +import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.binder.grpc.ObservationGrpcClientInterceptor; +import io.micrometer.core.instrument.binder.grpc.ObservationGrpcServerInterceptor; +import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import io.micrometer.observation.ObservationRegistry; +import io.micrometer.observation.ObservationTextPublisher; + +import java.io.IOException; + +/** + * Demonstrates how to use observation gRPC interceptors. To see more details, check the + * {@code GrpcObservationTest} in micrometer-core test. + * + * @author Tadaya Tsuyukubo + */ +public class GrpcObservationSample { + + public static void main(final String... args) throws IOException { + MeterRegistry meterRegistry = new SimpleMeterRegistry(); + + ObservationRegistry observationRegistry = ObservationRegistry.create(); + observationRegistry.observationConfig().observationHandler(new ObservationTextPublisher()); + observationRegistry.observationConfig().observationHandler(new DefaultMeterObservationHandler(meterRegistry)); + + HealthStatusManager service = new HealthStatusManager(); + + Server server = InProcessServerBuilder.forName("sample").addService(service.getHealthService()) + .intercept(new ObservationGrpcServerInterceptor(observationRegistry)).build(); + server.start(); + + ManagedChannel channel = InProcessChannelBuilder.forName("sample") + .intercept(new ObservationGrpcClientInterceptor(observationRegistry)).build(); + + HealthBlockingStub healthClient = HealthGrpc.newBlockingStub(channel); + + HealthCheckRequest request = HealthCheckRequest.getDefaultInstance(); + HealthCheckResponse response = healthClient.check(request); + + System.out.println("Check Status: " + response.getStatus()); + for (Meter meter : meterRegistry.getMeters()) { + System.out.println(meter.getClass().getSimpleName() + "->" + meter.getId() + ":" + meter.measure()); + } + + channel.shutdownNow(); + server.shutdownNow(); + } + +}