Skip to content

Commit

Permalink
gcp-observability: remove logging channel/server providers (#9424)
Browse files Browse the repository at this point in the history
  • Loading branch information
DNVindhya committed Aug 10, 2022
1 parent 050cdb1 commit 7bdca0c
Show file tree
Hide file tree
Showing 13 changed files with 593 additions and 701 deletions.
Expand Up @@ -34,6 +34,7 @@
import io.grpc.gcp.observability.logging.GcpLogSink;
import io.grpc.gcp.observability.logging.Sink;
import io.grpc.internal.TimeProvider;
import io.opencensus.common.Duration;
import io.opencensus.contrib.grpc.metrics.RpcViews;
import io.opencensus.exporter.stats.stackdriver.StackdriverStatsConfiguration;
import io.opencensus.exporter.stats.stackdriver.StackdriverStatsExporter;
Expand All @@ -55,6 +56,7 @@
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8869")
public final class GcpObservability implements AutoCloseable {
private static final Logger logger = Logger.getLogger(GcpObservability.class.getName());
private static final int METRICS_EXPORT_INTERVAL = 30;
private static GcpObservability instance = null;
private final Sink sink;
private final ObservabilityConfig config;
Expand All @@ -76,8 +78,6 @@ public static synchronized GcpObservability grpcInit() throws IOException {
Sink sink = new GcpLogSink(observabilityConfig.getDestinationProjectId(),
globalLocationTags.getLocationTags(), observabilityConfig.getCustomTags(),
observabilityConfig.getFlushMessageCount());
// TODO(dnvindhya): Cleanup code for LoggingChannelProvider and LoggingServerProvider
// once ChannelBuilder and ServerBuilder are used
LogHelper helper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER);
ConfigFilterHelper configFilterHelper = ConfigFilterHelper.factory(observabilityConfig);
instance = grpcInit(sink, observabilityConfig,
Expand All @@ -97,13 +97,8 @@ static GcpObservability grpcInit(
InternalLoggingServerInterceptor.Factory serverInterceptorFactory)
throws IOException {
if (instance == null) {
instance =
new GcpObservability(sink, config, channelInterceptorFactory, serverInterceptorFactory);
LogHelper logHelper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER);
ConfigFilterHelper logFilterHelper = ConfigFilterHelper.factory(config);
instance.setProducer(
new InternalLoggingChannelInterceptor.FactoryImpl(logHelper, logFilterHelper),
new InternalLoggingServerInterceptor.FactoryImpl(logHelper, logFilterHelper));
instance = new GcpObservability(sink, config);
instance.setProducer(channelInterceptorFactory, serverInterceptorFactory);
}
return instance;
}
Expand All @@ -116,13 +111,13 @@ public void close() {
throw new IllegalStateException("GcpObservability already closed!");
}
unRegisterStackDriverExporter();
LoggingChannelProvider.shutdown();
LoggingServerProvider.shutdown();
sink.close();
instance = null;
}
}

// TODO(dnvindhya): Remove <channel/server>InterceptorFactory and replace with respective
// interceptors
private void setProducer(
InternalLoggingChannelInterceptor.Factory channelInterceptorFactory,
InternalLoggingServerInterceptor.Factory serverInterceptorFactory) {
Expand All @@ -145,7 +140,8 @@ private void setProducer(
clientInterceptors, serverInterceptors, tracerFactories);
}

private void registerStackDriverExporter(String projectId, Map<String, String> customTags)
@VisibleForTesting
void registerStackDriverExporter(String projectId, Map<String, String> customTags)
throws IOException {
if (config.isEnableCloudMonitoring()) {
RpcViews.registerAllGrpcViews();
Expand All @@ -160,6 +156,7 @@ private void registerStackDriverExporter(String projectId, Map<String, String> c
e -> LabelValue.create(e.getValue())));
statsConfigurationBuilder.setConstantLabels(constantLabels);
}
statsConfigurationBuilder.setExportInterval(Duration.create(METRICS_EXPORT_INTERVAL, 0));
StackdriverStatsExporter.createAndRegister(statsConfigurationBuilder.build());
metricsEnabled = true;
}
Expand Down Expand Up @@ -208,13 +205,8 @@ private void unRegisterStackDriverExporter() {

private GcpObservability(
Sink sink,
ObservabilityConfig config,
InternalLoggingChannelInterceptor.Factory channelInterceptorFactory,
InternalLoggingServerInterceptor.Factory serverInterceptorFactory) {
ObservabilityConfig config) {
this.sink = checkNotNull(sink);
this.config = checkNotNull(config);

LoggingChannelProvider.init(checkNotNull(channelInterceptorFactory));
LoggingServerProvider.init(checkNotNull(serverInterceptorFactory));
}
}

