From 935a02f6437743d1ba93ffceaeee5306201f42da Mon Sep 17 00:00:00 2001 From: Vindhya Ningegowda Date: Wed, 22 Jun 2022 18:42:09 -0700 Subject: [PATCH 1/7] Populate global interceptors from observability and added stackdriver exporters --- .../gcp/observability/GcpObservability.java | 137 ++++++++++++++++-- .../observability/ObservabilityConfig.java | 31 +--- .../ObservabilityConfigImpl.java | 19 +-- .../observability/GcpObservabilityTest.java | 22 +-- .../ObservabilityConfigImplTest.java | 64 +------- 5 files changed, 152 insertions(+), 121 deletions(-) diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java index e65f4d8ef01..9331e7a0e42 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java @@ -19,8 +19,14 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.annotations.VisibleForTesting; +import io.grpc.ClientInterceptor; import io.grpc.ExperimentalApi; +import io.grpc.InternalGlobalInterceptors; import io.grpc.ManagedChannelProvider.ProviderNotFoundException; +import io.grpc.ServerInterceptor; +import io.grpc.ServerStreamTracer; +import io.grpc.census.InternalCensusStatsAccessor; +import io.grpc.census.InternalCensusTracingAccessor; import io.grpc.gcp.observability.interceptors.ConfigFilterHelper; import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor; import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor; @@ -28,13 +34,25 @@ import io.grpc.gcp.observability.logging.GcpLogSink; import io.grpc.gcp.observability.logging.Sink; import io.grpc.internal.TimeProvider; +import io.opencensus.contrib.grpc.metrics.RpcViews; +import io.opencensus.exporter.stats.stackdriver.StackdriverStatsConfiguration; +import io.opencensus.exporter.stats.stackdriver.StackdriverStatsExporter; +import io.opencensus.exporter.trace.stackdriver.StackdriverTraceConfiguration; +import io.opencensus.exporter.trace.stackdriver.StackdriverTraceExporter; +import io.opencensus.trace.Tracing; +import io.opencensus.trace.config.TraceConfig; import java.io.IOException; +import java.util.ArrayList; +import java.util.logging.Level; +import java.util.logging.Logger; /** The main class for gRPC Google Cloud Platform Observability features. */ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/8869") public final class GcpObservability implements AutoCloseable { + private static final Logger logger = Logger.getLogger(GcpObservability.class.getName()); private static GcpObservability instance = null; private final Sink sink; + private final ObservabilityConfig config; /** * Initialize grpc-observability. @@ -45,23 +63,33 @@ public static synchronized GcpObservability grpcInit() throws IOException { if (instance == null) { GlobalLoggingTags globalLoggingTags = new GlobalLoggingTags(); ObservabilityConfigImpl observabilityConfig = ObservabilityConfigImpl.getInstance(); - Sink sink = new GcpLogSink(observabilityConfig.getDestinationProjectId(), - globalLoggingTags.getLocationTags(), globalLoggingTags.getCustomTags(), - observabilityConfig.getFlushMessageCount()); + Sink sink = + new GcpLogSink( + observabilityConfig.getDestinationProjectId(), + globalLoggingTags.getLocationTags(), + globalLoggingTags.getCustomTags(), + observabilityConfig.getFlushMessageCount()); LogHelper helper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER); ConfigFilterHelper configFilterHelper = ConfigFilterHelper.factory(observabilityConfig); - instance = grpcInit(sink, - new InternalLoggingChannelInterceptor.FactoryImpl(helper, configFilterHelper), - new InternalLoggingServerInterceptor.FactoryImpl(helper, configFilterHelper)); + instance = + grpcInit( + sink, + observabilityConfig, + new InternalLoggingChannelInterceptor.FactoryImpl(helper, configFilterHelper), + new InternalLoggingServerInterceptor.FactoryImpl(helper, configFilterHelper)); } return instance; } - @VisibleForTesting static GcpObservability grpcInit(Sink sink, + @VisibleForTesting + static GcpObservability grpcInit( + Sink sink, + ObservabilityConfig config, InternalLoggingChannelInterceptor.Factory channelInterceptorFactory, InternalLoggingServerInterceptor.Factory serverInterceptorFactory) { if (instance == null) { - instance = new GcpObservability(sink, channelInterceptorFactory, serverInterceptorFactory); + instance = + new GcpObservability(sink, config, channelInterceptorFactory, serverInterceptorFactory); } return instance; } @@ -76,15 +104,106 @@ public void close() { LoggingChannelProvider.shutdown(); LoggingServerProvider.shutdown(); sink.close(); + unRegisterStackDriverExporter(); instance = null; } } - private GcpObservability(Sink sink, + private GcpObservability( + Sink sink, + ObservabilityConfig config, InternalLoggingChannelInterceptor.Factory channelInterceptorFactory, InternalLoggingServerInterceptor.Factory serverInterceptorFactory) { this.sink = checkNotNull(sink); + this.config = checkNotNull(config); + + ArrayList clientInterceptors = new ArrayList<>(); + ArrayList serverInterceptors = new ArrayList<>(); + ArrayList tracerFactories = new ArrayList<>(); + String projectId = config.getDestinationProjectId(); + LogHelper logHelper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER); + ConfigFilterHelper logFilterHelper = ConfigFilterHelper.factory(config); + + if (config.isEnableCloudLogging()) { + clientInterceptors.add( + new InternalLoggingChannelInterceptor.FactoryImpl(logHelper, logFilterHelper).create()); + serverInterceptors.add( + new InternalLoggingServerInterceptor.FactoryImpl(logHelper, logFilterHelper).create()); + } + + if (config.isEnableCloudMonitoring()) { + clientInterceptors.add( + InternalCensusStatsAccessor.getClientInterceptor(true, true, true, true)); + tracerFactories.add( + InternalCensusStatsAccessor.getServerStreamTracerFactory(true, true, true)); + } + + if (config.isEnableCloudTracing()) { + clientInterceptors.add(InternalCensusTracingAccessor.getClientInterceptor()); + tracerFactories.add(InternalCensusTracingAccessor.getServerStreamTracerFactory()); + } + if (!clientInterceptors.isEmpty() + || !serverInterceptors.isEmpty() + || !tracerFactories.isEmpty()) { + InternalGlobalInterceptors.setInterceptorsTracers( + clientInterceptors, serverInterceptors, tracerFactories); + } + + registerStackDriverExporter(projectId); + LoggingChannelProvider.init(checkNotNull(channelInterceptorFactory)); LoggingServerProvider.init(checkNotNull(serverInterceptorFactory)); } + + private void registerStackDriverExporter(String projectId) { + if (config.isEnableCloudMonitoring()) { + RpcViews.registerAllGrpcViews(); + StackdriverStatsConfiguration.Builder statsConfigurationBuilder = + StackdriverStatsConfiguration.builder(); + if (projectId != null) { + statsConfigurationBuilder.setProjectId(projectId); + } + try { + StackdriverStatsExporter.createAndRegister(statsConfigurationBuilder.build()); + } catch (Exception ex) { + throw new IllegalStateException( + "Could not register Cloud Stats exporter:" + ex.getMessage()); + } + } + + if (config.isEnableCloudTracing()) { + TraceConfig traceConfig = Tracing.getTraceConfig(); + traceConfig.updateActiveTraceParams( + traceConfig.getActiveTraceParams().toBuilder().setSampler(config.getSampler()).build()); + StackdriverTraceConfiguration.Builder traceConfigurationBuilder = + StackdriverTraceConfiguration.builder(); + if (projectId != null) { + traceConfigurationBuilder.setProjectId(projectId); + } + try { + StackdriverTraceExporter.createAndRegister(traceConfigurationBuilder.build()); + } catch (Exception ex) { + throw new IllegalStateException( + "Could not register Cloud Trace exporter:" + ex.getMessage()); + } + } + } + + private void unRegisterStackDriverExporter() { + if (config.isEnableCloudMonitoring()) { + try { + StackdriverStatsExporter.unregister(); + } catch (IllegalStateException ex) { + logger.log(Level.WARNING, "Could not unregister Cloud Stats exporter"); + } + } + + if (config.isEnableCloudTracing()) { + try { + StackdriverTraceExporter.unregister(); + } catch (IllegalStateException ex) { + logger.log(Level.WARNING, "Could not unregister Cloud Trace exporter"); + } + } + } } diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfig.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfig.java index 808a847695c..f36adf264b4 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfig.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfig.java @@ -18,6 +18,7 @@ import io.grpc.Internal; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; +import io.opencensus.trace.Sampler; import java.util.List; @Internal @@ -71,34 +72,4 @@ public LogFilter(String pattern, Integer headerBytes, Integer messageBytes) { this.messageBytes = messageBytes; } } - - /** Corresponds to a {@link io.opencensus.trace.Sampler} type. */ - enum SamplerType { - ALWAYS, - NEVER, - PROBABILISTIC; - } - - /** Represents a trace {@link io.opencensus.trace.Sampler} configuration. */ - class Sampler { - private SamplerType type; - private double probability; - - Sampler(double probability) { - this.probability = probability; - this.type = SamplerType.PROBABILISTIC; - } - - Sampler(SamplerType type) { - this.type = type; - } - - double getProbability() { - return probability; - } - - SamplerType getType() { - return type; - } - } } diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfigImpl.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfigImpl.java index a398a558162..8b19ff157db 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfigImpl.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfigImpl.java @@ -23,6 +23,8 @@ import io.grpc.internal.JsonParser; import io.grpc.internal.JsonUtil; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; +import io.opencensus.trace.Sampler; +import io.opencensus.trace.samplers.Samplers; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; @@ -100,19 +102,18 @@ private void parseConfig(Map config) { } this.eventTypes = eventTypesBuilder.build(); } - String sampler = JsonUtil.getString(config, "global_trace_sampler"); Double samplingRate = JsonUtil.getNumberAsDouble(config, "global_trace_sampling_rate"); - checkArgument( - sampler == null || samplingRate == null, - "only one of 'global_trace_sampler' or 'global_trace_sampling_rate' can be specified"); - if (sampler != null) { - this.sampler = new Sampler(SamplerType.valueOf(sampler.toUpperCase())); - } if (samplingRate != null) { checkArgument( samplingRate >= 0.0 && samplingRate <= 1.0, - "'global_trace_sampling_rate' needs to be between 0.0 and 1.0"); - this.sampler = new Sampler(samplingRate); + "'global_trace_sampling_rate' needs to be between [0.0, 1.0]"); + if (samplingRate == 0) { + this.sampler = Samplers.neverSample(); + } else if (samplingRate == 1) { + this.sampler = Samplers.alwaysSample(); + } else { + this.sampler = Samplers.probabilitySampler(samplingRate); + } } } } diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java index 129a8271831..a113c775456 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java @@ -32,25 +32,27 @@ @RunWith(JUnit4.class) public class GcpObservabilityTest { - + @Test public void initFinish() { ManagedChannelProvider prevChannelProvider = ManagedChannelProvider.provider(); ServerProvider prevServerProvider = ServerProvider.provider(); Sink sink = mock(Sink.class); - InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = mock( - InternalLoggingChannelInterceptor.Factory.class); - InternalLoggingServerInterceptor.Factory serverInterceptorFactory = mock( - InternalLoggingServerInterceptor.Factory.class); + ObservabilityConfig config = mock(ObservabilityConfig.class); + InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = + mock(InternalLoggingChannelInterceptor.Factory.class); + InternalLoggingServerInterceptor.Factory serverInterceptorFactory = + mock(InternalLoggingServerInterceptor.Factory.class); GcpObservability observability1; - try (GcpObservability observability = GcpObservability.grpcInit(sink, channelInterceptorFactory, - serverInterceptorFactory)) { + try (GcpObservability observability = + GcpObservability.grpcInit( + sink, config, channelInterceptorFactory, serverInterceptorFactory)) { assertThat(ManagedChannelProvider.provider()).isInstanceOf(LoggingChannelProvider.class); assertThat(ServerProvider.provider()).isInstanceOf(ServerProvider.class); - observability1 = GcpObservability.grpcInit(sink, channelInterceptorFactory, - serverInterceptorFactory); + observability1 = + GcpObservability.grpcInit( + sink, config, channelInterceptorFactory, serverInterceptorFactory); assertThat(observability1).isSameInstanceAs(observability); - } verify(sink).close(); assertThat(ManagedChannelProvider.provider()).isSameInstanceAs(prevChannelProvider); diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/ObservabilityConfigImplTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/ObservabilityConfigImplTest.java index c7186786e37..4c816042e72 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/ObservabilityConfigImplTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/ObservabilityConfigImplTest.java @@ -81,27 +81,6 @@ public class ObservabilityConfigImplTest { + " \"enable_cloud_tracing\": true\n" + "}"; - private static final String GLOBAL_TRACING_ALWAYS_SAMPLER = "{\n" - + " \"enable_cloud_tracing\": true,\n" - + " \"global_trace_sampler\": \"always\"\n" - + "}"; - - private static final String GLOBAL_TRACING_NEVER_SAMPLER = "{\n" - + " \"enable_cloud_tracing\": true,\n" - + " \"global_trace_sampler\": \"never\"\n" - + "}"; - - private static final String GLOBAL_TRACING_PROBABILISTIC_SAMPLER = "{\n" - + " \"enable_cloud_tracing\": true,\n" - + " \"global_trace_sampling_rate\": 0.75\n" - + "}"; - - private static final String GLOBAL_TRACING_BOTH_SAMPLER_ERROR = "{\n" - + " \"enable_cloud_tracing\": true,\n" - + " \"global_trace_sampler\": \"never\",\n" - + " \"global_trace_sampling_rate\": 0.75\n" - + "}"; - private static final String GLOBAL_TRACING_BADPROBABILISTIC_SAMPLER = "{\n" + " \"enable_cloud_tracing\": true,\n" + " \"global_trace_sampling_rate\": -0.75\n" @@ -194,47 +173,6 @@ public void enableCloudMonitoringAndTracing() throws IOException { assertTrue(observabilityConfig.isEnableCloudTracing()); } - @Test - public void alwaysSampler() throws IOException { - observabilityConfig.parse(GLOBAL_TRACING_ALWAYS_SAMPLER); - assertTrue(observabilityConfig.isEnableCloudTracing()); - ObservabilityConfig.Sampler sampler = observabilityConfig.getSampler(); - assertThat(sampler).isNotNull(); - assertThat(sampler.getType()).isEqualTo(ObservabilityConfig.SamplerType.ALWAYS); - } - - @Test - public void neverSampler() throws IOException { - observabilityConfig.parse(GLOBAL_TRACING_NEVER_SAMPLER); - assertTrue(observabilityConfig.isEnableCloudTracing()); - ObservabilityConfig.Sampler sampler = observabilityConfig.getSampler(); - assertThat(sampler).isNotNull(); - assertThat(sampler.getType()).isEqualTo(ObservabilityConfig.SamplerType.NEVER); - } - - @Test - public void probabilisticSampler() throws IOException { - observabilityConfig.parse(GLOBAL_TRACING_PROBABILISTIC_SAMPLER); - assertTrue(observabilityConfig.isEnableCloudTracing()); - ObservabilityConfig.Sampler sampler = observabilityConfig.getSampler(); - assertThat(sampler).isNotNull(); - assertThat(sampler.getType()).isEqualTo(ObservabilityConfig.SamplerType.PROBABILISTIC); - assertThat(sampler.getProbability()).isEqualTo(0.75); - } - - @Test - public void bothSamplerAndSamplingRate_error() throws IOException { - try { - observabilityConfig.parse(GLOBAL_TRACING_BOTH_SAMPLER_ERROR); - fail("exception expected!"); - } catch (IllegalArgumentException iae) { - assertThat(iae.getMessage()) - .isEqualTo( - "only one of 'global_trace_sampler' or 'global_trace_sampling_rate' can be" - + " specified"); - } - } - @Test public void badProbabilisticSampler_error() throws IOException { try { @@ -242,7 +180,7 @@ public void badProbabilisticSampler_error() throws IOException { fail("exception expected!"); } catch (IllegalArgumentException iae) { assertThat(iae.getMessage()).isEqualTo( - "'global_trace_sampling_rate' needs to be between 0.0 and 1.0"); + "'global_trace_sampling_rate' needs to be between [0.0, 1.0]"); } } From d407fddc8bef5896bc0169ec94d4b66f7dba267e Mon Sep 17 00:00:00 2001 From: Vindhya Ningegowda Date: Fri, 24 Jun 2022 17:51:19 -0700 Subject: [PATCH 2/7] Using OpenCensus samplers; added unit test --- gcp-observability/build.gradle | 3 +- .../gcp/observability/GcpObservability.java | 77 +++++++++--------- .../ObservabilityConfigImpl.java | 7 +- .../observability/GcpObservabilityTest.java | 78 ++++++++++++++++++- .../ObservabilityConfigImplTest.java | 74 +++++++++++++++--- 5 files changed, 189 insertions(+), 50 deletions(-) diff --git a/gcp-observability/build.gradle b/gcp-observability/build.gradle index 10cacdf8d90..a78640091da 100644 --- a/gcp-observability/build.gradle +++ b/gcp-observability/build.gradle @@ -41,7 +41,8 @@ dependencies { runtimeOnly libraries.opencensus.impl - testImplementation project(':grpc-testing'), + testImplementation project(':grpc-context').sourceSets.test.output, + project(':grpc-testing'), project(':grpc-testing-proto'), project(':grpc-netty-shaded') testImplementation (libraries.guava.testlib) { diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java index 9331e7a0e42..05fe3a02717 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java @@ -41,7 +41,6 @@ import io.opencensus.exporter.trace.stackdriver.StackdriverTraceExporter; import io.opencensus.trace.Tracing; import io.opencensus.trace.config.TraceConfig; -import java.io.IOException; import java.util.ArrayList; import java.util.logging.Level; import java.util.logging.Logger; @@ -53,13 +52,16 @@ public final class GcpObservability implements AutoCloseable { private static GcpObservability instance = null; private final Sink sink; private final ObservabilityConfig config; + private final ArrayList clientInterceptors = new ArrayList<>(); + private final ArrayList serverInterceptors = new ArrayList<>(); + private final ArrayList tracerFactories = new ArrayList<>(); /** * Initialize grpc-observability. * * @throws ProviderNotFoundException if no underlying channel/server provider is available. */ - public static synchronized GcpObservability grpcInit() throws IOException { + public static synchronized GcpObservability grpcInit() throws Exception { if (instance == null) { GlobalLoggingTags globalLoggingTags = new GlobalLoggingTags(); ObservabilityConfigImpl observabilityConfig = ObservabilityConfigImpl.getInstance(); @@ -69,6 +71,8 @@ public static synchronized GcpObservability grpcInit() throws IOException { globalLoggingTags.getLocationTags(), globalLoggingTags.getCustomTags(), observabilityConfig.getFlushMessageCount()); + // TODO(dnvindhya): Cleanup code for LoggingChannelProvider and LoggingServerProvider + // once ChannelBuilder and ServerBuilder are used LogHelper helper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER); ConfigFilterHelper configFilterHelper = ConfigFilterHelper.factory(observabilityConfig); instance = @@ -86,10 +90,17 @@ static GcpObservability grpcInit( Sink sink, ObservabilityConfig config, InternalLoggingChannelInterceptor.Factory channelInterceptorFactory, - InternalLoggingServerInterceptor.Factory serverInterceptorFactory) { + InternalLoggingServerInterceptor.Factory serverInterceptorFactory) + throws Exception { if (instance == null) { instance = new GcpObservability(sink, config, channelInterceptorFactory, serverInterceptorFactory); + LogHelper logHelper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER); + ConfigFilterHelper logFilterHelper = ConfigFilterHelper.factory(config); + instance.setProducer( + new InternalLoggingChannelInterceptor.FactoryImpl(logHelper, logFilterHelper), + new InternalLoggingServerInterceptor.FactoryImpl(logHelper, logFilterHelper)); + instance.registerStackDriverExporter(config.getDestinationProjectId()); } return instance; } @@ -109,26 +120,12 @@ public void close() { } } - private GcpObservability( - Sink sink, - ObservabilityConfig config, + private void setProducer( InternalLoggingChannelInterceptor.Factory channelInterceptorFactory, InternalLoggingServerInterceptor.Factory serverInterceptorFactory) { - this.sink = checkNotNull(sink); - this.config = checkNotNull(config); - - ArrayList clientInterceptors = new ArrayList<>(); - ArrayList serverInterceptors = new ArrayList<>(); - ArrayList tracerFactories = new ArrayList<>(); - String projectId = config.getDestinationProjectId(); - LogHelper logHelper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER); - ConfigFilterHelper logFilterHelper = ConfigFilterHelper.factory(config); - if (config.isEnableCloudLogging()) { - clientInterceptors.add( - new InternalLoggingChannelInterceptor.FactoryImpl(logHelper, logFilterHelper).create()); - serverInterceptors.add( - new InternalLoggingServerInterceptor.FactoryImpl(logHelper, logFilterHelper).create()); + clientInterceptors.add(channelInterceptorFactory.create()); + serverInterceptors.add(serverInterceptorFactory.create()); } if (config.isEnableCloudMonitoring()) { @@ -142,20 +139,16 @@ private GcpObservability( clientInterceptors.add(InternalCensusTracingAccessor.getClientInterceptor()); tracerFactories.add(InternalCensusTracingAccessor.getServerStreamTracerFactory()); } + if (!clientInterceptors.isEmpty() || !serverInterceptors.isEmpty() || !tracerFactories.isEmpty()) { InternalGlobalInterceptors.setInterceptorsTracers( clientInterceptors, serverInterceptors, tracerFactories); } - - registerStackDriverExporter(projectId); - - LoggingChannelProvider.init(checkNotNull(channelInterceptorFactory)); - LoggingServerProvider.init(checkNotNull(serverInterceptorFactory)); } - private void registerStackDriverExporter(String projectId) { + private void registerStackDriverExporter(String projectId) throws Exception { if (config.isEnableCloudMonitoring()) { RpcViews.registerAllGrpcViews(); StackdriverStatsConfiguration.Builder statsConfigurationBuilder = @@ -165,9 +158,8 @@ private void registerStackDriverExporter(String projectId) { } try { StackdriverStatsExporter.createAndRegister(statsConfigurationBuilder.build()); - } catch (Exception ex) { - throw new IllegalStateException( - "Could not register Cloud Stats exporter:" + ex.getMessage()); + } catch (Exception e) { + throw new Exception("Failed to register Stackdriver stats exporter, " + e.getMessage()); } } @@ -182,9 +174,8 @@ private void registerStackDriverExporter(String projectId) { } try { StackdriverTraceExporter.createAndRegister(traceConfigurationBuilder.build()); - } catch (Exception ex) { - throw new IllegalStateException( - "Could not register Cloud Trace exporter:" + ex.getMessage()); + } catch (Exception e) { + throw new Exception("Failed to register Stackdriver trace exporter, " + e.getMessage()); } } } @@ -193,17 +184,31 @@ private void unRegisterStackDriverExporter() { if (config.isEnableCloudMonitoring()) { try { StackdriverStatsExporter.unregister(); - } catch (IllegalStateException ex) { - logger.log(Level.WARNING, "Could not unregister Cloud Stats exporter"); + } catch (IllegalStateException e) { + logger.log( + Level.SEVERE, "Failed to unregister Stackdriver stats exporter, " + e.getMessage()); } } if (config.isEnableCloudTracing()) { try { StackdriverTraceExporter.unregister(); - } catch (IllegalStateException ex) { - logger.log(Level.WARNING, "Could not unregister Cloud Trace exporter"); + } catch (IllegalStateException e) { + logger.log( + Level.SEVERE, "Failed to unregister Stackdriver trace exporter, " + e.getMessage()); } } } + + private GcpObservability( + Sink sink, + ObservabilityConfig config, + InternalLoggingChannelInterceptor.Factory channelInterceptorFactory, + InternalLoggingServerInterceptor.Factory serverInterceptorFactory) { + this.sink = checkNotNull(sink); + this.config = checkNotNull(config); + + LoggingChannelProvider.init(checkNotNull(channelInterceptorFactory)); + LoggingServerProvider.init(checkNotNull(serverInterceptorFactory)); + } } diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfigImpl.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfigImpl.java index 8b19ff157db..84a965bb40f 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfigImpl.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfigImpl.java @@ -103,13 +103,14 @@ private void parseConfig(Map config) { this.eventTypes = eventTypesBuilder.build(); } Double samplingRate = JsonUtil.getNumberAsDouble(config, "global_trace_sampling_rate"); + if (enableCloudTracing && samplingRate == null) { + this.sampler = Samplers.probabilitySampler(0.0); + } if (samplingRate != null) { checkArgument( samplingRate >= 0.0 && samplingRate <= 1.0, "'global_trace_sampling_rate' needs to be between [0.0, 1.0]"); - if (samplingRate == 0) { - this.sampler = Samplers.neverSample(); - } else if (samplingRate == 1) { + if (samplingRate == 1.0) { this.sampler = Samplers.alwaysSample(); } else { this.sampler = Samplers.probabilitySampler(samplingRate); diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java index a113c775456..4c061f5bfd2 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java @@ -18,14 +18,29 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.fail; +import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.InternalGlobalInterceptors; import io.grpc.ManagedChannelProvider; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; import io.grpc.ServerProvider; +import io.grpc.StaticTestingClassLoader; import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor; import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor; import io.grpc.gcp.observability.logging.Sink; +import io.opencensus.trace.samplers.Samplers; +import java.util.regex.Pattern; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -34,7 +49,7 @@ public class GcpObservabilityTest { @Test - public void initFinish() { + public void initFinish() throws Exception { ManagedChannelProvider prevChannelProvider = ManagedChannelProvider.provider(); ServerProvider prevServerProvider = ServerProvider.provider(); Sink sink = mock(Sink.class); @@ -64,4 +79,65 @@ public void initFinish() { assertThat(e).hasMessageThat().contains("GcpObservability already closed!"); } } + + @Test + public void enableObservability() throws Exception { + StaticTestingClassLoader classLoader = + new StaticTestingClassLoader( + getClass().getClassLoader(), Pattern.compile("io\\.grpc\\.[^.]+")); + Class runnable = classLoader.loadClass(StaticTestingClassLoaderSet.class.getName()); + ((Runnable) runnable.getDeclaredConstructor().newInstance()).run(); + } + + // UsedReflectively + public static final class StaticTestingClassLoaderSet implements Runnable { + + @Override + public void run() { + Sink sink = mock(Sink.class); + ObservabilityConfig config = mock(ObservabilityConfig.class); + when(config.isEnableCloudLogging()).thenReturn(true); + when(config.isEnableCloudMonitoring()).thenReturn(true); + when(config.isEnableCloudTracing()).thenReturn(true); + when(config.getSampler()).thenReturn(Samplers.neverSample()); + + ClientInterceptor clientInterceptor = + mock(ClientInterceptor.class, delegatesTo(new NoopClientInterceptor())); + InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = + mock(InternalLoggingChannelInterceptor.Factory.class); + when(channelInterceptorFactory.create()).thenReturn(clientInterceptor); + + ServerInterceptor serverInterceptor = + mock(ServerInterceptor.class, delegatesTo(new NoopServerInterceptor())); + InternalLoggingServerInterceptor.Factory serverInterceptorFactory = + mock(InternalLoggingServerInterceptor.Factory.class); + when(serverInterceptorFactory.create()).thenReturn(serverInterceptor); + + try (GcpObservability observability = + GcpObservability.grpcInit( + sink, config, channelInterceptorFactory, serverInterceptorFactory)) { + assertThat(InternalGlobalInterceptors.getClientInterceptors()).hasSize(3); + assertThat(InternalGlobalInterceptors.getServerInterceptors()).hasSize(1); + assertThat(InternalGlobalInterceptors.getServerStreamTracerFactories()).hasSize(2); + } catch (Exception e) { + fail("Encountered exception: " + e); + } + } + } + + private static class NoopClientInterceptor implements ClientInterceptor { + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + return next.newCall(method, callOptions); + } + } + + private static class NoopServerInterceptor implements ServerInterceptor { + @Override + public ServerCall.Listener interceptCall( + ServerCall call, Metadata headers, ServerCallHandler next) { + return next.startCall(call, headers); + } + } } diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/ObservabilityConfigImplTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/ObservabilityConfigImplTest.java index 4c816042e72..f608dd5b25a 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/ObservabilityConfigImplTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/ObservabilityConfigImplTest.java @@ -26,7 +26,8 @@ import com.google.common.collect.ImmutableList; import io.grpc.gcp.observability.ObservabilityConfig.LogFilter; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; - +import io.opencensus.trace.Sampler; +import io.opencensus.trace.samplers.Samplers; import java.io.File; import java.io.IOException; import java.nio.file.Files; @@ -77,14 +78,33 @@ public class ObservabilityConfigImplTest { + "}"; private static final String ENABLE_CLOUD_MONITORING_AND_TRACING = "{\n" - + " \"enable_cloud_monitoring\": true,\n" - + " \"enable_cloud_tracing\": true\n" - + "}"; + + " \"enable_cloud_monitoring\": true,\n" + + " \"enable_cloud_tracing\": true\n" + + "}"; + + private static final String GLOBAL_TRACING_ALWAYS_SAMPLER = "{\n" + + " \"enable_cloud_tracing\": true,\n" + + " \"global_trace_sampling_rate\": 1.00\n" + + "}"; + + private static final String GLOBAL_TRACING_NEVER_SAMPLER = "{\n" + + " \"enable_cloud_tracing\": true,\n" + + " \"global_trace_sampling_rate\": 0.00\n" + + "}"; + + private static final String GLOBAL_TRACING_PROBABILISTIC_SAMPLER = "{\n" + + " \"enable_cloud_tracing\": true,\n" + + " \"global_trace_sampling_rate\": 0.75\n" + + "}"; + + private static final String GLOBAL_TRACING_DEFAULT_SAMPLER = "{\n" + + " \"enable_cloud_tracing\": true\n" + + "}"; private static final String GLOBAL_TRACING_BADPROBABILISTIC_SAMPLER = "{\n" - + " \"enable_cloud_tracing\": true,\n" - + " \"global_trace_sampling_rate\": -0.75\n" - + "}"; + + " \"enable_cloud_tracing\": true,\n" + + " \"global_trace_sampling_rate\": -0.75\n" + + "}"; ObservabilityConfigImpl observabilityConfig = new ObservabilityConfigImpl(); @@ -173,6 +193,42 @@ public void enableCloudMonitoringAndTracing() throws IOException { assertTrue(observabilityConfig.isEnableCloudTracing()); } + @Test + public void alwaysSampler() throws IOException { + observabilityConfig.parse(GLOBAL_TRACING_ALWAYS_SAMPLER); + assertTrue(observabilityConfig.isEnableCloudTracing()); + Sampler sampler = observabilityConfig.getSampler(); + assertThat(sampler).isNotNull(); + assertThat(sampler).isEqualTo(Samplers.alwaysSample()); + } + + @Test + public void neverSampler() throws IOException { + observabilityConfig.parse(GLOBAL_TRACING_NEVER_SAMPLER); + assertTrue(observabilityConfig.isEnableCloudTracing()); + Sampler sampler = observabilityConfig.getSampler(); + assertThat(sampler).isNotNull(); + assertThat(sampler).isEqualTo(Samplers.probabilitySampler(0.0)); + } + + @Test + public void probabilisticSampler() throws IOException { + observabilityConfig.parse(GLOBAL_TRACING_PROBABILISTIC_SAMPLER); + assertTrue(observabilityConfig.isEnableCloudTracing()); + Sampler sampler = observabilityConfig.getSampler(); + assertThat(sampler).isNotNull(); + assertThat(sampler).isEqualTo(Samplers.probabilitySampler(0.75)); + } + + @Test + public void defaultSampler() throws IOException { + observabilityConfig.parse(GLOBAL_TRACING_DEFAULT_SAMPLER); + assertTrue(observabilityConfig.isEnableCloudTracing()); + Sampler sampler = observabilityConfig.getSampler(); + assertThat(sampler).isNotNull(); + assertThat(sampler).isEqualTo(Samplers.probabilitySampler(0.00)); + } + @Test public void badProbabilisticSampler_error() throws IOException { try { @@ -180,7 +236,7 @@ public void badProbabilisticSampler_error() throws IOException { fail("exception expected!"); } catch (IllegalArgumentException iae) { assertThat(iae.getMessage()).isEqualTo( - "'global_trace_sampling_rate' needs to be between [0.0, 1.0]"); + "'global_trace_sampling_rate' needs to be between [0.0, 1.0]"); } } @@ -201,4 +257,4 @@ public void configFileLogFilters() throws Exception { assertThat(logFilters.get(1).headerBytes).isNull(); assertThat(logFilters.get(1).messageBytes).isNull(); } -} +} \ No newline at end of file From 4d685d346e7ba76d16c061c136c928b28a1ab7a6 Mon Sep 17 00:00:00 2001 From: Vindhya Ningegowda Date: Fri, 24 Jun 2022 18:30:13 -0700 Subject: [PATCH 3/7] added project_id for unit tests --- .../java/io/grpc/gcp/observability/GcpObservabilityTest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java index 4c061f5bfd2..4397729f88f 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java @@ -48,6 +48,8 @@ @RunWith(JUnit4.class) public class GcpObservabilityTest { + private static final String PROJECT_ID = "project"; + @Test public void initFinish() throws Exception { ManagedChannelProvider prevChannelProvider = ManagedChannelProvider.provider(); @@ -100,6 +102,7 @@ public void run() { when(config.isEnableCloudMonitoring()).thenReturn(true); when(config.isEnableCloudTracing()).thenReturn(true); when(config.getSampler()).thenReturn(Samplers.neverSample()); + when(config.getDestinationProjectId()).thenReturn(PROJECT_ID); ClientInterceptor clientInterceptor = mock(ClientInterceptor.class, delegatesTo(new NoopClientInterceptor())); From 528c9f57bded91d1f2e222de4713d8535ca5daab Mon Sep 17 00:00:00 2001 From: Vindhya Ningegowda Date: Mon, 27 Jun 2022 13:56:57 -0700 Subject: [PATCH 4/7] refactor stackdriver exporter --- .../gcp/observability/GcpObservability.java | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java index 05fe3a02717..e2a0c443cf0 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java @@ -18,6 +18,7 @@ import static com.google.common.base.Preconditions.checkNotNull; +import com.google.api.client.util.Strings; import com.google.common.annotations.VisibleForTesting; import io.grpc.ClientInterceptor; import io.grpc.ExperimentalApi; @@ -55,6 +56,8 @@ public final class GcpObservability implements AutoCloseable { private final ArrayList clientInterceptors = new ArrayList<>(); private final ArrayList serverInterceptors = new ArrayList<>(); private final ArrayList tracerFactories = new ArrayList<>(); + private boolean metricsEnabled; + private boolean tracesEnabled; /** * Initialize grpc-observability. @@ -81,6 +84,7 @@ public static synchronized GcpObservability grpcInit() throws Exception { observabilityConfig, new InternalLoggingChannelInterceptor.FactoryImpl(helper, configFilterHelper), new InternalLoggingServerInterceptor.FactoryImpl(helper, configFilterHelper)); + instance.registerStackDriverExporter(observabilityConfig.getDestinationProjectId()); } return instance; } @@ -100,7 +104,6 @@ static GcpObservability grpcInit( instance.setProducer( new InternalLoggingChannelInterceptor.FactoryImpl(logHelper, logFilterHelper), new InternalLoggingServerInterceptor.FactoryImpl(logHelper, logFilterHelper)); - instance.registerStackDriverExporter(config.getDestinationProjectId()); } return instance; } @@ -112,10 +115,10 @@ public void close() { if (instance == null) { throw new IllegalStateException("GcpObservability already closed!"); } + unRegisterStackDriverExporter(); LoggingChannelProvider.shutdown(); LoggingServerProvider.shutdown(); sink.close(); - unRegisterStackDriverExporter(); instance = null; } } @@ -127,14 +130,12 @@ private void setProducer( clientInterceptors.add(channelInterceptorFactory.create()); serverInterceptors.add(serverInterceptorFactory.create()); } - if (config.isEnableCloudMonitoring()) { clientInterceptors.add( InternalCensusStatsAccessor.getClientInterceptor(true, true, true, true)); tracerFactories.add( InternalCensusStatsAccessor.getServerStreamTracerFactory(true, true, true)); } - if (config.isEnableCloudTracing()) { clientInterceptors.add(InternalCensusTracingAccessor.getClientInterceptor()); tracerFactories.add(InternalCensusTracingAccessor.getServerStreamTracerFactory()); @@ -153,7 +154,7 @@ private void registerStackDriverExporter(String projectId) throws Exception { RpcViews.registerAllGrpcViews(); StackdriverStatsConfiguration.Builder statsConfigurationBuilder = StackdriverStatsConfiguration.builder(); - if (projectId != null) { + if (!Strings.isNullOrEmpty(projectId)) { statsConfigurationBuilder.setProjectId(projectId); } try { @@ -161,6 +162,7 @@ private void registerStackDriverExporter(String projectId) throws Exception { } catch (Exception e) { throw new Exception("Failed to register Stackdriver stats exporter, " + e.getMessage()); } + metricsEnabled = true; } if (config.isEnableCloudTracing()) { @@ -169,7 +171,7 @@ private void registerStackDriverExporter(String projectId) throws Exception { traceConfig.getActiveTraceParams().toBuilder().setSampler(config.getSampler()).build()); StackdriverTraceConfiguration.Builder traceConfigurationBuilder = StackdriverTraceConfiguration.builder(); - if (projectId != null) { + if (!Strings.isNullOrEmpty(projectId)) { traceConfigurationBuilder.setProjectId(projectId); } try { @@ -177,26 +179,29 @@ private void registerStackDriverExporter(String projectId) throws Exception { } catch (Exception e) { throw new Exception("Failed to register Stackdriver trace exporter, " + e.getMessage()); } + tracesEnabled = true; } } private void unRegisterStackDriverExporter() { - if (config.isEnableCloudMonitoring()) { + if (metricsEnabled) { try { StackdriverStatsExporter.unregister(); } catch (IllegalStateException e) { logger.log( Level.SEVERE, "Failed to unregister Stackdriver stats exporter, " + e.getMessage()); } + metricsEnabled = false; } - if (config.isEnableCloudTracing()) { + if (tracesEnabled) { try { StackdriverTraceExporter.unregister(); } catch (IllegalStateException e) { logger.log( Level.SEVERE, "Failed to unregister Stackdriver trace exporter, " + e.getMessage()); } + tracesEnabled = false; } } From 47d8125231ac236ccbd0739f298d1fa625bd215d Mon Sep 17 00:00:00 2001 From: Vindhya Ningegowda Date: Wed, 29 Jun 2022 15:08:30 -0700 Subject: [PATCH 5/7] throw IO Exception instead of generic Exception --- .../grpc/gcp/observability/GcpObservability.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java index e2a0c443cf0..aa5641f6fcf 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java @@ -42,6 +42,7 @@ import io.opencensus.exporter.trace.stackdriver.StackdriverTraceExporter; import io.opencensus.trace.Tracing; import io.opencensus.trace.config.TraceConfig; +import java.io.IOException; import java.util.ArrayList; import java.util.logging.Level; import java.util.logging.Logger; @@ -64,7 +65,7 @@ public final class GcpObservability implements AutoCloseable { * * @throws ProviderNotFoundException if no underlying channel/server provider is available. */ - public static synchronized GcpObservability grpcInit() throws Exception { + public static synchronized GcpObservability grpcInit() throws IOException { if (instance == null) { GlobalLoggingTags globalLoggingTags = new GlobalLoggingTags(); ObservabilityConfigImpl observabilityConfig = ObservabilityConfigImpl.getInstance(); @@ -95,7 +96,7 @@ static GcpObservability grpcInit( ObservabilityConfig config, InternalLoggingChannelInterceptor.Factory channelInterceptorFactory, InternalLoggingServerInterceptor.Factory serverInterceptorFactory) - throws Exception { + throws IOException { if (instance == null) { instance = new GcpObservability(sink, config, channelInterceptorFactory, serverInterceptorFactory); @@ -149,7 +150,7 @@ private void setProducer( } } - private void registerStackDriverExporter(String projectId) throws Exception { + private void registerStackDriverExporter(String projectId) throws IOException { if (config.isEnableCloudMonitoring()) { RpcViews.registerAllGrpcViews(); StackdriverStatsConfiguration.Builder statsConfigurationBuilder = @@ -159,8 +160,8 @@ private void registerStackDriverExporter(String projectId) throws Exception { } try { StackdriverStatsExporter.createAndRegister(statsConfigurationBuilder.build()); - } catch (Exception e) { - throw new Exception("Failed to register Stackdriver stats exporter, " + e.getMessage()); + } catch (IOException e) { + throw new IOException("Failed to register Stackdriver stats exporter, " + e.getMessage()); } metricsEnabled = true; } @@ -176,8 +177,8 @@ private void registerStackDriverExporter(String projectId) throws Exception { } try { StackdriverTraceExporter.createAndRegister(traceConfigurationBuilder.build()); - } catch (Exception e) { - throw new Exception("Failed to register Stackdriver trace exporter, " + e.getMessage()); + } catch (IOException e) { + throw new IOException("Failed to register Stackdriver trace exporter, " + e.getMessage()); } tracesEnabled = true; } From 6e713a03f2088ce13b89f8a5015b59111ad8783d Mon Sep 17 00:00:00 2001 From: Vindhya Ningegowda Date: Fri, 1 Jul 2022 16:39:29 -0700 Subject: [PATCH 6/7] addressed comments --- .../main/java/io/grpc/GlobalInterceptors.java | 11 +- gcp-observability/build.gradle | 1 + .../gcp/observability/GcpObservability.java | 43 ++---- .../ObservabilityConfigImpl.java | 6 +- .../observability/GcpObservabilityTest.java | 127 +++++++++++++----- 5 files changed, 113 insertions(+), 75 deletions(-) diff --git a/api/src/main/java/io/grpc/GlobalInterceptors.java b/api/src/main/java/io/grpc/GlobalInterceptors.java index 44d1592b61a..e5fd86170f0 100644 --- a/api/src/main/java/io/grpc/GlobalInterceptors.java +++ b/api/src/main/java/io/grpc/GlobalInterceptors.java @@ -73,24 +73,19 @@ static synchronized void setInterceptorsTracers( isGlobalInterceptorsTracersSet = true; } - /** - * Returns the list of global {@link ClientInterceptor}. If not set, this returns am empty list. - */ + /** Returns the list of global {@link ClientInterceptor}. If not set, this returns null. */ static synchronized List getClientInterceptors() { isGlobalInterceptorsTracersGet = true; return clientInterceptors; } - /** Returns list of global {@link ServerInterceptor}. If not set, this returns an empty list. */ + /** Returns list of global {@link ServerInterceptor}. If not set, this returns null. */ static synchronized List getServerInterceptors() { isGlobalInterceptorsTracersGet = true; return serverInterceptors; } - /** - * Returns list of global {@link ServerStreamTracer.Factory}. If not set, this returns an empty - * list. - */ + /** Returns list of global {@link ServerStreamTracer.Factory}. If not set, this returns null. */ static synchronized List getServerStreamTracerFactories() { isGlobalInterceptorsTracersGet = true; return serverStreamTracerFactories; diff --git a/gcp-observability/build.gradle b/gcp-observability/build.gradle index a78640091da..c4f93afbd17 100644 --- a/gcp-observability/build.gradle +++ b/gcp-observability/build.gradle @@ -30,6 +30,7 @@ dependencies { project(':grpc-alts'), project(':grpc-census'), ("com.google.cloud:google-cloud-logging:${cloudLoggingVersion}"), + libraries.opencensus.contrib.grpc.metrics, libraries.opencensus.exporter.stats.stackdriver, libraries.opencensus.exporter.trace.stackdriver, libraries.animalsniffer.annotations, // Prefer our version diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java index aa5641f6fcf..4bb9ffcb0c1 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java @@ -18,7 +18,6 @@ import static com.google.common.base.Preconditions.checkNotNull; -import com.google.api.client.util.Strings; import com.google.common.annotations.VisibleForTesting; import io.grpc.ClientInterceptor; import io.grpc.ExperimentalApi; @@ -69,22 +68,16 @@ public static synchronized GcpObservability grpcInit() throws IOException { if (instance == null) { GlobalLoggingTags globalLoggingTags = new GlobalLoggingTags(); ObservabilityConfigImpl observabilityConfig = ObservabilityConfigImpl.getInstance(); - Sink sink = - new GcpLogSink( - observabilityConfig.getDestinationProjectId(), - globalLoggingTags.getLocationTags(), - globalLoggingTags.getCustomTags(), - observabilityConfig.getFlushMessageCount()); + Sink sink = new GcpLogSink(observabilityConfig.getDestinationProjectId(), + globalLoggingTags.getLocationTags(), globalLoggingTags.getCustomTags(), + observabilityConfig.getFlushMessageCount()); // TODO(dnvindhya): Cleanup code for LoggingChannelProvider and LoggingServerProvider // once ChannelBuilder and ServerBuilder are used LogHelper helper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER); ConfigFilterHelper configFilterHelper = ConfigFilterHelper.factory(observabilityConfig); - instance = - grpcInit( - sink, - observabilityConfig, - new InternalLoggingChannelInterceptor.FactoryImpl(helper, configFilterHelper), - new InternalLoggingServerInterceptor.FactoryImpl(helper, configFilterHelper)); + instance = grpcInit(sink, observabilityConfig, + new InternalLoggingChannelInterceptor.FactoryImpl(helper, configFilterHelper), + new InternalLoggingServerInterceptor.FactoryImpl(helper, configFilterHelper)); instance.registerStackDriverExporter(observabilityConfig.getDestinationProjectId()); } return instance; @@ -142,12 +135,8 @@ private void setProducer( tracerFactories.add(InternalCensusTracingAccessor.getServerStreamTracerFactory()); } - if (!clientInterceptors.isEmpty() - || !serverInterceptors.isEmpty() - || !tracerFactories.isEmpty()) { - InternalGlobalInterceptors.setInterceptorsTracers( - clientInterceptors, serverInterceptors, tracerFactories); - } + InternalGlobalInterceptors.setInterceptorsTracers( + clientInterceptors, serverInterceptors, tracerFactories); } private void registerStackDriverExporter(String projectId) throws IOException { @@ -155,14 +144,10 @@ private void registerStackDriverExporter(String projectId) throws IOException { RpcViews.registerAllGrpcViews(); StackdriverStatsConfiguration.Builder statsConfigurationBuilder = StackdriverStatsConfiguration.builder(); - if (!Strings.isNullOrEmpty(projectId)) { + if (projectId != null) { statsConfigurationBuilder.setProjectId(projectId); } - try { - StackdriverStatsExporter.createAndRegister(statsConfigurationBuilder.build()); - } catch (IOException e) { - throw new IOException("Failed to register Stackdriver stats exporter, " + e.getMessage()); - } + StackdriverStatsExporter.createAndRegister(statsConfigurationBuilder.build()); metricsEnabled = true; } @@ -172,14 +157,10 @@ private void registerStackDriverExporter(String projectId) throws IOException { traceConfig.getActiveTraceParams().toBuilder().setSampler(config.getSampler()).build()); StackdriverTraceConfiguration.Builder traceConfigurationBuilder = StackdriverTraceConfiguration.builder(); - if (!Strings.isNullOrEmpty(projectId)) { + if (projectId != null) { traceConfigurationBuilder.setProjectId(projectId); } - try { - StackdriverTraceExporter.createAndRegister(traceConfigurationBuilder.build()); - } catch (IOException e) { - throw new IOException("Failed to register Stackdriver trace exporter, " + e.getMessage()); - } + StackdriverTraceExporter.createAndRegister(traceConfigurationBuilder.build()); tracesEnabled = true; } } diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfigImpl.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfigImpl.java index 84a965bb40f..08da2fede80 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfigImpl.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfigImpl.java @@ -106,11 +106,15 @@ private void parseConfig(Map config) { if (enableCloudTracing && samplingRate == null) { this.sampler = Samplers.probabilitySampler(0.0); } + double epsilon = 1e-6; if (samplingRate != null) { checkArgument( samplingRate >= 0.0 && samplingRate <= 1.0, "'global_trace_sampling_rate' needs to be between [0.0, 1.0]"); - if (samplingRate == 1.0) { + // Using alwaysSample() instead of probabilitySampler() because according to + // {@link io.opencensus.trace.samplers.ProbabilitySampler#shouldSample} + // there is a (very) small chance of *not* sampling if probability = 1.00. + if (Math.abs(1 - samplingRate) < epsilon) { this.sampler = Samplers.alwaysSample(); } else { this.sampler = Samplers.probabilitySampler(samplingRate); diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java index 4397729f88f..87601247a42 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java @@ -28,18 +28,21 @@ import io.grpc.ClientCall; import io.grpc.ClientInterceptor; import io.grpc.InternalGlobalInterceptors; +// import io.grpc.ManagedChannelProvider; import io.grpc.ManagedChannelProvider; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.ServerCall; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; +// import io.grpc.ServerProvider; import io.grpc.ServerProvider; import io.grpc.StaticTestingClassLoader; import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor; import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor; import io.grpc.gcp.observability.logging.Sink; import io.opencensus.trace.samplers.Samplers; +import java.io.IOException; import java.util.regex.Pattern; import org.junit.Test; import org.junit.runner.RunWith; @@ -48,51 +51,77 @@ @RunWith(JUnit4.class) public class GcpObservabilityTest { - private static final String PROJECT_ID = "project"; + private final StaticTestingClassLoader classLoader = + new StaticTestingClassLoader( + getClass().getClassLoader(), + Pattern.compile( + "io\\.grpc\\.InternalGlobalInterceptors|io\\.grpc\\.GlobalInterceptors|" + + "io\\.grpc\\.gcp\\.observability\\.[^.]+|" + + "io\\.grpc\\.gcp\\.observability\\.interceptors\\.[^.]+|" + + "io\\.grpc\\.gcp\\.observability\\.GcpObservabilityTest\\$.*")); @Test public void initFinish() throws Exception { - ManagedChannelProvider prevChannelProvider = ManagedChannelProvider.provider(); - ServerProvider prevServerProvider = ServerProvider.provider(); - Sink sink = mock(Sink.class); - ObservabilityConfig config = mock(ObservabilityConfig.class); - InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = - mock(InternalLoggingChannelInterceptor.Factory.class); - InternalLoggingServerInterceptor.Factory serverInterceptorFactory = - mock(InternalLoggingServerInterceptor.Factory.class); - GcpObservability observability1; - try (GcpObservability observability = - GcpObservability.grpcInit( - sink, config, channelInterceptorFactory, serverInterceptorFactory)) { - assertThat(ManagedChannelProvider.provider()).isInstanceOf(LoggingChannelProvider.class); - assertThat(ServerProvider.provider()).isInstanceOf(ServerProvider.class); - observability1 = - GcpObservability.grpcInit( - sink, config, channelInterceptorFactory, serverInterceptorFactory); - assertThat(observability1).isSameInstanceAs(observability); - } - verify(sink).close(); - assertThat(ManagedChannelProvider.provider()).isSameInstanceAs(prevChannelProvider); - assertThat(ServerProvider.provider()).isSameInstanceAs(prevServerProvider); - try { - observability1.close(); - fail("should have failed for calling close() second time"); - } catch (IllegalStateException e) { - assertThat(e).hasMessageThat().contains("GcpObservability already closed!"); - } + Class runnable = + classLoader.loadClass(StaticTestingClassInitFinish.class.getName()); + ((Runnable) runnable.getDeclaredConstructor().newInstance()).run(); } @Test public void enableObservability() throws Exception { - StaticTestingClassLoader classLoader = - new StaticTestingClassLoader( - getClass().getClassLoader(), Pattern.compile("io\\.grpc\\.[^.]+")); - Class runnable = classLoader.loadClass(StaticTestingClassLoaderSet.class.getName()); + Class runnable = + classLoader.loadClass(StaticTestingClassEnableObservability.class.getName()); + ((Runnable) runnable.getDeclaredConstructor().newInstance()).run(); + } + + @Test + public void disableObservability() throws Exception { + Class runnable = + classLoader.loadClass(StaticTestingClassDisableObservability.class.getName()); ((Runnable) runnable.getDeclaredConstructor().newInstance()).run(); } // UsedReflectively - public static final class StaticTestingClassLoaderSet implements Runnable { + public static final class StaticTestingClassInitFinish implements Runnable { + + @Override + public void run() { + ManagedChannelProvider prevChannelProvider = ManagedChannelProvider.provider(); + ServerProvider prevServerProvider = ServerProvider.provider(); + Sink sink = mock(Sink.class); + ObservabilityConfig config = mock(ObservabilityConfig.class); + InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = + mock(InternalLoggingChannelInterceptor.Factory.class); + InternalLoggingServerInterceptor.Factory serverInterceptorFactory = + mock(InternalLoggingServerInterceptor.Factory.class); + GcpObservability observability1; + try { + GcpObservability observability = + GcpObservability.grpcInit( + sink, config, channelInterceptorFactory, serverInterceptorFactory); + assertThat(ManagedChannelProvider.provider()).isInstanceOf(LoggingChannelProvider.class); + assertThat(ServerProvider.provider()).isInstanceOf(ServerProvider.class); + observability1 = + GcpObservability.grpcInit( + sink, config, channelInterceptorFactory, serverInterceptorFactory); + assertThat(observability1).isSameInstanceAs(observability); + observability.close(); + verify(sink).close(); + assertThat(ManagedChannelProvider.provider()).isSameInstanceAs(prevChannelProvider); + assertThat(ServerProvider.provider()).isSameInstanceAs(prevServerProvider); + try { + observability1.close(); + fail("should have failed for calling close() second time"); + } catch (IllegalStateException e) { + assertThat(e).hasMessageThat().contains("GcpObservability already closed!"); + } + } catch (IOException e) { + fail("Encountered exception: " + e); + } + } + } + + public static final class StaticTestingClassEnableObservability implements Runnable { @Override public void run() { @@ -102,7 +131,6 @@ public void run() { when(config.isEnableCloudMonitoring()).thenReturn(true); when(config.isEnableCloudTracing()).thenReturn(true); when(config.getSampler()).thenReturn(Samplers.neverSample()); - when(config.getDestinationProjectId()).thenReturn(PROJECT_ID); ClientInterceptor clientInterceptor = mock(ClientInterceptor.class, delegatesTo(new NoopClientInterceptor())); @@ -128,6 +156,35 @@ public void run() { } } + public static final class StaticTestingClassDisableObservability implements Runnable { + + @Override + public void run() { + Sink sink = mock(Sink.class); + ObservabilityConfig config = mock(ObservabilityConfig.class); + when(config.isEnableCloudLogging()).thenReturn(false); + when(config.isEnableCloudMonitoring()).thenReturn(false); + when(config.isEnableCloudTracing()).thenReturn(false); + when(config.getSampler()).thenReturn(Samplers.neverSample()); + + InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = + mock(InternalLoggingChannelInterceptor.Factory.class); + InternalLoggingServerInterceptor.Factory serverInterceptorFactory = + mock(InternalLoggingServerInterceptor.Factory.class);; + + try (GcpObservability observability = + GcpObservability.grpcInit( + sink, config, channelInterceptorFactory, serverInterceptorFactory)) { + assertThat(InternalGlobalInterceptors.getClientInterceptors()).isEmpty(); + assertThat(InternalGlobalInterceptors.getServerInterceptors()).isEmpty(); + assertThat(InternalGlobalInterceptors.getServerStreamTracerFactories()).isEmpty(); + } catch (Exception e) { + fail("Encountered exception: " + e); + } + verify(sink).close(); + } + } + private static class NoopClientInterceptor implements ClientInterceptor { @Override public ClientCall interceptCall( From fc4569520a9be784b3275812a956d00beb8bc0ff Mon Sep 17 00:00:00 2001 From: Vindhya Ningegowda Date: Wed, 13 Jul 2022 15:10:28 -0700 Subject: [PATCH 7/7] addressed comments (2) --- .../io/grpc/gcp/observability/ObservabilityConfig.java | 1 + .../gcp/observability/ObservabilityConfigImpl.java | 10 +++++----- .../grpc/gcp/observability/GcpObservabilityTest.java | 7 +++---- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfig.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfig.java index f36adf264b4..a6abe589eed 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfig.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfig.java @@ -44,6 +44,7 @@ public interface ObservabilityConfig { /** Get event types to log. */ List getEventTypes(); + /** Get sampler for TraceConfig - when Cloud Tracing is enabled. */ Sampler getSampler(); /** diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfigImpl.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfigImpl.java index 08da2fede80..2f547ec5af7 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfigImpl.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfigImpl.java @@ -37,6 +37,8 @@ final class ObservabilityConfigImpl implements ObservabilityConfig { private static final String CONFIG_ENV_VAR_NAME = "GRPC_CONFIG_OBSERVABILITY"; private static final String CONFIG_FILE_ENV_VAR_NAME = "GRPC_CONFIG_OBSERVABILITY_JSON"; + // Tolerance for floating-point comparisons. + private static final double EPSILON = 1e-6; private boolean enableCloudLogging = false; private boolean enableCloudMonitoring = false; @@ -103,18 +105,16 @@ private void parseConfig(Map config) { this.eventTypes = eventTypesBuilder.build(); } Double samplingRate = JsonUtil.getNumberAsDouble(config, "global_trace_sampling_rate"); - if (enableCloudTracing && samplingRate == null) { + if (samplingRate == null) { this.sampler = Samplers.probabilitySampler(0.0); - } - double epsilon = 1e-6; - if (samplingRate != null) { + } else { checkArgument( samplingRate >= 0.0 && samplingRate <= 1.0, "'global_trace_sampling_rate' needs to be between [0.0, 1.0]"); // Using alwaysSample() instead of probabilitySampler() because according to // {@link io.opencensus.trace.samplers.ProbabilitySampler#shouldSample} // there is a (very) small chance of *not* sampling if probability = 1.00. - if (Math.abs(1 - samplingRate) < epsilon) { + if (1 - samplingRate < EPSILON) { this.sampler = Samplers.alwaysSample(); } else { this.sampler = Samplers.probabilitySampler(samplingRate); diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java index 87601247a42..d494b3c14f1 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java @@ -28,14 +28,12 @@ import io.grpc.ClientCall; import io.grpc.ClientInterceptor; import io.grpc.InternalGlobalInterceptors; -// import io.grpc.ManagedChannelProvider; import io.grpc.ManagedChannelProvider; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.ServerCall; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; -// import io.grpc.ServerProvider; import io.grpc.ServerProvider; import io.grpc.StaticTestingClassLoader; import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor; @@ -86,6 +84,7 @@ public static final class StaticTestingClassInitFinish implements Runnable { @Override public void run() { + // TODO(dnvindhya) : Remove usage of Providers on cleaning up Logging*Provider ManagedChannelProvider prevChannelProvider = ManagedChannelProvider.provider(); ServerProvider prevServerProvider = ServerProvider.provider(); Sink sink = mock(Sink.class); @@ -144,7 +143,7 @@ public void run() { mock(InternalLoggingServerInterceptor.Factory.class); when(serverInterceptorFactory.create()).thenReturn(serverInterceptor); - try (GcpObservability observability = + try (GcpObservability unused = GcpObservability.grpcInit( sink, config, channelInterceptorFactory, serverInterceptorFactory)) { assertThat(InternalGlobalInterceptors.getClientInterceptors()).hasSize(3); @@ -172,7 +171,7 @@ public void run() { InternalLoggingServerInterceptor.Factory serverInterceptorFactory = mock(InternalLoggingServerInterceptor.Factory.class);; - try (GcpObservability observability = + try (GcpObservability unused = GcpObservability.grpcInit( sink, config, channelInterceptorFactory, serverInterceptorFactory)) { assertThat(InternalGlobalInterceptors.getClientInterceptors()).isEmpty();