Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gcp-observability: remove logging channel/server providers #9424

Merged
merged 5 commits into from Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -34,6 +34,7 @@
import io.grpc.gcp.observability.logging.GcpLogSink;
import io.grpc.gcp.observability.logging.Sink;
import io.grpc.internal.TimeProvider;
import io.opencensus.common.Duration;
import io.opencensus.contrib.grpc.metrics.RpcViews;
import io.opencensus.exporter.stats.stackdriver.StackdriverStatsConfiguration;
import io.opencensus.exporter.stats.stackdriver.StackdriverStatsExporter;
Expand All @@ -55,6 +56,7 @@
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8869")
public final class GcpObservability implements AutoCloseable {
private static final Logger logger = Logger.getLogger(GcpObservability.class.getName());
private static final int METRICS_EXPORT_INTERVAL = 30;
private static GcpObservability instance = null;
private final Sink sink;
private final ObservabilityConfig config;
Expand All @@ -76,8 +78,6 @@ public static synchronized GcpObservability grpcInit() throws IOException {
Sink sink = new GcpLogSink(observabilityConfig.getDestinationProjectId(),
globalLocationTags.getLocationTags(), observabilityConfig.getCustomTags(),
observabilityConfig.getFlushMessageCount());
// TODO(dnvindhya): Cleanup code for LoggingChannelProvider and LoggingServerProvider
// once ChannelBuilder and ServerBuilder are used
LogHelper helper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER);
ConfigFilterHelper configFilterHelper = ConfigFilterHelper.factory(observabilityConfig);
instance = grpcInit(sink, observabilityConfig,
Expand All @@ -97,13 +97,8 @@ static GcpObservability grpcInit(
InternalLoggingServerInterceptor.Factory serverInterceptorFactory)
throws IOException {
if (instance == null) {
instance =
new GcpObservability(sink, config, channelInterceptorFactory, serverInterceptorFactory);
LogHelper logHelper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER);
ConfigFilterHelper logFilterHelper = ConfigFilterHelper.factory(config);
instance.setProducer(
new InternalLoggingChannelInterceptor.FactoryImpl(logHelper, logFilterHelper),
new InternalLoggingServerInterceptor.FactoryImpl(logHelper, logFilterHelper));
instance = new GcpObservability(sink, config);
instance.setProducer(channelInterceptorFactory, serverInterceptorFactory);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something that's now clear is that we only ever create one instance of the InternalLoggingChannelInterceptor and InternalLoggingServerInterceptor each and install those in the Global interceptors. So the factory is kind of useless. One thing we may consider (may be a separate PR) is to eliminate the Factories and replace those with those respective interceptors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about a TODO comment saying we will eliminate the factory?

}
return instance;
}
Expand All @@ -116,8 +111,6 @@ public void close() {
throw new IllegalStateException("GcpObservability already closed!");
}
unRegisterStackDriverExporter();
LoggingChannelProvider.shutdown();
LoggingServerProvider.shutdown();
sink.close();
instance = null;
}
Expand Down Expand Up @@ -145,7 +138,8 @@ private void setProducer(
clientInterceptors, serverInterceptors, tracerFactories);
}

private void registerStackDriverExporter(String projectId, Map<String, String> customTags)
@VisibleForTesting
void registerStackDriverExporter(String projectId, Map<String, String> customTags)
throws IOException {
if (config.isEnableCloudMonitoring()) {
RpcViews.registerAllGrpcViews();
Expand All @@ -160,6 +154,7 @@ private void registerStackDriverExporter(String projectId, Map<String, String> c
e -> LabelValue.create(e.getValue())));
statsConfigurationBuilder.setConstantLabels(constantLabels);
}
statsConfigurationBuilder.setExportInterval(Duration.create(METRICS_EXPORT_INTERVAL, 0));
sanjaypujare marked this conversation as resolved.
Show resolved Hide resolved
StackdriverStatsExporter.createAndRegister(statsConfigurationBuilder.build());
metricsEnabled = true;
}
Expand Down Expand Up @@ -208,13 +203,8 @@ private void unRegisterStackDriverExporter() {

private GcpObservability(
Sink sink,
ObservabilityConfig config,
InternalLoggingChannelInterceptor.Factory channelInterceptorFactory,
InternalLoggingServerInterceptor.Factory serverInterceptorFactory) {
ObservabilityConfig config) {
this.sink = checkNotNull(sink);
this.config = checkNotNull(config);

LoggingChannelProvider.init(checkNotNull(channelInterceptorFactory));
LoggingServerProvider.init(checkNotNull(serverInterceptorFactory));
}
}

