Skip to content

Commit

Permalink
rls: Add gauge metric recording (#11175)
Browse files Browse the repository at this point in the history
Adds these gauges:
- grpc.lb.rls.cache_entries
- grpc.lb.rls.cache_size
  • Loading branch information
temawi authored and ejona86 committed May 9, 2024
1 parent f737cbc commit 8133318
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 12 deletions.
38 changes: 38 additions & 0 deletions rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,14 @@
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LongCounterMetricInstrument;
import io.grpc.LongGaugeMetricInstrument;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MetricInstrumentRegistry;
import io.grpc.MetricRecorder.BatchCallback;
import io.grpc.MetricRecorder.BatchRecorder;
import io.grpc.MetricRecorder.Registration;
import io.grpc.Status;
import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.ExponentialBackoffPolicy;
Expand All @@ -65,6 +69,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -94,6 +99,10 @@ final class CachingRlsLbClient {
private static final LongCounterMetricInstrument DEFAULT_TARGET_PICKS_COUNTER;
private static final LongCounterMetricInstrument TARGET_PICKS_COUNTER;
private static final LongCounterMetricInstrument FAILED_PICKS_COUNTER;
private static final LongGaugeMetricInstrument CACHE_ENTRIES_GAUGE;
private static final LongGaugeMetricInstrument CACHE_SIZE_GAUGE;
private final Registration gaugeRegistration;
private final String metricsInstanceUuid = UUID.randomUUID().toString();

// All cache status changes (pending, backoff, success) must be under this lock
private final Object lock = new Object();
Expand Down Expand Up @@ -138,6 +147,14 @@ final class CachingRlsLbClient {
"Number of LB picks failed due to either a failed RLS request or the RLS channel being "
+ "throttled", "pick", Arrays.asList("grpc.target", "grpc.lb.rls.server_target"),
Collections.emptyList(), true);
CACHE_ENTRIES_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_entries",
"Number of entries in the RLS cache", "entry",
Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_id"),
Collections.emptyList(), true);
CACHE_SIZE_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_size",
"The current size of the RLS cache", "byte",
Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_id"),
Collections.emptyList(), true);
}

private CachingRlsLbClient(Builder builder) {
Expand Down Expand Up @@ -202,6 +219,26 @@ private CachingRlsLbClient(Builder builder) {
lbPolicyConfig.getLoadBalancingPolicy(), childLbResolvedAddressFactory,
childLbHelperProvider,
new BackoffRefreshListener());

gaugeRegistration = helper.getMetricRecorder()
.registerBatchCallback(new BatchCallback() {
@Override
public void accept(BatchRecorder recorder) {
int estimatedSize;
long estimatedSizeBytes;
synchronized (lock) {
estimatedSize = linkedHashLruCache.estimatedSize();
estimatedSizeBytes = linkedHashLruCache.estimatedSizeBytes();
}
recorder.recordLongGauge(CACHE_ENTRIES_GAUGE, estimatedSize,
Arrays.asList(helper.getChannelTarget(), rlsConfig.lookupService(),
metricsInstanceUuid), Collections.emptyList());
recorder.recordLongGauge(CACHE_SIZE_GAUGE, estimatedSizeBytes,
Arrays.asList(helper.getChannelTarget(), rlsConfig.lookupService(),
metricsInstanceUuid), Collections.emptyList());
}
}, CACHE_ENTRIES_GAUGE, CACHE_SIZE_GAUGE);

logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created");
}

Expand Down Expand Up @@ -306,6 +343,7 @@ void close() {
pendingCallCache.clear();
rlsChannel.shutdownNow();
rlsPicker.close();
gaugeRegistration.close();
}
}

