diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java index d5a608d83b0..e077e12a2a6 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java @@ -34,6 +34,7 @@ import io.grpc.gcp.observability.logging.GcpLogSink; import io.grpc.gcp.observability.logging.Sink; import io.grpc.internal.TimeProvider; +import io.opencensus.common.Duration; import io.opencensus.contrib.grpc.metrics.RpcViews; import io.opencensus.exporter.stats.stackdriver.StackdriverStatsConfiguration; import io.opencensus.exporter.stats.stackdriver.StackdriverStatsExporter; @@ -55,6 +56,7 @@ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/8869") public final class GcpObservability implements AutoCloseable { private static final Logger logger = Logger.getLogger(GcpObservability.class.getName()); + private static final int METRICS_EXPORT_INTERVAL = 30; private static GcpObservability instance = null; private final Sink sink; private final ObservabilityConfig config; @@ -76,8 +78,6 @@ public static synchronized GcpObservability grpcInit() throws IOException { Sink sink = new GcpLogSink(observabilityConfig.getDestinationProjectId(), globalLocationTags.getLocationTags(), observabilityConfig.getCustomTags(), observabilityConfig.getFlushMessageCount()); - // TODO(dnvindhya): Cleanup code for LoggingChannelProvider and LoggingServerProvider - // once ChannelBuilder and ServerBuilder are used LogHelper helper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER); ConfigFilterHelper configFilterHelper = ConfigFilterHelper.factory(observabilityConfig); instance = grpcInit(sink, observabilityConfig, @@ -97,13 +97,8 @@ static GcpObservability grpcInit( InternalLoggingServerInterceptor.Factory serverInterceptorFactory) throws IOException { if (instance == null) { - instance = - new GcpObservability(sink, config, channelInterceptorFactory, serverInterceptorFactory); - LogHelper logHelper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER); - ConfigFilterHelper logFilterHelper = ConfigFilterHelper.factory(config); - instance.setProducer( - new InternalLoggingChannelInterceptor.FactoryImpl(logHelper, logFilterHelper), - new InternalLoggingServerInterceptor.FactoryImpl(logHelper, logFilterHelper)); + instance = new GcpObservability(sink, config); + instance.setProducer(channelInterceptorFactory, serverInterceptorFactory); } return instance; } @@ -116,13 +111,13 @@ public void close() { throw new IllegalStateException("GcpObservability already closed!"); } unRegisterStackDriverExporter(); - LoggingChannelProvider.shutdown(); - LoggingServerProvider.shutdown(); sink.close(); instance = null; } } + // TODO(dnvindhya): Remove InterceptorFactory and replace with respective + // interceptors private void setProducer( InternalLoggingChannelInterceptor.Factory channelInterceptorFactory, InternalLoggingServerInterceptor.Factory serverInterceptorFactory) { @@ -145,7 +140,8 @@ private void setProducer( clientInterceptors, serverInterceptors, tracerFactories); } - private void registerStackDriverExporter(String projectId, Map customTags) + @VisibleForTesting + void registerStackDriverExporter(String projectId, Map customTags) throws IOException { if (config.isEnableCloudMonitoring()) { RpcViews.registerAllGrpcViews(); @@ -160,6 +156,7 @@ private void registerStackDriverExporter(String projectId, Map c e -> LabelValue.create(e.getValue()))); statsConfigurationBuilder.setConstantLabels(constantLabels); } + statsConfigurationBuilder.setExportInterval(Duration.create(METRICS_EXPORT_INTERVAL, 0)); StackdriverStatsExporter.createAndRegister(statsConfigurationBuilder.build()); metricsEnabled = true; } @@ -208,13 +205,8 @@ private void unRegisterStackDriverExporter() { private GcpObservability( Sink sink, - ObservabilityConfig config, - InternalLoggingChannelInterceptor.Factory channelInterceptorFactory, - InternalLoggingServerInterceptor.Factory serverInterceptorFactory) { + ObservabilityConfig config) { this.sink = checkNotNull(sink); this.config = checkNotNull(config); - - LoggingChannelProvider.init(checkNotNull(channelInterceptorFactory)); - LoggingServerProvider.init(checkNotNull(serverInterceptorFactory)); } } diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/LoggingChannelProvider.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/LoggingChannelProvider.java deleted file mode 100644 index 81c3501e1d4..00000000000 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/LoggingChannelProvider.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Copyright 2022 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.gcp.observability; - -import static com.google.common.base.Preconditions.checkNotNull; - -import io.grpc.ChannelCredentials; -import io.grpc.InternalManagedChannelProvider; -import io.grpc.ManagedChannelBuilder; -import io.grpc.ManagedChannelProvider; -import io.grpc.ManagedChannelRegistry; -import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.Collection; -import java.util.Collections; - -/** A channel provider that injects logging interceptor. */ -final class LoggingChannelProvider extends ManagedChannelProvider { - private final ManagedChannelProvider prevProvider; - private final InternalLoggingChannelInterceptor.Factory clientInterceptorFactory; - - private static LoggingChannelProvider instance; - - private LoggingChannelProvider(InternalLoggingChannelInterceptor.Factory factory) { - prevProvider = ManagedChannelProvider.provider(); - clientInterceptorFactory = factory; - } - - static synchronized void init(InternalLoggingChannelInterceptor.Factory factory) { - if (instance != null) { - throw new IllegalStateException("LoggingChannelProvider already initialized!"); - } - instance = new LoggingChannelProvider(factory); - ManagedChannelRegistry.getDefaultRegistry().register(instance); - } - - static synchronized void shutdown() { - if (instance == null) { - throw new IllegalStateException("LoggingChannelProvider not initialized!"); - } - ManagedChannelRegistry.getDefaultRegistry().deregister(instance); - instance = null; - } - - @Override - protected boolean isAvailable() { - return true; - } - - @Override - protected int priority() { - return 6; - } - - private ManagedChannelBuilder addInterceptor(ManagedChannelBuilder builder) { - return builder.intercept(clientInterceptorFactory.create()); - } - - @Override - protected ManagedChannelBuilder builderForAddress(String name, int port) { - return addInterceptor( - InternalManagedChannelProvider.builderForAddress(prevProvider, name, port)); - } - - @Override - protected ManagedChannelBuilder builderForTarget(String target) { - return addInterceptor(InternalManagedChannelProvider.builderForTarget(prevProvider, target)); - } - - @Override - protected NewChannelBuilderResult newChannelBuilder(String target, ChannelCredentials creds) { - NewChannelBuilderResult result = InternalManagedChannelProvider.newChannelBuilder(prevProvider, - target, creds); - ManagedChannelBuilder builder = result.getChannelBuilder(); - if (builder != null) { - return NewChannelBuilderResult.channelBuilder( - addInterceptor(builder)); - } - checkNotNull(result.getError(), "Expected error to be set!"); - return result; - } - - @Override - protected Collection> getSupportedSocketAddressTypes() { - return Collections.singleton(InetSocketAddress.class); - } -} diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/LoggingServerProvider.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/LoggingServerProvider.java deleted file mode 100644 index 6a4c710795e..00000000000 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/LoggingServerProvider.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright 2022 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.gcp.observability; - -import static com.google.common.base.Preconditions.checkNotNull; - -import io.grpc.InternalServerProvider; -import io.grpc.ServerBuilder; -import io.grpc.ServerCredentials; -import io.grpc.ServerProvider; -import io.grpc.ServerRegistry; -import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor; - -/** A server provider that injects the logging interceptor. */ -final class LoggingServerProvider extends ServerProvider { - private final ServerProvider prevProvider; - private final InternalLoggingServerInterceptor.Factory serverInterceptorFactory; - - private static LoggingServerProvider instance; - - private LoggingServerProvider(InternalLoggingServerInterceptor.Factory factory) { - prevProvider = ServerProvider.provider(); - serverInterceptorFactory = factory; - } - - static synchronized void init(InternalLoggingServerInterceptor.Factory factory) { - if (instance != null) { - throw new IllegalStateException("LoggingServerProvider already initialized!"); - } - instance = new LoggingServerProvider(factory); - ServerRegistry.getDefaultRegistry().register(instance); - } - - static synchronized void shutdown() { - if (instance == null) { - throw new IllegalStateException("LoggingServerProvider not initialized!"); - } - ServerRegistry.getDefaultRegistry().deregister(instance); - instance = null; - } - - @Override - protected boolean isAvailable() { - return true; - } - - @Override - protected int priority() { - return 6; - } - - private ServerBuilder addInterceptor(ServerBuilder builder) { - return builder.intercept(serverInterceptorFactory.create()); - } - - @Override - protected ServerBuilder builderForPort(int port) { - return addInterceptor(InternalServerProvider.builderForPort(prevProvider, port)); - } - - @Override - protected NewServerBuilderResult newServerBuilderForPort(int port, ServerCredentials creds) { - ServerProvider.NewServerBuilderResult result = InternalServerProvider.newServerBuilderForPort( - prevProvider, port, - creds); - ServerBuilder builder = result.getServerBuilder(); - if (builder != null) { - return ServerProvider.NewServerBuilderResult.serverBuilder( - addInterceptor(builder)); - } - checkNotNull(result.getError(), "Expected error to be set!"); - return result; - } -} diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingChannelInterceptor.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingChannelInterceptor.java index 5c0355dd663..81e0a9819af 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingChannelInterceptor.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingChannelInterceptor.java @@ -40,7 +40,7 @@ import java.util.logging.Logger; /** - * A logging interceptor for {@code LoggingChannelProvider}. + * A logging client interceptor for Observability. */ @Internal public final class InternalLoggingChannelInterceptor implements ClientInterceptor { @@ -51,6 +51,7 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto private final LogHelper helper; private final ConfigFilterHelper filterHelper; + // TODO(dnvindhya): Remove factory and use interceptors directly public interface Factory { ClientInterceptor create(); } diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingServerInterceptor.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingServerInterceptor.java index e0c5b0bb6a0..112a1c067b1 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingServerInterceptor.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingServerInterceptor.java @@ -40,7 +40,7 @@ import java.util.logging.Logger; /** - * A logging interceptor for {@code LoggingServerProvider}. + * A logging server interceptor for Observability. */ @Internal public final class InternalLoggingServerInterceptor implements ServerInterceptor { @@ -51,6 +51,7 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor private final LogHelper helper; private final ConfigFilterHelper filterHelper; + // TODO(dnvindhya): Remove factory and use interceptors directly public interface Factory { ServerInterceptor create(); } diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java index 011a333ed9a..9b198250afb 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java @@ -54,18 +54,20 @@ public class GcpLogSink implements Sink { = ImmutableSet.of("project_id", "location", "cluster_name", "namespace_name", "pod_name", "container_name"); private static final long FALLBACK_FLUSH_LIMIT = 100L; + private final String projectId; private final Map customTags; - private final Logging gcpLoggingClient; private final MonitoredResource kubernetesResource; private final Long flushLimit; + /** Lazily initialize cloud logging client to avoid circular initialization. Because cloud + * logging APIs also uses gRPC. */ + private volatile Logging gcpLoggingClient; private long flushCounter; - private static Logging createLoggingClient(String projectId) { - LoggingOptions.Builder builder = LoggingOptions.newBuilder(); - if (!Strings.isNullOrEmpty(projectId)) { - builder.setProjectId(projectId); - } - return builder.build().getService(); + @VisibleForTesting + GcpLogSink(Logging loggingClient, String destinationProjectId, Map locationTags, + Map customTags, Long flushLimit) { + this(destinationProjectId, locationTags, customTags, flushLimit); + this.gcpLoggingClient = loggingClient; } /** @@ -75,15 +77,7 @@ private static Logging createLoggingClient(String projectId) { */ public GcpLogSink(String destinationProjectId, Map locationTags, Map customTags, Long flushLimit) { - this(createLoggingClient(destinationProjectId), destinationProjectId, locationTags, - customTags, flushLimit); - - } - - @VisibleForTesting - GcpLogSink(Logging client, String destinationProjectId, Map locationTags, - Map customTags, Long flushLimit) { - this.gcpLoggingClient = client; + this.projectId = destinationProjectId; this.customTags = getCustomTags(customTags, locationTags, destinationProjectId); this.kubernetesResource = getResource(locationTags); this.flushLimit = flushLimit != null ? flushLimit : FALLBACK_FLUSH_LIMIT; @@ -98,8 +92,11 @@ public GcpLogSink(String destinationProjectId, Map locationTags, @Override public void write(GrpcLogRecord logProto) { if (gcpLoggingClient == null) { - logger.log(Level.SEVERE, "Attempt to write after GcpLogSink is closed."); - return; + synchronized (this) { + if (gcpLoggingClient == null) { + gcpLoggingClient = createLoggingClient(); + } + } } if (SERVICE_TO_EXCLUDE.equals(logProto.getServiceName())) { return; @@ -133,6 +130,14 @@ public void write(GrpcLogRecord logProto) { } } + Logging createLoggingClient() { + LoggingOptions.Builder builder = LoggingOptions.newBuilder(); + if (!Strings.isNullOrEmpty(projectId)) { + builder.setProjectId(projectId); + } + return builder.build().getService(); + } + @VisibleForTesting static Map getCustomTags(Map customTags, Map locationTags, String destinationProjectId) { diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java index d494b3c14f1..97e9031631b 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java @@ -28,13 +28,11 @@ import io.grpc.ClientCall; import io.grpc.ClientInterceptor; import io.grpc.InternalGlobalInterceptors; -import io.grpc.ManagedChannelProvider; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.ServerCall; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; -import io.grpc.ServerProvider; import io.grpc.StaticTestingClassLoader; import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor; import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor; @@ -84,9 +82,6 @@ public static final class StaticTestingClassInitFinish implements Runnable { @Override public void run() { - // TODO(dnvindhya) : Remove usage of Providers on cleaning up Logging*Provider - ManagedChannelProvider prevChannelProvider = ManagedChannelProvider.provider(); - ServerProvider prevServerProvider = ServerProvider.provider(); Sink sink = mock(Sink.class); ObservabilityConfig config = mock(ObservabilityConfig.class); InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = @@ -98,16 +93,12 @@ public void run() { GcpObservability observability = GcpObservability.grpcInit( sink, config, channelInterceptorFactory, serverInterceptorFactory); - assertThat(ManagedChannelProvider.provider()).isInstanceOf(LoggingChannelProvider.class); - assertThat(ServerProvider.provider()).isInstanceOf(ServerProvider.class); observability1 = GcpObservability.grpcInit( sink, config, channelInterceptorFactory, serverInterceptorFactory); assertThat(observability1).isSameInstanceAs(observability); observability.close(); verify(sink).close(); - assertThat(ManagedChannelProvider.provider()).isSameInstanceAs(prevChannelProvider); - assertThat(ServerProvider.provider()).isSameInstanceAs(prevServerProvider); try { observability1.close(); fail("should have failed for calling close() second time"); diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingChannelProviderTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingChannelProviderTest.java deleted file mode 100644 index c4337d5d4fd..00000000000 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingChannelProviderTest.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Copyright 2022 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.gcp.observability; - -import static com.google.common.truth.Truth.assertThat; -import static org.junit.Assert.fail; -import static org.mockito.AdditionalAnswers.delegatesTo; -import static org.mockito.ArgumentMatchers.same; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import io.grpc.CallOptions; -import io.grpc.Channel; -import io.grpc.ClientCall; -import io.grpc.ClientInterceptor; -import io.grpc.Grpc; -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; -import io.grpc.ManagedChannelProvider; -import io.grpc.MethodDescriptor; -import io.grpc.TlsChannelCredentials; -import io.grpc.gcp.observability.interceptors.ConfigFilterHelper; -import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor; -import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor.FactoryImpl; -import io.grpc.gcp.observability.interceptors.LogHelper; -import io.grpc.testing.TestMethodDescriptors; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.ArgumentMatchers; -import org.mockito.junit.MockitoJUnit; -import org.mockito.junit.MockitoRule; - -@RunWith(JUnit4.class) -public class LoggingChannelProviderTest { - - @Rule - public final MockitoRule mocks = MockitoJUnit.rule(); - - private final MethodDescriptor method = TestMethodDescriptors.voidMethod(); - - @Test - public void initTwiceCausesException() { - ManagedChannelProvider prevProvider = ManagedChannelProvider.provider(); - assertThat(prevProvider).isNotInstanceOf(LoggingChannelProvider.class); - LogHelper mockLogHelper = mock(LogHelper.class); - ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class); - LoggingChannelProvider.init( - new FactoryImpl(mockLogHelper, mockFilterHelper)); - assertThat(ManagedChannelProvider.provider()).isInstanceOf(LoggingChannelProvider.class); - try { - LoggingChannelProvider.init( - new FactoryImpl(mockLogHelper, mockFilterHelper)); - fail("should have failed for calling init() again"); - } catch (IllegalStateException e) { - assertThat(e).hasMessageThat().contains("LoggingChannelProvider already initialized!"); - } - LoggingChannelProvider.shutdown(); - assertThat(ManagedChannelProvider.provider()).isSameInstanceAs(prevProvider); - } - - @Test - public void forTarget_interceptorCalled() { - ClientInterceptor interceptor = mock(ClientInterceptor.class, - delegatesTo(new NoopInterceptor())); - InternalLoggingChannelInterceptor.Factory factory = mock( - InternalLoggingChannelInterceptor.Factory.class); - when(factory.create()).thenReturn(interceptor); - LoggingChannelProvider.init(factory); - ManagedChannelBuilder builder = ManagedChannelBuilder.forTarget("localhost"); - ManagedChannel channel = builder.build(); - CallOptions callOptions = CallOptions.DEFAULT; - - ClientCall unused = channel.newCall(method, callOptions); - verify(interceptor) - .interceptCall(same(method), same(callOptions), ArgumentMatchers.any()); - channel.shutdownNow(); - LoggingChannelProvider.shutdown(); - } - - @Test - public void forAddress_interceptorCalled() { - ClientInterceptor interceptor = mock(ClientInterceptor.class, - delegatesTo(new NoopInterceptor())); - InternalLoggingChannelInterceptor.Factory factory = mock( - InternalLoggingChannelInterceptor.Factory.class); - when(factory.create()).thenReturn(interceptor); - LoggingChannelProvider.init(factory); - ManagedChannelBuilder builder = ManagedChannelBuilder.forAddress("localhost", 80); - ManagedChannel channel = builder.build(); - CallOptions callOptions = CallOptions.DEFAULT; - - ClientCall unused = channel.newCall(method, callOptions); - verify(interceptor) - .interceptCall(same(method), same(callOptions), ArgumentMatchers.any()); - channel.shutdownNow(); - LoggingChannelProvider.shutdown(); - } - - @Test - public void newChannelBuilder_interceptorCalled() { - ClientInterceptor interceptor = mock(ClientInterceptor.class, - delegatesTo(new NoopInterceptor())); - InternalLoggingChannelInterceptor.Factory factory = mock( - InternalLoggingChannelInterceptor.Factory.class); - when(factory.create()).thenReturn(interceptor); - LoggingChannelProvider.init(factory); - ManagedChannelBuilder builder = Grpc.newChannelBuilder("localhost", - TlsChannelCredentials.create()); - ManagedChannel channel = builder.build(); - CallOptions callOptions = CallOptions.DEFAULT; - - ClientCall unused = channel.newCall(method, callOptions); - verify(interceptor) - .interceptCall(same(method), same(callOptions), ArgumentMatchers.any()); - channel.shutdownNow(); - LoggingChannelProvider.shutdown(); - } - - private static class NoopInterceptor implements ClientInterceptor { - @Override - public ClientCall interceptCall(MethodDescriptor method, - CallOptions callOptions, Channel next) { - return next.newCall(method, callOptions); - } - } -} diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingServerProviderTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingServerProviderTest.java deleted file mode 100644 index a5ec7966da7..00000000000 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingServerProviderTest.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Copyright 2022 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.gcp.observability; - -import static com.google.common.truth.Truth.assertThat; -import static org.junit.Assert.fail; -import static org.mockito.AdditionalAnswers.delegatesTo; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import io.grpc.Grpc; -import io.grpc.InsecureServerCredentials; -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; -import io.grpc.Metadata; -import io.grpc.Server; -import io.grpc.ServerBuilder; -import io.grpc.ServerCall; -import io.grpc.ServerCallHandler; -import io.grpc.ServerInterceptor; -import io.grpc.ServerProvider; -import io.grpc.gcp.observability.interceptors.ConfigFilterHelper; -import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor; -import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor.FactoryImpl; -import io.grpc.gcp.observability.interceptors.LogHelper; -import io.grpc.stub.StreamObserver; -import io.grpc.testing.GrpcCleanupRule; -import io.grpc.testing.protobuf.SimpleRequest; -import io.grpc.testing.protobuf.SimpleResponse; -import io.grpc.testing.protobuf.SimpleServiceGrpc; -import java.io.IOException; -import java.util.function.Supplier; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.ArgumentMatchers; - -@RunWith(JUnit4.class) -public class LoggingServerProviderTest { - @Rule public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); - - @Test - public void initTwiceCausesException() { - ServerProvider prevProvider = ServerProvider.provider(); - assertThat(prevProvider).isNotInstanceOf(LoggingServerProvider.class); - LogHelper mockLogHelper = mock(LogHelper.class); - ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class); - LoggingServerProvider.init( - new FactoryImpl(mockLogHelper, mockFilterHelper)); - assertThat(ServerProvider.provider()).isInstanceOf(ServerProvider.class); - try { - LoggingServerProvider.init( - new FactoryImpl(mockLogHelper, mockFilterHelper)); - fail("should have failed for calling init() again"); - } catch (IllegalStateException e) { - assertThat(e).hasMessageThat().contains("LoggingServerProvider already initialized!"); - } - LoggingServerProvider.shutdown(); - assertThat(ServerProvider.provider()).isSameInstanceAs(prevProvider); - } - - @Test - public void forPort_interceptorCalled() throws IOException { - serverBuilder_interceptorCalled(() -> ServerBuilder.forPort(0)); - } - - @Test - public void newServerBuilder_interceptorCalled() throws IOException { - serverBuilder_interceptorCalled( - () -> Grpc.newServerBuilderForPort(0, InsecureServerCredentials.create())); - } - - @SuppressWarnings("unchecked") - private void serverBuilder_interceptorCalled(Supplier> serverBuilderSupplier) - throws IOException { - ServerInterceptor interceptor = - mock(ServerInterceptor.class, delegatesTo(new NoopInterceptor())); - InternalLoggingServerInterceptor.Factory factory = mock( - InternalLoggingServerInterceptor.Factory.class); - when(factory.create()).thenReturn(interceptor); - LoggingServerProvider.init(factory); - Server server = serverBuilderSupplier.get().addService(new SimpleServiceImpl()).build().start(); - int port = cleanupRule.register(server).getPort(); - ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", port).usePlaintext() - .build(); - SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc.newBlockingStub( - cleanupRule.register(channel)); - assertThat(unaryRpc("buddy", stub)).isEqualTo("Hello buddy"); - verify(interceptor).interceptCall(any(ServerCall.class), any(Metadata.class), anyCallHandler()); - LoggingServerProvider.shutdown(); - } - - private ServerCallHandler anyCallHandler() { - return ArgumentMatchers.any(); - } - - private static String unaryRpc( - String requestMessage, SimpleServiceGrpc.SimpleServiceBlockingStub blockingStub) { - SimpleRequest request = SimpleRequest.newBuilder().setRequestMessage(requestMessage).build(); - SimpleResponse response = blockingStub.unaryRpc(request); - return response.getResponseMessage(); - } - - private static class SimpleServiceImpl extends SimpleServiceGrpc.SimpleServiceImplBase { - - @Override - public void unaryRpc(SimpleRequest req, StreamObserver responseObserver) { - SimpleResponse response = - SimpleResponse.newBuilder() - .setResponseMessage("Hello " + req.getRequestMessage()) - .build(); - responseObserver.onNext(response); - responseObserver.onCompleted(); - } - } - - private static class NoopInterceptor implements ServerInterceptor { - @Override - public ServerCall.Listener interceptCall( - ServerCall call, - Metadata headers, - ServerCallHandler next) { - return next.startCall(call, headers); - } - } -} diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java index 2b41c83fe38..b4dea62c047 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java @@ -28,10 +28,11 @@ import io.grpc.MethodDescriptor; import io.grpc.Server; import io.grpc.ServerBuilder; +import io.grpc.StaticTestingClassLoader; import io.grpc.gcp.observability.interceptors.ConfigFilterHelper; import io.grpc.gcp.observability.interceptors.ConfigFilterHelper.FilterParams; import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor; -import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor.FactoryImpl; +import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor; import io.grpc.gcp.observability.interceptors.LogHelper; import io.grpc.gcp.observability.logging.GcpLogSink; import io.grpc.gcp.observability.logging.Sink; @@ -41,9 +42,9 @@ import io.grpc.testing.GrpcCleanupRule; import io.grpc.testing.protobuf.SimpleServiceGrpc; import java.io.IOException; -import java.util.Map; +import java.util.regex.Pattern; +import org.junit.ClassRule; import org.junit.Ignore; -import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -52,132 +53,196 @@ @RunWith(JUnit4.class) public class LoggingTest { - @Rule - public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); + @ClassRule + public static final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); private static final String PROJECT_ID = "PROJECT"; - private static final Map locationTags = ImmutableMap.of( + private static final ImmutableMap LOCATION_TAGS = ImmutableMap.of( "project_id", "PROJECT", "location", "us-central1-c", "cluster_name", "grpc-observability-cluster", "namespace_name", "default" , "pod_name", "app1-6c7c58f897-n92c5"); - private static final Map customTags = ImmutableMap.of( + private static final ImmutableMap CUSTOM_TAGS = ImmutableMap.of( "KEY1", "Value1", "KEY2", "VALUE2"); - private static final long flushLimit = 100L; + private static final long FLUSH_LIMIT = 100L; + + private final StaticTestingClassLoader classLoader = + new StaticTestingClassLoader(getClass().getClassLoader(), Pattern.compile("io\\.grpc\\..*")); /** - * Cloud logging test using LoggingChannelProvider and LoggingServerProvider. - * - *

Ignoring test, because it calls external CLoud Logging APIs. - * To test cloud logging setup, - * 1. Set up Cloud Logging Auth credentials - * 2. Assign permissions to service account to write logs to project specified by + * Cloud logging test using GlobalInterceptors. + * + *

Ignoring test, because it calls external Cloud Logging APIs. + * To test cloud logging setup locally, + * 1. Set up Cloud auth credentials + * 2. Assign permissions to service account to write logs to project specified by * variable PROJECT_ID * 3. Comment @Ignore annotation + * 4. This test is expected to pass when ran with above setup. This has been verified manually. *

*/ @Ignore @Test - public void clientServer_interceptorCalled_logAlways() - throws IOException { - Sink sink = new GcpLogSink(PROJECT_ID, locationTags, customTags, flushLimit); - LogHelper spyLogHelper = spy(new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER)); - ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class); - FilterParams logAlwaysFilterParams = - FilterParams.create(true, 0, 0); - when(mockFilterHelper.isMethodToBeLogged(any(MethodDescriptor.class))) - .thenReturn(logAlwaysFilterParams); - when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class))) - .thenReturn(true); - LoggingServerProvider.init( - new FactoryImpl(spyLogHelper, mockFilterHelper)); - Server server = ServerBuilder.forPort(0).addService(new LoggingTestHelper.SimpleServiceImpl()) - .build().start(); - int port = cleanupRule.register(server).getPort(); - LoggingChannelProvider.init( - new InternalLoggingChannelInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper)); - SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc.newBlockingStub( - cleanupRule.register(ManagedChannelBuilder.forAddress("localhost", port) - .usePlaintext().build())); - assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) - .isEqualTo("Hello buddy"); - assertThat(Mockito.mockingDetails(spyLogHelper).getInvocations().size()).isGreaterThan(11); - sink.close(); - LoggingChannelProvider.shutdown(); - LoggingServerProvider.shutdown(); + public void clientServer_interceptorCalled_logAlways() throws Exception { + Class runnable = + classLoader.loadClass(LoggingTest.StaticTestingClassEndtoEndLogging.class.getName()); + ((Runnable) runnable.getDeclaredConstructor().newInstance()).run(); } @Test - public void clientServer_interceptorCalled_logNever() throws IOException { - Sink mockSink = mock(GcpLogSink.class); - LogHelper spyLogHelper = spy(new LogHelper(mockSink, TimeProvider.SYSTEM_TIME_PROVIDER)); - ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class); - FilterParams logNeverFilterParams = - FilterParams.create(false, 0, 0); - when(mockFilterHelper.isMethodToBeLogged(any(MethodDescriptor.class))) - .thenReturn(logNeverFilterParams); - when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class))) - .thenReturn(true); - LoggingServerProvider.init( - new FactoryImpl(spyLogHelper, mockFilterHelper)); - Server server = ServerBuilder.forPort(0).addService(new LoggingTestHelper.SimpleServiceImpl()) - .build().start(); - int port = cleanupRule.register(server).getPort(); - LoggingChannelProvider.init( - new InternalLoggingChannelInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper)); - SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc.newBlockingStub( - cleanupRule.register(ManagedChannelBuilder.forAddress("localhost", port) - .usePlaintext().build())); - assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) - .isEqualTo("Hello buddy"); - verifyNoInteractions(spyLogHelper); - verifyNoInteractions(mockSink); - LoggingChannelProvider.shutdown(); - LoggingServerProvider.shutdown(); + public void clientServer_interceptorCalled_logNever() throws Exception { + Class runnable = + classLoader.loadClass(LoggingTest.StaticTestingClassLogNever.class.getName()); + ((Runnable) runnable.getDeclaredConstructor().newInstance()).run(); } @Test - public void clientServer_interceptorCalled_doNotLogMessageEvents() throws IOException { - LogHelper mockLogHelper = mock(LogHelper.class); - ConfigFilterHelper mockFilterHelper2 = mock(ConfigFilterHelper.class); - FilterParams logAlwaysFilterParams = - FilterParams.create(true, 0, 0); - when(mockFilterHelper2.isMethodToBeLogged(any(MethodDescriptor.class))) - .thenReturn(logAlwaysFilterParams); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_HEADER)) - .thenReturn(true); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_HEADER)) - .thenReturn(true); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_HALF_CLOSE)) - .thenReturn(true); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_TRAILER)) - .thenReturn(true); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_CANCEL)) - .thenReturn(true); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_MESSAGE)) - .thenReturn(false); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_MESSAGE)) - .thenReturn(false); - LoggingServerProvider.init( - new FactoryImpl(mockLogHelper, mockFilterHelper2)); - Server server = ServerBuilder.forPort(0).addService(new LoggingTestHelper.SimpleServiceImpl()) - .build().start(); - int port = cleanupRule.register(server).getPort(); - LoggingChannelProvider.init( - new InternalLoggingChannelInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper2)); - SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc.newBlockingStub( - cleanupRule.register(ManagedChannelBuilder.forAddress("localhost", port) - .usePlaintext().build())); - assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) - .isEqualTo("Hello buddy"); - // Total number of calls should have been 14 (6 from client and 6 from server) - // Since cancel is not invoked, it will be 12. - // Request message(Total count:2 (1 from client and 1 from server) and Response message(count:2) - // events are not in the event_types list, i.e 14 - 2(cancel) - 2(req_msg) - 2(resp_msg) = 8 - assertThat(Mockito.mockingDetails(mockLogHelper).getInvocations().size()).isEqualTo(8); - LoggingChannelProvider.shutdown(); - LoggingServerProvider.shutdown(); + public void clientServer_interceptorCalled_logFewEvents() throws Exception { + Class runnable = + classLoader.loadClass(LoggingTest.StaticTestingClassLogFewEvents.class.getName()); + ((Runnable) runnable.getDeclaredConstructor().newInstance()).run(); + } + + // UsedReflectively + public static final class StaticTestingClassEndtoEndLogging implements Runnable { + + @Override + public void run() { + Sink sink = new GcpLogSink(PROJECT_ID, LOCATION_TAGS, CUSTOM_TAGS, FLUSH_LIMIT); + ObservabilityConfig config = mock(ObservabilityConfig.class); + LogHelper spyLogHelper = spy(new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER)); + ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class); + InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = + new InternalLoggingChannelInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper); + InternalLoggingServerInterceptor.Factory serverInterceptorFactory = + new InternalLoggingServerInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper); + + when(config.isEnableCloudLogging()).thenReturn(true); + FilterParams logAlwaysFilterParams = FilterParams.create(true, 0, 0); + when(mockFilterHelper.isMethodToBeLogged(any(MethodDescriptor.class))) + .thenReturn(logAlwaysFilterParams); + when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class))).thenReturn(true); + + try (GcpObservability unused = + GcpObservability.grpcInit( + sink, config, channelInterceptorFactory, serverInterceptorFactory)) { + Server server = + ServerBuilder.forPort(0) + .addService(new LoggingTestHelper.SimpleServiceImpl()) + .build() + .start(); + int port = cleanupRule.register(server).getPort(); + SimpleServiceGrpc.SimpleServiceBlockingStub stub = + SimpleServiceGrpc.newBlockingStub( + cleanupRule.register( + ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build())); + assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) + .isEqualTo("Hello buddy"); + assertThat(Mockito.mockingDetails(spyLogHelper).getInvocations().size()).isGreaterThan(11); + } catch (IOException e) { + throw new AssertionError("Exception while testing logging", e); + } + } + } + + public static final class StaticTestingClassLogNever implements Runnable { + + @Override + public void run() { + Sink mockSink = mock(GcpLogSink.class); + ObservabilityConfig config = mock(ObservabilityConfig.class); + LogHelper spyLogHelper = spy(new LogHelper(mockSink, TimeProvider.SYSTEM_TIME_PROVIDER)); + ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class); + InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = + new InternalLoggingChannelInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper); + InternalLoggingServerInterceptor.Factory serverInterceptorFactory = + new InternalLoggingServerInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper); + + when(config.isEnableCloudLogging()).thenReturn(true); + FilterParams logNeverFilterParams = FilterParams.create(false, 0, 0); + when(mockFilterHelper.isMethodToBeLogged(any(MethodDescriptor.class))) + .thenReturn(logNeverFilterParams); + when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class))).thenReturn(true); + + try (GcpObservability unused = + GcpObservability.grpcInit( + mockSink, config, channelInterceptorFactory, serverInterceptorFactory)) { + Server server = + ServerBuilder.forPort(0) + .addService(new LoggingTestHelper.SimpleServiceImpl()) + .build() + .start(); + int port = cleanupRule.register(server).getPort(); + SimpleServiceGrpc.SimpleServiceBlockingStub stub = + SimpleServiceGrpc.newBlockingStub( + cleanupRule.register( + ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build())); + assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) + .isEqualTo("Hello buddy"); + verifyNoInteractions(spyLogHelper); + verifyNoInteractions(mockSink); + } catch (IOException e) { + throw new AssertionError("Exception while testing logging event filter", e); + } + } + } + + public static final class StaticTestingClassLogFewEvents implements Runnable { + + @Override + public void run() { + Sink mockSink = mock(GcpLogSink.class); + ObservabilityConfig config = mock(ObservabilityConfig.class); + LogHelper mockLogHelper = mock(LogHelper.class); + ConfigFilterHelper mockFilterHelper2 = mock(ConfigFilterHelper.class); + InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = + new InternalLoggingChannelInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper2); + InternalLoggingServerInterceptor.Factory serverInterceptorFactory = + new InternalLoggingServerInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper2); + + when(config.isEnableCloudLogging()).thenReturn(true); + FilterParams logAlwaysFilterParams = FilterParams.create(true, 0, 0); + when(mockFilterHelper2.isMethodToBeLogged(any(MethodDescriptor.class))) + .thenReturn(logAlwaysFilterParams); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_HEADER)) + .thenReturn(true); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_HEADER)) + .thenReturn(true); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_HALF_CLOSE)).thenReturn(true); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_TRAILER)).thenReturn(true); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_CANCEL)).thenReturn(true); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_MESSAGE)) + .thenReturn(false); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_MESSAGE)) + .thenReturn(false); + + try (GcpObservability observability = + GcpObservability.grpcInit( + mockSink, config, channelInterceptorFactory, serverInterceptorFactory)) { + Server server = + ServerBuilder.forPort(0) + .addService(new LoggingTestHelper.SimpleServiceImpl()) + .build() + .start(); + int port = cleanupRule.register(server).getPort(); + SimpleServiceGrpc.SimpleServiceBlockingStub stub = + SimpleServiceGrpc.newBlockingStub( + cleanupRule.register( + ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build())); + assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) + .isEqualTo("Hello buddy"); + // Total number of calls should have been 14 (6 from client and 6 from server) + // Since cancel is not invoked, it will be 12. + // Request message(Total count:2 (1 from client and 1 from server) and Response + // message(count:2) + // events are not in the event_types list, i.e 14 - 2(cancel) - 2(req_msg) - 2(resp_msg) + // = 8 + assertThat(Mockito.mockingDetails(mockLogHelper).getInvocations().size()).isEqualTo(8); + } catch (IOException e) { + throw new AssertionError("Exception while testing logging event filter", e); + } + } } } diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/MetricsTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/MetricsTest.java new file mode 100644 index 00000000000..f967b99fbcb --- /dev/null +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/MetricsTest.java @@ -0,0 +1,159 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.gcp.observability; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.cloud.monitoring.v3.MetricServiceClient; +import com.google.cloud.monitoring.v3.MetricServiceClient.ListTimeSeriesPagedResponse; +import com.google.monitoring.v3.ListTimeSeriesRequest; +import com.google.monitoring.v3.ProjectName; +import com.google.monitoring.v3.TimeInterval; +import com.google.monitoring.v3.TimeSeries; +import com.google.protobuf.util.Timestamps; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.StaticTestingClassLoader; +import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor; +import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor; +import io.grpc.gcp.observability.logging.GcpLogSink; +import io.grpc.gcp.observability.logging.Sink; +import io.grpc.testing.GrpcCleanupRule; +import io.grpc.testing.protobuf.SimpleServiceGrpc; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class MetricsTest { + + @ClassRule + public static final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); + + private static final String PROJECT_ID = "PROJECT"; + private static final String TEST_CLIENT_METHOD = "grpc.testing.SimpleService/UnaryRpc"; + private static final String CUSTOM_TAG_KEY = "Version"; + private static final String CUSTOM_TAG_VALUE = + String.format("C67J9A-%s", String.valueOf(System.currentTimeMillis())); + private static final Map CUSTOM_TAGS = Collections.singletonMap(CUSTOM_TAG_KEY, + CUSTOM_TAG_VALUE); + + private final StaticTestingClassLoader classLoader = + new StaticTestingClassLoader(getClass().getClassLoader(), + Pattern.compile("io\\.grpc\\..*|io\\.opencensus\\..*")); + + /** + * End to end cloud monitoring test. + * + *

