Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gcp-observability: Populate global interceptors from observability #9309

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 3 additions & 8 deletions api/src/main/java/io/grpc/GlobalInterceptors.java
Expand Up @@ -73,24 +73,19 @@ static synchronized void setInterceptorsTracers(
isGlobalInterceptorsTracersSet = true;
}

/**
* Returns the list of global {@link ClientInterceptor}. If not set, this returns am empty list.
*/
/** Returns the list of global {@link ClientInterceptor}. If not set, this returns null. */
static synchronized List<ClientInterceptor> getClientInterceptors() {
isGlobalInterceptorsTracersGet = true;
return clientInterceptors;
}

/** Returns list of global {@link ServerInterceptor}. If not set, this returns an empty list. */
/** Returns list of global {@link ServerInterceptor}. If not set, this returns null. */
static synchronized List<ServerInterceptor> getServerInterceptors() {
isGlobalInterceptorsTracersGet = true;
return serverInterceptors;
}

/**
* Returns list of global {@link ServerStreamTracer.Factory}. If not set, this returns an empty
* list.
*/
/** Returns list of global {@link ServerStreamTracer.Factory}. If not set, this returns null. */
static synchronized List<ServerStreamTracer.Factory> getServerStreamTracerFactories() {
isGlobalInterceptorsTracersGet = true;
return serverStreamTracerFactories;
Expand Down
4 changes: 3 additions & 1 deletion gcp-observability/build.gradle
Expand Up @@ -30,6 +30,7 @@ dependencies {
project(':grpc-alts'),
project(':grpc-census'),
("com.google.cloud:google-cloud-logging:${cloudLoggingVersion}"),
libraries.opencensus.contrib.grpc.metrics,
libraries.opencensus.exporter.stats.stackdriver,
libraries.opencensus.exporter.trace.stackdriver,
libraries.animalsniffer.annotations, // Prefer our version
Expand All @@ -41,7 +42,8 @@ dependencies {

runtimeOnly libraries.opencensus.impl

testImplementation project(':grpc-testing'),
testImplementation project(':grpc-context').sourceSets.test.output,
sanjaypujare marked this conversation as resolved.
Show resolved Hide resolved
project(':grpc-testing'),
project(':grpc-testing-proto'),
project(':grpc-netty-shaded')
testImplementation (libraries.guava.testlib) {
Expand Down
Expand Up @@ -19,22 +19,45 @@
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.ClientInterceptor;
import io.grpc.ExperimentalApi;
import io.grpc.InternalGlobalInterceptors;
import io.grpc.ManagedChannelProvider.ProviderNotFoundException;
import io.grpc.ServerInterceptor;
import io.grpc.ServerStreamTracer;
import io.grpc.census.InternalCensusStatsAccessor;
import io.grpc.census.InternalCensusTracingAccessor;
import io.grpc.gcp.observability.interceptors.ConfigFilterHelper;
import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor;
import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor;
import io.grpc.gcp.observability.interceptors.LogHelper;
import io.grpc.gcp.observability.logging.GcpLogSink;
import io.grpc.gcp.observability.logging.Sink;
import io.grpc.internal.TimeProvider;
import io.opencensus.contrib.grpc.metrics.RpcViews;
import io.opencensus.exporter.stats.stackdriver.StackdriverStatsConfiguration;
import io.opencensus.exporter.stats.stackdriver.StackdriverStatsExporter;
import io.opencensus.exporter.trace.stackdriver.StackdriverTraceConfiguration;
import io.opencensus.exporter.trace.stackdriver.StackdriverTraceExporter;
import io.opencensus.trace.Tracing;
import io.opencensus.trace.config.TraceConfig;
import java.io.IOException;
import java.util.ArrayList;
import java.util.logging.Level;
import java.util.logging.Logger;

/** The main class for gRPC Google Cloud Platform Observability features. */
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8869")
public final class GcpObservability implements AutoCloseable {
private static final Logger logger = Logger.getLogger(GcpObservability.class.getName());
private static GcpObservability instance = null;
private final Sink sink;
private final ObservabilityConfig config;
private final ArrayList<ClientInterceptor> clientInterceptors = new ArrayList<>();
private final ArrayList<ServerInterceptor> serverInterceptors = new ArrayList<>();
private final ArrayList<ServerStreamTracer.Factory> tracerFactories = new ArrayList<>();
private boolean metricsEnabled;
private boolean tracesEnabled;

/**
* Initialize grpc-observability.
Expand All @@ -48,20 +71,33 @@ public static synchronized GcpObservability grpcInit() throws IOException {
Sink sink = new GcpLogSink(observabilityConfig.getDestinationProjectId(),
globalLoggingTags.getLocationTags(), globalLoggingTags.getCustomTags(),
observabilityConfig.getFlushMessageCount());
// TODO(dnvindhya): Cleanup code for LoggingChannelProvider and LoggingServerProvider
// once ChannelBuilder and ServerBuilder are used
sanjaypujare marked this conversation as resolved.
Show resolved Hide resolved
LogHelper helper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER);
ConfigFilterHelper configFilterHelper = ConfigFilterHelper.factory(observabilityConfig);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are passing observabilityConfig to the non-static grpcInit we can construct configFilterHelper in there? Unless I am missing something?

May be the same thing for many other objects being constructed here and passed to the non-static grpcInit such as sink helper etc?

The reason I am saying is because the non-static grpcInit is unit tested/unit-testable so the more code it has the better it is. But we need to look into it carefully

instance = grpcInit(sink,
instance = grpcInit(sink, observabilityConfig,
new InternalLoggingChannelInterceptor.FactoryImpl(helper, configFilterHelper),
new InternalLoggingServerInterceptor.FactoryImpl(helper, configFilterHelper));
instance.registerStackDriverExporter(observabilityConfig.getDestinationProjectId());
}
return instance;
}

@VisibleForTesting static GcpObservability grpcInit(Sink sink,
@VisibleForTesting
static GcpObservability grpcInit(
Sink sink,
ObservabilityConfig config,
InternalLoggingChannelInterceptor.Factory channelInterceptorFactory,
InternalLoggingServerInterceptor.Factory serverInterceptorFactory) {
InternalLoggingServerInterceptor.Factory serverInterceptorFactory)
throws IOException {
if (instance == null) {
instance = new GcpObservability(sink, channelInterceptorFactory, serverInterceptorFactory);
instance =
new GcpObservability(sink, config, channelInterceptorFactory, serverInterceptorFactory);
LogHelper logHelper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER);
ConfigFilterHelper logFilterHelper = ConfigFilterHelper.factory(config);
sanjaypujare marked this conversation as resolved.
Show resolved Hide resolved
instance.setProducer(
new InternalLoggingChannelInterceptor.FactoryImpl(logHelper, logFilterHelper),
new InternalLoggingServerInterceptor.FactoryImpl(logHelper, logFilterHelper));
}
return instance;
}
Expand All @@ -73,17 +109,92 @@ public void close() {
if (instance == null) {
throw new IllegalStateException("GcpObservability already closed!");
}
unRegisterStackDriverExporter();
sanjaypujare marked this conversation as resolved.
Show resolved Hide resolved
LoggingChannelProvider.shutdown();
LoggingServerProvider.shutdown();
sink.close();
instance = null;
}
}

private GcpObservability(Sink sink,
private void setProducer(
InternalLoggingChannelInterceptor.Factory channelInterceptorFactory,
InternalLoggingServerInterceptor.Factory serverInterceptorFactory) {
if (config.isEnableCloudLogging()) {
clientInterceptors.add(channelInterceptorFactory.create());
serverInterceptors.add(serverInterceptorFactory.create());
}
if (config.isEnableCloudMonitoring()) {
clientInterceptors.add(
InternalCensusStatsAccessor.getClientInterceptor(true, true, true, true));
tracerFactories.add(
InternalCensusStatsAccessor.getServerStreamTracerFactory(true, true, true));
}
if (config.isEnableCloudTracing()) {
clientInterceptors.add(InternalCensusTracingAccessor.getClientInterceptor());
tracerFactories.add(InternalCensusTracingAccessor.getServerStreamTracerFactory());
}

InternalGlobalInterceptors.setInterceptorsTracers(
clientInterceptors, serverInterceptors, tracerFactories);
}

private void registerStackDriverExporter(String projectId) throws IOException {
if (config.isEnableCloudMonitoring()) {
RpcViews.registerAllGrpcViews();
StackdriverStatsConfiguration.Builder statsConfigurationBuilder =
StackdriverStatsConfiguration.builder();
if (projectId != null) {
statsConfigurationBuilder.setProjectId(projectId);
}
StackdriverStatsExporter.createAndRegister(statsConfigurationBuilder.build());
metricsEnabled = true;
}

if (config.isEnableCloudTracing()) {
TraceConfig traceConfig = Tracing.getTraceConfig();
traceConfig.updateActiveTraceParams(
traceConfig.getActiveTraceParams().toBuilder().setSampler(config.getSampler()).build());
StackdriverTraceConfiguration.Builder traceConfigurationBuilder =
StackdriverTraceConfiguration.builder();
if (projectId != null) {
traceConfigurationBuilder.setProjectId(projectId);
}
StackdriverTraceExporter.createAndRegister(traceConfigurationBuilder.build());
tracesEnabled = true;
}
}

private void unRegisterStackDriverExporter() {
if (metricsEnabled) {
try {
StackdriverStatsExporter.unregister();
} catch (IllegalStateException e) {
logger.log(
Level.SEVERE, "Failed to unregister Stackdriver stats exporter, " + e.getMessage());
}
metricsEnabled = false;
}

if (tracesEnabled) {
try {
StackdriverTraceExporter.unregister();
} catch (IllegalStateException e) {
logger.log(
Level.SEVERE, "Failed to unregister Stackdriver trace exporter, " + e.getMessage());
}
tracesEnabled = false;
}
}

private GcpObservability(
Sink sink,
ObservabilityConfig config,
InternalLoggingChannelInterceptor.Factory channelInterceptorFactory,
InternalLoggingServerInterceptor.Factory serverInterceptorFactory) {
this.sink = checkNotNull(sink);
this.config = checkNotNull(config);

LoggingChannelProvider.init(checkNotNull(channelInterceptorFactory));
LoggingServerProvider.init(checkNotNull(serverInterceptorFactory));
}
Expand Down
Expand Up @@ -18,6 +18,7 @@

import io.grpc.Internal;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.opencensus.trace.Sampler;
import java.util.List;

@Internal
Expand Down Expand Up @@ -71,34 +72,4 @@ public LogFilter(String pattern, Integer headerBytes, Integer messageBytes) {
this.messageBytes = messageBytes;
}
}

sanjaypujare marked this conversation as resolved.
Show resolved Hide resolved
/** Corresponds to a {@link io.opencensus.trace.Sampler} type. */
enum SamplerType {
ALWAYS,
NEVER,
PROBABILISTIC;
}

/** Represents a trace {@link io.opencensus.trace.Sampler} configuration. */
class Sampler {
private SamplerType type;
private double probability;

Sampler(double probability) {
this.probability = probability;
this.type = SamplerType.PROBABILISTIC;
}

Sampler(SamplerType type) {
this.type = type;
}

double getProbability() {
return probability;
}

SamplerType getType() {
return type;
}
}
}
Expand Up @@ -23,6 +23,8 @@
import io.grpc.internal.JsonParser;
import io.grpc.internal.JsonUtil;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.opencensus.trace.Sampler;
import io.opencensus.trace.samplers.Samplers;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
Expand Down Expand Up @@ -100,19 +102,23 @@ private void parseConfig(Map<String, ?> config) {
}
this.eventTypes = eventTypesBuilder.build();
}
String sampler = JsonUtil.getString(config, "global_trace_sampler");
Double samplingRate = JsonUtil.getNumberAsDouble(config, "global_trace_sampling_rate");
checkArgument(
sampler == null || samplingRate == null,
"only one of 'global_trace_sampler' or 'global_trace_sampling_rate' can be specified");
if (sampler != null) {
this.sampler = new Sampler(SamplerType.valueOf(sampler.toUpperCase()));
if (enableCloudTracing && samplingRate == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be simpler/better if you just remove the check for enableCloudTracing and do it as follows (with epsilon defined as a class constant)?:

      if (samplingRate == null) {
        this.sampler = Samplers.probabilitySampler(0.0);
      } else {
        checkArgument(
            samplingRate >= 0.0 && samplingRate <= 1.0,
            "'global_trace_sampling_rate' needs to be between [0.0, 1.0]");
        // Using alwaysSample() instead of probabilitySampler() because according to
        // {@link io.opencensus.trace.samplers.ProbabilitySampler#shouldSample}
        // there is a (very) small chance of *not* sampling if probability = 1.00.
        if (Math.abs(1 - samplingRate) < EPSILON) {
          this.sampler = Samplers.alwaysSample();
        } else {
          this.sampler = Samplers.probabilitySampler(samplingRate);
        }
      }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

this.sampler = Samplers.probabilitySampler(0.0);
sanjaypujare marked this conversation as resolved.
Show resolved Hide resolved
}
double epsilon = 1e-6;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not make this a class final constant with some comment (e.g. why 1e-6)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (samplingRate != null) {
checkArgument(
samplingRate >= 0.0 && samplingRate <= 1.0,
"'global_trace_sampling_rate' needs to be between 0.0 and 1.0");
this.sampler = new Sampler(samplingRate);
"'global_trace_sampling_rate' needs to be between [0.0, 1.0]");
// Using alwaysSample() instead of probabilitySampler() because according to
// {@link io.opencensus.trace.samplers.ProbabilitySampler#shouldSample}
// there is a (very) small chance of *not* sampling if probability = 1.00.
if (Math.abs(1 - samplingRate) < epsilon) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Since samplingRate is always going to be <= 1.0 (because of the check on line 112) the Math.abs is strictly unnecessary. But I am okay to leave it there as a generalized double equality comparison logic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed Math.abs

this.sampler = Samplers.alwaysSample();
} else {
this.sampler = Samplers.probabilitySampler(samplingRate);
}
}
}
}
Expand Down