Expand Down
85 changes: 83 additions & 2 deletions rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@
import static io.grpc.rls.CachingRlsLbClient.RLS_DATA_KEY;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.common.base.Converter;
import com.google.common.collect.ImmutableList;
Expand All @@ -47,10 +50,14 @@
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancerProvider;
import io.grpc.LongGaugeMetricInstrument;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MetricRecorder;
import io.grpc.MetricRecorder.BatchCallback;
import io.grpc.MetricRecorder.BatchRecorder;
import io.grpc.MetricRecorder.Registration;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.Status;
import io.grpc.Status.Code;
Expand Down Expand Up @@ -95,12 +102,14 @@
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.AdditionalAnswers;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.Captor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
Expand All @@ -124,6 +133,13 @@ public class CachingRlsLbClientTest {
private SocketAddress socketAddress;
@Mock
private MetricRecorder mockMetricRecorder;
@Mock
private BatchRecorder mockBatchRecorder;
@Mock
private Registration mockGaugeRegistration;
@Captor
private ArgumentCaptor<BatchCallback> gaugeBatchCallbackCaptor;


private final SynchronizationContext syncContext =
new SynchronizationContext(new UncaughtExceptionHandler() {
Expand All @@ -145,7 +161,7 @@ public void uncaughtException(Thread t, Throwable e) {
private final ChildLoadBalancingPolicy childLbPolicy =
new ChildLoadBalancingPolicy("target", Collections.<String, Object>emptyMap(), lbProvider);
private final Helper helper =
mock(Helper.class, AdditionalAnswers.delegatesTo(new FakeHelper()));
mock(Helper.class, delegatesTo(new FakeHelper()));
private final FakeThrottler fakeThrottler = new FakeThrottler();
private final LbPolicyConfiguration lbPolicyConfiguration =
new LbPolicyConfiguration(ROUTE_LOOKUP_CONFIG, null, childLbPolicy);
Expand All @@ -168,6 +184,11 @@ private void setUpRlsLbClient() {
.build();
}

@Before
public void setUpMockMetricRecorder() {
when(mockMetricRecorder.registerBatchCallback(any(), any())).thenReturn(mockGaugeRegistration);
}

@After
public void tearDown() throws Exception {
rlsLbClient.close();
Expand Down Expand Up @@ -636,6 +657,51 @@ private void setState(ChildPolicyWrapper policyWrapper, ConnectivityState newSta
policyWrapper.getHelper().updateBalancingState(newState, policyWrapper.getPicker());
}

@Test
public void metricGauges() throws ExecutionException, InterruptedException, TimeoutException {
setUpRlsLbClient();

verify(mockMetricRecorder).registerBatchCallback(gaugeBatchCallbackCaptor.capture(),
any());

BatchCallback gaugeBatchCallback = gaugeBatchCallbackCaptor.getValue();

// Verify the correct cache gauge values when requested at this point.
InOrder inOrder = inOrder(mockBatchRecorder);
gaugeBatchCallback.accept(mockBatchRecorder);
inOrder.verify(mockBatchRecorder).recordLongGauge(
argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_entries")), eq(0L),
any(), any());
inOrder.verify(mockBatchRecorder)
.recordLongGauge(argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_size")),
eq(0L), any(), any());

RouteLookupRequest routeLookupRequest = RouteLookupRequest.create(
ImmutableMap.of("server", "bigtable.googleapis.com", "service-key", "foo", "method-key",
"bar"));
rlsServerImpl.setLookupTable(ImmutableMap.of(routeLookupRequest,
RouteLookupResponse.create(ImmutableList.of("target"), "header")));

// Make a request that will populate the cache with an entry
getInSyncContext(routeLookupRequest);
fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS);

// Gauge values should reflect the new cache entry.
gaugeBatchCallback.accept(mockBatchRecorder);
inOrder.verify(mockBatchRecorder).recordLongGauge(
argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_entries")), eq(1L),
any(), any());
inOrder.verify(mockBatchRecorder)
.recordLongGauge(argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_size")),
eq(260L), any(), any());

inOrder.verifyNoMoreInteractions();

// Shutdown
rlsLbClient.close();
verify(mockGaugeRegistration).close();
}

private static RouteLookupConfig getRouteLookupConfig() {
return RouteLookupConfig.builder()
.grpcKeybuilders(ImmutableList.of(
Expand Down Expand Up @@ -667,6 +733,21 @@ public long nextBackoffNanos() {
};
}

private static class LongGaugeInstrumentArgumentMatcher implements
ArgumentMatcher<LongGaugeMetricInstrument> {

private final String instrumentName;

public LongGaugeInstrumentArgumentMatcher(String instrumentName) {
this.instrumentName = instrumentName;
}

@Override
public boolean matches(LongGaugeMetricInstrument instrument) {
return instrument.getName().equals(instrumentName);
}
}

private static final class FakeBackoffProvider implements BackoffPolicy.Provider {

private BackoffPolicy nextPolicy = createBackoffPolicy(100, TimeUnit.MILLISECONDS);
Expand Down
16 changes: 6 additions & 10 deletions rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import com.google.common.base.Converter;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -59,6 +59,7 @@
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.MetricInstrument;
import io.grpc.MetricRecorder;
import io.grpc.MetricRecorder.Registration;
import io.grpc.MetricSink;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.NoopMetricSink;
Expand Down Expand Up @@ -128,6 +129,8 @@ public void uncaughtException(Thread t, Throwable e) {
});
@Mock
private MetricRecorder mockMetricRecorder;
@Mock
private Registration mockGaugeRegistration;
private final FakeHelper helperDelegate = new FakeHelper();
private final Helper helper =
mock(Helper.class, AdditionalAnswers.delegatesTo(helperDelegate));
Expand Down Expand Up @@ -186,6 +189,8 @@ public CachingRlsLbClient.Builder get() {

searchSubchannelArgs = newPickSubchannelArgs(fakeSearchMethod);
rescueSubchannelArgs = newPickSubchannelArgs(fakeRescueMethod);

when(mockMetricRecorder.registerBatchCallback(any(), any())).thenReturn(mockGaugeRegistration);
}

@After
Expand Down Expand Up @@ -226,7 +231,6 @@ public void lb_serverStatusCodeConversion() throws Exception {
assertThat(serverStatus.getDescription()).contains("RLS server returned: ");
assertThat(serverStatus.getDescription()).endsWith("ABORTED: base desc");
assertThat(serverStatus.getDescription()).contains("RLS server conv.test");
verifyNoMoreInteractions(mockMetricRecorder);
}

@Test
Expand Down Expand Up @@ -290,8 +294,6 @@ public void lb_working_withDefaultTarget_rlsResponding() throws Exception {
assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.UNAVAILABLE);
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "fail");

verifyNoMoreInteractions(mockMetricRecorder);
}

@Test
Expand Down Expand Up @@ -351,7 +353,6 @@ public void lb_working_withoutDefaultTarget_noRlsResponse() throws Exception {
inOrder.verify(helper).getChannelTarget();
inOrder.verifyNoMoreInteractions();
verifyFailedPicksCounterAdd(1, 1);
verifyNoMoreInteractions(mockMetricRecorder);
}

@Test
Expand All @@ -377,7 +378,6 @@ public void lb_working_withDefaultTarget_noRlsResponse() throws Exception {
int times = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 1 : 2;
verifyLongCounterAdd("grpc.lb.rls.default_target_picks", times, 1,
"defaultTarget", "complete");
verifyNoMoreInteractions(mockMetricRecorder);

Subchannel subchannel = picker.pickSubchannel(searchSubchannelArgs).getSubchannel();
assertThat(subchannelIsReady(subchannel)).isTrue();
Expand Down Expand Up @@ -422,8 +422,6 @@ public void lb_working_withDefaultTarget_noRlsResponse() throws Exception {
assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.UNAVAILABLE);
assertThat(res.getSubchannel()).isNull();
verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "fail");

verifyNoMoreInteractions(mockMetricRecorder);
}

@Test
Expand Down Expand Up @@ -499,7 +497,6 @@ public void lb_working_withoutDefaultTarget() throws Exception {
inOrder.verify(helper, atLeast(0)).refreshNameResolution();
inOrder.verifyNoMoreInteractions();
verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "fail");
verifyNoMoreInteractions(mockMetricRecorder);
}

@Test
Expand Down Expand Up @@ -542,7 +539,6 @@ public void lb_nameResolutionFailed() throws Exception {
res = failedPicker.pickSubchannel(newPickSubchannelArgs(fakeSearchMethod));
assertThat(res.getStatus().isOk()).isFalse();
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
verifyNoMoreInteractions(mockMetricRecorder);
}

private PickResult markReadyAndGetPickResult(InOrder inOrder,
Expand Down

0 comments on commit 8133318

Please sign in to comment.