This file was deleted.

This file was deleted.

Expand Up @@ -40,7 +40,7 @@
import java.util.logging.Logger;

/**
* A logging interceptor for {@code LoggingChannelProvider}.
* A logging client interceptor for Observability.
*/
@Internal
public final class InternalLoggingChannelInterceptor implements ClientInterceptor {
Expand All @@ -51,6 +51,7 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
private final LogHelper helper;
private final ConfigFilterHelper filterHelper;

// TODO(dnvindhya): Remove factory and use interceptors directly
public interface Factory {
ClientInterceptor create();
}
Expand Down
Expand Up @@ -40,7 +40,7 @@
import java.util.logging.Logger;

/**
* A logging interceptor for {@code LoggingServerProvider}.
* A logging server interceptor for Observability.
*/
@Internal
public final class InternalLoggingServerInterceptor implements ServerInterceptor {
Expand All @@ -51,6 +51,7 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
private final LogHelper helper;
private final ConfigFilterHelper filterHelper;

// TODO(dnvindhya): Remove factory and use interceptors directly
public interface Factory {
ServerInterceptor create();
}
Expand Down
Expand Up @@ -54,18 +54,20 @@ public class GcpLogSink implements Sink {
= ImmutableSet.of("project_id", "location", "cluster_name", "namespace_name",
"pod_name", "container_name");
private static final long FALLBACK_FLUSH_LIMIT = 100L;
private final String projectId;
private final Map<String, String> customTags;
private final Logging gcpLoggingClient;
private final MonitoredResource kubernetesResource;
private final Long flushLimit;
/** Lazily initialize cloud logging client to avoid circular initialization. Because cloud
* logging APIs also uses gRPC. */
private volatile Logging gcpLoggingClient;
private long flushCounter;

private static Logging createLoggingClient(String projectId) {
LoggingOptions.Builder builder = LoggingOptions.newBuilder();
if (!Strings.isNullOrEmpty(projectId)) {
builder.setProjectId(projectId);
}
return builder.build().getService();
@VisibleForTesting
GcpLogSink(Logging loggingClient, String destinationProjectId, Map<String, String> locationTags,
Map<String, String> customTags, Long flushLimit) {
this(destinationProjectId, locationTags, customTags, flushLimit);
this.gcpLoggingClient = loggingClient;
}

/**
Expand All @@ -75,15 +77,7 @@ private static Logging createLoggingClient(String projectId) {
*/
public GcpLogSink(String destinationProjectId, Map<String, String> locationTags,
Map<String, String> customTags, Long flushLimit) {
this(createLoggingClient(destinationProjectId), destinationProjectId, locationTags,
customTags, flushLimit);

}

@VisibleForTesting
GcpLogSink(Logging client, String destinationProjectId, Map<String, String> locationTags,
Map<String, String> customTags, Long flushLimit) {
this.gcpLoggingClient = client;
this.projectId = destinationProjectId;
this.customTags = getCustomTags(customTags, locationTags, destinationProjectId);
this.kubernetesResource = getResource(locationTags);
this.flushLimit = flushLimit != null ? flushLimit : FALLBACK_FLUSH_LIMIT;
Expand All @@ -98,8 +92,11 @@ public GcpLogSink(String destinationProjectId, Map<String, String> locationTags,
@Override
public void write(GrpcLogRecord logProto) {
if (gcpLoggingClient == null) {
logger.log(Level.SEVERE, "Attempt to write after GcpLogSink is closed.");
return;
synchronized (this) {
if (gcpLoggingClient == null) {
gcpLoggingClient = createLoggingClient();
}
}
}
if (SERVICE_TO_EXCLUDE.equals(logProto.getServiceName())) {
return;
Expand Down Expand Up @@ -133,6 +130,14 @@ public void write(GrpcLogRecord logProto) {
}
}

Logging createLoggingClient() {
LoggingOptions.Builder builder = LoggingOptions.newBuilder();
if (!Strings.isNullOrEmpty(projectId)) {
builder.setProjectId(projectId);
}
return builder.build().getService();
}

@VisibleForTesting
static Map<String, String> getCustomTags(Map<String, String> customTags,
Map<String, String> locationTags, String destinationProjectId) {
Expand Down

0 comments on commit 7bdca0c

Please sign in to comment.