Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gcp-observability: implement exclusion of cloud backend RPCs for all 3 signals (backport to v1.49.x) (#9427) #9436

Merged
merged 1 commit into from Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -19,6 +19,7 @@
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import io.grpc.ClientInterceptor;
import io.grpc.ExperimentalApi;
import io.grpc.InternalGlobalInterceptors;
Expand All @@ -27,6 +28,7 @@
import io.grpc.ServerStreamTracer;
import io.grpc.census.InternalCensusStatsAccessor;
import io.grpc.census.InternalCensusTracingAccessor;
import io.grpc.gcp.observability.interceptors.ConditionalClientInterceptor;
import io.grpc.gcp.observability.interceptors.ConfigFilterHelper;
import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor;
import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor;
Expand Down Expand Up @@ -57,6 +59,9 @@
public final class GcpObservability implements AutoCloseable {
private static final Logger logger = Logger.getLogger(GcpObservability.class.getName());
private static final int METRICS_EXPORT_INTERVAL = 30;
private static final ImmutableSet<String> SERVICES_TO_EXCLUDE = ImmutableSet.of(
"google.logging.v2.LoggingServiceV2", "google.monitoring.v3.MetricService",
"google.devtools.cloudtrace.v2.TraceService");
private static GcpObservability instance = null;
private final Sink sink;
private final ObservabilityConfig config;
Expand All @@ -77,7 +82,7 @@ public static synchronized GcpObservability grpcInit() throws IOException {
ObservabilityConfigImpl observabilityConfig = ObservabilityConfigImpl.getInstance();
Sink sink = new GcpLogSink(observabilityConfig.getDestinationProjectId(),
globalLocationTags.getLocationTags(), observabilityConfig.getCustomTags(),
observabilityConfig.getFlushMessageCount());
observabilityConfig.getFlushMessageCount(), SERVICES_TO_EXCLUDE);
LogHelper helper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER);
ConfigFilterHelper configFilterHelper = ConfigFilterHelper.factory(observabilityConfig);
instance = grpcInit(sink, observabilityConfig,
Expand Down Expand Up @@ -126,20 +131,26 @@ private void setProducer(
serverInterceptors.add(serverInterceptorFactory.create());
}
if (config.isEnableCloudMonitoring()) {
clientInterceptors.add(
InternalCensusStatsAccessor.getClientInterceptor(true, true, true, true));
clientInterceptors.add(getConditionalInterceptor(
InternalCensusStatsAccessor.getClientInterceptor(true, true, true, true)));
tracerFactories.add(
InternalCensusStatsAccessor.getServerStreamTracerFactory(true, true, true));
}
if (config.isEnableCloudTracing()) {
clientInterceptors.add(InternalCensusTracingAccessor.getClientInterceptor());
clientInterceptors.add(
getConditionalInterceptor(InternalCensusTracingAccessor.getClientInterceptor()));
tracerFactories.add(InternalCensusTracingAccessor.getServerStreamTracerFactory());
}

InternalGlobalInterceptors.setInterceptorsTracers(
clientInterceptors, serverInterceptors, tracerFactories);
}

static ConditionalClientInterceptor getConditionalInterceptor(ClientInterceptor interceptor) {
return new ConditionalClientInterceptor(interceptor,
(m, c) -> !SERVICES_TO_EXCLUDE.contains(m.getServiceName()));
}

@VisibleForTesting
void registerStackDriverExporter(String projectId, Map<String, String> customTags)
throws IOException {
Expand Down
@@ -0,0 +1,52 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.gcp.observability.interceptors;

import static com.google.common.base.Preconditions.checkNotNull;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.Internal;
import io.grpc.MethodDescriptor;
import java.util.function.BiPredicate;

/**
* A client interceptor that conditionally calls a delegated interceptor.
*/
@Internal
public final class ConditionalClientInterceptor implements ClientInterceptor {

private final ClientInterceptor delegate;
private final BiPredicate<MethodDescriptor<?, ?>, CallOptions> predicate;

public ConditionalClientInterceptor(ClientInterceptor delegate,
BiPredicate<MethodDescriptor<?, ?>, CallOptions> predicate) {
this.delegate = checkNotNull(delegate, "delegate");
this.predicate = checkNotNull(predicate, "predicate");
}

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
if (!predicate.test(method, callOptions)) {
return next.newCall(method, callOptions);
}
return delegate.interceptCall(method, callOptions, next);
}
}
Expand Up @@ -16,6 +16,8 @@

package io.grpc.gcp.observability.logging;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.cloud.MonitoredResource;
import com.google.cloud.logging.LogEntry;
import com.google.cloud.logging.Logging;
Expand All @@ -31,6 +33,7 @@
import io.grpc.internal.JsonParser;
import io.grpc.observabilitylog.v1.GrpcLogRecord;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
Expand All @@ -45,8 +48,6 @@
public class GcpLogSink implements Sink {
private final Logger logger = Logger.getLogger(GcpLogSink.class.getName());

// TODO(DNVindhya): Make cloud logging service a configurable value
private static final String SERVICE_TO_EXCLUDE = "google.logging.v2.LoggingServiceV2";
private static final String DEFAULT_LOG_NAME =
"microservices.googleapis.com%2Fobservability%2Fgrpc";
private static final String K8S_MONITORED_RESOURCE_TYPE = "k8s_container";
Expand All @@ -62,26 +63,29 @@ public class GcpLogSink implements Sink {
* logging APIs also uses gRPC. */
private volatile Logging gcpLoggingClient;
private long flushCounter;
private final Collection<String> servicesToExclude;

@VisibleForTesting
GcpLogSink(Logging loggingClient, String destinationProjectId, Map<String, String> locationTags,
Map<String, String> customTags, Long flushLimit) {
this(destinationProjectId, locationTags, customTags, flushLimit);
Map<String, String> customTags, Long flushLimit, Collection<String> servicesToExclude) {
this(destinationProjectId, locationTags, customTags, flushLimit, servicesToExclude);
this.gcpLoggingClient = loggingClient;
}

/**
* Retrieves a single instance of GcpLogSink.
*
* @param destinationProjectId cloud project id to write logs
* @param servicesToExclude service names for which log entries should not be generated
*/
public GcpLogSink(String destinationProjectId, Map<String, String> locationTags,
Map<String, String> customTags, Long flushLimit) {
Map<String, String> customTags, Long flushLimit, Collection<String> servicesToExclude) {
this.projectId = destinationProjectId;
this.customTags = getCustomTags(customTags, locationTags, destinationProjectId);
this.kubernetesResource = getResource(locationTags);
this.flushLimit = flushLimit != null ? flushLimit : FALLBACK_FLUSH_LIMIT;
this.flushCounter = 0L;
this.servicesToExclude = checkNotNull(servicesToExclude, "servicesToExclude");
}

/**
Expand All @@ -98,7 +102,7 @@ public void write(GrpcLogRecord logProto) {
}
}
}
if (SERVICE_TO_EXCLUDE.equals(logProto.getServiceName())) {
if (servicesToExclude.contains(logProto.getServiceName())) {
return;
}
try {
Expand Down
Expand Up @@ -19,8 +19,11 @@
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

import io.grpc.CallOptions;
Expand All @@ -34,11 +37,13 @@
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.StaticTestingClassLoader;
import io.grpc.gcp.observability.interceptors.ConditionalClientInterceptor;
import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor;
import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor;
import io.grpc.gcp.observability.logging.Sink;
import io.opencensus.trace.samplers.Samplers;
import java.io.IOException;
import java.util.List;
import java.util.regex.Pattern;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -77,6 +82,61 @@ public void disableObservability() throws Exception {
((Runnable) runnable.getDeclaredConstructor().newInstance()).run();
}

@Test
@SuppressWarnings("unchecked")
public void conditionalInterceptor() {
ClientInterceptor delegate = mock(ClientInterceptor.class);
Channel channel = mock(Channel.class);
ClientCall<?, ?> returnedCall = mock(ClientCall.class);

ConditionalClientInterceptor conditionalClientInterceptor
= GcpObservability.getConditionalInterceptor(
delegate);
MethodDescriptor<?, ?> method = MethodDescriptor.newBuilder()
.setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName("google.logging.v2.LoggingServiceV2/method")
.setRequestMarshaller(mock(MethodDescriptor.Marshaller.class))
.setResponseMarshaller(mock(MethodDescriptor.Marshaller.class))
.build();
doReturn(returnedCall).when(channel).newCall(method, CallOptions.DEFAULT);
ClientCall<?, ?> clientCall = conditionalClientInterceptor.interceptCall(method,
CallOptions.DEFAULT, channel);
verifyNoInteractions(delegate);
assertThat(clientCall).isSameInstanceAs(returnedCall);

method = MethodDescriptor.newBuilder().setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName("google.monitoring.v3.MetricService/method2")
.setRequestMarshaller(mock(MethodDescriptor.Marshaller.class))
.setResponseMarshaller(mock(MethodDescriptor.Marshaller.class))
.build();
doReturn(returnedCall).when(channel).newCall(method, CallOptions.DEFAULT);
clientCall = conditionalClientInterceptor.interceptCall(method, CallOptions.DEFAULT, channel);
verifyNoInteractions(delegate);
assertThat(clientCall).isSameInstanceAs(returnedCall);

method = MethodDescriptor.newBuilder().setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName("google.devtools.cloudtrace.v2.TraceService/method3")
.setRequestMarshaller(mock(MethodDescriptor.Marshaller.class))
.setResponseMarshaller(mock(MethodDescriptor.Marshaller.class))
.build();
doReturn(returnedCall).when(channel).newCall(method, CallOptions.DEFAULT);
clientCall = conditionalClientInterceptor.interceptCall(method, CallOptions.DEFAULT, channel);
verifyNoInteractions(delegate);
assertThat(clientCall).isSameInstanceAs(returnedCall);

reset(channel);
ClientCall<?, ?> interceptedCall = mock(ClientCall.class);
method = MethodDescriptor.newBuilder().setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName("some.other.random.service/method4")
.setRequestMarshaller(mock(MethodDescriptor.Marshaller.class))
.setResponseMarshaller(mock(MethodDescriptor.Marshaller.class))
.build();
doReturn(interceptedCall).when(delegate).interceptCall(method, CallOptions.DEFAULT, channel);
clientCall = conditionalClientInterceptor.interceptCall(method, CallOptions.DEFAULT, channel);
verifyNoInteractions(channel);
assertThat(clientCall).isSameInstanceAs(interceptedCall);
}

// UsedReflectively
public static final class StaticTestingClassInitFinish implements Runnable {

Expand Down Expand Up @@ -137,7 +197,10 @@ public void run() {
try (GcpObservability unused =
GcpObservability.grpcInit(
sink, config, channelInterceptorFactory, serverInterceptorFactory)) {
assertThat(InternalGlobalInterceptors.getClientInterceptors()).hasSize(3);
List<ClientInterceptor> list = InternalGlobalInterceptors.getClientInterceptors();
assertThat(list).hasSize(3);
assertThat(list.get(1)).isInstanceOf(ConditionalClientInterceptor.class);
assertThat(list.get(2)).isInstanceOf(ConditionalClientInterceptor.class);
assertThat(InternalGlobalInterceptors.getServerInterceptors()).hasSize(1);
assertThat(InternalGlobalInterceptors.getServerStreamTracerFactories()).hasSize(2);
} catch (Exception e) {
Expand Down
Expand Up @@ -42,6 +42,7 @@
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.testing.protobuf.SimpleServiceGrpc;
import java.io.IOException;
import java.util.Collections;
import java.util.regex.Pattern;
import org.junit.ClassRule;
import org.junit.Ignore;
Expand Down Expand Up @@ -110,7 +111,9 @@ public static final class StaticTestingClassEndtoEndLogging implements Runnable

@Override
public void run() {
Sink sink = new GcpLogSink(PROJECT_ID, LOCATION_TAGS, CUSTOM_TAGS, FLUSH_LIMIT);
Sink sink =
new GcpLogSink(
PROJECT_ID, LOCATION_TAGS, CUSTOM_TAGS, FLUSH_LIMIT, Collections.emptySet());
ObservabilityConfig config = mock(ObservabilityConfig.class);
LogHelper spyLogHelper = spy(new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER));
ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class);
Expand Down
@@ -0,0 +1,89 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.gcp.observability.interceptors;

import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
import java.util.function.BiPredicate;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

/**
* Tests for {@link ConditionalClientInterceptor}.
*/
@RunWith(JUnit4.class)
public class ConditionalClientInterceptorTest {

private ConditionalClientInterceptor conditionalClientInterceptor;
@Mock private ClientInterceptor delegate;
@Mock private BiPredicate<MethodDescriptor<?, ?>, CallOptions> predicate;
@Mock private Channel channel;
@Mock private ClientCall<?, ?> returnedCall;
private MethodDescriptor<?, ?> method;

@Before
@SuppressWarnings("unchecked")
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
conditionalClientInterceptor = new ConditionalClientInterceptor(
delegate, predicate);
method = MethodDescriptor.newBuilder().setType(MethodType.UNARY)
.setFullMethodName("service/method")
.setRequestMarshaller(mock(MethodDescriptor.Marshaller.class))
.setResponseMarshaller(mock(MethodDescriptor.Marshaller.class))
.build();
}

@Test
@SuppressWarnings("unchecked")
public void predicateFalse() {
when(predicate.test(any(MethodDescriptor.class), any(CallOptions.class))).thenReturn(false);
doReturn(returnedCall).when(channel).newCall(method, CallOptions.DEFAULT);
ClientCall<?, ?> clientCall = conditionalClientInterceptor.interceptCall(method,
CallOptions.DEFAULT, channel);
assertThat(clientCall).isSameInstanceAs(returnedCall);
verify(delegate, never()).interceptCall(any(MethodDescriptor.class), any(CallOptions.class),
any(Channel.class));
}

@Test
@SuppressWarnings("unchecked")
public void predicateTrue() {
when(predicate.test(any(MethodDescriptor.class), any(CallOptions.class))).thenReturn(true);
doReturn(returnedCall).when(delegate).interceptCall(method, CallOptions.DEFAULT, channel);
ClientCall<?, ?> clientCall = conditionalClientInterceptor.interceptCall(method,
CallOptions.DEFAULT, channel);
assertThat(clientCall).isSameInstanceAs(returnedCall);
verify(channel, never()).newCall(any(MethodDescriptor.class), any(CallOptions.class));
}
}