From 4a0a670c7d7109269d5c03e6c95f5af980a26c24 Mon Sep 17 00:00:00 2001 From: sanjaypujare Date: Wed, 10 Aug 2022 14:19:27 -0700 Subject: [PATCH] gcp-observability: implement exclusion of cloud backend RPCs for all 3 signals (#9427) * gcp-observability: implement exclusion of cloud backend RPCs for all 3 signals by using a ConditionalClientInterceptor that conditionally delegates --- .../gcp/observability/GcpObservability.java | 19 +++- .../ConditionalClientInterceptor.java | 52 +++++++++++ .../gcp/observability/logging/GcpLogSink.java | 16 ++-- .../observability/GcpObservabilityTest.java | 65 +++++++++++++- .../grpc/gcp/observability/LoggingTest.java | 5 +- .../ConditionalClientInterceptorTest.java | 89 +++++++++++++++++++ .../observability/logging/GcpLogSinkTest.java | 22 +++-- 7 files changed, 250 insertions(+), 18 deletions(-) create mode 100644 gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/ConditionalClientInterceptor.java create mode 100644 gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/ConditionalClientInterceptorTest.java diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java index e077e12a2a6..83601582ef5 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java @@ -19,6 +19,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; import io.grpc.ClientInterceptor; import io.grpc.ExperimentalApi; import io.grpc.InternalGlobalInterceptors; @@ -27,6 +28,7 @@ import io.grpc.ServerStreamTracer; import io.grpc.census.InternalCensusStatsAccessor; import io.grpc.census.InternalCensusTracingAccessor; +import io.grpc.gcp.observability.interceptors.ConditionalClientInterceptor; import io.grpc.gcp.observability.interceptors.ConfigFilterHelper; import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor; import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor; @@ -57,6 +59,9 @@ public final class GcpObservability implements AutoCloseable { private static final Logger logger = Logger.getLogger(GcpObservability.class.getName()); private static final int METRICS_EXPORT_INTERVAL = 30; + private static final ImmutableSet SERVICES_TO_EXCLUDE = ImmutableSet.of( + "google.logging.v2.LoggingServiceV2", "google.monitoring.v3.MetricService", + "google.devtools.cloudtrace.v2.TraceService"); private static GcpObservability instance = null; private final Sink sink; private final ObservabilityConfig config; @@ -77,7 +82,7 @@ public static synchronized GcpObservability grpcInit() throws IOException { ObservabilityConfigImpl observabilityConfig = ObservabilityConfigImpl.getInstance(); Sink sink = new GcpLogSink(observabilityConfig.getDestinationProjectId(), globalLocationTags.getLocationTags(), observabilityConfig.getCustomTags(), - observabilityConfig.getFlushMessageCount()); + observabilityConfig.getFlushMessageCount(), SERVICES_TO_EXCLUDE); LogHelper helper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER); ConfigFilterHelper configFilterHelper = ConfigFilterHelper.factory(observabilityConfig); instance = grpcInit(sink, observabilityConfig, @@ -126,13 +131,14 @@ private void setProducer( serverInterceptors.add(serverInterceptorFactory.create()); } if (config.isEnableCloudMonitoring()) { - clientInterceptors.add( - InternalCensusStatsAccessor.getClientInterceptor(true, true, true, true)); + clientInterceptors.add(getConditionalInterceptor( + InternalCensusStatsAccessor.getClientInterceptor(true, true, true, true))); tracerFactories.add( InternalCensusStatsAccessor.getServerStreamTracerFactory(true, true, true)); } if (config.isEnableCloudTracing()) { - clientInterceptors.add(InternalCensusTracingAccessor.getClientInterceptor()); + clientInterceptors.add( + getConditionalInterceptor(InternalCensusTracingAccessor.getClientInterceptor())); tracerFactories.add(InternalCensusTracingAccessor.getServerStreamTracerFactory()); } @@ -140,6 +146,11 @@ private void setProducer( clientInterceptors, serverInterceptors, tracerFactories); } + static ConditionalClientInterceptor getConditionalInterceptor(ClientInterceptor interceptor) { + return new ConditionalClientInterceptor(interceptor, + (m, c) -> !SERVICES_TO_EXCLUDE.contains(m.getServiceName())); + } + @VisibleForTesting void registerStackDriverExporter(String projectId, Map customTags) throws IOException { diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/ConditionalClientInterceptor.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/ConditionalClientInterceptor.java new file mode 100644 index 00000000000..5051453ce0e --- /dev/null +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/ConditionalClientInterceptor.java @@ -0,0 +1,52 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.gcp.observability.interceptors; + +import static com.google.common.base.Preconditions.checkNotNull; + +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.Internal; +import io.grpc.MethodDescriptor; +import java.util.function.BiPredicate; + +/** + * A client interceptor that conditionally calls a delegated interceptor. + */ +@Internal +public final class ConditionalClientInterceptor implements ClientInterceptor { + + private final ClientInterceptor delegate; + private final BiPredicate, CallOptions> predicate; + + public ConditionalClientInterceptor(ClientInterceptor delegate, + BiPredicate, CallOptions> predicate) { + this.delegate = checkNotNull(delegate, "delegate"); + this.predicate = checkNotNull(predicate, "predicate"); + } + + @Override + public ClientCall interceptCall(MethodDescriptor method, + CallOptions callOptions, Channel next) { + if (!predicate.test(method, callOptions)) { + return next.newCall(method, callOptions); + } + return delegate.interceptCall(method, callOptions, next); + } +} diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java index 9b198250afb..8275b1ec048 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java @@ -16,6 +16,8 @@ package io.grpc.gcp.observability.logging; +import static com.google.common.base.Preconditions.checkNotNull; + import com.google.cloud.MonitoredResource; import com.google.cloud.logging.LogEntry; import com.google.cloud.logging.Logging; @@ -31,6 +33,7 @@ import io.grpc.internal.JsonParser; import io.grpc.observabilitylog.v1.GrpcLogRecord; import java.io.IOException; +import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.Objects; @@ -45,8 +48,6 @@ public class GcpLogSink implements Sink { private final Logger logger = Logger.getLogger(GcpLogSink.class.getName()); - // TODO(DNVindhya): Make cloud logging service a configurable value - private static final String SERVICE_TO_EXCLUDE = "google.logging.v2.LoggingServiceV2"; private static final String DEFAULT_LOG_NAME = "microservices.googleapis.com%2Fobservability%2Fgrpc"; private static final String K8S_MONITORED_RESOURCE_TYPE = "k8s_container"; @@ -62,11 +63,12 @@ public class GcpLogSink implements Sink { * logging APIs also uses gRPC. */ private volatile Logging gcpLoggingClient; private long flushCounter; + private final Collection servicesToExclude; @VisibleForTesting GcpLogSink(Logging loggingClient, String destinationProjectId, Map locationTags, - Map customTags, Long flushLimit) { - this(destinationProjectId, locationTags, customTags, flushLimit); + Map customTags, Long flushLimit, Collection servicesToExclude) { + this(destinationProjectId, locationTags, customTags, flushLimit, servicesToExclude); this.gcpLoggingClient = loggingClient; } @@ -74,14 +76,16 @@ public class GcpLogSink implements Sink { * Retrieves a single instance of GcpLogSink. * * @param destinationProjectId cloud project id to write logs + * @param servicesToExclude service names for which log entries should not be generated */ public GcpLogSink(String destinationProjectId, Map locationTags, - Map customTags, Long flushLimit) { + Map customTags, Long flushLimit, Collection servicesToExclude) { this.projectId = destinationProjectId; this.customTags = getCustomTags(customTags, locationTags, destinationProjectId); this.kubernetesResource = getResource(locationTags); this.flushLimit = flushLimit != null ? flushLimit : FALLBACK_FLUSH_LIMIT; this.flushCounter = 0L; + this.servicesToExclude = checkNotNull(servicesToExclude, "servicesToExclude"); } /** @@ -98,7 +102,7 @@ public void write(GrpcLogRecord logProto) { } } } - if (SERVICE_TO_EXCLUDE.equals(logProto.getServiceName())) { + if (servicesToExclude.contains(logProto.getServiceName())) { return; } try { diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java index 97e9031631b..b18fe741ae2 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java @@ -19,8 +19,11 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.fail; import static org.mockito.AdditionalAnswers.delegatesTo; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import io.grpc.CallOptions; @@ -34,11 +37,13 @@ import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; import io.grpc.StaticTestingClassLoader; +import io.grpc.gcp.observability.interceptors.ConditionalClientInterceptor; import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor; import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor; import io.grpc.gcp.observability.logging.Sink; import io.opencensus.trace.samplers.Samplers; import java.io.IOException; +import java.util.List; import java.util.regex.Pattern; import org.junit.Test; import org.junit.runner.RunWith; @@ -77,6 +82,61 @@ public void disableObservability() throws Exception { ((Runnable) runnable.getDeclaredConstructor().newInstance()).run(); } + @Test + @SuppressWarnings("unchecked") + public void conditionalInterceptor() { + ClientInterceptor delegate = mock(ClientInterceptor.class); + Channel channel = mock(Channel.class); + ClientCall returnedCall = mock(ClientCall.class); + + ConditionalClientInterceptor conditionalClientInterceptor + = GcpObservability.getConditionalInterceptor( + delegate); + MethodDescriptor method = MethodDescriptor.newBuilder() + .setType(MethodDescriptor.MethodType.UNARY) + .setFullMethodName("google.logging.v2.LoggingServiceV2/method") + .setRequestMarshaller(mock(MethodDescriptor.Marshaller.class)) + .setResponseMarshaller(mock(MethodDescriptor.Marshaller.class)) + .build(); + doReturn(returnedCall).when(channel).newCall(method, CallOptions.DEFAULT); + ClientCall clientCall = conditionalClientInterceptor.interceptCall(method, + CallOptions.DEFAULT, channel); + verifyNoInteractions(delegate); + assertThat(clientCall).isSameInstanceAs(returnedCall); + + method = MethodDescriptor.newBuilder().setType(MethodDescriptor.MethodType.UNARY) + .setFullMethodName("google.monitoring.v3.MetricService/method2") + .setRequestMarshaller(mock(MethodDescriptor.Marshaller.class)) + .setResponseMarshaller(mock(MethodDescriptor.Marshaller.class)) + .build(); + doReturn(returnedCall).when(channel).newCall(method, CallOptions.DEFAULT); + clientCall = conditionalClientInterceptor.interceptCall(method, CallOptions.DEFAULT, channel); + verifyNoInteractions(delegate); + assertThat(clientCall).isSameInstanceAs(returnedCall); + + method = MethodDescriptor.newBuilder().setType(MethodDescriptor.MethodType.UNARY) + .setFullMethodName("google.devtools.cloudtrace.v2.TraceService/method3") + .setRequestMarshaller(mock(MethodDescriptor.Marshaller.class)) + .setResponseMarshaller(mock(MethodDescriptor.Marshaller.class)) + .build(); + doReturn(returnedCall).when(channel).newCall(method, CallOptions.DEFAULT); + clientCall = conditionalClientInterceptor.interceptCall(method, CallOptions.DEFAULT, channel); + verifyNoInteractions(delegate); + assertThat(clientCall).isSameInstanceAs(returnedCall); + + reset(channel); + ClientCall interceptedCall = mock(ClientCall.class); + method = MethodDescriptor.newBuilder().setType(MethodDescriptor.MethodType.UNARY) + .setFullMethodName("some.other.random.service/method4") + .setRequestMarshaller(mock(MethodDescriptor.Marshaller.class)) + .setResponseMarshaller(mock(MethodDescriptor.Marshaller.class)) + .build(); + doReturn(interceptedCall).when(delegate).interceptCall(method, CallOptions.DEFAULT, channel); + clientCall = conditionalClientInterceptor.interceptCall(method, CallOptions.DEFAULT, channel); + verifyNoInteractions(channel); + assertThat(clientCall).isSameInstanceAs(interceptedCall); + } + // UsedReflectively public static final class StaticTestingClassInitFinish implements Runnable { @@ -137,7 +197,10 @@ public void run() { try (GcpObservability unused = GcpObservability.grpcInit( sink, config, channelInterceptorFactory, serverInterceptorFactory)) { - assertThat(InternalGlobalInterceptors.getClientInterceptors()).hasSize(3); + List list = InternalGlobalInterceptors.getClientInterceptors(); + assertThat(list).hasSize(3); + assertThat(list.get(1)).isInstanceOf(ConditionalClientInterceptor.class); + assertThat(list.get(2)).isInstanceOf(ConditionalClientInterceptor.class); assertThat(InternalGlobalInterceptors.getServerInterceptors()).hasSize(1); assertThat(InternalGlobalInterceptors.getServerStreamTracerFactories()).hasSize(2); } catch (Exception e) { diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java index b4dea62c047..aa6c2d55d8b 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java @@ -42,6 +42,7 @@ import io.grpc.testing.GrpcCleanupRule; import io.grpc.testing.protobuf.SimpleServiceGrpc; import java.io.IOException; +import java.util.Collections; import java.util.regex.Pattern; import org.junit.ClassRule; import org.junit.Ignore; @@ -110,7 +111,9 @@ public static final class StaticTestingClassEndtoEndLogging implements Runnable @Override public void run() { - Sink sink = new GcpLogSink(PROJECT_ID, LOCATION_TAGS, CUSTOM_TAGS, FLUSH_LIMIT); + Sink sink = + new GcpLogSink( + PROJECT_ID, LOCATION_TAGS, CUSTOM_TAGS, FLUSH_LIMIT, Collections.emptySet()); ObservabilityConfig config = mock(ObservabilityConfig.class); LogHelper spyLogHelper = spy(new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER)); ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class); diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/ConditionalClientInterceptorTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/ConditionalClientInterceptorTest.java new file mode 100644 index 00000000000..5fdd2e185bd --- /dev/null +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/ConditionalClientInterceptorTest.java @@ -0,0 +1,89 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.gcp.observability.interceptors; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.MethodDescriptor; +import io.grpc.MethodDescriptor.MethodType; +import java.util.function.BiPredicate; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link ConditionalClientInterceptor}. + */ +@RunWith(JUnit4.class) +public class ConditionalClientInterceptorTest { + + private ConditionalClientInterceptor conditionalClientInterceptor; + @Mock private ClientInterceptor delegate; + @Mock private BiPredicate, CallOptions> predicate; + @Mock private Channel channel; + @Mock private ClientCall returnedCall; + private MethodDescriptor method; + + @Before + @SuppressWarnings("unchecked") + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + conditionalClientInterceptor = new ConditionalClientInterceptor( + delegate, predicate); + method = MethodDescriptor.newBuilder().setType(MethodType.UNARY) + .setFullMethodName("service/method") + .setRequestMarshaller(mock(MethodDescriptor.Marshaller.class)) + .setResponseMarshaller(mock(MethodDescriptor.Marshaller.class)) + .build(); + } + + @Test + @SuppressWarnings("unchecked") + public void predicateFalse() { + when(predicate.test(any(MethodDescriptor.class), any(CallOptions.class))).thenReturn(false); + doReturn(returnedCall).when(channel).newCall(method, CallOptions.DEFAULT); + ClientCall clientCall = conditionalClientInterceptor.interceptCall(method, + CallOptions.DEFAULT, channel); + assertThat(clientCall).isSameInstanceAs(returnedCall); + verify(delegate, never()).interceptCall(any(MethodDescriptor.class), any(CallOptions.class), + any(Channel.class)); + } + + @Test + @SuppressWarnings("unchecked") + public void predicateTrue() { + when(predicate.test(any(MethodDescriptor.class), any(CallOptions.class))).thenReturn(true); + doReturn(returnedCall).when(delegate).interceptCall(method, CallOptions.DEFAULT, channel); + ClientCall clientCall = conditionalClientInterceptor.interceptCall(method, + CallOptions.DEFAULT, channel); + assertThat(clientCall).isSameInstanceAs(returnedCall); + verify(channel, never()).newCall(any(MethodDescriptor.class), any(CallOptions.class)); + } +} diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java index 031313c7304..cea081b9b55 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java @@ -21,6 +21,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; import com.google.cloud.MonitoredResource; @@ -35,6 +36,7 @@ import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -105,7 +107,7 @@ public class GcpLogSinkTest { @SuppressWarnings("unchecked") public void verifyWrite() throws Exception { GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS, - CUSTOM_TAGS, FLUSH_LIMIT); + CUSTOM_TAGS, FLUSH_LIMIT, Collections.emptySet()); sink.write(LOG_PROTO); ArgumentCaptor> logEntrySetCaptor = ArgumentCaptor.forClass( @@ -123,7 +125,7 @@ public void verifyWrite() throws Exception { @SuppressWarnings("unchecked") public void verifyWriteWithTags() { GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS, - CUSTOM_TAGS, FLUSH_LIMIT); + CUSTOM_TAGS, FLUSH_LIMIT, Collections.emptySet()); MonitoredResource expectedMonitoredResource = GcpLogSink.getResource(LOCATION_TAGS); sink.write(LOG_PROTO); @@ -147,7 +149,7 @@ public void emptyCustomTags_labelsNotSet() { Map emptyCustomTags = null; Map expectedEmptyLabels = new HashMap<>(); GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS, - emptyCustomTags, FLUSH_LIMIT); + emptyCustomTags, FLUSH_LIMIT, Collections.emptySet()); sink.write(LOG_PROTO); ArgumentCaptor> logEntrySetCaptor = ArgumentCaptor.forClass( @@ -168,7 +170,7 @@ public void emptyCustomTags_setSourceProject() { Map expectedLabels = GcpLogSink.getCustomTags(emptyCustomTags, LOCATION_TAGS, destinationProjectId); GcpLogSink sink = new GcpLogSink(mockLogging, destinationProjectId, LOCATION_TAGS, - emptyCustomTags, FLUSH_LIMIT); + emptyCustomTags, FLUSH_LIMIT, Collections.emptySet()); sink.write(LOG_PROTO); ArgumentCaptor> logEntrySetCaptor = ArgumentCaptor.forClass( @@ -185,7 +187,7 @@ public void emptyCustomTags_setSourceProject() { public void verifyFlush() { long lowerFlushLimit = 2L; GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS, - CUSTOM_TAGS, lowerFlushLimit); + CUSTOM_TAGS, lowerFlushLimit, Collections.emptySet()); sink.write(LOG_PROTO); verify(mockLogging, never()).flush(); sink.write(LOG_PROTO); @@ -198,11 +200,19 @@ public void verifyFlush() { @Test public void verifyClose() throws Exception { GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS, - CUSTOM_TAGS, FLUSH_LIMIT); + CUSTOM_TAGS, FLUSH_LIMIT, Collections.emptySet()); sink.write(LOG_PROTO); verify(mockLogging, times(1)).write(anyIterable()); sink.close(); verify(mockLogging).close(); verifyNoMoreInteractions(mockLogging); } + + @Test + public void verifyExclude() throws Exception { + Sink mockSink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS, + CUSTOM_TAGS, FLUSH_LIMIT, Collections.singleton("service")); + mockSink.write(LOG_PROTO); + verifyNoInteractions(mockLogging); + } }