From 851628c684aca1125ccd1a7aac5223e89d73585c Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Tue, 13 Apr 2021 15:54:03 -0700 Subject: [PATCH] Keep tests for channelz and binlog implementations, as they are kept in-place. --- .../java/io/grpc/services/BinlogHelper.java | 2 +- .../io/grpc/services/InetAddressUtil.java | 2 +- .../services/BinaryLogProviderImplTest.java | 57 + .../grpc/services/BinaryLogProviderTest.java | 437 +++++ .../io/grpc/services/BinlogHelperTest.java | 1581 +++++++++++++++++ .../grpc/services/ChannelzProtoUtilTest.java | 949 ++++++++++ .../io/grpc/services/ChannelzServiceTest.java | 276 +++ .../io/grpc/services/ChannelzTestHelper.java | 183 ++ .../io/grpc/services/TempFileSinkTest.java | 68 + 9 files changed, 3553 insertions(+), 2 deletions(-) create mode 100644 services/src/test/java/io/grpc/services/BinaryLogProviderImplTest.java create mode 100644 services/src/test/java/io/grpc/services/BinaryLogProviderTest.java create mode 100644 services/src/test/java/io/grpc/services/BinlogHelperTest.java create mode 100644 services/src/test/java/io/grpc/services/ChannelzProtoUtilTest.java create mode 100644 services/src/test/java/io/grpc/services/ChannelzServiceTest.java create mode 100644 services/src/test/java/io/grpc/services/ChannelzTestHelper.java create mode 100644 services/src/test/java/io/grpc/services/TempFileSinkTest.java diff --git a/services/src/main/java/io/grpc/services/BinlogHelper.java b/services/src/main/java/io/grpc/services/BinlogHelper.java index ba3eef430c1..d8923160e5c 100644 --- a/services/src/main/java/io/grpc/services/BinlogHelper.java +++ b/services/src/main/java/io/grpc/services/BinlogHelper.java @@ -19,7 +19,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import static io.grpc.protobuf.services.BinaryLogProvider.BYTEARRAY_MARSHALLER; +import static io.grpc.services.BinaryLogProvider.BYTEARRAY_MARSHALLER; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; diff --git a/services/src/main/java/io/grpc/services/InetAddressUtil.java b/services/src/main/java/io/grpc/services/InetAddressUtil.java index 057a8ccb5e6..e1bc8e9d26b 100644 --- a/services/src/main/java/io/grpc/services/InetAddressUtil.java +++ b/services/src/main/java/io/grpc/services/InetAddressUtil.java @@ -91,4 +91,4 @@ private static String hextetsToIPv6String(int[] hextets) { } return buf.toString(); } -} \ No newline at end of file +} diff --git a/services/src/test/java/io/grpc/services/BinaryLogProviderImplTest.java b/services/src/test/java/io/grpc/services/BinaryLogProviderImplTest.java new file mode 100644 index 00000000000..ae363017438 --- /dev/null +++ b/services/src/test/java/io/grpc/services/BinaryLogProviderImplTest.java @@ -0,0 +1,57 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.services; + +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import io.grpc.CallOptions; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link BinaryLogProviderImpl}. */ +@SuppressWarnings("deprecation") +@RunWith(JUnit4.class) +public class BinaryLogProviderImplTest { + @Test + public void configStrNullTest() throws Exception { + BinaryLogSink sink = mock(BinaryLogSink.class); + BinaryLogProviderImpl binlog = new BinaryLogProviderImpl(sink, /*configStr=*/ null); + assertNull(binlog.getServerInterceptor("package.service/method")); + assertNull(binlog.getClientInterceptor("package.service/method", CallOptions.DEFAULT)); + } + + @Test + public void configStrEmptyTest() throws Exception { + BinaryLogSink sink = mock(BinaryLogSink.class); + BinaryLogProviderImpl binlog = new BinaryLogProviderImpl(sink, ""); + assertNull(binlog.getServerInterceptor("package.service/method")); + assertNull(binlog.getClientInterceptor("package.service/method", CallOptions.DEFAULT)); + } + + @Test + public void closeTest() throws Exception { + BinaryLogSink sink = mock(BinaryLogSink.class); + BinaryLogProviderImpl log = new BinaryLogProviderImpl(sink, "*"); + verify(sink, never()).close(); + log.close(); + verify(sink).close(); + } +} diff --git a/services/src/test/java/io/grpc/services/BinaryLogProviderTest.java b/services/src/test/java/io/grpc/services/BinaryLogProviderTest.java new file mode 100644 index 00000000000..0f7d41482e0 --- /dev/null +++ b/services/src/test/java/io/grpc/services/BinaryLogProviderTest.java @@ -0,0 +1,437 @@ +/* + * Copyright 2017 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.services; + +import static com.google.common.base.Charsets.UTF_8; +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import com.google.common.io.ByteStreams; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; +import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; +import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; +import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.MethodDescriptor.Marshaller; +import io.grpc.MethodDescriptor.MethodType; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.ServerMethodDefinition; +import io.grpc.internal.NoopClientCall; +import io.grpc.internal.NoopServerCall; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link BinaryLogProvider}. */ +@RunWith(JUnit4.class) +public class BinaryLogProviderTest { + private final InvocationCountMarshaller reqMarshaller = + new InvocationCountMarshaller() { + @Override + Marshaller delegate() { + return StringMarshaller.INSTANCE; + } + }; + private final InvocationCountMarshaller respMarshaller = + new InvocationCountMarshaller() { + @Override + Marshaller delegate() { + return IntegerMarshaller.INSTANCE; + } + }; + private final MethodDescriptor method = + MethodDescriptor + .newBuilder(reqMarshaller, respMarshaller) + .setFullMethodName("myservice/mymethod") + .setType(MethodType.UNARY) + .setSchemaDescriptor(new Object()) + .setIdempotent(true) + .setSafe(true) + .setSampledToLocalTracing(true) + .build(); + private final List binlogReq = new ArrayList<>(); + private final List binlogResp = new ArrayList<>(); + private final BinaryLogProvider binlogProvider = new BinaryLogProvider() { + @Override + public ServerInterceptor getServerInterceptor(String fullMethodName) { + return new TestBinaryLogServerInterceptor(); + } + + @Override + public ClientInterceptor getClientInterceptor( + String fullMethodName, CallOptions callOptions) { + return new TestBinaryLogClientInterceptor(); + } + }; + + @Test + public void wrapChannel_methodDescriptor() throws Exception { + final AtomicReference> methodRef = + new AtomicReference<>(); + Channel channel = new Channel() { + @Override + public ClientCall newCall( + MethodDescriptor method, CallOptions callOptions) { + methodRef.set(method); + return new NoopClientCall<>(); + } + + @Override + public String authority() { + throw new UnsupportedOperationException(); + } + }; + Channel wChannel = binlogProvider.wrapChannel(channel); + ClientCall unusedClientCall = wChannel.newCall(method, CallOptions.DEFAULT); + validateWrappedMethod(methodRef.get()); + } + + @Test + public void wrapChannel_handler() throws Exception { + final List serializedReq = new ArrayList<>(); + final AtomicReference> listener = + new AtomicReference<>(); + Channel channel = new Channel() { + @Override + public ClientCall newCall( + MethodDescriptor methodDescriptor, CallOptions callOptions) { + return new NoopClientCall() { + @Override + public void start(ClientCall.Listener responseListener, Metadata headers) { + listener.set(responseListener); + } + + @Override + public void sendMessage(RequestT message) { + serializedReq.add((byte[]) message); + } + }; + } + + @Override + public String authority() { + throw new UnsupportedOperationException(); + } + }; + Channel wChannel = binlogProvider.wrapChannel(channel); + ClientCall clientCall = wChannel.newCall(method, CallOptions.DEFAULT); + final List observedResponse = new ArrayList<>(); + clientCall.start( + new NoopClientCall.NoopClientCallListener() { + @Override + public void onMessage(Integer message) { + observedResponse.add(message); + } + }, + new Metadata()); + + String expectedRequest = "hello world"; + assertThat(binlogReq).isEmpty(); + assertThat(serializedReq).isEmpty(); + assertEquals(0, reqMarshaller.streamInvocations); + clientCall.sendMessage(expectedRequest); + // it is unacceptably expensive for the binlog to double parse every logged message + assertEquals(1, reqMarshaller.streamInvocations); + assertEquals(0, reqMarshaller.parseInvocations); + assertThat(binlogReq).hasSize(1); + assertThat(serializedReq).hasSize(1); + assertEquals( + expectedRequest, + StringMarshaller.INSTANCE.parse(new ByteArrayInputStream(binlogReq.get(0)))); + assertEquals( + expectedRequest, + StringMarshaller.INSTANCE.parse(new ByteArrayInputStream(serializedReq.get(0)))); + + int expectedResponse = 12345; + assertThat(binlogResp).isEmpty(); + assertThat(observedResponse).isEmpty(); + assertEquals(0, respMarshaller.parseInvocations); + onClientMessageHelper(listener.get(), IntegerMarshaller.INSTANCE.stream(expectedResponse)); + // it is unacceptably expensive for the binlog to double parse every logged message + assertEquals(1, respMarshaller.parseInvocations); + assertEquals(0, respMarshaller.streamInvocations); + assertThat(binlogResp).hasSize(1); + assertThat(observedResponse).hasSize(1); + assertEquals( + expectedResponse, + (int) IntegerMarshaller.INSTANCE.parse(new ByteArrayInputStream(binlogResp.get(0)))); + assertEquals(expectedResponse, (int) observedResponse.get(0)); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private static void onClientMessageHelper(ClientCall.Listener listener, Object request) { + listener.onMessage(request); + } + + private void validateWrappedMethod(MethodDescriptor wMethod) { + assertSame(BinaryLogProvider.BYTEARRAY_MARSHALLER, wMethod.getRequestMarshaller()); + assertSame(BinaryLogProvider.BYTEARRAY_MARSHALLER, wMethod.getResponseMarshaller()); + assertEquals(method.getType(), wMethod.getType()); + assertEquals(method.getFullMethodName(), wMethod.getFullMethodName()); + assertEquals(method.getSchemaDescriptor(), wMethod.getSchemaDescriptor()); + assertEquals(method.isIdempotent(), wMethod.isIdempotent()); + assertEquals(method.isSafe(), wMethod.isSafe()); + assertEquals(method.isSampledToLocalTracing(), wMethod.isSampledToLocalTracing()); + } + + @Test + public void wrapMethodDefinition_methodDescriptor() throws Exception { + ServerMethodDefinition methodDef = + ServerMethodDefinition.create( + method, + new ServerCallHandler() { + @Override + public ServerCall.Listener startCall( + ServerCall call, Metadata headers) { + throw new UnsupportedOperationException(); + } + }); + ServerMethodDefinition wMethodDef = binlogProvider.wrapMethodDefinition(methodDef); + validateWrappedMethod(wMethodDef.getMethodDescriptor()); + } + + @Test + public void wrapMethodDefinition_handler() throws Exception { + // The request as seen by the user supplied server code + final List observedRequest = new ArrayList<>(); + final AtomicReference> serverCall = + new AtomicReference<>(); + ServerMethodDefinition methodDef = + ServerMethodDefinition.create( + method, + new ServerCallHandler() { + @Override + public ServerCall.Listener startCall( + ServerCall call, Metadata headers) { + serverCall.set(call); + return new ServerCall.Listener() { + @Override + public void onMessage(String message) { + observedRequest.add(message); + } + }; + } + }); + ServerMethodDefinition wDef = binlogProvider.wrapMethodDefinition(methodDef); + List serializedResp = new ArrayList<>(); + ServerCall.Listener wListener = startServerCallHelper(wDef, serializedResp); + + String expectedRequest = "hello world"; + assertThat(binlogReq).isEmpty(); + assertThat(observedRequest).isEmpty(); + assertEquals(0, reqMarshaller.parseInvocations); + onServerMessageHelper(wListener, StringMarshaller.INSTANCE.stream(expectedRequest)); + // it is unacceptably expensive for the binlog to double parse every logged message + assertEquals(1, reqMarshaller.parseInvocations); + assertEquals(0, reqMarshaller.streamInvocations); + assertThat(binlogReq).hasSize(1); + assertThat(observedRequest).hasSize(1); + assertEquals( + expectedRequest, + StringMarshaller.INSTANCE.parse(new ByteArrayInputStream(binlogReq.get(0)))); + assertEquals(expectedRequest, observedRequest.get(0)); + + int expectedResponse = 12345; + assertThat(binlogResp).isEmpty(); + assertThat(serializedResp).isEmpty(); + assertEquals(0, respMarshaller.streamInvocations); + serverCall.get().sendMessage(expectedResponse); + // it is unacceptably expensive for the binlog to double parse every logged message + assertEquals(0, respMarshaller.parseInvocations); + assertEquals(1, respMarshaller.streamInvocations); + assertThat(binlogResp).hasSize(1); + assertThat(serializedResp).hasSize(1); + assertEquals( + expectedResponse, + (int) IntegerMarshaller.INSTANCE.parse(new ByteArrayInputStream(binlogResp.get(0)))); + assertEquals(expectedResponse, + (int) method.parseResponse(new ByteArrayInputStream((byte[]) serializedResp.get(0)))); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private static void onServerMessageHelper(ServerCall.Listener listener, Object request) { + listener.onMessage(request); + } + + private static ServerCall.Listener startServerCallHelper( + final ServerMethodDefinition methodDef, + final List serializedResp) { + ServerCall serverCall = new NoopServerCall() { + @Override + public void sendMessage(RespT message) { + serializedResp.add(message); + } + + @Override + public MethodDescriptor getMethodDescriptor() { + return methodDef.getMethodDescriptor(); + } + }; + return methodDef.getServerCallHandler().startCall(serverCall, new Metadata()); + } + + private final class TestBinaryLogClientInterceptor implements ClientInterceptor { + @Override + public ClientCall interceptCall( + final MethodDescriptor method, + CallOptions callOptions, + Channel next) { + assertSame(BinaryLogProvider.BYTEARRAY_MARSHALLER, method.getRequestMarshaller()); + assertSame(BinaryLogProvider.BYTEARRAY_MARSHALLER, method.getResponseMarshaller()); + return new SimpleForwardingClientCall(next.newCall(method, callOptions)) { + @Override + public void start(ClientCall.Listener responseListener, Metadata headers) { + super.start( + new SimpleForwardingClientCallListener(responseListener) { + @Override + public void onMessage(RespT message) { + assertTrue(message instanceof InputStream); + try { + byte[] bytes = ByteStreams.toByteArray((InputStream) message); + binlogResp.add(bytes); + ByteArrayInputStream input = new ByteArrayInputStream(bytes); + RespT dup = method.parseResponse(input); + super.onMessage(dup); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }, + headers); + } + + @Override + public void sendMessage(ReqT message) { + byte[] bytes = (byte[]) message; + binlogReq.add(bytes); + ByteArrayInputStream input = new ByteArrayInputStream(bytes); + ReqT dup = method.parseRequest(input); + super.sendMessage(dup); + } + }; + } + } + + private final class TestBinaryLogServerInterceptor implements ServerInterceptor { + @Override + public ServerCall.Listener interceptCall( + final ServerCall call, + Metadata headers, + ServerCallHandler next) { + assertSame( + BinaryLogProvider.BYTEARRAY_MARSHALLER, + call.getMethodDescriptor().getRequestMarshaller()); + assertSame( + BinaryLogProvider.BYTEARRAY_MARSHALLER, + call.getMethodDescriptor().getResponseMarshaller()); + ServerCall wCall = new SimpleForwardingServerCall(call) { + @Override + public void sendMessage(RespT message) { + byte[] bytes = (byte[]) message; + binlogResp.add(bytes); + ByteArrayInputStream input = new ByteArrayInputStream(bytes); + RespT dup = call.getMethodDescriptor().parseResponse(input); + super.sendMessage(dup); + } + }; + final ServerCall.Listener oListener = next.startCall(wCall, headers); + return new SimpleForwardingServerCallListener(oListener) { + @Override + public void onMessage(ReqT message) { + assertTrue(message instanceof InputStream); + try { + byte[] bytes = ByteStreams.toByteArray((InputStream) message); + binlogReq.add(bytes); + ByteArrayInputStream input = new ByteArrayInputStream(bytes); + ReqT dup = call.getMethodDescriptor().parseRequest(input); + super.onMessage(dup); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }; + } + } + + private abstract static class InvocationCountMarshaller + implements MethodDescriptor.Marshaller { + private int streamInvocations = 0; + private int parseInvocations = 0; + + abstract MethodDescriptor.Marshaller delegate(); + + @Override + public InputStream stream(T value) { + streamInvocations++; + return delegate().stream(value); + } + + @Override + public T parse(InputStream stream) { + parseInvocations++; + return delegate().parse(stream); + } + } + + + private static class StringMarshaller implements MethodDescriptor.Marshaller { + public static final StringMarshaller INSTANCE = new StringMarshaller(); + + @Override + public InputStream stream(String value) { + return new ByteArrayInputStream(value.getBytes(UTF_8)); + } + + @Override + public String parse(InputStream stream) { + try { + return new String(ByteStreams.toByteArray(stream), UTF_8); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + } + + private static class IntegerMarshaller implements MethodDescriptor.Marshaller { + public static final IntegerMarshaller INSTANCE = new IntegerMarshaller(); + + @Override + public InputStream stream(Integer value) { + return StringMarshaller.INSTANCE.stream(value.toString()); + } + + @Override + public Integer parse(InputStream stream) { + return Integer.valueOf(StringMarshaller.INSTANCE.parse(stream)); + } + } +} diff --git a/services/src/test/java/io/grpc/services/BinlogHelperTest.java b/services/src/test/java/io/grpc/services/BinlogHelperTest.java new file mode 100644 index 00000000000..fd11f4a86ac --- /dev/null +++ b/services/src/test/java/io/grpc/services/BinlogHelperTest.java @@ -0,0 +1,1581 @@ +/* + * Copyright 2017 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.services; + +import static com.google.common.truth.Truth.assertThat; +import static io.grpc.services.BinaryLogProvider.BYTEARRAY_MARSHALLER; +import static io.grpc.services.BinlogHelper.createMetadataProto; +import static io.grpc.services.BinlogHelper.getPeerSocket; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.SettableFuture; +import com.google.protobuf.Any; +import com.google.protobuf.ByteString; +import com.google.protobuf.Duration; +import com.google.protobuf.StringValue; +import com.google.protobuf.Timestamp; +import com.google.protobuf.util.Durations; +import io.grpc.Attributes; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.Context; +import io.grpc.Deadline; +import io.grpc.Grpc; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.MethodDescriptor.MethodType; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.binarylog.v1.Address; +import io.grpc.binarylog.v1.ClientHeader; +import io.grpc.binarylog.v1.GrpcLogEntry; +import io.grpc.binarylog.v1.GrpcLogEntry.EventType; +import io.grpc.binarylog.v1.GrpcLogEntry.Logger; +import io.grpc.binarylog.v1.Message; +import io.grpc.binarylog.v1.MetadataEntry; +import io.grpc.binarylog.v1.ServerHeader; +import io.grpc.binarylog.v1.Trailer; +import io.grpc.internal.NoopClientCall; +import io.grpc.internal.NoopServerCall; +import io.grpc.protobuf.StatusProto; +import io.grpc.services.BinlogHelper.FactoryImpl; +import io.grpc.services.BinlogHelper.MaybeTruncated; +import io.grpc.services.BinlogHelper.SinkWriter; +import io.grpc.services.BinlogHelper.SinkWriterImpl; +import io.grpc.services.BinlogHelper.TimeProvider; +import io.netty.channel.unix.DomainSocketAddress; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.charset.Charset; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.AdditionalMatchers; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; + +/** Tests for {@link BinlogHelper}. */ +@SuppressWarnings("deprecation") +@RunWith(JUnit4.class) +public final class BinlogHelperTest { + private static final Charset US_ASCII = Charset.forName("US-ASCII"); + private static final BinlogHelper HEADER_FULL = new Builder().header(Integer.MAX_VALUE).build(); + private static final BinlogHelper HEADER_256 = new Builder().header(256).build(); + private static final BinlogHelper MSG_FULL = new Builder().msg(Integer.MAX_VALUE).build(); + private static final BinlogHelper MSG_256 = new Builder().msg(256).build(); + private static final BinlogHelper BOTH_256 = new Builder().header(256).msg(256).build(); + private static final BinlogHelper BOTH_FULL = + new Builder().header(Integer.MAX_VALUE).msg(Integer.MAX_VALUE).build(); + + private static final String DATA_A = "aaaaaaaaa"; + private static final String DATA_B = "bbbbbbbbb"; + private static final String DATA_C = "ccccccccc"; + private static final Metadata.Key KEY_A = + Metadata.Key.of("a", Metadata.ASCII_STRING_MARSHALLER); + private static final Metadata.Key KEY_B = + Metadata.Key.of("b", Metadata.ASCII_STRING_MARSHALLER); + private static final Metadata.Key KEY_C = + Metadata.Key.of("c", Metadata.ASCII_STRING_MARSHALLER); + private static final MetadataEntry ENTRY_A = + MetadataEntry + .newBuilder() + .setKey(KEY_A.name()) + .setValue(ByteString.copyFrom(DATA_A.getBytes(US_ASCII))) + .build(); + private static final MetadataEntry ENTRY_B = + MetadataEntry + .newBuilder() + .setKey(KEY_B.name()) + .setValue(ByteString.copyFrom(DATA_B.getBytes(US_ASCII))) + .build(); + private static final MetadataEntry ENTRY_C = + MetadataEntry + .newBuilder() + .setKey(KEY_C.name()) + .setValue(ByteString.copyFrom(DATA_C.getBytes(US_ASCII))) + .build(); + private static final long CALL_ID = 0x1112131415161718L; + private static final int HEADER_LIMIT = 10; + private static final int MESSAGE_LIMIT = Integer.MAX_VALUE; + + private final Metadata nonEmptyMetadata = new Metadata(); + private final BinaryLogSink sink = mock(BinaryLogSink.class); + private final Timestamp timestamp + = Timestamp.newBuilder().setSeconds(9876).setNanos(54321).build(); + private final BinlogHelper.TimeProvider timeProvider = new TimeProvider() { + @Override + public long currentTimeNanos() { + return TimeUnit.SECONDS.toNanos(9876) + 54321; + } + }; + private final SinkWriter sinkWriterImpl = + new SinkWriterImpl( + sink, + timeProvider, + HEADER_LIMIT, + MESSAGE_LIMIT); + private final SinkWriter mockSinkWriter = mock(SinkWriter.class); + private final byte[] message = new byte[100]; + private SocketAddress peer; + + @Before + public void setUp() throws Exception { + nonEmptyMetadata.put(KEY_A, DATA_A); + nonEmptyMetadata.put(KEY_B, DATA_B); + nonEmptyMetadata.put(KEY_C, DATA_C); + peer = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 1234); + } + + @Test + public void configBinLog_global() throws Exception { + assertSameLimits(BOTH_FULL, makeLog("*", "p.s/m")); + assertSameLimits(BOTH_FULL, makeLog("*{h;m}", "p.s/m")); + assertSameLimits(HEADER_FULL, makeLog("*{h}", "p.s/m")); + assertSameLimits(MSG_FULL, makeLog("*{m}", "p.s/m")); + assertSameLimits(HEADER_256, makeLog("*{h:256}", "p.s/m")); + assertSameLimits(MSG_256, makeLog("*{m:256}", "p.s/m")); + assertSameLimits(BOTH_256, makeLog("*{h:256;m:256}", "p.s/m")); + assertSameLimits( + new Builder().header(Integer.MAX_VALUE).msg(256).build(), + makeLog("*{h;m:256}", "p.s/m")); + assertSameLimits( + new Builder().header(256).msg(Integer.MAX_VALUE).build(), + makeLog("*{h:256;m}", "p.s/m")); + } + + @Test + public void configBinLog_method() throws Exception { + assertSameLimits(BOTH_FULL, makeLog("p.s/m", "p.s/m")); + assertSameLimits(BOTH_FULL, makeLog("p.s/m{h;m}", "p.s/m")); + assertSameLimits(HEADER_FULL, makeLog("p.s/m{h}", "p.s/m")); + assertSameLimits(MSG_FULL, makeLog("p.s/m{m}", "p.s/m")); + assertSameLimits(HEADER_256, makeLog("p.s/m{h:256}", "p.s/m")); + assertSameLimits(MSG_256, makeLog("p.s/m{m:256}", "p.s/m")); + assertSameLimits(BOTH_256, makeLog("p.s/m{h:256;m:256}", "p.s/m")); + assertSameLimits( + new Builder().header(Integer.MAX_VALUE).msg(256).build(), + makeLog("p.s/m{h;m:256}", "p.s/m")); + assertSameLimits( + new Builder().header(256).msg(Integer.MAX_VALUE).build(), + makeLog("p.s/m{h:256;m}", "p.s/m")); + } + + @Test + public void configBinLog_method_absent() throws Exception { + assertNull(makeLog("p.s/m", "p.s/absent")); + } + + @Test + public void configBinLog_service() throws Exception { + assertSameLimits(BOTH_FULL, makeLog("p.s/*", "p.s/m")); + assertSameLimits(BOTH_FULL, makeLog("p.s/*{h;m}", "p.s/m")); + assertSameLimits(HEADER_FULL, makeLog("p.s/*{h}", "p.s/m")); + assertSameLimits(MSG_FULL, makeLog("p.s/*{m}", "p.s/m")); + assertSameLimits(HEADER_256, makeLog("p.s/*{h:256}", "p.s/m")); + assertSameLimits(MSG_256, makeLog("p.s/*{m:256}", "p.s/m")); + assertSameLimits(BOTH_256, makeLog("p.s/*{h:256;m:256}", "p.s/m")); + assertSameLimits( + new Builder().header(Integer.MAX_VALUE).msg(256).build(), + makeLog("p.s/*{h;m:256}", "p.s/m")); + assertSameLimits( + new Builder().header(256).msg(Integer.MAX_VALUE).build(), + makeLog("p.s/*{h:256;m}", "p.s/m")); + } + + @Test + public void configBinLog_service_absent() throws Exception { + assertNull(makeLog("p.s/*", "p.other/m")); + } + + @Test + public void createLogFromOptionString() throws Exception { + assertSameLimits(BOTH_FULL, makeOptions(null)); + assertSameLimits(HEADER_FULL, makeOptions("h")); + assertSameLimits(MSG_FULL, makeOptions("m")); + assertSameLimits(HEADER_256, makeOptions("h:256")); + assertSameLimits(MSG_256, makeOptions("m:256")); + assertSameLimits(BOTH_256, makeOptions("h:256;m:256")); + assertSameLimits( + new Builder().header(Integer.MAX_VALUE).msg(256).build(), + makeOptions("h;m:256")); + assertSameLimits( + new Builder().header(256).msg(Integer.MAX_VALUE).build(), + makeOptions("h:256;m")); + } + + private void assertIllegalPatternDetected(String perSvcOrMethodConfig) { + try { + FactoryImpl.createBinaryLog(sink, perSvcOrMethodConfig); + fail(); + } catch (IllegalArgumentException expected) { + assertThat(expected).hasMessageThat().startsWith("Illegal log config pattern"); + } + } + + @Test + public void badFactoryConfigStrDetected() throws Exception { + try { + new FactoryImpl(sink, "obviouslybad{"); + fail(); + } catch (IllegalArgumentException expected) { + assertThat(expected).hasMessageThat().startsWith("Illegal log config pattern"); + } + } + + @Test + public void badFactoryConfigStrDetected_empty() throws Exception { + try { + new FactoryImpl(sink, "*,"); + fail(); + } catch (IllegalArgumentException expected) { + assertThat(expected).hasMessageThat().startsWith("Illegal log config pattern"); + } + } + + @Test + public void createLogFromOptionString_malformed() throws Exception { + assertIllegalPatternDetected(""); + assertIllegalPatternDetected("bad"); + assertIllegalPatternDetected("mad"); + assertIllegalPatternDetected("x;y"); + assertIllegalPatternDetected("h:abc"); + assertIllegalPatternDetected("h:1e8"); + assertIllegalPatternDetected("2"); + assertIllegalPatternDetected("2;2"); + // The grammar specifies that if both h and m are present, h comes before m + assertIllegalPatternDetected("m:123;h:123"); + // NumberFormatException + assertIllegalPatternDetected("h:99999999999999"); + } + + @Test + public void configBinLog_multiConfig_withGlobal() throws Exception { + String configStr = + "*{h}," + + "package.both256/*{h:256;m:256}," + + "package.service1/both128{h:128;m:128}," + + "package.service2/method_messageOnly{m}"; + assertSameLimits(HEADER_FULL, makeLog(configStr, "otherpackage.service/method")); + + assertSameLimits(BOTH_256, makeLog(configStr, "package.both256/method1")); + assertSameLimits(BOTH_256, makeLog(configStr, "package.both256/method2")); + assertSameLimits(BOTH_256, makeLog(configStr, "package.both256/method3")); + + assertSameLimits( + new Builder().header(128).msg(128).build(), makeLog(configStr, "package.service1/both128")); + // the global config is in effect + assertSameLimits(HEADER_FULL, makeLog(configStr, "package.service1/absent")); + + assertSameLimits(MSG_FULL, makeLog(configStr, "package.service2/method_messageOnly")); + // the global config is in effect + assertSameLimits(HEADER_FULL, makeLog(configStr, "package.service2/absent")); + } + + @Test + public void configBinLog_multiConfig_noGlobal() throws Exception { + String configStr = + "package.both256/*{h:256;m:256}," + + "package.service1/both128{h:128;m:128}," + + "package.service2/method_messageOnly{m}"; + assertNull(makeLog(configStr, "otherpackage.service/method")); + + assertSameLimits(BOTH_256, makeLog(configStr, "package.both256/method1")); + assertSameLimits(BOTH_256, makeLog(configStr, "package.both256/method2")); + assertSameLimits(BOTH_256, makeLog(configStr, "package.both256/method3")); + + assertSameLimits( + new Builder().header(128).msg(128).build(), makeLog(configStr, "package.service1/both128")); + // no global config in effect + assertNull(makeLog(configStr, "package.service1/absent")); + + assertSameLimits(MSG_FULL, makeLog(configStr, "package.service2/method_messageOnly")); + // no global config in effect + assertNull(makeLog(configStr, "package.service2/absent")); + } + + @Test + public void configBinLog_blacklist() { + assertNull(makeLog("*,-p.s/blacklisted", "p.s/blacklisted")); + assertNull(makeLog("-p.s/blacklisted,*", "p.s/blacklisted")); + assertNotNull(makeLog("-p.s/method,*", "p.s/allowed")); + + assertNull(makeLog("p.s/*,-p.s/blacklisted", "p.s/blacklisted")); + assertNull(makeLog("-p.s/blacklisted,p.s/*", "p.s/blacklisted")); + assertNotNull(makeLog("-p.s/blacklisted,p.s/*", "p.s/allowed")); + } + + private void assertDuplicatelPatternDetected(String factoryConfigStr) { + try { + new BinlogHelper.FactoryImpl(sink, factoryConfigStr); + fail(); + } catch (IllegalStateException expected) { + assertThat(expected).hasMessageThat().startsWith("Duplicate entry"); + } + } + + @Test + public void configBinLog_duplicates_global() throws Exception { + assertDuplicatelPatternDetected("*{h},*{h:256}"); + } + + @Test + public void configBinLog_duplicates_service() throws Exception { + assertDuplicatelPatternDetected("p.s/*,p.s/*{h}"); + + } + + @Test + public void configBinLog_duplicates_method() throws Exception { + assertDuplicatelPatternDetected("p.s/*,p.s/*{h:1;m:2}"); + assertDuplicatelPatternDetected("p.s/m,-p.s/m"); + assertDuplicatelPatternDetected("-p.s/m,p.s/m"); + assertDuplicatelPatternDetected("-p.s/m,-p.s/m"); + } + + @Test + public void socketToProto_ipv4() throws Exception { + InetAddress address = InetAddress.getByName("127.0.0.1"); + int port = 12345; + InetSocketAddress socketAddress = new InetSocketAddress(address, port); + assertEquals( + Address + .newBuilder() + .setType(Address.Type.TYPE_IPV4) + .setAddress("127.0.0.1") + .setIpPort(12345) + .build(), + BinlogHelper.socketToProto(socketAddress)); + } + + @Test + public void socketToProto_ipv6() throws Exception { + // this is a ipv6 link local address + InetAddress address = InetAddress.getByName("2001:db8:0:0:0:0:2:1"); + int port = 12345; + InetSocketAddress socketAddress = new InetSocketAddress(address, port); + assertEquals( + Address + .newBuilder() + .setType(Address.Type.TYPE_IPV6) + .setAddress("2001:db8::2:1") // RFC 5952 section 4: ipv6 canonical form required + .setIpPort(12345) + .build(), + BinlogHelper.socketToProto(socketAddress)); + } + + @Test + public void socketToProto_unix() throws Exception { + String path = "/some/path"; + DomainSocketAddress socketAddress = new DomainSocketAddress(path); + assertEquals( + Address + .newBuilder() + .setType(Address.Type.TYPE_UNIX) + .setAddress("/some/path") + .build(), + BinlogHelper.socketToProto(socketAddress) + ); + } + + @Test + public void socketToProto_unknown() throws Exception { + SocketAddress unknownSocket = new SocketAddress() { + @Override + public String toString() { + return "some-socket-address"; + } + }; + assertEquals( + Address.newBuilder() + .setType(Address.Type.TYPE_UNKNOWN) + .setAddress("some-socket-address") + .build(), + BinlogHelper.socketToProto(unknownSocket)); + } + + @Test + public void metadataToProto_empty() throws Exception { + assertEquals( + GrpcLogEntry.newBuilder() + .setType(EventType.EVENT_TYPE_CLIENT_HEADER) + .setClientHeader( + ClientHeader.newBuilder().setMetadata( + io.grpc.binarylog.v1.Metadata.getDefaultInstance())) + .build(), + metadataToProtoTestHelper( + EventType.EVENT_TYPE_CLIENT_HEADER, new Metadata(), Integer.MAX_VALUE)); + } + + @Test + public void metadataToProto() throws Exception { + assertEquals( + GrpcLogEntry.newBuilder() + .setType(EventType.EVENT_TYPE_CLIENT_HEADER) + .setClientHeader( + ClientHeader.newBuilder().setMetadata( + io.grpc.binarylog.v1.Metadata + .newBuilder() + .addEntry(ENTRY_A) + .addEntry(ENTRY_B) + .addEntry(ENTRY_C) + .build())) + .build(), + metadataToProtoTestHelper( + EventType.EVENT_TYPE_CLIENT_HEADER, nonEmptyMetadata, Integer.MAX_VALUE)); + } + + @Test + public void metadataToProto_setsTruncated() throws Exception { + assertTrue(BinlogHelper.createMetadataProto(nonEmptyMetadata, 0).truncated); + } + + @Test + public void metadataToProto_truncated() throws Exception { + // 0 byte limit not enough for any metadata + assertEquals( + io.grpc.binarylog.v1.Metadata.getDefaultInstance(), + BinlogHelper.createMetadataProto(nonEmptyMetadata, 0).proto.build()); + // not enough bytes for first key value + assertEquals( + io.grpc.binarylog.v1.Metadata.getDefaultInstance(), + BinlogHelper.createMetadataProto(nonEmptyMetadata, 9).proto.build()); + // enough for first key value + assertEquals( + io.grpc.binarylog.v1.Metadata + .newBuilder() + .addEntry(ENTRY_A) + .build(), + BinlogHelper.createMetadataProto(nonEmptyMetadata, 10).proto.build()); + // Test edge cases for >= 2 key values + assertEquals( + io.grpc.binarylog.v1.Metadata + .newBuilder() + .addEntry(ENTRY_A) + .build(), + BinlogHelper.createMetadataProto(nonEmptyMetadata, 19).proto.build()); + assertEquals( + io.grpc.binarylog.v1.Metadata + .newBuilder() + .addEntry(ENTRY_A) + .addEntry(ENTRY_B) + .build(), + BinlogHelper.createMetadataProto(nonEmptyMetadata, 20).proto.build()); + assertEquals( + io.grpc.binarylog.v1.Metadata + .newBuilder() + .addEntry(ENTRY_A) + .addEntry(ENTRY_B) + .build(), + BinlogHelper.createMetadataProto(nonEmptyMetadata, 29).proto.build()); + + // not truncated: enough for all keys + assertEquals( + io.grpc.binarylog.v1.Metadata + .newBuilder() + .addEntry(ENTRY_A) + .addEntry(ENTRY_B) + .addEntry(ENTRY_C) + .build(), + BinlogHelper.createMetadataProto(nonEmptyMetadata, 30).proto.build()); + } + + @Test + public void messageToProto() throws Exception { + byte[] bytes + = "this is a long message: AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".getBytes(US_ASCII); + assertEquals( + GrpcLogEntry.newBuilder() + .setMessage( + Message + .newBuilder() + .setData(ByteString.copyFrom(bytes)) + .setLength(bytes.length) + .build()) + .build(), + messageToProtoTestHelper(bytes, Integer.MAX_VALUE)); + } + + @Test + public void messageToProto_truncated() throws Exception { + byte[] bytes + = "this is a long message: AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".getBytes(US_ASCII); + assertEquals( + GrpcLogEntry.newBuilder() + .setMessage( + Message + .newBuilder() + .setLength(bytes.length) + .build()) + .setPayloadTruncated(true) + .build(), + messageToProtoTestHelper(bytes, 0)); + + int limit = 10; + String truncatedMessage = "this is a "; + assertEquals( + GrpcLogEntry.newBuilder() + .setMessage( + Message + .newBuilder() + .setData(ByteString.copyFrom(truncatedMessage.getBytes(US_ASCII))) + .setLength(bytes.length) + .build()) + .setPayloadTruncated(true) + .build(), + messageToProtoTestHelper(bytes, limit)); + } + + @Test + public void logClientHeader() throws Exception { + long seq = 1; + String authority = "authority"; + String methodName = "service/method"; + Duration timeout = Durations.fromMillis(1234); + InetAddress address = InetAddress.getByName("127.0.0.1"); + int port = 12345; + InetSocketAddress peerAddress = new InetSocketAddress(address, port); + long callId = 1000; + + GrpcLogEntry.Builder builder = + metadataToProtoTestHelper(EventType.EVENT_TYPE_CLIENT_HEADER, nonEmptyMetadata, 10) + .toBuilder() + .setTimestamp(timestamp) + .setSequenceIdWithinCall(seq) + .setLogger(Logger.LOGGER_CLIENT) + .setCallId(callId); + builder.getClientHeaderBuilder() + .setMethodName("/" + methodName) + .setAuthority(authority) + .setTimeout(timeout); + GrpcLogEntry base = builder.build(); + { + sinkWriterImpl.logClientHeader( + seq, + methodName, + authority, + timeout, + nonEmptyMetadata, + Logger.LOGGER_CLIENT, + callId, + /*peerAddress=*/ null); + verify(sink).write(base); + } + + // logger is server + { + sinkWriterImpl.logClientHeader( + seq, + methodName, + authority, + timeout, + nonEmptyMetadata, + Logger.LOGGER_SERVER, + callId, + peerAddress); + verify(sink).write( + base.toBuilder() + .setPeer(BinlogHelper.socketToProto(peerAddress)) + .setLogger(Logger.LOGGER_SERVER) + .build()); + } + + // authority is null + { + sinkWriterImpl.logClientHeader( + seq, + methodName, + /*authority=*/ null, + timeout, + nonEmptyMetadata, + Logger.LOGGER_CLIENT, + callId, + /*peerAddress=*/ null); + + verify(sink).write( + base.toBuilder() + .setClientHeader(builder.getClientHeader().toBuilder().clearAuthority().build()) + .build()); + } + + // timeout is null + { + sinkWriterImpl.logClientHeader( + seq, + methodName, + authority, + /*timeout=*/ null, + nonEmptyMetadata, + Logger.LOGGER_CLIENT, + callId, + /*peerAddress=*/ null); + + verify(sink).write( + base.toBuilder() + .setClientHeader(builder.getClientHeader().toBuilder().clearTimeout().build()) + .build()); + } + + // peerAddress is non null (error for client side) + try { + sinkWriterImpl.logClientHeader( + seq, + methodName, + authority, + timeout, + nonEmptyMetadata, + Logger.LOGGER_CLIENT, + callId, + peerAddress); + fail(); + } catch (IllegalArgumentException expected) { + // noop + } + } + + @Test + public void logServerHeader() throws Exception { + long seq = 1; + InetAddress address = InetAddress.getByName("127.0.0.1"); + int port = 12345; + InetSocketAddress peerAddress = new InetSocketAddress(address, port); + long callId = 1000; + + GrpcLogEntry.Builder builder = + metadataToProtoTestHelper(EventType.EVENT_TYPE_SERVER_HEADER, nonEmptyMetadata, 10) + .toBuilder() + .setTimestamp(timestamp) + .setSequenceIdWithinCall(seq) + .setLogger(Logger.LOGGER_CLIENT) + .setCallId(callId) + .setPeer(BinlogHelper.socketToProto(peerAddress)); + + { + sinkWriterImpl.logServerHeader( + seq, + nonEmptyMetadata, + Logger.LOGGER_CLIENT, + callId, + peerAddress); + verify(sink).write(builder.build()); + } + + // logger is server + // null peerAddress is required for server side + { + sinkWriterImpl.logServerHeader( + seq, + nonEmptyMetadata, + Logger.LOGGER_SERVER, + callId, + /*peerAddress=*/ null); + verify(sink).write( + builder + .setLogger(Logger.LOGGER_SERVER) + .clearPeer() + .build()); + } + + // logger is server + // non null peerAddress is an error + try { + sinkWriterImpl.logServerHeader( + seq, + nonEmptyMetadata, + Logger.LOGGER_SERVER, + callId, + peerAddress); + fail(); + } catch (IllegalArgumentException expected) { + // noop + } + } + + @Test + public void logTrailer() throws Exception { + long seq = 1; + InetAddress address = InetAddress.getByName("127.0.0.1"); + int port = 12345; + InetSocketAddress peerAddress = new InetSocketAddress(address, port); + long callId = 1000; + Status statusDescription = Status.INTERNAL.withDescription("my description"); + + GrpcLogEntry.Builder builder = + metadataToProtoTestHelper(EventType.EVENT_TYPE_SERVER_TRAILER, nonEmptyMetadata, 10) + .toBuilder() + .setTimestamp(timestamp) + .setSequenceIdWithinCall(seq) + .setLogger(Logger.LOGGER_CLIENT) + .setCallId(callId) + .setPeer(BinlogHelper.socketToProto(peerAddress)); + + builder.getTrailerBuilder() + .setStatusCode(Status.INTERNAL.getCode().value()) + .setStatusMessage("my description"); + GrpcLogEntry base = builder.build(); + + { + sinkWriterImpl.logTrailer( + seq, + statusDescription, + nonEmptyMetadata, + Logger.LOGGER_CLIENT, + callId, + peerAddress); + verify(sink).write(base); + } + + // logger is server + { + sinkWriterImpl.logTrailer( + seq, + statusDescription, + nonEmptyMetadata, + Logger.LOGGER_SERVER, + callId, + /*peerAddress=*/ null); + verify(sink).write( + base.toBuilder() + .clearPeer() + .setLogger(Logger.LOGGER_SERVER) + .build()); + } + + // peerAddress is null + { + sinkWriterImpl.logTrailer( + seq, + statusDescription, + nonEmptyMetadata, + Logger.LOGGER_CLIENT, + callId, + /*peerAddress=*/ null); + verify(sink).write( + base.toBuilder() + .clearPeer() + .build()); + } + + // status code is present but description is null + { + sinkWriterImpl.logTrailer( + seq, + statusDescription.getCode().toStatus(), // strip the description + nonEmptyMetadata, + Logger.LOGGER_CLIENT, + callId, + peerAddress); + verify(sink).write( + base.toBuilder() + .setTrailer(base.getTrailer().toBuilder().clearStatusMessage()) + .build()); + } + + // status proto always logged if present (com.google.rpc.Status), + { + int zeroHeaderBytes = 0; + SinkWriterImpl truncatingWriter = new SinkWriterImpl( + sink, timeProvider, zeroHeaderBytes, MESSAGE_LIMIT); + com.google.rpc.Status statusProto = com.google.rpc.Status.newBuilder() + .addDetails( + Any.pack(StringValue.newBuilder().setValue("arbitrarypayload").build())) + .setCode(Status.INTERNAL.getCode().value()) + .setMessage("status detail string") + .build(); + StatusException statusException + = StatusProto.toStatusException(statusProto, nonEmptyMetadata); + truncatingWriter.logTrailer( + seq, + statusException.getStatus(), + statusException.getTrailers(), + Logger.LOGGER_CLIENT, + callId, + peerAddress); + verify(sink).write( + base.toBuilder() + .setTrailer( + builder.getTrailerBuilder() + .setStatusMessage("status detail string") + .setStatusDetails(ByteString.copyFrom(statusProto.toByteArray())) + .setMetadata(io.grpc.binarylog.v1.Metadata.getDefaultInstance())) + .build()); + } + } + + @Test + public void alwaysLoggedMetadata_grpcTraceBin() throws Exception { + Metadata.Key key + = Metadata.Key.of("grpc-trace-bin", Metadata.BINARY_BYTE_MARSHALLER); + Metadata metadata = new Metadata(); + metadata.put(key, new byte[1]); + int zeroHeaderBytes = 0; + MaybeTruncated pair = + createMetadataProto(metadata, zeroHeaderBytes); + assertEquals( + key.name(), + Iterables.getOnlyElement(pair.proto.getEntryBuilderList()).getKey()); + assertFalse(pair.truncated); + } + + @Test + public void neverLoggedMetadata_grpcStatusDetilsBin() throws Exception { + Metadata.Key key + = Metadata.Key.of("grpc-status-details-bin", Metadata.BINARY_BYTE_MARSHALLER); + Metadata metadata = new Metadata(); + metadata.put(key, new byte[1]); + int unlimitedHeaderBytes = Integer.MAX_VALUE; + MaybeTruncated pair + = createMetadataProto(metadata, unlimitedHeaderBytes); + assertThat(pair.proto.getEntryBuilderList()).isEmpty(); + assertFalse(pair.truncated); + } + + @Test + public void logRpcMessage() throws Exception { + long seq = 1; + long callId = 1000; + GrpcLogEntry base = messageToProtoTestHelper(message, MESSAGE_LIMIT).toBuilder() + .setTimestamp(timestamp) + .setType(EventType.EVENT_TYPE_CLIENT_MESSAGE) + .setLogger(Logger.LOGGER_CLIENT) + .setSequenceIdWithinCall(1) + .setCallId(callId) + .build(); + { + sinkWriterImpl.logRpcMessage( + seq, + EventType.EVENT_TYPE_CLIENT_MESSAGE, + BYTEARRAY_MARSHALLER, + message, + Logger.LOGGER_CLIENT, + callId); + verify(sink).write(base); + } + + // server messsage + { + sinkWriterImpl.logRpcMessage( + seq, + EventType.EVENT_TYPE_SERVER_MESSAGE, + BYTEARRAY_MARSHALLER, + message, + Logger.LOGGER_CLIENT, + callId); + verify(sink).write( + base.toBuilder() + .setType(EventType.EVENT_TYPE_SERVER_MESSAGE) + .build()); + } + + // logger is server + { + sinkWriterImpl.logRpcMessage( + seq, + EventType.EVENT_TYPE_CLIENT_MESSAGE, + BYTEARRAY_MARSHALLER, + message, + Logger.LOGGER_SERVER, + callId); + verify(sink).write( + base.toBuilder() + .setLogger(Logger.LOGGER_SERVER) + .build()); + } + } + + @Test + public void getPeerSocketTest() { + assertNull(getPeerSocket(Attributes.EMPTY)); + assertSame( + peer, + getPeerSocket(Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer).build())); + } + + @Test + public void serverDeadlineLogged() { + final AtomicReference> interceptedCall = + new AtomicReference<>(); + @SuppressWarnings("unchecked") + final ServerCall.Listener mockListener = mock(ServerCall.Listener.class); + + final MethodDescriptor method = + MethodDescriptor.newBuilder() + .setType(MethodType.UNKNOWN) + .setFullMethodName("service/method") + .setRequestMarshaller(BYTEARRAY_MARSHALLER) + .setResponseMarshaller(BYTEARRAY_MARSHALLER) + .build(); + + // We expect the contents of the "grpc-timeout" header to be installed the context + Context.current() + .withDeadlineAfter(1, TimeUnit.SECONDS, Executors.newSingleThreadScheduledExecutor()) + .run(new Runnable() { + @Override + public void run() { + ServerCall.Listener unused = + new BinlogHelper(mockSinkWriter) + .getServerInterceptor(CALL_ID) + .interceptCall( + new NoopServerCall() { + @Override + public MethodDescriptor getMethodDescriptor() { + return method; + } + }, + new Metadata(), + new ServerCallHandler() { + @Override + public ServerCall.Listener startCall( + ServerCall call, + Metadata headers) { + interceptedCall.set(call); + return mockListener; + } + }); + } + }); + ArgumentCaptor timeoutCaptor = ArgumentCaptor.forClass(Duration.class); + verify(mockSinkWriter).logClientHeader( + /*seq=*/ eq(1L), + eq("service/method"), + ArgumentMatchers.isNull(), + timeoutCaptor.capture(), + any(Metadata.class), + eq(Logger.LOGGER_SERVER), + eq(CALL_ID), + ArgumentMatchers.isNull()); + verifyNoMoreInteractions(mockSinkWriter); + Duration timeout = timeoutCaptor.getValue(); + assertThat(TimeUnit.SECONDS.toNanos(1) - Durations.toNanos(timeout)) + .isAtMost(TimeUnit.MILLISECONDS.toNanos(250)); + } + + @Test + public void clientDeadlineLogged_deadlineSetViaCallOption() { + MethodDescriptor method = + MethodDescriptor.newBuilder() + .setType(MethodType.UNKNOWN) + .setFullMethodName("service/method") + .setRequestMarshaller(BYTEARRAY_MARSHALLER) + .setResponseMarshaller(BYTEARRAY_MARSHALLER) + .build(); + @SuppressWarnings("unchecked") + ClientCall.Listener mockListener = mock(ClientCall.Listener.class); + + ClientCall call = + new BinlogHelper(mockSinkWriter) + .getClientInterceptor(CALL_ID) + .interceptCall( + method, + CallOptions.DEFAULT.withDeadlineAfter(1, TimeUnit.SECONDS), + new Channel() { + @Override + public ClientCall newCall( + MethodDescriptor methodDescriptor, + CallOptions callOptions) { + return new NoopClientCall<>(); + } + + @Override + public String authority() { + return null; + } + }); + call.start(mockListener, new Metadata()); + ArgumentCaptor callOptTimeoutCaptor = ArgumentCaptor.forClass(Duration.class); + verify(mockSinkWriter) + .logClientHeader( + anyLong(), + AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), + AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), + callOptTimeoutCaptor.capture(), + any(Metadata.class), + any(GrpcLogEntry.Logger.class), + anyLong(), + AdditionalMatchers.or(ArgumentMatchers.isNull(), + ArgumentMatchers.any())); + Duration timeout = callOptTimeoutCaptor.getValue(); + assertThat(TimeUnit.SECONDS.toNanos(1) - Durations.toNanos(timeout)) + .isAtMost(TimeUnit.MILLISECONDS.toNanos(250)); + } + + @Test + public void clientDeadlineLogged_deadlineSetViaContext() throws Exception { + // important: deadline is read from the ctx where call was created + final SettableFuture> callFuture = SettableFuture.create(); + Context.current() + .withDeadline( + Deadline.after(1, TimeUnit.SECONDS), Executors.newSingleThreadScheduledExecutor()) + .run(new Runnable() { + @Override + public void run() { + MethodDescriptor method = + MethodDescriptor.newBuilder() + .setType(MethodType.UNKNOWN) + .setFullMethodName("service/method") + .setRequestMarshaller(BYTEARRAY_MARSHALLER) + .setResponseMarshaller(BYTEARRAY_MARSHALLER) + .build(); + + callFuture.set(new BinlogHelper(mockSinkWriter) + .getClientInterceptor(CALL_ID) + .interceptCall( + method, + CallOptions.DEFAULT.withDeadlineAfter(1, TimeUnit.SECONDS), + new Channel() { + @Override + public ClientCall newCall( + MethodDescriptor methodDescriptor, + CallOptions callOptions) { + return new NoopClientCall<>(); + } + + @Override + public String authority() { + return null; + } + })); + } + }); + @SuppressWarnings("unchecked") + ClientCall.Listener mockListener = mock(ClientCall.Listener.class); + callFuture.get().start(mockListener, new Metadata()); + ArgumentCaptor callOptTimeoutCaptor = ArgumentCaptor.forClass(Duration.class); + verify(mockSinkWriter) + .logClientHeader( + anyLong(), + anyString(), + ArgumentMatchers.any(), + callOptTimeoutCaptor.capture(), + any(Metadata.class), + any(GrpcLogEntry.Logger.class), + anyLong(), + AdditionalMatchers.or(ArgumentMatchers.isNull(), + ArgumentMatchers.any())); + Duration timeout = callOptTimeoutCaptor.getValue(); + assertThat(TimeUnit.SECONDS.toNanos(1) - Durations.toNanos(timeout)) + .isAtMost(TimeUnit.MILLISECONDS.toNanos(250)); + } + + @Test + public void clientInterceptor() throws Exception { + final AtomicReference> interceptedListener = + new AtomicReference<>(); + // capture these manually because ClientCall can not be mocked + final AtomicReference actualClientInitial = new AtomicReference<>(); + final AtomicReference actualRequest = new AtomicReference<>(); + + final SettableFuture halfCloseCalled = SettableFuture.create(); + final SettableFuture cancelCalled = SettableFuture.create(); + Channel channel = new Channel() { + @Override + public ClientCall newCall( + MethodDescriptor methodDescriptor, CallOptions callOptions) { + return new NoopClientCall() { + @Override + @SuppressWarnings("unchecked") + public void start(Listener responseListener, Metadata headers) { + interceptedListener.set((Listener) responseListener); + actualClientInitial.set(headers); + } + + @Override + public void sendMessage(RequestT message) { + actualRequest.set(message); + } + + @Override + public void cancel(String message, Throwable cause) { + cancelCalled.set(null); + } + + @Override + public void halfClose() { + halfCloseCalled.set(null); + } + + @Override + public Attributes getAttributes() { + return Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer).build(); + } + }; + } + + @Override + public String authority() { + return "the-authority"; + } + }; + + @SuppressWarnings("unchecked") + ClientCall.Listener mockListener = mock(ClientCall.Listener.class); + + MethodDescriptor method = + MethodDescriptor.newBuilder() + .setType(MethodType.UNKNOWN) + .setFullMethodName("service/method") + .setRequestMarshaller(BYTEARRAY_MARSHALLER) + .setResponseMarshaller(BYTEARRAY_MARSHALLER) + .build(); + ClientCall interceptedCall = + new BinlogHelper(mockSinkWriter) + .getClientInterceptor(CALL_ID) + .interceptCall( + method, + CallOptions.DEFAULT, + channel); + + // send client header + { + Metadata clientInitial = new Metadata(); + interceptedCall.start(mockListener, clientInitial); + verify(mockSinkWriter).logClientHeader( + /*seq=*/ eq(1L), + eq("service/method"), + eq("the-authority"), + ArgumentMatchers.isNull(), + same(clientInitial), + eq(Logger.LOGGER_CLIENT), + eq(CALL_ID), + ArgumentMatchers.isNull()); + verifyNoMoreInteractions(mockSinkWriter); + assertSame(clientInitial, actualClientInitial.get()); + } + + // receive server header + { + Metadata serverInitial = new Metadata(); + interceptedListener.get().onHeaders(serverInitial); + verify(mockSinkWriter).logServerHeader( + /*seq=*/ eq(2L), + same(serverInitial), + eq(Logger.LOGGER_CLIENT), + eq(CALL_ID), + same(peer)); + verifyNoMoreInteractions(mockSinkWriter); + verify(mockListener).onHeaders(same(serverInitial)); + } + + // send client msg + { + byte[] request = "this is a request".getBytes(US_ASCII); + interceptedCall.sendMessage(request); + verify(mockSinkWriter).logRpcMessage( + /*seq=*/ eq(3L), + eq(EventType.EVENT_TYPE_CLIENT_MESSAGE), + same(BYTEARRAY_MARSHALLER), + same(request), + eq(Logger.LOGGER_CLIENT), + eq(CALL_ID)); + verifyNoMoreInteractions(mockSinkWriter); + assertSame(request, actualRequest.get()); + } + + // client half close + { + interceptedCall.halfClose(); + verify(mockSinkWriter).logHalfClose( + /*seq=*/ eq(4L), + eq(Logger.LOGGER_CLIENT), + eq(CALL_ID)); + halfCloseCalled.get(1, TimeUnit.SECONDS); + verifyNoMoreInteractions(mockSinkWriter); + } + + // receive server msg + { + byte[] response = "this is a response".getBytes(US_ASCII); + interceptedListener.get().onMessage(response); + verify(mockSinkWriter).logRpcMessage( + /*seq=*/ eq(5L), + eq(EventType.EVENT_TYPE_SERVER_MESSAGE), + same(BYTEARRAY_MARSHALLER), + same(response), + eq(Logger.LOGGER_CLIENT), + eq(CALL_ID)); + verifyNoMoreInteractions(mockSinkWriter); + verify(mockListener).onMessage(same(response)); + } + + // receive trailer + { + Status status = Status.INTERNAL.withDescription("some description"); + Metadata trailers = new Metadata(); + + interceptedListener.get().onClose(status, trailers); + verify(mockSinkWriter).logTrailer( + /*seq=*/ eq(6L), + same(status), + same(trailers), + eq(Logger.LOGGER_CLIENT), + eq(CALL_ID), + ArgumentMatchers.isNull()); + verifyNoMoreInteractions(mockSinkWriter); + verify(mockListener).onClose(same(status), same(trailers)); + } + + // cancel + { + interceptedCall.cancel(null, null); + verify(mockSinkWriter).logCancel( + /*seq=*/ eq(7L), + eq(Logger.LOGGER_CLIENT), + eq(CALL_ID)); + cancelCalled.get(1, TimeUnit.SECONDS); + } + } + + @Test + public void clientInterceptor_trailersOnlyResponseLogsPeerAddress() throws Exception { + final AtomicReference> interceptedListener = + new AtomicReference<>(); + // capture these manually because ClientCall can not be mocked + final AtomicReference actualClientInitial = new AtomicReference<>(); + final AtomicReference actualRequest = new AtomicReference<>(); + + Channel channel = new Channel() { + @Override + public ClientCall newCall( + MethodDescriptor methodDescriptor, CallOptions callOptions) { + return new NoopClientCall() { + @Override + @SuppressWarnings("unchecked") + public void start(Listener responseListener, Metadata headers) { + interceptedListener.set((Listener) responseListener); + actualClientInitial.set(headers); + } + + @Override + public void sendMessage(RequestT message) { + actualRequest.set(message); + } + + @Override + public Attributes getAttributes() { + return Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer).build(); + } + }; + } + + @Override + public String authority() { + return "the-authority"; + } + }; + + @SuppressWarnings("unchecked") + ClientCall.Listener mockListener = mock(ClientCall.Listener.class); + + MethodDescriptor method = + MethodDescriptor.newBuilder() + .setType(MethodType.UNKNOWN) + .setFullMethodName("service/method") + .setRequestMarshaller(BYTEARRAY_MARSHALLER) + .setResponseMarshaller(BYTEARRAY_MARSHALLER) + .build(); + ClientCall interceptedCall = + new BinlogHelper(mockSinkWriter) + .getClientInterceptor(CALL_ID) + .interceptCall( + method, + CallOptions.DEFAULT.withDeadlineAfter(1, TimeUnit.SECONDS), + channel); + Metadata clientInitial = new Metadata(); + interceptedCall.start(mockListener, clientInitial); + verify(mockSinkWriter).logClientHeader( + /*seq=*/ eq(1L), + anyString(), + anyString(), + any(Duration.class), + any(Metadata.class), + eq(Logger.LOGGER_CLIENT), + eq(CALL_ID), + ArgumentMatchers.isNull()); + verifyNoMoreInteractions(mockSinkWriter); + + // trailer only response + { + Status status = Status.INTERNAL.withDescription("some description"); + Metadata trailers = new Metadata(); + + interceptedListener.get().onClose(status, trailers); + verify(mockSinkWriter).logTrailer( + /*seq=*/ eq(2L), + same(status), + same(trailers), + eq(Logger.LOGGER_CLIENT), + eq(CALL_ID), + same(peer)); + verifyNoMoreInteractions(mockSinkWriter); + verify(mockListener).onClose(same(status), same(trailers)); + } + } + + @Test + public void serverInterceptor() throws Exception { + final AtomicReference> interceptedCall = + new AtomicReference<>(); + ServerCall.Listener capturedListener; + @SuppressWarnings("unchecked") + final ServerCall.Listener mockListener = mock(ServerCall.Listener.class); + // capture these manually because ServerCall can not be mocked + final AtomicReference actualServerInitial = new AtomicReference<>(); + final AtomicReference actualResponse = new AtomicReference<>(); + final AtomicReference actualStatus = new AtomicReference<>(); + final AtomicReference actualTrailers = new AtomicReference<>(); + + // begin call and receive client header + { + Metadata clientInitial = new Metadata(); + final MethodDescriptor method = + MethodDescriptor.newBuilder() + .setType(MethodType.UNKNOWN) + .setFullMethodName("service/method") + .setRequestMarshaller(BYTEARRAY_MARSHALLER) + .setResponseMarshaller(BYTEARRAY_MARSHALLER) + .build(); + capturedListener = + new BinlogHelper(mockSinkWriter) + .getServerInterceptor(CALL_ID) + .interceptCall( + new NoopServerCall() { + @Override + public void sendHeaders(Metadata headers) { + actualServerInitial.set(headers); + } + + @Override + public void sendMessage(byte[] message) { + actualResponse.set(message); + } + + @Override + public void close(Status status, Metadata trailers) { + actualStatus.set(status); + actualTrailers.set(trailers); + } + + @Override + public MethodDescriptor getMethodDescriptor() { + return method; + } + + @Override + public Attributes getAttributes() { + return Attributes + .newBuilder() + .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer) + .build(); + } + + @Override + public String getAuthority() { + return "the-authority"; + } + }, + clientInitial, + new ServerCallHandler() { + @Override + public ServerCall.Listener startCall( + ServerCall call, + Metadata headers) { + interceptedCall.set(call); + return mockListener; + } + }); + verify(mockSinkWriter).logClientHeader( + /*seq=*/ eq(1L), + eq("service/method"), + eq("the-authority"), + ArgumentMatchers.isNull(), + same(clientInitial), + eq(Logger.LOGGER_SERVER), + eq(CALL_ID), + same(peer)); + verifyNoMoreInteractions(mockSinkWriter); + } + + // send server header + { + Metadata serverInital = new Metadata(); + interceptedCall.get().sendHeaders(serverInital); + verify(mockSinkWriter).logServerHeader( + /*seq=*/ eq(2L), + same(serverInital), + eq(Logger.LOGGER_SERVER), + eq(CALL_ID), + ArgumentMatchers.isNull()); + verifyNoMoreInteractions(mockSinkWriter); + assertSame(serverInital, actualServerInitial.get()); + } + + // receive client msg + { + byte[] request = "this is a request".getBytes(US_ASCII); + capturedListener.onMessage(request); + verify(mockSinkWriter).logRpcMessage( + /*seq=*/ eq(3L), + eq(EventType.EVENT_TYPE_CLIENT_MESSAGE), + same(BYTEARRAY_MARSHALLER), + same(request), + eq(Logger.LOGGER_SERVER), + eq(CALL_ID)); + verifyNoMoreInteractions(mockSinkWriter); + verify(mockListener).onMessage(same(request)); + } + + // client half close + { + capturedListener.onHalfClose(); + verify(mockSinkWriter).logHalfClose( + eq(4L), + eq(Logger.LOGGER_SERVER), + eq(CALL_ID)); + verifyNoMoreInteractions(mockSinkWriter); + verify(mockListener).onHalfClose(); + } + + // send server msg + { + byte[] response = "this is a response".getBytes(US_ASCII); + interceptedCall.get().sendMessage(response); + verify(mockSinkWriter).logRpcMessage( + /*seq=*/ eq(5L), + eq(EventType.EVENT_TYPE_SERVER_MESSAGE), + same(BYTEARRAY_MARSHALLER), + same(response), + eq(Logger.LOGGER_SERVER), + eq(CALL_ID)); + verifyNoMoreInteractions(mockSinkWriter); + assertSame(response, actualResponse.get()); + } + + // send trailer + { + Status status = Status.INTERNAL.withDescription("some description"); + Metadata trailers = new Metadata(); + interceptedCall.get().close(status, trailers); + verify(mockSinkWriter).logTrailer( + /*seq=*/ eq(6L), + same(status), + same(trailers), + eq(Logger.LOGGER_SERVER), + eq(CALL_ID), + ArgumentMatchers.isNull()); + verifyNoMoreInteractions(mockSinkWriter); + assertSame(status, actualStatus.get()); + assertSame(trailers, actualTrailers.get()); + } + + // cancel + { + capturedListener.onCancel(); + verify(mockSinkWriter).logCancel( + /*seq=*/ eq(7L), + eq(Logger.LOGGER_SERVER), + eq(CALL_ID)); + verify(mockListener).onCancel(); + } + } + + /** A builder class to make unit test code more readable. */ + private static final class Builder { + int maxHeaderBytes = 0; + int maxMessageBytes = 0; + + Builder header(int bytes) { + maxHeaderBytes = bytes; + return this; + } + + Builder msg(int bytes) { + maxMessageBytes = bytes; + return this; + } + + BinlogHelper build() { + return new BinlogHelper( + new SinkWriterImpl(mock(BinaryLogSink.class), null, maxHeaderBytes, maxMessageBytes)); + } + } + + private static void assertSameLimits(BinlogHelper a, BinlogHelper b) { + assertEquals(a.writer.getMaxMessageBytes(), b.writer.getMaxMessageBytes()); + assertEquals(a.writer.getMaxHeaderBytes(), b.writer.getMaxHeaderBytes()); + } + + private BinlogHelper makeLog(String factoryConfigStr, String lookup) { + return new BinlogHelper.FactoryImpl(sink, factoryConfigStr).getLog(lookup); + } + + private BinlogHelper makeOptions(String logConfigStr) { + return FactoryImpl.createBinaryLog(sink, logConfigStr); + } + + private static GrpcLogEntry metadataToProtoTestHelper( + EventType type, Metadata metadata, int maxHeaderBytes) { + GrpcLogEntry.Builder builder = GrpcLogEntry.newBuilder(); + MaybeTruncated pair + = BinlogHelper.createMetadataProto(metadata, maxHeaderBytes); + switch (type) { + case EVENT_TYPE_CLIENT_HEADER: + builder.setClientHeader(ClientHeader.newBuilder().setMetadata(pair.proto)); + break; + case EVENT_TYPE_SERVER_HEADER: + builder.setServerHeader(ServerHeader.newBuilder().setMetadata(pair.proto)); + break; + case EVENT_TYPE_SERVER_TRAILER: + builder.setTrailer(Trailer.newBuilder().setMetadata(pair.proto)); + break; + default: + throw new IllegalArgumentException(); + } + builder.setType(type).setPayloadTruncated(pair.truncated); + return builder.build(); + } + + private static GrpcLogEntry messageToProtoTestHelper( + byte[] message, int maxMessageBytes) { + GrpcLogEntry.Builder builder = GrpcLogEntry.newBuilder(); + MaybeTruncated pair + = BinlogHelper.createMessageProto(message, maxMessageBytes); + builder.setMessage(pair.proto).setPayloadTruncated(pair.truncated); + return builder.build(); + } +} diff --git a/services/src/test/java/io/grpc/services/ChannelzProtoUtilTest.java b/services/src/test/java/io/grpc/services/ChannelzProtoUtilTest.java new file mode 100644 index 00000000000..8692f6fcde0 --- /dev/null +++ b/services/src/test/java/io/grpc/services/ChannelzProtoUtilTest.java @@ -0,0 +1,949 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.services; + +import static com.google.common.truth.Truth.assertThat; +import static io.grpc.InternalChannelz.id; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.Any; +import com.google.protobuf.ByteString; +import com.google.protobuf.Int64Value; +import com.google.protobuf.Message; +import com.google.protobuf.util.Durations; +import com.google.protobuf.util.Timestamps; +import io.grpc.ConnectivityState; +import io.grpc.InternalChannelz; +import io.grpc.InternalChannelz.ChannelStats; +import io.grpc.InternalChannelz.ChannelTrace.Event; +import io.grpc.InternalChannelz.ChannelTrace.Event.Severity; +import io.grpc.InternalChannelz.RootChannelList; +import io.grpc.InternalChannelz.ServerList; +import io.grpc.InternalChannelz.ServerSocketsList; +import io.grpc.InternalChannelz.ServerStats; +import io.grpc.InternalChannelz.SocketOptions; +import io.grpc.InternalChannelz.SocketStats; +import io.grpc.InternalInstrumented; +import io.grpc.InternalWithLogId; +import io.grpc.channelz.v1.Address; +import io.grpc.channelz.v1.Address.OtherAddress; +import io.grpc.channelz.v1.Address.TcpIpAddress; +import io.grpc.channelz.v1.Address.UdsAddress; +import io.grpc.channelz.v1.Channel; +import io.grpc.channelz.v1.ChannelConnectivityState; +import io.grpc.channelz.v1.ChannelConnectivityState.State; +import io.grpc.channelz.v1.ChannelData; +import io.grpc.channelz.v1.ChannelRef; +import io.grpc.channelz.v1.ChannelTrace; +import io.grpc.channelz.v1.ChannelTraceEvent; +import io.grpc.channelz.v1.GetChannelRequest; +import io.grpc.channelz.v1.GetServerSocketsResponse; +import io.grpc.channelz.v1.GetServersResponse; +import io.grpc.channelz.v1.GetTopChannelsResponse; +import io.grpc.channelz.v1.Security; +import io.grpc.channelz.v1.Security.OtherSecurity; +import io.grpc.channelz.v1.Security.Tls; +import io.grpc.channelz.v1.Server; +import io.grpc.channelz.v1.ServerData; +import io.grpc.channelz.v1.ServerRef; +import io.grpc.channelz.v1.Socket; +import io.grpc.channelz.v1.SocketData; +import io.grpc.channelz.v1.SocketOption; +import io.grpc.channelz.v1.SocketOptionLinger; +import io.grpc.channelz.v1.SocketOptionTcpInfo; +import io.grpc.channelz.v1.SocketOptionTimeout; +import io.grpc.channelz.v1.SocketRef; +import io.grpc.channelz.v1.Subchannel; +import io.grpc.channelz.v1.SubchannelRef; +import io.grpc.services.ChannelzTestHelper.TestChannel; +import io.grpc.services.ChannelzTestHelper.TestListenSocket; +import io.grpc.services.ChannelzTestHelper.TestServer; +import io.grpc.services.ChannelzTestHelper.TestSocket; +import io.netty.channel.unix.DomainSocketAddress; +import java.net.Inet4Address; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.security.cert.Certificate; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public final class ChannelzProtoUtilTest { + + private final TestChannel channel = new TestChannel(); + private final ChannelRef channelRef = ChannelRef + .newBuilder() + .setName(channel.toString()) + .setChannelId(channel.getLogId().getId()) + .build(); + private final ChannelData channelData = ChannelData + .newBuilder() + .setTarget("sometarget") + .setState(ChannelConnectivityState.newBuilder().setState(State.READY)) + .setCallsStarted(1) + .setCallsSucceeded(2) + .setCallsFailed(3) + .setLastCallStartedTimestamp(Timestamps.fromNanos(4)) + .build(); + private final Channel channelProto = Channel + .newBuilder() + .setRef(channelRef) + .setData(channelData) + .build(); + + private final TestChannel subchannel = new TestChannel(); + private final SubchannelRef subchannelRef = SubchannelRef + .newBuilder() + .setName(subchannel.toString()) + .setSubchannelId(subchannel.getLogId().getId()) + .build(); + private final ChannelData subchannelData = ChannelData + .newBuilder() + .setTarget("sometarget") + .setState(ChannelConnectivityState.newBuilder().setState(State.READY)) + .setCallsStarted(1) + .setCallsSucceeded(2) + .setCallsFailed(3) + .setLastCallStartedTimestamp(Timestamps.fromNanos(4)) + .build(); + private final Subchannel subchannelProto = Subchannel + .newBuilder() + .setRef(subchannelRef) + .setData(subchannelData) + .build(); + + private final TestServer server = new TestServer(); + private final ServerRef serverRef = ServerRef + .newBuilder() + .setName(server.toString()) + .setServerId(server.getLogId().getId()) + .build(); + private final ServerData serverData = ServerData + .newBuilder() + .setCallsStarted(1) + .setCallsSucceeded(2) + .setCallsFailed(3) + .setLastCallStartedTimestamp(Timestamps.fromNanos(4)) + .build(); + private final Server serverProto = Server + .newBuilder() + .setRef(serverRef) + .setData(serverData) + .build(); + + private final SocketOption sockOptLingerDisabled = SocketOption + .newBuilder() + .setName("SO_LINGER") + .setAdditional( + Any.pack(SocketOptionLinger.getDefaultInstance())) + .build(); + + private final SocketOption sockOptlinger10s = SocketOption + .newBuilder() + .setName("SO_LINGER") + .setAdditional( + Any.pack(SocketOptionLinger + .newBuilder() + .setActive(true) + .setDuration(Durations.fromSeconds(10)) + .build())) + .build(); + + private final SocketOption sockOptTimeout200ms = SocketOption + .newBuilder() + .setName("SO_TIMEOUT") + .setAdditional( + Any.pack(SocketOptionTimeout + .newBuilder() + .setDuration(Durations.fromMillis(200)) + .build()) + ).build(); + + private final SocketOption sockOptAdditional = SocketOption + .newBuilder() + .setName("SO_MADE_UP_OPTION") + .setValue("some-made-up-value") + .build(); + + private final InternalChannelz.TcpInfo channelzTcpInfo + = new InternalChannelz.TcpInfo.Builder() + .setState(70) + .setCaState(71) + .setRetransmits(72) + .setProbes(73) + .setBackoff(74) + .setOptions(75) + .setSndWscale(76) + .setRcvWscale(77) + .setRto(78) + .setAto(79) + .setSndMss(710) + .setRcvMss(711) + .setUnacked(712) + .setSacked(713) + .setLost(714) + .setRetrans(715) + .setFackets(716) + .setLastDataSent(717) + .setLastAckSent(718) + .setLastDataRecv(719) + .setLastAckRecv(720) + .setPmtu(721) + .setRcvSsthresh(722) + .setRtt(723) + .setRttvar(724) + .setSndSsthresh(725) + .setSndCwnd(726) + .setAdvmss(727) + .setReordering(728) + .build(); + + private final SocketOption socketOptionTcpInfo = SocketOption + .newBuilder() + .setName("TCP_INFO") + .setAdditional( + Any.pack( + SocketOptionTcpInfo.newBuilder() + .setTcpiState(70) + .setTcpiCaState(71) + .setTcpiRetransmits(72) + .setTcpiProbes(73) + .setTcpiBackoff(74) + .setTcpiOptions(75) + .setTcpiSndWscale(76) + .setTcpiRcvWscale(77) + .setTcpiRto(78) + .setTcpiAto(79) + .setTcpiSndMss(710) + .setTcpiRcvMss(711) + .setTcpiUnacked(712) + .setTcpiSacked(713) + .setTcpiLost(714) + .setTcpiRetrans(715) + .setTcpiFackets(716) + .setTcpiLastDataSent(717) + .setTcpiLastAckSent(718) + .setTcpiLastDataRecv(719) + .setTcpiLastAckRecv(720) + .setTcpiPmtu(721) + .setTcpiRcvSsthresh(722) + .setTcpiRtt(723) + .setTcpiRttvar(724) + .setTcpiSndSsthresh(725) + .setTcpiSndCwnd(726) + .setTcpiAdvmss(727) + .setTcpiReordering(728) + .build())) + .build(); + + private final TestListenSocket listenSocket = new TestListenSocket(); + private final SocketRef listenSocketRef = SocketRef + .newBuilder() + .setName(listenSocket.toString()) + .setSocketId(id(listenSocket)) + .build(); + private final Address listenAddress = Address + .newBuilder() + .setTcpipAddress( + TcpIpAddress + .newBuilder() + .setIpAddress(ByteString.copyFrom( + ((InetSocketAddress) listenSocket.listenAddress).getAddress().getAddress())) + .setPort(1234)) + .build(); + + private final TestSocket socket = new TestSocket(); + private final SocketRef socketRef = SocketRef + .newBuilder() + .setName(socket.toString()) + .setSocketId(socket.getLogId().getId()) + .build(); + private final SocketData socketDataWithDataNoSockOpts = SocketData + .newBuilder() + .setStreamsStarted(1) + .setLastLocalStreamCreatedTimestamp(Timestamps.fromNanos(2)) + .setLastRemoteStreamCreatedTimestamp(Timestamps.fromNanos(3)) + .setStreamsSucceeded(4) + .setStreamsFailed(5) + .setMessagesSent(6) + .setMessagesReceived(7) + .setKeepAlivesSent(8) + .setLastMessageSentTimestamp(Timestamps.fromNanos(9)) + .setLastMessageReceivedTimestamp(Timestamps.fromNanos(10)) + .setLocalFlowControlWindow(Int64Value.newBuilder().setValue(11)) + .setRemoteFlowControlWindow(Int64Value.newBuilder().setValue(12)) + .build(); + private final Address localAddress = Address + .newBuilder() + .setTcpipAddress( + TcpIpAddress + .newBuilder() + .setIpAddress(ByteString.copyFrom( + ((InetSocketAddress) socket.local).getAddress().getAddress())) + .setPort(1000)) + .build(); + private final Address remoteAddress = Address + .newBuilder() + .setTcpipAddress( + TcpIpAddress + .newBuilder() + .setIpAddress(ByteString.copyFrom( + ((InetSocketAddress) socket.remote).getAddress().getAddress())) + .setPort(1000)) + .build(); + + private final ChannelTrace channelTrace = ChannelTrace + .newBuilder() + .setNumEventsLogged(1234) + .setCreationTimestamp(Timestamps.fromNanos(1000)) + .build(); + + @Test + public void toChannelRef() { + assertEquals(channelRef, ChannelzProtoUtil.toChannelRef(channel)); + } + + @Test + public void toSubchannelRef() { + assertEquals(subchannelRef, ChannelzProtoUtil.toSubchannelRef(subchannel)); + } + + @Test + public void toServerRef() { + assertEquals(serverRef, ChannelzProtoUtil.toServerRef(server)); + } + + @Test + public void toSocketRef() { + assertEquals(socketRef, ChannelzProtoUtil.toSocketRef(socket)); + } + + @Test + public void toState() { + for (ConnectivityState connectivityState : ConnectivityState.values()) { + assertEquals( + connectivityState.name(), + ChannelzProtoUtil.toState(connectivityState).getValueDescriptor().getName()); + } + assertEquals(State.UNKNOWN, ChannelzProtoUtil.toState(null)); + } + + @Test + public void toSocket_withDataNoOptions() throws Exception { + assertEquals( + Socket + .newBuilder() + .setRef(socketRef) + .setLocal(localAddress) + .setRemote(remoteAddress) + .setData(socketDataWithDataNoSockOpts) + .build(), + ChannelzProtoUtil.toSocket(socket)); + } + + @Test + public void toSocket_noDataWithOptions() throws Exception { + assertEquals( + Socket + .newBuilder() + .setRef(listenSocketRef) + .setLocal(listenAddress) + .setData( + SocketData + .newBuilder() + .addOption( + SocketOption + .newBuilder() + .setName("listen_option") + .setValue("listen_option_value"))) + .build(), + ChannelzProtoUtil.toSocket(listenSocket)); + } + + @Test + public void toSocket_withDataWithOptions() throws Exception { + socket.socketOptions + = new SocketOptions(null, null, null, ImmutableMap.of("test_name", "test_value")); + assertEquals( + Socket + .newBuilder() + .setRef(socketRef) + .setLocal(localAddress) + .setRemote(remoteAddress) + .setData( + SocketData + .newBuilder(socketDataWithDataNoSockOpts) + .addOption( + SocketOption.newBuilder() + .setName("test_name").setValue("test_value"))) + .build(), + ChannelzProtoUtil.toSocket(socket)); + } + + @Test + public void extractSocketData() throws Exception { + // no options + assertEquals( + socketDataWithDataNoSockOpts, + ChannelzProtoUtil.extractSocketData(socket.getStats().get())); + + // with options + socket.socketOptions = toBuilder(socket.socketOptions) + .setSocketOptionLingerSeconds(10) + .setTcpInfo(channelzTcpInfo) + .build(); + assertEquals( + socketDataWithDataNoSockOpts + .toBuilder() + .addOption(sockOptlinger10s) + .addOption(socketOptionTcpInfo) + .build(), + ChannelzProtoUtil.extractSocketData(socket.getStats().get())); + } + + @Test + public void toSocketData() throws Exception { + assertEquals( + socketDataWithDataNoSockOpts + .toBuilder() + .build(), + ChannelzProtoUtil.extractSocketData(socket.getStats().get())); + } + + @Test + public void socketSecurityTls() throws Exception { + Certificate local = mock(Certificate.class); + Certificate remote = mock(Certificate.class); + when(local.getEncoded()).thenReturn("localcert".getBytes(Charsets.UTF_8)); + when(remote.getEncoded()).thenReturn("remotecert".getBytes(Charsets.UTF_8)); + + socket.security = new InternalChannelz.Security( + new InternalChannelz.Tls("TLS_NULL_WITH_NULL_NULL", local, remote)); + assertEquals( + Security.newBuilder().setTls( + Tls.newBuilder() + .setStandardName("TLS_NULL_WITH_NULL_NULL") + .setLocalCertificate(ByteString.copyFrom("localcert", Charsets.UTF_8)) + .setRemoteCertificate(ByteString.copyFrom("remotecert", Charsets.UTF_8))) + .build(), + ChannelzProtoUtil.toSocket(socket).getSecurity()); + + socket.security = new InternalChannelz.Security( + new InternalChannelz.Tls("TLS_NULL_WITH_NULL_NULL", /*localCert=*/ null, remote)); + assertEquals( + Security.newBuilder().setTls( + Tls.newBuilder() + .setStandardName("TLS_NULL_WITH_NULL_NULL") + .setRemoteCertificate(ByteString.copyFrom("remotecert", Charsets.UTF_8))) + .build(), + ChannelzProtoUtil.toSocket(socket).getSecurity()); + + socket.security = new InternalChannelz.Security( + new InternalChannelz.Tls("TLS_NULL_WITH_NULL_NULL", local, /*remoteCert=*/ null)); + assertEquals( + Security.newBuilder().setTls( + Tls.newBuilder() + .setStandardName("TLS_NULL_WITH_NULL_NULL") + .setLocalCertificate(ByteString.copyFrom("localcert", Charsets.UTF_8))) + .build(), + ChannelzProtoUtil.toSocket(socket).getSecurity()); + } + + @Test + public void socketSecurityOther() throws Exception { + // what is packed here is not important, just pick some proto message + Message contents = GetChannelRequest.newBuilder().setChannelId(1).build(); + Any packed = Any.pack(contents); + socket.security + = new InternalChannelz.Security( + new InternalChannelz.OtherSecurity("other_security", packed)); + assertEquals( + Security.newBuilder().setOther( + OtherSecurity.newBuilder().setName("other_security").setValue(packed)) + .build(), + ChannelzProtoUtil.toSocket(socket).getSecurity()); + } + + @Test + public void toAddress_inet() throws Exception { + InetSocketAddress inet4 = new InetSocketAddress(Inet4Address.getByName("10.0.0.1"), 1000); + assertEquals( + Address.newBuilder().setTcpipAddress( + TcpIpAddress + .newBuilder() + .setIpAddress(ByteString.copyFrom(inet4.getAddress().getAddress())) + .setPort(1000)) + .build(), + ChannelzProtoUtil.toAddress(inet4)); + } + + @Test + public void toAddress_uds() throws Exception { + String path = "/tmp/foo"; + DomainSocketAddress uds = new DomainSocketAddress(path); + assertEquals( + Address.newBuilder().setUdsAddress( + UdsAddress + .newBuilder() + .setFilename(path)) + .build(), + ChannelzProtoUtil.toAddress(uds)); + } + + @Test + public void toAddress_other() throws Exception { + final String name = "my name"; + SocketAddress other = new SocketAddress() { + @Override + public String toString() { + return name; + } + }; + assertEquals( + Address.newBuilder().setOtherAddress( + OtherAddress + .newBuilder() + .setName(name)) + .build(), + ChannelzProtoUtil.toAddress(other)); + } + + @Test + public void toServer() throws Exception { + // no listen sockets + assertEquals(serverProto, ChannelzProtoUtil.toServer(server)); + + // 1 listen socket + server.serverStats = toBuilder(server.serverStats) + .addListenSockets(ImmutableList.>of(listenSocket)) + .build(); + assertEquals( + serverProto + .toBuilder() + .addListenSocket(listenSocketRef) + .build(), + ChannelzProtoUtil.toServer(server)); + + // multiple listen sockets + TestListenSocket otherListenSocket = new TestListenSocket(); + SocketRef otherListenSocketRef = ChannelzProtoUtil.toSocketRef(otherListenSocket); + server.serverStats = toBuilder(server.serverStats) + .addListenSockets( + ImmutableList.>of(otherListenSocket)) + .build(); + assertEquals( + serverProto + .toBuilder() + .addListenSocket(listenSocketRef) + .addListenSocket(otherListenSocketRef) + .build(), + ChannelzProtoUtil.toServer(server)); + } + + @Test + public void toServerData() throws Exception { + assertEquals(serverData, ChannelzProtoUtil.toServerData(server.serverStats)); + } + + @Test + public void toChannel() throws Exception { + assertEquals(channelProto, ChannelzProtoUtil.toChannel(channel)); + + channel.stats = toBuilder(channel.stats) + .setSubchannels(ImmutableList.of(subchannel)) + .build(); + + assertEquals( + channelProto + .toBuilder() + .addSubchannelRef(subchannelRef) + .build(), + ChannelzProtoUtil.toChannel(channel)); + + TestChannel otherSubchannel = new TestChannel(); + channel.stats = toBuilder(channel.stats) + .setSubchannels(ImmutableList.of(subchannel, otherSubchannel)) + .build(); + assertEquals( + channelProto + .toBuilder() + .addSubchannelRef(subchannelRef) + .addSubchannelRef(ChannelzProtoUtil.toSubchannelRef(otherSubchannel)) + .build(), + ChannelzProtoUtil.toChannel(channel)); + } + + @Test + public void extractChannelData() { + assertEquals(channelData, ChannelzProtoUtil.extractChannelData(channel.stats)); + } + + @Test + public void toSubchannel_noChildren() throws Exception { + assertEquals( + subchannelProto, + ChannelzProtoUtil.toSubchannel(subchannel)); + } + + @Test + public void toSubchannel_socketChildren() throws Exception { + subchannel.stats = toBuilder(subchannel.stats) + .setSockets(ImmutableList.of(socket)) + .build(); + + assertEquals( + subchannelProto.toBuilder() + .addSocketRef(socketRef) + .build(), + ChannelzProtoUtil.toSubchannel(subchannel)); + + TestSocket otherSocket = new TestSocket(); + subchannel.stats = toBuilder(subchannel.stats) + .setSockets(ImmutableList.of(socket, otherSocket)) + .build(); + assertEquals( + subchannelProto + .toBuilder() + .addSocketRef(socketRef) + .addSocketRef(ChannelzProtoUtil.toSocketRef(otherSocket)) + .build(), + ChannelzProtoUtil.toSubchannel(subchannel)); + } + + @Test + public void toSubchannel_subchannelChildren() throws Exception { + TestChannel subchannel1 = new TestChannel(); + subchannel.stats = toBuilder(subchannel.stats) + .setSubchannels(ImmutableList.of(subchannel1)) + .build(); + assertEquals( + subchannelProto.toBuilder() + .addSubchannelRef(ChannelzProtoUtil.toSubchannelRef(subchannel1)) + .build(), + ChannelzProtoUtil.toSubchannel(subchannel)); + + TestChannel subchannel2 = new TestChannel(); + subchannel.stats = toBuilder(subchannel.stats) + .setSubchannels(ImmutableList.of(subchannel1, subchannel2)) + .build(); + assertEquals( + subchannelProto + .toBuilder() + .addSubchannelRef(ChannelzProtoUtil.toSubchannelRef(subchannel1)) + .addSubchannelRef(ChannelzProtoUtil.toSubchannelRef(subchannel2)) + .build(), + ChannelzProtoUtil.toSubchannel(subchannel)); + } + + @Test + public void toGetTopChannelsResponse() { + // empty results + assertEquals( + GetTopChannelsResponse.newBuilder().setEnd(true).build(), + ChannelzProtoUtil.toGetTopChannelResponse( + new RootChannelList( + Collections.>emptyList(), true))); + + // 1 result, paginated + assertEquals( + GetTopChannelsResponse + .newBuilder() + .addChannel(channelProto) + .build(), + ChannelzProtoUtil.toGetTopChannelResponse( + new RootChannelList( + ImmutableList.>of(channel), false))); + + // 1 result, end + assertEquals( + GetTopChannelsResponse + .newBuilder() + .addChannel(channelProto) + .setEnd(true) + .build(), + ChannelzProtoUtil.toGetTopChannelResponse( + new RootChannelList( + ImmutableList.>of(channel), true))); + + // 2 results, end + TestChannel channel2 = new TestChannel(); + assertEquals( + GetTopChannelsResponse + .newBuilder() + .addChannel(channelProto) + .addChannel(ChannelzProtoUtil.toChannel(channel2)) + .setEnd(true) + .build(), + ChannelzProtoUtil.toGetTopChannelResponse( + new RootChannelList( + ImmutableList.>of(channel, channel2), true))); + } + + @Test + public void toGetServersResponse() { + // empty results + assertEquals( + GetServersResponse.getDefaultInstance(), + ChannelzProtoUtil.toGetServersResponse( + new ServerList(Collections.>emptyList(), false))); + + // 1 result, paginated + assertEquals( + GetServersResponse + .newBuilder() + .addServer(serverProto) + .build(), + ChannelzProtoUtil.toGetServersResponse( + new ServerList(ImmutableList.>of(server), false))); + + // 1 result, end + assertEquals( + GetServersResponse + .newBuilder() + .addServer(serverProto) + .setEnd(true) + .build(), + ChannelzProtoUtil.toGetServersResponse( + new ServerList(ImmutableList.>of(server), true))); + + TestServer server2 = new TestServer(); + // 2 results, end + assertEquals( + GetServersResponse + .newBuilder() + .addServer(serverProto) + .addServer(ChannelzProtoUtil.toServer(server2)) + .setEnd(true) + .build(), + ChannelzProtoUtil.toGetServersResponse( + new ServerList( + ImmutableList.>of(server, server2), true))); + } + + @Test + public void toGetServerSocketsResponse() { + // empty results + assertEquals( + GetServerSocketsResponse.getDefaultInstance(), + ChannelzProtoUtil.toGetServerSocketsResponse( + new ServerSocketsList(Collections.emptyList(), false))); + + // 1 result, paginated + assertEquals( + GetServerSocketsResponse + .newBuilder() + .addSocketRef(socketRef) + .build(), + ChannelzProtoUtil.toGetServerSocketsResponse( + new ServerSocketsList(ImmutableList.of(socket), false))); + + // 1 result, end + assertEquals( + GetServerSocketsResponse + .newBuilder() + .addSocketRef(socketRef) + .setEnd(true) + .build(), + ChannelzProtoUtil.toGetServerSocketsResponse( + new ServerSocketsList(ImmutableList.of(socket), true))); + + TestSocket socket2 = new TestSocket(); + // 2 results, end + assertEquals( + GetServerSocketsResponse + .newBuilder() + .addSocketRef(socketRef) + .addSocketRef(ChannelzProtoUtil.toSocketRef(socket2)) + .setEnd(true) + .build(), + ChannelzProtoUtil.toGetServerSocketsResponse( + new ServerSocketsList(ImmutableList.of(socket, socket2), true))); + } + + @Test + public void toSocketOptionLinger() { + assertEquals(sockOptLingerDisabled, ChannelzProtoUtil.toSocketOptionLinger(-1)); + assertEquals(sockOptlinger10s, ChannelzProtoUtil.toSocketOptionLinger(10)); + } + + @Test + public void toSocketOptionTimeout() { + assertEquals( + sockOptTimeout200ms, ChannelzProtoUtil.toSocketOptionTimeout("SO_TIMEOUT", 200)); + } + + @Test + public void toSocketOptionAdditional() { + assertEquals( + sockOptAdditional, + ChannelzProtoUtil.toSocketOptionAdditional("SO_MADE_UP_OPTION", "some-made-up-value")); + } + + @Test + public void toSocketOptionTcpInfo() { + assertEquals( + socketOptionTcpInfo, + ChannelzProtoUtil.toSocketOptionTcpInfo(channelzTcpInfo)); + } + + @Test + public void toSocketOptionsList() { + assertThat( + ChannelzProtoUtil.toSocketOptionsList( + new InternalChannelz.SocketOptions.Builder().build())) + .isEmpty(); + + assertThat( + ChannelzProtoUtil.toSocketOptionsList( + new InternalChannelz.SocketOptions.Builder().setSocketOptionLingerSeconds(10).build())) + .containsExactly(sockOptlinger10s); + + assertThat( + ChannelzProtoUtil.toSocketOptionsList( + new InternalChannelz.SocketOptions.Builder().setSocketOptionTimeoutMillis(200).build())) + .containsExactly(sockOptTimeout200ms); + + assertThat( + ChannelzProtoUtil.toSocketOptionsList( + new InternalChannelz.SocketOptions + .Builder() + .addOption("SO_MADE_UP_OPTION", "some-made-up-value") + .build())) + .containsExactly(sockOptAdditional); + + SocketOption otherOption = SocketOption + .newBuilder() + .setName("SO_MADE_UP_OPTION2") + .setValue("some-made-up-value2") + .build(); + assertThat( + ChannelzProtoUtil.toSocketOptionsList( + new InternalChannelz.SocketOptions.Builder() + .addOption("SO_MADE_UP_OPTION", "some-made-up-value") + .addOption("SO_MADE_UP_OPTION2", "some-made-up-value2") + .build())) + .containsExactly(sockOptAdditional, otherOption); + } + + @Test + public void channelTrace_withoutEvents() { + ChannelStats stats = toBuilder(channel.stats) + .setChannelTrace(new InternalChannelz.ChannelTrace.Builder() + .setNumEventsLogged(1234) + .setCreationTimeNanos(1000) + .build()) + .build(); + + ChannelData protoStats = channelData.toBuilder().setTrace(channelTrace).build(); + assertEquals(ChannelzProtoUtil.extractChannelData(stats), protoStats); + } + + @Test + public void channelTrace_withEvents() { + Event event1 = new Event.Builder() + .setDescription("event1") + .setSeverity(Severity.CT_ERROR) + .setTimestampNanos(12) + .setSubchannelRef(subchannel) + .build(); + Event event2 = new Event.Builder() + .setDescription("event2") + .setTimestampNanos(34) + .setSeverity(Severity.CT_INFO) + .setChannelRef(channel) + .build(); + + ChannelStats stats = + toBuilder(channel.stats) + .setChannelTrace( + new InternalChannelz.ChannelTrace.Builder() + .setNumEventsLogged(1234) + .setCreationTimeNanos(1000) + .setEvents(Arrays.asList(event1, event2)) + .build()) + .build(); + + ChannelTraceEvent protoEvent1 = ChannelTraceEvent + .newBuilder() + .setDescription("event1") + .setTimestamp(Timestamps.fromNanos(12)) + .setSeverity(ChannelTraceEvent.Severity.CT_ERROR) + .setSubchannelRef(subchannelRef) + .build(); + ChannelTraceEvent protoEvent2 = ChannelTraceEvent + .newBuilder() + .setDescription("event2") + .setTimestamp(Timestamps.fromNanos(34)) + .setSeverity(ChannelTraceEvent.Severity.CT_INFO) + .setChannelRef(channelRef) + .build(); + ChannelData protoStats = channelData + .toBuilder() + .setTrace(channelTrace + .toBuilder() + .addAllEvents(Arrays.asList(protoEvent1, protoEvent2)) + .build()) + .build(); + assertEquals(ChannelzProtoUtil.extractChannelData(stats), protoStats); + } + + private static ChannelStats.Builder toBuilder(ChannelStats stats) { + ChannelStats.Builder builder = new ChannelStats.Builder() + .setTarget(stats.target) + .setState(stats.state) + .setCallsStarted(stats.callsStarted) + .setCallsSucceeded(stats.callsSucceeded) + .setCallsFailed(stats.callsFailed) + .setLastCallStartedNanos(stats.lastCallStartedNanos); + if (!stats.subchannels.isEmpty()) { + builder.setSubchannels(stats.subchannels); + } + if (!stats.sockets.isEmpty()) { + builder.setSockets(stats.sockets); + } + return builder; + } + + + private static SocketOptions.Builder toBuilder(SocketOptions options) { + SocketOptions.Builder builder = new SocketOptions.Builder() + .setSocketOptionTimeoutMillis(options.soTimeoutMillis) + .setSocketOptionLingerSeconds(options.lingerSeconds); + for (Map.Entry entry : options.others.entrySet()) { + builder.addOption(entry.getKey(), entry.getValue()); + } + return builder; + } + + private static ServerStats.Builder toBuilder(ServerStats stats) { + return new ServerStats.Builder() + .setCallsStarted(stats.callsStarted) + .setCallsSucceeded(stats.callsSucceeded) + .setCallsFailed(stats.callsFailed) + .setLastCallStartedNanos(stats.lastCallStartedNanos) + .addListenSockets(stats.listenSockets); + } +} diff --git a/services/src/test/java/io/grpc/services/ChannelzServiceTest.java b/services/src/test/java/io/grpc/services/ChannelzServiceTest.java new file mode 100644 index 00000000000..0849f8611ad --- /dev/null +++ b/services/src/test/java/io/grpc/services/ChannelzServiceTest.java @@ -0,0 +1,276 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.services; + +import static com.google.common.truth.Truth.assertWithMessage; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import io.grpc.InternalChannelz; +import io.grpc.Status; +import io.grpc.channelz.v1.GetChannelRequest; +import io.grpc.channelz.v1.GetChannelResponse; +import io.grpc.channelz.v1.GetServerRequest; +import io.grpc.channelz.v1.GetServerResponse; +import io.grpc.channelz.v1.GetServersRequest; +import io.grpc.channelz.v1.GetServersResponse; +import io.grpc.channelz.v1.GetSocketRequest; +import io.grpc.channelz.v1.GetSocketResponse; +import io.grpc.channelz.v1.GetSubchannelRequest; +import io.grpc.channelz.v1.GetSubchannelResponse; +import io.grpc.channelz.v1.GetTopChannelsRequest; +import io.grpc.channelz.v1.GetTopChannelsResponse; +import io.grpc.services.ChannelzTestHelper.TestChannel; +import io.grpc.services.ChannelzTestHelper.TestServer; +import io.grpc.services.ChannelzTestHelper.TestSocket; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ExecutionException; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; + +@SuppressWarnings("deprecation") +@RunWith(JUnit4.class) +public class ChannelzServiceTest { + // small value to force pagination + private static final int MAX_PAGE_SIZE = 1; + + private final InternalChannelz channelz = new InternalChannelz(); + private ChannelzService service = new ChannelzService(channelz, MAX_PAGE_SIZE); + + @Test + public void getTopChannels_empty() { + assertEquals( + GetTopChannelsResponse.newBuilder().setEnd(true).build(), + getTopChannelHelper(0)); + } + + @Test + public void getTopChannels_onePage() throws Exception { + TestChannel root = new TestChannel(); + channelz.addRootChannel(root); + + assertEquals( + GetTopChannelsResponse + .newBuilder() + .addChannel(ChannelzProtoUtil.toChannel(root)) + .setEnd(true) + .build(), + getTopChannelHelper(0)); + } + + @Test + public void getChannel() throws ExecutionException, InterruptedException { + TestChannel root = new TestChannel(); + assertChannelNotFound(root.getLogId().getId()); + + channelz.addRootChannel(root); + assertEquals( + GetChannelResponse + .newBuilder() + .setChannel(ChannelzProtoUtil.toChannel(root)) + .build(), + getChannelHelper(root.getLogId().getId())); + + channelz.removeRootChannel(root); + assertChannelNotFound(root.getLogId().getId()); + } + + @Test + public void getSubchannel() throws Exception { + TestChannel subchannel = new TestChannel(); + assertSubchannelNotFound(subchannel.getLogId().getId()); + + channelz.addSubchannel(subchannel); + assertEquals( + GetSubchannelResponse + .newBuilder() + .setSubchannel(ChannelzProtoUtil.toSubchannel(subchannel)) + .build(), + getSubchannelHelper(subchannel.getLogId().getId())); + + channelz.removeSubchannel(subchannel); + assertSubchannelNotFound(subchannel.getLogId().getId()); + } + + @Test + public void getServers_empty() { + assertEquals( + GetServersResponse.newBuilder().setEnd(true).build(), + getServersHelper(0)); + } + + @Test + public void getServers_onePage() throws Exception { + TestServer server = new TestServer(); + channelz.addServer(server); + + assertEquals( + GetServersResponse + .newBuilder() + .addServer(ChannelzProtoUtil.toServer(server)) + .setEnd(true) + .build(), + getServersHelper(0)); + } + + @Test + public void getServer() throws ExecutionException, InterruptedException { + TestServer server = new TestServer(); + assertServerNotFound(server.getLogId().getId()); + + channelz.addServer(server); + assertEquals( + GetServerResponse + .newBuilder() + .setServer(ChannelzProtoUtil.toServer(server)) + .build(), + getServerHelper(server.getLogId().getId())); + + channelz.removeServer(server); + assertServerNotFound(server.getLogId().getId()); + } + + + @Test + public void getSocket() throws Exception { + TestSocket socket = new TestSocket(); + assertSocketNotFound(socket.getLogId().getId()); + + channelz.addClientSocket(socket); + assertEquals( + GetSocketResponse + .newBuilder() + .setSocket(ChannelzProtoUtil.toSocket(socket)) + .build(), + getSocketHelper(socket.getLogId().getId())); + + channelz.removeClientSocket(socket); + assertSocketNotFound(socket.getLogId().getId()); + } + + private GetTopChannelsResponse getTopChannelHelper(long startId) { + @SuppressWarnings("unchecked") + StreamObserver observer = mock(StreamObserver.class); + ArgumentCaptor responseCaptor + = ArgumentCaptor.forClass(GetTopChannelsResponse.class); + service.getTopChannels( + GetTopChannelsRequest.newBuilder().setStartChannelId(startId).build(), + observer); + verify(observer).onNext(responseCaptor.capture()); + verify(observer).onCompleted(); + return responseCaptor.getValue(); + } + + private GetChannelResponse getChannelHelper(long id) { + @SuppressWarnings("unchecked") + StreamObserver observer = mock(StreamObserver.class); + ArgumentCaptor response + = ArgumentCaptor.forClass(GetChannelResponse.class); + service.getChannel(GetChannelRequest.newBuilder().setChannelId(id).build(), observer); + verify(observer).onNext(response.capture()); + verify(observer).onCompleted(); + return response.getValue(); + } + + private void assertChannelNotFound(long id) { + @SuppressWarnings("unchecked") + StreamObserver observer = mock(StreamObserver.class); + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + service.getChannel(GetChannelRequest.newBuilder().setChannelId(id).build(), observer); + verify(observer).onError(exceptionCaptor.capture()); + Status s = Status.fromThrowable(exceptionCaptor.getValue()); + assertWithMessage(s.toString()).that(s.getCode()).isEqualTo(Status.Code.NOT_FOUND); + } + + private GetSubchannelResponse getSubchannelHelper(long id) { + @SuppressWarnings("unchecked") + StreamObserver observer = mock(StreamObserver.class); + ArgumentCaptor response + = ArgumentCaptor.forClass(GetSubchannelResponse.class); + service.getSubchannel(GetSubchannelRequest.newBuilder().setSubchannelId(id).build(), observer); + verify(observer).onNext(response.capture()); + verify(observer).onCompleted(); + return response.getValue(); + } + + private void assertSubchannelNotFound(long id) { + @SuppressWarnings("unchecked") + StreamObserver observer = mock(StreamObserver.class); + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + service.getSubchannel(GetSubchannelRequest.newBuilder().setSubchannelId(id).build(), observer); + verify(observer).onError(exceptionCaptor.capture()); + Status s = Status.fromThrowable(exceptionCaptor.getValue()); + assertWithMessage(s.toString()).that(s.getCode()).isEqualTo(Status.Code.NOT_FOUND); + } + + private GetServersResponse getServersHelper(long startId) { + @SuppressWarnings("unchecked") + StreamObserver observer = mock(StreamObserver.class); + ArgumentCaptor responseCaptor + = ArgumentCaptor.forClass(GetServersResponse.class); + service.getServers( + GetServersRequest.newBuilder().setStartServerId(startId).build(), + observer); + verify(observer).onNext(responseCaptor.capture()); + verify(observer).onCompleted(); + return responseCaptor.getValue(); + } + + private void assertServerNotFound(long id) { + @SuppressWarnings("unchecked") + StreamObserver observer = mock(StreamObserver.class); + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + service.getServer(GetServerRequest.newBuilder().setServerId(id).build(), observer); + verify(observer).onError(exceptionCaptor.capture()); + Status s = Status.fromThrowable(exceptionCaptor.getValue()); + assertWithMessage(s.toString()).that(s.getCode()).isEqualTo(Status.Code.NOT_FOUND); + } + + private GetServerResponse getServerHelper(long id) { + @SuppressWarnings("unchecked") + StreamObserver observer = mock(StreamObserver.class); + ArgumentCaptor response = ArgumentCaptor.forClass(GetServerResponse.class); + service.getServer(GetServerRequest.newBuilder().setServerId(id).build(), observer); + verify(observer).onNext(response.capture()); + verify(observer).onCompleted(); + return response.getValue(); + } + + private void assertSocketNotFound(long id) { + @SuppressWarnings("unchecked") + StreamObserver observer = mock(StreamObserver.class); + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + service.getSocket(GetSocketRequest.newBuilder().setSocketId(id).build(), observer); + verify(observer).onError(exceptionCaptor.capture()); + Status s = Status.fromThrowable(exceptionCaptor.getValue()); + assertWithMessage(s.toString()).that(s.getCode()).isEqualTo(Status.Code.NOT_FOUND); + } + + private GetSocketResponse getSocketHelper(long id) { + @SuppressWarnings("unchecked") + StreamObserver observer = mock(StreamObserver.class); + ArgumentCaptor response + = ArgumentCaptor.forClass(GetSocketResponse.class); + service.getSocket(GetSocketRequest.newBuilder().setSocketId(id).build(), observer); + verify(observer).onNext(response.capture()); + verify(observer).onCompleted(); + return response.getValue(); + } +} diff --git a/services/src/test/java/io/grpc/services/ChannelzTestHelper.java b/services/src/test/java/io/grpc/services/ChannelzTestHelper.java new file mode 100644 index 00000000000..6bd8e8bceb5 --- /dev/null +++ b/services/src/test/java/io/grpc/services/ChannelzTestHelper.java @@ -0,0 +1,183 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.services; + +import com.google.common.base.MoreObjects; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import io.grpc.ConnectivityState; +import io.grpc.InternalChannelz; +import io.grpc.InternalChannelz.ChannelStats; +import io.grpc.InternalChannelz.Security; +import io.grpc.InternalChannelz.ServerStats; +import io.grpc.InternalChannelz.SocketOptions; +import io.grpc.InternalChannelz.SocketStats; +import io.grpc.InternalChannelz.TransportStats; +import io.grpc.InternalInstrumented; +import io.grpc.InternalLogId; +import io.grpc.InternalWithLogId; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Collections; + +/** + * Test class definitions that will be used in the proto utils test as well as + * channelz service test. + */ +final class ChannelzTestHelper { + + static final class TestSocket implements InternalInstrumented { + private final InternalLogId id = InternalLogId.allocate("socket", /*details=*/ null); + TransportStats transportStats = new TransportStats( + /*streamsStarted=*/ 1, + /*lastLocalStreamCreatedTimeNanos=*/ 2, + /*lastRemoteStreamCreatedTimeNanos=*/ 3, + /*streamsSucceeded=*/ 4, + /*streamsFailed=*/ 5, + /*messagesSent=*/ 6, + /*messagesReceived=*/ 7, + /*keepAlivesSent=*/ 8, + /*lastMessageSentTimeNanos=*/ 9, + /*lastMessageReceivedTimeNanos=*/ 10, + /*localFlowControlWindow=*/ 11, + /*remoteFlowControlWindow=*/ 12); + SocketAddress local = new InetSocketAddress("10.0.0.1", 1000); + SocketAddress remote = new InetSocketAddress("10.0.0.2", 1000); + InternalChannelz.SocketOptions socketOptions + = new InternalChannelz.SocketOptions.Builder().build(); + Security security = null; + + @Override + public ListenableFuture getStats() { + SettableFuture ret = SettableFuture.create(); + ret.set( + new SocketStats( + transportStats, + local, + remote, + socketOptions, + security)); + return ret; + } + + @Override + public InternalLogId getLogId() { + return id; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("logId", getLogId()) + .toString(); + } + } + + static final class TestListenSocket implements InternalInstrumented { + private final InternalLogId id = InternalLogId.allocate("listensocket", /*details=*/ null); + SocketAddress listenAddress = new InetSocketAddress("10.0.0.1", 1234); + + @Override + public ListenableFuture getStats() { + SettableFuture ret = SettableFuture.create(); + ret.set( + new SocketStats( + /*data=*/ null, + listenAddress, + /*remote=*/ null, + new SocketOptions.Builder().addOption("listen_option", "listen_option_value").build(), + /*security=*/ null)); + return ret; + } + + @Override + public InternalLogId getLogId() { + return id; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("logId", getLogId()) + .toString(); + } + } + + static final class TestServer implements InternalInstrumented { + private final InternalLogId id = InternalLogId.allocate("server", /*details=*/ null); + ServerStats serverStats = new ServerStats( + /*callsStarted=*/ 1, + /*callsSucceeded=*/ 2, + /*callsFailed=*/ 3, + /*lastCallStartedNanos=*/ 4, + Collections.>emptyList()); + + @Override + public ListenableFuture getStats() { + SettableFuture ret = SettableFuture.create(); + ret.set(serverStats); + return ret; + } + + @Override + public InternalLogId getLogId() { + return id; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("logId", getLogId()) + .toString(); + } + } + + static final class TestChannel implements InternalInstrumented { + private final InternalLogId id = + InternalLogId.allocate("channel-or-subchannel", /*details=*/ null); + + ChannelStats stats = new ChannelStats.Builder() + .setTarget("sometarget") + .setState(ConnectivityState.READY) + .setCallsStarted(1) + .setCallsSucceeded(2) + .setCallsFailed(3) + .setLastCallStartedNanos(4) + .setSubchannels(Collections.emptyList()) + .setSockets(Collections.emptyList()) + .build(); + + @Override + public ListenableFuture getStats() { + SettableFuture ret = SettableFuture.create(); + ret.set(stats); + return ret; + } + + @Override + public InternalLogId getLogId() { + return id; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("logId", getLogId()) + .toString(); + } + } +} diff --git a/services/src/test/java/io/grpc/services/TempFileSinkTest.java b/services/src/test/java/io/grpc/services/TempFileSinkTest.java new file mode 100644 index 00000000000..e0c5e200c58 --- /dev/null +++ b/services/src/test/java/io/grpc/services/TempFileSinkTest.java @@ -0,0 +1,68 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.services; + +import static org.junit.Assert.assertEquals; + +import io.grpc.binarylog.v1.GrpcLogEntry; +import java.io.DataInputStream; +import java.io.FileInputStream; +import java.io.IOException; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link TempFileSink}. + */ +@RunWith(JUnit4.class) +public class TempFileSinkTest { + @Test + public void readMyWrite() throws Exception { + TempFileSink sink = new TempFileSink(); + GrpcLogEntry e1 = GrpcLogEntry.newBuilder() + .setCallId(1234) + .build(); + GrpcLogEntry e2 = GrpcLogEntry.newBuilder() + .setCallId(5678) + .build(); + sink.write(e1); + sink.write(e2); + sink.close(); + + DataInputStream input = new DataInputStream(new FileInputStream(sink.getPath())); + try { + GrpcLogEntry read1 = GrpcLogEntry.parseDelimitedFrom(input); + GrpcLogEntry read2 = GrpcLogEntry.parseDelimitedFrom(input); + + assertEquals(e1, read1); + assertEquals(e2, read2); + assertEquals(-1, input.read()); + } finally { + input.close(); + } + } + + @Test + public void writeAfterCloseIsSilent() throws IOException { + TempFileSink sink = new TempFileSink(); + sink.close(); + sink.write(GrpcLogEntry.newBuilder() + .setCallId(1234) + .build()); + } +}