This file was deleted.

This file was deleted.

Expand Up @@ -40,7 +40,7 @@
import java.util.logging.Logger;

/**
* A logging interceptor for {@code LoggingChannelProvider}.
* A logging client interceptor for Observability.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned elsewhere we should consider getting rid of the Factory (in the Server interceptor as well). A separate PR seems more convenient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do agree and will do this in another PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls add a TODO comment saying so

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

*/
@Internal
public final class InternalLoggingChannelInterceptor implements ClientInterceptor {
Expand Down
Expand Up @@ -40,7 +40,7 @@
import java.util.logging.Logger;

/**
* A logging interceptor for {@code LoggingServerProvider}.
* A logging server interceptor for Observability.
*/
@Internal
public final class InternalLoggingServerInterceptor implements ServerInterceptor {
Expand Down
Expand Up @@ -54,18 +54,20 @@ public class GcpLogSink implements Sink {
= ImmutableSet.of("project_id", "location", "cluster_name", "namespace_name",
"pod_name", "container_name");
private static final long FALLBACK_FLUSH_LIMIT = 100L;
private final String projectId;
private final Map<String, String> customTags;
private final Logging gcpLoggingClient;
private final MonitoredResource kubernetesResource;
private final Long flushLimit;
/** Lazily initialize cloud logging client to avoid circular initialization. Because cloud
* logging APIs also uses gRPC. */
private volatile Logging gcpLoggingClient;
private long flushCounter;

private static Logging createLoggingClient(String projectId) {
LoggingOptions.Builder builder = LoggingOptions.newBuilder();
if (!Strings.isNullOrEmpty(projectId)) {
builder.setProjectId(projectId);
}
return builder.build().getService();
@VisibleForTesting
GcpLogSink(Logging loggingClient, String destinationProjectId, Map<String, String> locationTags,
Map<String, String> customTags, Long flushLimit) {
this(destinationProjectId, locationTags, customTags, flushLimit);
this.gcpLoggingClient = loggingClient;
}

/**
Expand All @@ -75,15 +77,7 @@ private static Logging createLoggingClient(String projectId) {
*/
public GcpLogSink(String destinationProjectId, Map<String, String> locationTags,
Map<String, String> customTags, Long flushLimit) {
this(createLoggingClient(destinationProjectId), destinationProjectId, locationTags,
customTags, flushLimit);

}

@VisibleForTesting
GcpLogSink(Logging client, String destinationProjectId, Map<String, String> locationTags,
Map<String, String> customTags, Long flushLimit) {
this.gcpLoggingClient = client;
this.projectId = destinationProjectId;
this.customTags = getCustomTags(customTags, locationTags, destinationProjectId);
this.kubernetesResource = getResource(locationTags);
this.flushLimit = flushLimit != null ? flushLimit : FALLBACK_FLUSH_LIMIT;
Expand All @@ -98,8 +92,11 @@ public GcpLogSink(String destinationProjectId, Map<String, String> locationTags,
@Override
public void write(GrpcLogRecord logProto) {
if (gcpLoggingClient == null) {
logger.log(Level.SEVERE, "Attempt to write after GcpLogSink is closed.");
return;
synchronized (this) {
if (gcpLoggingClient == null) {
gcpLoggingClient = createLoggingClient();
}
}
}
if (SERVICE_TO_EXCLUDE.equals(logProto.getServiceName())) {
return;
Expand Down Expand Up @@ -133,6 +130,14 @@ public void write(GrpcLogRecord logProto) {
}
}

Logging createLoggingClient() {
sanjaypujare marked this conversation as resolved.
Show resolved Hide resolved
LoggingOptions.Builder builder = LoggingOptions.newBuilder();
if (!Strings.isNullOrEmpty(projectId)) {
builder.setProjectId(projectId);
}
return builder.build().getService();
}

@VisibleForTesting
static Map<String, String> getCustomTags(Map<String, String> customTags,
Map<String, String> locationTags, String destinationProjectId) {
Expand Down