Ignoring test, because it calls external Cloud Monitoring APIs. To test cloud monitoring + * setup locally, + * 1. Set up Cloud auth credentials + * 2. Assign permissions to service account to write metrics to project specified by variable + * PROJECT_ID + * 3. Comment @Ignore annotation + * 4. This test is expected to pass when ran with above setup. This has been verified manually. + */ + @Ignore + @Test + public void testMetricsExporter() throws Exception { + Class runnable = + classLoader.loadClass(MetricsTest.StaticTestingClassTestMetricsExporter.class.getName()); + ((Runnable) runnable.getDeclaredConstructor().newInstance()).run(); + } + + public static final class StaticTestingClassTestMetricsExporter implements Runnable { + + @Override + public void run() { + Sink mockSink = mock(GcpLogSink.class); + ObservabilityConfig mockConfig = mock(ObservabilityConfig.class); + InternalLoggingChannelInterceptor.Factory mockChannelInterceptorFactory = + mock(InternalLoggingChannelInterceptor.Factory.class); + InternalLoggingServerInterceptor.Factory mockServerInterceptorFactory = + mock(InternalLoggingServerInterceptor.Factory.class); + + when(mockConfig.isEnableCloudMonitoring()).thenReturn(true); + when(mockConfig.getDestinationProjectId()).thenReturn(PROJECT_ID); + + try { + GcpObservability observability = + GcpObservability.grpcInit( + mockSink, mockConfig, mockChannelInterceptorFactory, mockServerInterceptorFactory); + observability.registerStackDriverExporter(PROJECT_ID, CUSTOM_TAGS); + + Server server = + ServerBuilder.forPort(0) + .addService(new LoggingTestHelper.SimpleServiceImpl()) + .build() + .start(); + int port = cleanupRule.register(server).getPort(); + SimpleServiceGrpc.SimpleServiceBlockingStub stub = + SimpleServiceGrpc.newBlockingStub( + cleanupRule.register( + ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build())); + assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) + .isEqualTo("Hello buddy"); + // Adding sleep to ensure metrics are exported before querying cloud monitoring backend + TimeUnit.SECONDS.sleep(40); + + // This checks Cloud monitoring for the new metrics that was just exported. + MetricServiceClient metricServiceClient = MetricServiceClient.create(); + // Restrict time to last 1 minute + long startMillis = System.currentTimeMillis() - ((60 * 1) * 1000); + TimeInterval interval = + TimeInterval.newBuilder() + .setStartTime(Timestamps.fromMillis(startMillis)) + .setEndTime(Timestamps.fromMillis(System.currentTimeMillis())) + .build(); + // Timeseries data + String metricsFilter = + String.format( + "metric.type=\"custom.googleapis.com/opencensus/grpc.io/client/completed_rpcs\"" + + " AND metric.labels.grpc_client_method=\"%s\"" + + " AND metric.labels.%s=%s", + TEST_CLIENT_METHOD, CUSTOM_TAG_KEY, CUSTOM_TAG_VALUE); + ListTimeSeriesRequest metricsRequest = + ListTimeSeriesRequest.newBuilder() + .setName(ProjectName.of(PROJECT_ID).toString()) + .setFilter(metricsFilter) + .setInterval(interval) + .build(); + ListTimeSeriesPagedResponse response = metricServiceClient.listTimeSeries(metricsRequest); + assertThat(response.iterateAll()).isNotEmpty(); + for (TimeSeries ts : response.iterateAll()) { + assertThat(ts.getMetric().getLabelsMap().get("grpc_client_method")) + .isEqualTo(TEST_CLIENT_METHOD); + assertThat(ts.getMetric().getLabelsMap().get("grpc_client_status")).isEqualTo("OK"); + assertThat(ts.getPoints(0).getValue().getInt64Value()).isEqualTo(1); + } + observability.close(); + } catch (IOException | InterruptedException e) { + throw new AssertionError("Exception while testing metrics", e); + } + } + } +} diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/TracesTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/TracesTest.java new file mode 100644 index 00000000000..ec759827737 --- /dev/null +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/TracesTest.java @@ -0,0 +1,165 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.gcp.observability; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.cloud.trace.v1.TraceServiceClient; +import com.google.cloud.trace.v1.TraceServiceClient.ListTracesPagedResponse; +import com.google.devtools.cloudtrace.v1.GetTraceRequest; +import com.google.devtools.cloudtrace.v1.ListTracesRequest; +import com.google.devtools.cloudtrace.v1.Trace; +import com.google.devtools.cloudtrace.v1.TraceSpan; +import com.google.protobuf.util.Timestamps; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.StaticTestingClassLoader; +import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor; +import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor; +import io.grpc.gcp.observability.logging.GcpLogSink; +import io.grpc.gcp.observability.logging.Sink; +import io.grpc.testing.GrpcCleanupRule; +import io.grpc.testing.protobuf.SimpleServiceGrpc; +import io.opencensus.trace.samplers.Samplers; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class TracesTest { + + @ClassRule + public static final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); + + private static final String PROJECT_ID = "PROJECT"; + private static final String CUSTOM_TAG_KEY = "service"; + private static final String CUSTOM_TAG_VALUE = + String.format("payment-%s", String.valueOf(System.currentTimeMillis())); + private static final Map CUSTOM_TAGS = + Collections.singletonMap(CUSTOM_TAG_KEY, CUSTOM_TAG_VALUE); + + private final StaticTestingClassLoader classLoader = + new StaticTestingClassLoader(getClass().getClassLoader(), + Pattern.compile("io\\.grpc\\..*|io\\.opencensus\\..*")); + + /** + * End to end cloud trace test. + * + *

Ignoring test, because it calls external Cloud Tracing APIs. To test cloud trace setup + * locally, + * 1. Set up Cloud auth credentials + * 2. Assign permissions to service account to write traces to project specified by variable + * PROJECT_ID + * 3. Comment @Ignore annotation + * 4. This test is expected to pass when ran with above setup. This has been verified manually. + */ + @Ignore + @Test + public void testTracesExporter() throws Exception { + Class runnable = + classLoader.loadClass(TracesTest.StaticTestingClassTestTracesExporter.class.getName()); + ((Runnable) runnable.getDeclaredConstructor().newInstance()).run(); + } + + public static final class StaticTestingClassTestTracesExporter implements Runnable { + + @Override + public void run() { + Sink mockSink = mock(GcpLogSink.class); + ObservabilityConfig mockConfig = mock(ObservabilityConfig.class); + InternalLoggingChannelInterceptor.Factory mockChannelInterceptorFactory = + mock(InternalLoggingChannelInterceptor.Factory.class); + InternalLoggingServerInterceptor.Factory mockServerInterceptorFactory = + mock(InternalLoggingServerInterceptor.Factory.class); + + when(mockConfig.isEnableCloudTracing()).thenReturn(true); + when(mockConfig.getSampler()).thenReturn(Samplers.alwaysSample()); + when(mockConfig.getDestinationProjectId()).thenReturn(PROJECT_ID); + + try { + GcpObservability observability = + GcpObservability.grpcInit( + mockSink, mockConfig, mockChannelInterceptorFactory, mockServerInterceptorFactory); + observability.registerStackDriverExporter(PROJECT_ID, CUSTOM_TAGS); + + Server server = + ServerBuilder.forPort(0) + .addService(new LoggingTestHelper.SimpleServiceImpl()) + .build() + .start(); + int port = cleanupRule.register(server).getPort(); + SimpleServiceGrpc.SimpleServiceBlockingStub stub = + SimpleServiceGrpc.newBlockingStub( + cleanupRule.register( + ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build())); + assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) + .isEqualTo("Hello buddy"); + // Adding sleep to ensure traces are exported before querying cloud tracing backend + TimeUnit.SECONDS.sleep(10); + + TraceServiceClient traceServiceClient = TraceServiceClient.create(); + String traceFilter = + String.format( + "span:Sent.grpc.testing.SimpleService +%s:%s", CUSTOM_TAG_KEY, CUSTOM_TAG_VALUE); + String traceOrder = "start"; + // Restrict time to last 1 minute + long startMillis = System.currentTimeMillis() - ((60 * 1) * 1000); + ListTracesRequest traceRequest = + ListTracesRequest.newBuilder() + .setProjectId(PROJECT_ID) + .setStartTime(Timestamps.fromMillis(startMillis)) + .setEndTime(Timestamps.fromMillis(System.currentTimeMillis())) + .setFilter(traceFilter) + .setOrderBy(traceOrder) + .build(); + ListTracesPagedResponse traceResponse = traceServiceClient.listTraces(traceRequest); + assertThat(traceResponse.iterateAll()).isNotEmpty(); + List traceIdList = new ArrayList<>(); + for (Trace t : traceResponse.iterateAll()) { + traceIdList.add(t.getTraceId()); + } + + for (String traceId : traceIdList) { + // This checks Cloud trace for the new trace that was just created. + GetTraceRequest getTraceRequest = + GetTraceRequest.newBuilder().setProjectId(PROJECT_ID).setTraceId(traceId).build(); + Trace trace = traceServiceClient.getTrace(getTraceRequest); + assertThat(trace.getSpansList()).hasSize(3); + for (TraceSpan span : trace.getSpansList()) { + assertThat(span.getName()).contains("grpc.testing.SimpleService.UnaryRpc"); + assertThat(span.getLabelsMap().get(CUSTOM_TAG_KEY)).isEqualTo(CUSTOM_TAG_VALUE); + } + } + observability.close(); + } catch (IOException | InterruptedException e) { + throw new AssertionError("Exception while testing traces", e); + } + } + } +} diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java index 5c50679b531..031313c7304 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java @@ -18,7 +18,6 @@ import static com.google.common.truth.Truth.assertThat; import static org.mockito.ArgumentMatchers.anyIterable; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -39,12 +38,12 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.ArgumentCaptor; +import org.mockito.Mock; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; @@ -57,75 +56,65 @@ public class GcpLogSinkTest { @Rule public final MockitoRule mockito = MockitoJUnit.rule(); - private static final Map locationTags = ImmutableMap.of("project_id", "PROJECT", + private static final ImmutableMap LOCATION_TAGS = + ImmutableMap.of("project_id", "PROJECT", "location", "us-central1-c", "cluster_name", "grpc-observability-cluster", "namespace_name", "default" , "pod_name", "app1-6c7c58f897-n92c5"); - private static final Map customTags = ImmutableMap.of("KEY1", "Value1", + private static final ImmutableMap CUSTOM_TAGS = + ImmutableMap.of("KEY1", "Value1", "KEY2", "VALUE2"); - private static final long flushLimit = 10L; - // gRPC is expected to alway use this log name when reporting to GCP cloud logging. - private static final String expectedLogName = + private static final long FLUSH_LIMIT = 10L; + // gRPC is expected to always use this log name when reporting to GCP cloud logging. + private static final String EXPECTED_LOG_NAME = "microservices.googleapis.com%2Fobservability%2Fgrpc"; - private final long seqId = 1; - private final String destProjectName = "PROJECT"; - private final String serviceName = "service"; - private final String methodName = "method"; - private final String authority = "authority"; - private final Duration timeout = Durations.fromMillis(1234); - private final String rpcId = "d155e885-9587-4e77-81f7-3aa5a443d47f"; - private final GrpcLogRecord logProto = GrpcLogRecord.newBuilder() - .setSequenceId(seqId) - .setServiceName(serviceName) - .setMethodName(methodName) - .setAuthority(authority) - .setTimeout(timeout) + private static final long SEQ_ID = 1; + private static final String DEST_PROJECT_NAME = "PROJECT"; + private static final String SERVICE_NAME = "service"; + private static final String METHOD_NAME = "method"; + private static final String AUTHORITY = "authority"; + private static final Duration TIMEOUT = Durations.fromMillis(1234); + private static final String RPC_ID = "d155e885-9587-4e77-81f7-3aa5a443d47f"; + private static final GrpcLogRecord LOG_PROTO = GrpcLogRecord.newBuilder() + .setSequenceId(SEQ_ID) + .setServiceName(SERVICE_NAME) + .setMethodName(METHOD_NAME) + .setAuthority(AUTHORITY) + .setTimeout(TIMEOUT) .setEventType(EventType.GRPC_CALL_REQUEST_HEADER) .setEventLogger(EventLogger.LOGGER_CLIENT) - .setRpcId(rpcId) + .setRpcId(RPC_ID) .build(); - private final Struct expectedStructLogProto = Struct.newBuilder() - .putFields("sequence_id", Value.newBuilder().setStringValue(String.valueOf(seqId)).build()) - .putFields("service_name", Value.newBuilder().setStringValue(serviceName).build()) - .putFields("method_name", Value.newBuilder().setStringValue(methodName).build()) - .putFields("authority", Value.newBuilder().setStringValue(authority).build()) + private static final Struct EXPECTED_STRUCT_LOG_PROTO = Struct.newBuilder() + .putFields("sequence_id", Value.newBuilder().setStringValue(String.valueOf(SEQ_ID)).build()) + .putFields("service_name", Value.newBuilder().setStringValue(SERVICE_NAME).build()) + .putFields("method_name", Value.newBuilder().setStringValue(METHOD_NAME).build()) + .putFields("authority", Value.newBuilder().setStringValue(AUTHORITY).build()) .putFields("timeout", Value.newBuilder().setStringValue("1.234s").build()) .putFields("event_type", Value.newBuilder().setStringValue( String.valueOf(EventType.GRPC_CALL_REQUEST_HEADER)).build()) .putFields("event_logger", Value.newBuilder().setStringValue( String.valueOf(EventLogger.LOGGER_CLIENT)).build()) - .putFields("rpc_id", Value.newBuilder().setStringValue(rpcId).build()) + .putFields("rpc_id", Value.newBuilder().setStringValue(RPC_ID).build()) .build(); + @Mock private Logging mockLogging; - @Before - public void setUp() { - mockLogging = mock(Logging.class); - } - - @Test - public void createSink() { - Sink mockSink = new GcpLogSink(mockLogging, destProjectName, locationTags, - customTags, flushLimit); - assertThat(mockSink).isInstanceOf(GcpLogSink.class); - } - @Test @SuppressWarnings("unchecked") public void verifyWrite() throws Exception { - Sink mockSink = new GcpLogSink(mockLogging, destProjectName, locationTags, - customTags, flushLimit); - mockSink.write(logProto); + GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS, + CUSTOM_TAGS, FLUSH_LIMIT); + sink.write(LOG_PROTO); ArgumentCaptor> logEntrySetCaptor = ArgumentCaptor.forClass( (Class) Collection.class); verify(mockLogging, times(1)).write(logEntrySetCaptor.capture()); for (Iterator it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) { LogEntry entry = it.next(); - System.out.println(entry); - assertThat(entry.getPayload().getData()).isEqualTo(expectedStructLogProto); - assertThat(entry.getLogName()).isEqualTo(expectedLogName); + assertThat(entry.getPayload().getData()).isEqualTo(EXPECTED_STRUCT_LOG_PROTO); + assertThat(entry.getLogName()).isEqualTo(EXPECTED_LOG_NAME); } verifyNoMoreInteractions(mockLogging); } @@ -133,10 +122,10 @@ public void verifyWrite() throws Exception { @Test @SuppressWarnings("unchecked") public void verifyWriteWithTags() { - GcpLogSink mockSink = new GcpLogSink(mockLogging, destProjectName, locationTags, - customTags, flushLimit); - MonitoredResource expectedMonitoredResource = GcpLogSink.getResource(locationTags); - mockSink.write(logProto); + GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS, + CUSTOM_TAGS, FLUSH_LIMIT); + MonitoredResource expectedMonitoredResource = GcpLogSink.getResource(LOCATION_TAGS); + sink.write(LOG_PROTO); ArgumentCaptor> logEntrySetCaptor = ArgumentCaptor.forClass( (Class) Collection.class); @@ -145,9 +134,9 @@ public void verifyWriteWithTags() { for (Iterator it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) { LogEntry entry = it.next(); assertThat(entry.getResource()).isEqualTo(expectedMonitoredResource); - assertThat(entry.getLabels()).isEqualTo(customTags); - assertThat(entry.getPayload().getData()).isEqualTo(expectedStructLogProto); - assertThat(entry.getLogName()).isEqualTo(expectedLogName); + assertThat(entry.getLabels()).isEqualTo(CUSTOM_TAGS); + assertThat(entry.getPayload().getData()).isEqualTo(EXPECTED_STRUCT_LOG_PROTO); + assertThat(entry.getLogName()).isEqualTo(EXPECTED_LOG_NAME); } verifyNoMoreInteractions(mockLogging); } @@ -157,9 +146,9 @@ public void verifyWriteWithTags() { public void emptyCustomTags_labelsNotSet() { Map emptyCustomTags = null; Map expectedEmptyLabels = new HashMap<>(); - GcpLogSink mockSink = new GcpLogSink(mockLogging, destProjectName, locationTags, - emptyCustomTags, flushLimit); - mockSink.write(logProto); + GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS, + emptyCustomTags, FLUSH_LIMIT); + sink.write(LOG_PROTO); ArgumentCaptor> logEntrySetCaptor = ArgumentCaptor.forClass( (Class) Collection.class); @@ -167,7 +156,7 @@ public void emptyCustomTags_labelsNotSet() { for (Iterator it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) { LogEntry entry = it.next(); assertThat(entry.getLabels()).isEqualTo(expectedEmptyLabels); - assertThat(entry.getPayload().getData()).isEqualTo(expectedStructLogProto); + assertThat(entry.getPayload().getData()).isEqualTo(EXPECTED_STRUCT_LOG_PROTO); } } @@ -176,11 +165,11 @@ public void emptyCustomTags_labelsNotSet() { public void emptyCustomTags_setSourceProject() { Map emptyCustomTags = null; String destinationProjectId = "DESTINATION_PROJECT"; - Map expectedLabels = GcpLogSink.getCustomTags(emptyCustomTags, locationTags, + Map expectedLabels = GcpLogSink.getCustomTags(emptyCustomTags, LOCATION_TAGS, destinationProjectId); - GcpLogSink mockSink = new GcpLogSink(mockLogging, destinationProjectId, locationTags, - emptyCustomTags, flushLimit); - mockSink.write(logProto); + GcpLogSink sink = new GcpLogSink(mockLogging, destinationProjectId, LOCATION_TAGS, + emptyCustomTags, FLUSH_LIMIT); + sink.write(LOG_PROTO); ArgumentCaptor> logEntrySetCaptor = ArgumentCaptor.forClass( (Class) Collection.class); @@ -188,31 +177,31 @@ public void emptyCustomTags_setSourceProject() { for (Iterator it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) { LogEntry entry = it.next(); assertThat(entry.getLabels()).isEqualTo(expectedLabels); - assertThat(entry.getPayload().getData()).isEqualTo(expectedStructLogProto); + assertThat(entry.getPayload().getData()).isEqualTo(EXPECTED_STRUCT_LOG_PROTO); } } @Test public void verifyFlush() { long lowerFlushLimit = 2L; - GcpLogSink mockSink = new GcpLogSink(mockLogging, destProjectName, locationTags, - customTags, lowerFlushLimit); - mockSink.write(logProto); + GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS, + CUSTOM_TAGS, lowerFlushLimit); + sink.write(LOG_PROTO); verify(mockLogging, never()).flush(); - mockSink.write(logProto); + sink.write(LOG_PROTO); verify(mockLogging, times(1)).flush(); - mockSink.write(logProto); - mockSink.write(logProto); + sink.write(LOG_PROTO); + sink.write(LOG_PROTO); verify(mockLogging, times(2)).flush(); } @Test public void verifyClose() throws Exception { - Sink mockSink = new GcpLogSink(mockLogging, destProjectName, locationTags, - customTags, flushLimit); - mockSink.write(logProto); + GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS, + CUSTOM_TAGS, FLUSH_LIMIT); + sink.write(LOG_PROTO); verify(mockLogging, times(1)).write(anyIterable()); - mockSink.close(); + sink.close(); verify(mockLogging).close(); verifyNoMoreInteractions(mockLogging); }