From d8412abbd131ee5446ceda3b643f20551b766ac5 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Tue, 13 Apr 2021 11:56:57 -0700 Subject: [PATCH] Keep channelz and binlog implementations in-place, while mark them as deprecated and suggest users to migrate new ones in io.grpc.protobuf.services --- .../grpc/services/BinaryLogProviderImpl.java | 93 ++ .../java/io/grpc/services/BinaryLogSink.java | 35 + .../java/io/grpc/services/BinaryLogs.java | 57 ++ .../java/io/grpc/services/BinlogHelper.java | 880 ++++++++++++++++++ .../io/grpc/services/ChannelzProtoUtil.java | 471 ++++++++++ .../io/grpc/services/ChannelzService.java | 237 +++++ .../io/grpc/services/InetAddressUtil.java | 94 ++ .../java/io/grpc/services/TempFileSink.java | 84 ++ 8 files changed, 1951 insertions(+) create mode 100644 services/src/main/java/io/grpc/services/BinaryLogProviderImpl.java create mode 100644 services/src/main/java/io/grpc/services/BinaryLogSink.java create mode 100644 services/src/main/java/io/grpc/services/BinaryLogs.java create mode 100644 services/src/main/java/io/grpc/services/BinlogHelper.java create mode 100644 services/src/main/java/io/grpc/services/ChannelzProtoUtil.java create mode 100644 services/src/main/java/io/grpc/services/ChannelzService.java create mode 100644 services/src/main/java/io/grpc/services/InetAddressUtil.java create mode 100644 services/src/main/java/io/grpc/services/TempFileSink.java diff --git a/services/src/main/java/io/grpc/services/BinaryLogProviderImpl.java b/services/src/main/java/io/grpc/services/BinaryLogProviderImpl.java new file mode 100644 index 00000000000..5c09f8a3ddd --- /dev/null +++ b/services/src/main/java/io/grpc/services/BinaryLogProviderImpl.java @@ -0,0 +1,93 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.services; + +import com.google.common.base.Preconditions; +import io.grpc.CallOptions; +import io.grpc.ClientInterceptor; +import io.grpc.ServerInterceptor; +import io.grpc.protobuf.services.BinaryLogProvider; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.Nullable; + +/** + * The default implementation of a {@link BinaryLogProvider}. + */ +class BinaryLogProviderImpl extends BinaryLogProvider { + // avoid using 0 because proto3 long fields default to 0 when unset + private static final AtomicLong counter = new AtomicLong(1); + + private final BinlogHelper.Factory factory; + private final BinaryLogSink sink; + + public BinaryLogProviderImpl() throws IOException { + this(new TempFileSink(), System.getenv("GRPC_BINARY_LOG_CONFIG")); + } + + /** + * Deprecated and will be removed in a future version of gRPC. + */ + @Deprecated + public BinaryLogProviderImpl(BinaryLogSink sink) throws IOException { + this(sink, System.getenv("GRPC_BINARY_LOG_CONFIG")); + } + + /** + * Creates an instance. + * @param sink ownership is transferred to this class. + * @param configStr config string to parse to determine logged methods and msg size limits. + * @throws IOException if initialization failed. + */ + public BinaryLogProviderImpl(BinaryLogSink sink, String configStr) throws IOException { + this.sink = Preconditions.checkNotNull(sink); + try { + factory = new BinlogHelper.FactoryImpl(sink, configStr); + } catch (RuntimeException e) { + sink.close(); + // parsing the conf string may throw if it is blank or contains errors + throw new IOException( + "Can not initialize. The env variable GRPC_BINARY_LOG_CONFIG must be valid.", e); + } + } + + @Nullable + @Override + public ServerInterceptor getServerInterceptor(String fullMethodName) { + BinlogHelper helperForMethod = factory.getLog(fullMethodName); + if (helperForMethod == null) { + return null; + } + return helperForMethod.getServerInterceptor(counter.getAndIncrement()); + } + + @Nullable + @Override + public ClientInterceptor getClientInterceptor( + String fullMethodName, CallOptions callOptions) { + BinlogHelper helperForMethod = factory.getLog(fullMethodName); + if (helperForMethod == null) { + return null; + } + return helperForMethod.getClientInterceptor(counter.getAndIncrement()); + } + + @Override + public void close() throws IOException { + sink.close(); + } +} diff --git a/services/src/main/java/io/grpc/services/BinaryLogSink.java b/services/src/main/java/io/grpc/services/BinaryLogSink.java new file mode 100644 index 00000000000..af3204eddff --- /dev/null +++ b/services/src/main/java/io/grpc/services/BinaryLogSink.java @@ -0,0 +1,35 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.services; + +import com.google.protobuf.MessageLite; +import io.grpc.ExperimentalApi; +import java.io.Closeable; + +/** + * A class that accepts binary log messages. + * + * @deprecated Use {@link io.grpc.protobuf.services.BinaryLogSink} instead. + */ +@Deprecated +@ExperimentalApi("https://github.com/grpc/grpc-java/issues/4017") +public interface BinaryLogSink extends Closeable { + /** + * Writes the {@code message} to the destination. + */ + void write(MessageLite message); +} diff --git a/services/src/main/java/io/grpc/services/BinaryLogs.java b/services/src/main/java/io/grpc/services/BinaryLogs.java new file mode 100644 index 00000000000..0c4f8b5c7dc --- /dev/null +++ b/services/src/main/java/io/grpc/services/BinaryLogs.java @@ -0,0 +1,57 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.services; + +import io.grpc.BinaryLog; +import io.grpc.ExperimentalApi; +import java.io.IOException; + +/** + * @deprecated Use {@link io.grpc.protobuf.services.BinaryLogs} instead. + */ +@Deprecated +@ExperimentalApi("https://github.com/grpc/grpc-java/issues/4017") +public final class BinaryLogs { + /** + * Creates a binary log that writes to a temp file. Warning: this implementation is + * not performance optimized, and RPCs will experience back pressure if disk IO does not keep + * up. + */ + public static BinaryLog createBinaryLog() throws IOException { + return new BinaryLogProviderImpl(); + } + + /** + * Deprecated and will be removed in a future version of gRPC. + */ + @Deprecated + public static BinaryLog createBinaryLog(BinaryLogSink sink) throws IOException { + return new BinaryLogProviderImpl(sink); + } + + /** + * Creates a binary log with a custom {@link BinaryLogSink} for receiving the logged data, + * and a config string as defined by + * + * A16-binary-logging. + */ + public static BinaryLog createBinaryLog(BinaryLogSink sink, String configStr) throws IOException { + return new BinaryLogProviderImpl(sink, configStr); + } + + private BinaryLogs() {} +} diff --git a/services/src/main/java/io/grpc/services/BinlogHelper.java b/services/src/main/java/io/grpc/services/BinlogHelper.java new file mode 100644 index 00000000000..ba3eef430c1 --- /dev/null +++ b/services/src/main/java/io/grpc/services/BinlogHelper.java @@ -0,0 +1,880 @@ +/* + * Copyright 2017 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.services; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import static io.grpc.protobuf.services.BinaryLogProvider.BYTEARRAY_MARSHALLER; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.protobuf.ByteString; +import com.google.protobuf.Duration; +import com.google.protobuf.util.Durations; +import com.google.protobuf.util.Timestamps; +import io.grpc.Attributes; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.Context; +import io.grpc.Deadline; +import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; +import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; +import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; +import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener; +import io.grpc.Grpc; +import io.grpc.InternalMetadata; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.MethodDescriptor.Marshaller; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.Status; +import io.grpc.binarylog.v1.Address; +import io.grpc.binarylog.v1.GrpcLogEntry; +import io.grpc.binarylog.v1.GrpcLogEntry.EventType; +import io.grpc.binarylog.v1.Message; +import java.net.Inet4Address; +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; + +/** + * A binary log class that is configured for a specific {@link MethodDescriptor}. + */ +@ThreadSafe +final class BinlogHelper { + private static final Logger logger = Logger.getLogger(BinlogHelper.class.getName()); + // Normally 'grpc-' metadata keys are set from within gRPC, and applications are not allowed + // to set them. This key is a special well known key that set from the application layer, but + // represents a com.google.rpc.Status and is given special first class treatment. + // See StatusProto.java + static final Metadata.Key STATUS_DETAILS_KEY = + Metadata.Key.of( + "grpc-status-details-bin", + Metadata.BINARY_BYTE_MARSHALLER); + + @VisibleForTesting + final SinkWriter writer; + + @VisibleForTesting + BinlogHelper(SinkWriter writer) { + this.writer = writer; + } + + // TODO(zpencer): move proto related static helpers into this class + static final class SinkWriterImpl extends SinkWriter { + private final BinaryLogSink sink; + private TimeProvider timeProvider; + private final int maxHeaderBytes; + private final int maxMessageBytes; + + SinkWriterImpl( + BinaryLogSink sink, + TimeProvider timeProvider, + int maxHeaderBytes, + int maxMessageBytes) { + this.sink = sink; + this.timeProvider = timeProvider; + this.maxHeaderBytes = maxHeaderBytes; + this.maxMessageBytes = maxMessageBytes; + } + + GrpcLogEntry.Builder newTimestampedBuilder() { + long epochNanos = timeProvider.currentTimeNanos(); + return GrpcLogEntry.newBuilder().setTimestamp(Timestamps.fromNanos(epochNanos)); + } + + @Override + void logClientHeader( + long seq, + String methodName, + // not all transports have the concept of authority + @Nullable String authority, + @Nullable Duration timeout, + Metadata metadata, + GrpcLogEntry.Logger logger, + long callId, + // null on client side + @Nullable SocketAddress peerAddress) { + Preconditions.checkArgument(methodName != null, "methodName can not be null"); + Preconditions.checkArgument( + !methodName.startsWith("/"), + "in grpc-java method names should not have a leading '/'. However this class will " + + "add one to be consistent with language agnostic conventions."); + Preconditions.checkArgument( + peerAddress == null || logger == GrpcLogEntry.Logger.LOGGER_SERVER, + "peerSocket can only be specified for server"); + + MaybeTruncated pair + = createMetadataProto(metadata, maxHeaderBytes); + io.grpc.binarylog.v1.ClientHeader.Builder clientHeaderBuilder + = io.grpc.binarylog.v1.ClientHeader.newBuilder() + .setMetadata(pair.proto) + .setMethodName("/" + methodName); + if (timeout != null) { + clientHeaderBuilder.setTimeout(timeout); + } + if (authority != null) { + clientHeaderBuilder.setAuthority(authority); + } + + GrpcLogEntry.Builder entryBuilder = newTimestampedBuilder() + .setSequenceIdWithinCall(seq) + .setType(EventType.EVENT_TYPE_CLIENT_HEADER) + .setClientHeader(clientHeaderBuilder) + .setPayloadTruncated(pair.truncated) + .setLogger(logger) + .setCallId(callId); + if (peerAddress != null) { + entryBuilder.setPeer(socketToProto(peerAddress)); + } + sink.write(entryBuilder.build()); + } + + @Override + void logServerHeader( + long seq, + Metadata metadata, + GrpcLogEntry.Logger logger, + long callId, + // null on server + @Nullable SocketAddress peerAddress) { + Preconditions.checkArgument( + peerAddress == null || logger == GrpcLogEntry.Logger.LOGGER_CLIENT, + "peerSocket can only be specified for client"); + MaybeTruncated pair + = createMetadataProto(metadata, maxHeaderBytes); + + GrpcLogEntry.Builder entryBuilder = newTimestampedBuilder() + .setSequenceIdWithinCall(seq) + .setType(EventType.EVENT_TYPE_SERVER_HEADER) + .setServerHeader( + io.grpc.binarylog.v1.ServerHeader.newBuilder() + .setMetadata(pair.proto)) + .setPayloadTruncated(pair.truncated) + .setLogger(logger) + .setCallId(callId); + if (peerAddress != null) { + entryBuilder.setPeer(socketToProto(peerAddress)); + } + sink.write(entryBuilder.build()); + } + + @Override + void logTrailer( + long seq, + Status status, + Metadata metadata, + GrpcLogEntry.Logger logger, + long callId, + // null on server, can be non null on client if this is a trailer-only response + @Nullable SocketAddress peerAddress) { + Preconditions.checkArgument( + peerAddress == null || logger == GrpcLogEntry.Logger.LOGGER_CLIENT, + "peerSocket can only be specified for client"); + MaybeTruncated pair + = createMetadataProto(metadata, maxHeaderBytes); + + io.grpc.binarylog.v1.Trailer.Builder trailerBuilder + = io.grpc.binarylog.v1.Trailer.newBuilder() + .setStatusCode(status.getCode().value()) + .setMetadata(pair.proto); + String statusDescription = status.getDescription(); + if (statusDescription != null) { + trailerBuilder.setStatusMessage(statusDescription); + } + byte[] statusDetailBytes = metadata.get(STATUS_DETAILS_KEY); + if (statusDetailBytes != null) { + trailerBuilder.setStatusDetails(ByteString.copyFrom(statusDetailBytes)); + } + + GrpcLogEntry.Builder entryBuilder = newTimestampedBuilder() + .setSequenceIdWithinCall(seq) + .setType(EventType.EVENT_TYPE_SERVER_TRAILER) + .setTrailer(trailerBuilder) + .setPayloadTruncated(pair.truncated) + .setLogger(logger) + .setCallId(callId); + if (peerAddress != null) { + entryBuilder.setPeer(socketToProto(peerAddress)); + } + sink.write(entryBuilder.build()); + } + + @Override + void logRpcMessage( + long seq, + EventType eventType, + Marshaller marshaller, + T message, + GrpcLogEntry.Logger logger, + long callId) { + Preconditions.checkArgument( + eventType == EventType.EVENT_TYPE_CLIENT_MESSAGE + || eventType == EventType.EVENT_TYPE_SERVER_MESSAGE, + "event type must correspond to client message or server message"); + if (marshaller != BYTEARRAY_MARSHALLER) { + throw new IllegalStateException("Expected the BinaryLog's ByteArrayMarshaller"); + } + MaybeTruncated pair = createMessageProto((byte[]) message, maxMessageBytes); + GrpcLogEntry.Builder entryBuilder = newTimestampedBuilder() + .setSequenceIdWithinCall(seq) + .setType(eventType) + .setMessage(pair.proto) + .setPayloadTruncated(pair.truncated) + .setLogger(logger) + .setCallId(callId); + sink.write(entryBuilder.build()); + } + + @Override + void logHalfClose(long seq, GrpcLogEntry.Logger logger, long callId) { + sink.write( + newTimestampedBuilder() + .setSequenceIdWithinCall(seq) + .setType(EventType.EVENT_TYPE_CLIENT_HALF_CLOSE) + .setLogger(logger) + .setCallId(callId) + .build()); + } + + @Override + void logCancel(long seq, GrpcLogEntry.Logger logger, long callId) { + sink.write( + newTimestampedBuilder() + .setSequenceIdWithinCall(seq) + .setType(EventType.EVENT_TYPE_CANCEL) + .setLogger(logger) + .setCallId(callId) + .build()); + } + + @Override + int getMaxHeaderBytes() { + return maxHeaderBytes; + } + + @Override + int getMaxMessageBytes() { + return maxMessageBytes; + } + } + + abstract static class SinkWriter { + /** + * Logs the client header. This method logs the appropriate number of bytes + * as determined by the binary logging configuration. + */ + abstract void logClientHeader( + long seq, + String methodName, + // not all transports have the concept of authority + @Nullable String authority, + @Nullable Duration timeout, + Metadata metadata, + GrpcLogEntry.Logger logger, + long callId, + // null on client side + @Nullable SocketAddress peerAddress); + + /** + * Logs the server header. This method logs the appropriate number of bytes + * as determined by the binary logging configuration. + */ + abstract void logServerHeader( + long seq, + Metadata metadata, + GrpcLogEntry.Logger logger, + long callId, + // null on server + @Nullable SocketAddress peerAddress); + + /** + * Logs the server trailer. This method logs the appropriate number of bytes + * as determined by the binary logging configuration. + */ + abstract void logTrailer( + long seq, + Status status, + Metadata metadata, + GrpcLogEntry.Logger logger, + long callId, + // null on server, can be non null on client if this is a trailer-only response + @Nullable SocketAddress peerAddress); + + /** + * Logs the message message. The number of bytes logged is determined by the binary + * logging configuration. + */ + abstract void logRpcMessage( + long seq, + EventType eventType, + Marshaller marshaller, + T message, + GrpcLogEntry.Logger logger, + long callId); + + abstract void logHalfClose(long seq, GrpcLogEntry.Logger logger, long callId); + + /** + * Logs the cancellation. + */ + abstract void logCancel(long seq, GrpcLogEntry.Logger logger, long callId); + + /** + * Returns the number bytes of the header this writer will log, according to configuration. + */ + abstract int getMaxHeaderBytes(); + + /** + * Returns the number bytes of the message this writer will log, according to configuration. + */ + abstract int getMaxMessageBytes(); + } + + static SocketAddress getPeerSocket(Attributes streamAttributes) { + return streamAttributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); + } + + private static Deadline min(@Nullable Deadline deadline0, @Nullable Deadline deadline1) { + if (deadline0 == null) { + return deadline1; + } + if (deadline1 == null) { + return deadline0; + } + return deadline0.minimum(deadline1); + } + + interface TimeProvider { + /** Returns the current nano time. */ + long currentTimeNanos(); + + TimeProvider SYSTEM_TIME_PROVIDER = new TimeProvider() { + @Override + public long currentTimeNanos() { + return TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()); + } + }; + } + + + public ClientInterceptor getClientInterceptor(final long callId) { + return new ClientInterceptor() { + boolean trailersOnlyResponse = true; + @Override + public ClientCall interceptCall( + final MethodDescriptor method, CallOptions callOptions, Channel next) { + final AtomicLong seq = new AtomicLong(1); + final String methodName = method.getFullMethodName(); + final String authority = next.authority(); + // The timeout should reflect the time remaining when the call is started, so do not + // compute remaining time here. + final Deadline deadline = min(callOptions.getDeadline(), Context.current().getDeadline()); + + return new SimpleForwardingClientCall(next.newCall(method, callOptions)) { + @Override + public void start(final ClientCall.Listener responseListener, Metadata headers) { + final Duration timeout = deadline == null ? null + : Durations.fromNanos(deadline.timeRemaining(TimeUnit.NANOSECONDS)); + writer.logClientHeader( + seq.getAndIncrement(), + methodName, + authority, + timeout, + headers, + GrpcLogEntry.Logger.LOGGER_CLIENT, + callId, + /*peerAddress=*/ null); + ClientCall.Listener wListener = + new SimpleForwardingClientCallListener(responseListener) { + @Override + public void onMessage(RespT message) { + writer.logRpcMessage( + seq.getAndIncrement(), + EventType.EVENT_TYPE_SERVER_MESSAGE, + method.getResponseMarshaller(), + message, + GrpcLogEntry.Logger.LOGGER_CLIENT, + callId); + super.onMessage(message); + } + + @Override + public void onHeaders(Metadata headers) { + trailersOnlyResponse = false; + writer.logServerHeader( + seq.getAndIncrement(), + headers, + GrpcLogEntry.Logger.LOGGER_CLIENT, + callId, + getPeerSocket(getAttributes())); + super.onHeaders(headers); + } + + @Override + public void onClose(Status status, Metadata trailers) { + SocketAddress peer = trailersOnlyResponse + ? getPeerSocket(getAttributes()) : null; + writer.logTrailer( + seq.getAndIncrement(), + status, + trailers, + GrpcLogEntry.Logger.LOGGER_CLIENT, + callId, + peer); + super.onClose(status, trailers); + } + }; + super.start(wListener, headers); + } + + @Override + public void sendMessage(ReqT message) { + writer.logRpcMessage( + seq.getAndIncrement(), + EventType.EVENT_TYPE_CLIENT_MESSAGE, + method.getRequestMarshaller(), + message, + GrpcLogEntry.Logger.LOGGER_CLIENT, + callId); + super.sendMessage(message); + } + + @Override + public void halfClose() { + writer.logHalfClose( + seq.getAndIncrement(), + GrpcLogEntry.Logger.LOGGER_CLIENT, + callId); + super.halfClose(); + } + + @Override + public void cancel(String message, Throwable cause) { + writer.logCancel( + seq.getAndIncrement(), + GrpcLogEntry.Logger.LOGGER_CLIENT, + callId); + super.cancel(message, cause); + } + }; + } + }; + } + + public ServerInterceptor getServerInterceptor(final long callId) { + return new ServerInterceptor() { + @Override + public ServerCall.Listener interceptCall( + final ServerCall call, + Metadata headers, + ServerCallHandler next) { + final AtomicLong seq = new AtomicLong(1); + SocketAddress peer = getPeerSocket(call.getAttributes()); + String methodName = call.getMethodDescriptor().getFullMethodName(); + String authority = call.getAuthority(); + Deadline deadline = Context.current().getDeadline(); + final Duration timeout = deadline == null ? null + : Durations.fromNanos(deadline.timeRemaining(TimeUnit.NANOSECONDS)); + + writer.logClientHeader( + seq.getAndIncrement(), + methodName, + authority, + timeout, + headers, + GrpcLogEntry.Logger.LOGGER_SERVER, + callId, + peer); + ServerCall wCall = new SimpleForwardingServerCall(call) { + @Override + public void sendMessage(RespT message) { + writer.logRpcMessage( + seq.getAndIncrement(), + EventType.EVENT_TYPE_SERVER_MESSAGE, + call.getMethodDescriptor().getResponseMarshaller(), + message, + GrpcLogEntry.Logger.LOGGER_SERVER, + callId); + super.sendMessage(message); + } + + @Override + public void sendHeaders(Metadata headers) { + writer.logServerHeader( + seq.getAndIncrement(), + headers, + GrpcLogEntry.Logger.LOGGER_SERVER, + callId, + /*peerAddress=*/ null); + super.sendHeaders(headers); + } + + @Override + public void close(Status status, Metadata trailers) { + writer.logTrailer( + seq.getAndIncrement(), + status, + trailers, + GrpcLogEntry.Logger.LOGGER_SERVER, + callId, + /*peerAddress=*/ null); + super.close(status, trailers); + } + }; + + return new SimpleForwardingServerCallListener(next.startCall(wCall, headers)) { + @Override + public void onMessage(ReqT message) { + writer.logRpcMessage( + seq.getAndIncrement(), + EventType.EVENT_TYPE_CLIENT_MESSAGE, + call.getMethodDescriptor().getRequestMarshaller(), + message, + GrpcLogEntry.Logger.LOGGER_SERVER, + callId); + super.onMessage(message); + } + + @Override + public void onHalfClose() { + writer.logHalfClose( + seq.getAndIncrement(), + GrpcLogEntry.Logger.LOGGER_SERVER, + callId); + super.onHalfClose(); + } + + @Override + public void onCancel() { + writer.logCancel( + seq.getAndIncrement(), + GrpcLogEntry.Logger.LOGGER_SERVER, + callId); + super.onCancel(); + } + }; + } + }; + } + + interface Factory { + @Nullable + BinlogHelper getLog(String fullMethodName); + } + + static final class FactoryImpl implements Factory { + private final BinlogHelper globalLog; + private final Map perServiceLogs; + private final Map perMethodLogs; + private final Set blacklistedMethods; + + /** + * Accepts a string in the format specified by the binary log spec. + */ + @VisibleForTesting + FactoryImpl(BinaryLogSink sink, String configurationString) { + checkNotNull(sink, "sink"); + BinlogHelper globalLog = null; + Map perServiceLogs = new HashMap<>(); + Map perMethodLogs = new HashMap<>(); + Set blacklistedMethods = new HashSet<>(); + if (configurationString != null && configurationString.length() > 0) { + for (String configuration : Splitter.on(',').split(configurationString)) { + int leftCurly = configuration.indexOf('{'); + // '*' for global, 'service/*' for service glob, or 'service/method' for fully qualified + String methodOrSvc; + // An expression originally wrapped in curly braces; like {m:256,h:256}, {m:256}, {h:256} + String binlogOptionStr; + if (leftCurly == -1) { + methodOrSvc = configuration; + binlogOptionStr = null; + } else { + int rightCurly = configuration.indexOf('}', leftCurly); + if (rightCurly != configuration.length() - 1) { + throw new IllegalArgumentException("Illegal log config pattern: " + configuration); + } + methodOrSvc = configuration.substring(0, leftCurly); + // option without the curly braces + binlogOptionStr = configuration.substring(leftCurly + 1, configuration.length() - 1); + } + if (methodOrSvc.isEmpty()) { + throw new IllegalArgumentException("Illegal log config pattern: " + configuration); + } + if (methodOrSvc.equals("*")) { + // parse config for "*" + checkState( + globalLog == null, + "Duplicate entry, this is fatal: " + configuration); + globalLog = createBinaryLog(sink, binlogOptionStr); + logger.log(Level.INFO, "Global binlog: {0}", binlogOptionStr); + } else if (isServiceGlob(methodOrSvc)) { + // parse config for a service, e.g. "service/*" + String service = MethodDescriptor.extractFullServiceName(methodOrSvc); + checkState( + !perServiceLogs.containsKey(service), + "Duplicate entry, this is fatal: " + configuration); + perServiceLogs.put(service, createBinaryLog(sink, binlogOptionStr)); + logger.log( + Level.INFO, + "Service binlog: service={0} config={1}", + new Object[] {service, binlogOptionStr}); + } else if (methodOrSvc.startsWith("-")) { + // parse config for a method, e.g. "-service/method" + String blacklistedMethod = methodOrSvc.substring(1); + if (blacklistedMethod.length() == 0) { + continue; + } + checkState( + !blacklistedMethods.contains(blacklistedMethod), + "Duplicate entry, this is fatal: " + configuration); + checkState( + !perMethodLogs.containsKey(blacklistedMethod), + "Duplicate entry, this is fatal: " + configuration); + blacklistedMethods.add(blacklistedMethod); + } else { + // parse config for a fully qualified method, e.g "serice/method" + checkState( + !perMethodLogs.containsKey(methodOrSvc), + "Duplicate entry, this is fatal: " + configuration); + checkState( + !blacklistedMethods.contains(methodOrSvc), + "Duplicate entry, this method was blacklisted: " + configuration); + perMethodLogs.put(methodOrSvc, createBinaryLog(sink, binlogOptionStr)); + logger.log( + Level.INFO, + "Method binlog: method={0} config={1}", + new Object[] {methodOrSvc, binlogOptionStr}); + } + } + } + this.globalLog = globalLog; + this.perServiceLogs = Collections.unmodifiableMap(perServiceLogs); + this.perMethodLogs = Collections.unmodifiableMap(perMethodLogs); + this.blacklistedMethods = Collections.unmodifiableSet(blacklistedMethods); + } + + /** + * Accepts a full method name and returns the log that should be used. + */ + @Override + public BinlogHelper getLog(String fullMethodName) { + if (blacklistedMethods.contains(fullMethodName)) { + return null; + } + BinlogHelper methodLog = perMethodLogs.get(fullMethodName); + if (methodLog != null) { + return methodLog; + } + BinlogHelper serviceLog = perServiceLogs.get( + MethodDescriptor.extractFullServiceName(fullMethodName)); + if (serviceLog != null) { + return serviceLog; + } + return globalLog; + } + + /** + * Returns a binlog with the correct header and message limits or {@code null} if the input + * is malformed. The input should be a string that is in one of these forms: + * + *

{@code {h(:\d+)?}, {m(:\d+)?}, {h(:\d+)?,m(:\d+)?}} + * + *

If the {@code logConfig} is null, the returned binlog will have a limit of + * Integer.MAX_VALUE. + */ + @VisibleForTesting + @Nullable + static BinlogHelper createBinaryLog(BinaryLogSink sink, @Nullable String logConfig) { + if (logConfig == null) { + return new BinlogHelper( + new SinkWriterImpl( + sink, TimeProvider.SYSTEM_TIME_PROVIDER, Integer.MAX_VALUE, Integer.MAX_VALUE)); + } + try { + final int maxHeaderBytes; + final int maxMsgBytes; + String[] parts = logConfig.split(";", 2); + if (parts.length == 2) { + if (!(parts[0].startsWith("h") && parts[1].startsWith("m"))) { + throw new IllegalArgumentException("Illegal log config pattern"); + } + maxHeaderBytes = optionalInt(parts[0].substring(1)); + maxMsgBytes = optionalInt(parts[1].substring(1)); + } else if (parts[0].startsWith("h")) { + maxHeaderBytes = optionalInt(parts[0].substring(1)); + maxMsgBytes = 0; + } else if (parts[0].startsWith("m")) { + maxHeaderBytes = 0; + maxMsgBytes = optionalInt(parts[0].substring(1)); + } else { + throw new IllegalArgumentException("Illegal log config pattern"); + } + return new BinlogHelper( + new SinkWriterImpl( + sink, TimeProvider.SYSTEM_TIME_PROVIDER, maxHeaderBytes, maxMsgBytes)); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Illegal log config pattern"); + } + } + + /** Returns {@code s}, after verifying it contains only digits. */ + static String checkDigits(String s) { + for (int i = 0; i < s.length(); i++) { + char c = s.charAt(i); + if (c < '0' || '9' < c) { + throw new IllegalArgumentException("Illegal log config pattern"); + } + } + return s; + } + + /** Parses the optional int of the form "" (max int) or ":123" (123). */ + static int optionalInt(String s) { + if (s.isEmpty()) { + return Integer.MAX_VALUE; + } + if (!s.startsWith(":")) { + throw new IllegalArgumentException("Illegal log config pattern"); + } + s = checkDigits(s.substring(1)); + return Integer.parseInt(s); + } + + /** + * Returns true if the input string is a glob of the form: {@code /*}. + */ + static boolean isServiceGlob(String input) { + return input.endsWith("/*"); + } + } + + @VisibleForTesting + static Address socketToProto(SocketAddress address) { + checkNotNull(address, "address"); + + Address.Builder builder = Address.newBuilder(); + if (address instanceof InetSocketAddress) { + InetAddress inetAddress = ((InetSocketAddress) address).getAddress(); + if (inetAddress instanceof Inet4Address) { + builder.setType(Address.Type.TYPE_IPV4) + .setAddress(InetAddressUtil.toAddrString(inetAddress)); + } else if (inetAddress instanceof Inet6Address) { + builder.setType(Address.Type.TYPE_IPV6) + .setAddress(InetAddressUtil.toAddrString(inetAddress)); + } else { + logger.log(Level.SEVERE, "unknown type of InetSocketAddress: {}", address); + builder.setAddress(address.toString()); + } + builder.setIpPort(((InetSocketAddress) address).getPort()); + } else if (address.getClass().getName().equals("io.netty.channel.unix.DomainSocketAddress")) { + // To avoid a compile time dependency on grpc-netty, we check against the runtime class name. + builder.setType(Address.Type.TYPE_UNIX) + .setAddress(address.toString()); + } else { + builder.setType(Address.Type.TYPE_UNKNOWN).setAddress(address.toString()); + } + return builder.build(); + } + + private static final Set NEVER_INCLUDED_METADATA = new HashSet<>( + Collections.singletonList( + // grpc-status-details-bin is already logged in a field of the binlog proto + STATUS_DETAILS_KEY.name())); + private static final Set ALWAYS_INCLUDED_METADATA = new HashSet<>( + Collections.singletonList( + "grpc-trace-bin")); + + static final class MaybeTruncated { + T proto; + boolean truncated; + + private MaybeTruncated(T proto, boolean truncated) { + this.proto = proto; + this.truncated = truncated; + } + } + + @VisibleForTesting + static MaybeTruncated createMetadataProto( + Metadata metadata, int maxHeaderBytes) { + checkNotNull(metadata, "metadata"); + checkArgument(maxHeaderBytes >= 0, "maxHeaderBytes must be non negative"); + io.grpc.binarylog.v1.Metadata.Builder metaBuilder = io.grpc.binarylog.v1.Metadata.newBuilder(); + // This code is tightly coupled with Metadata's implementation + byte[][] serialized = InternalMetadata.serialize(metadata); + boolean truncated = false; + if (serialized != null) { + int curBytes = 0; + for (int i = 0; i < serialized.length; i += 2) { + String key = new String(serialized[i], Charsets.UTF_8); + byte[] value = serialized[i + 1]; + if (NEVER_INCLUDED_METADATA.contains(key)) { + continue; + } + boolean forceInclude = ALWAYS_INCLUDED_METADATA.contains(key); + int bytesAfterAdd = curBytes + key.length() + value.length; + if (!forceInclude && bytesAfterAdd > maxHeaderBytes) { + truncated = true; + continue; + } + metaBuilder.addEntryBuilder() + .setKey(key) + .setValue(ByteString.copyFrom(value)); + if (!forceInclude) { + // force included keys do not count towards the size limit + curBytes = bytesAfterAdd; + } + } + } + return new MaybeTruncated<>(metaBuilder, truncated); + } + + @VisibleForTesting + static MaybeTruncated createMessageProto( + byte[] message, int maxMessageBytes) { + checkNotNull(message, "message"); + checkArgument(maxMessageBytes >= 0, "maxMessageBytes must be non negative"); + Message.Builder msgBuilder = Message + .newBuilder() + .setLength(message.length); + if (maxMessageBytes > 0) { + int desiredBytes = Math.min(maxMessageBytes, message.length); + msgBuilder.setData(ByteString.copyFrom(message, 0, desiredBytes)); + } + return new MaybeTruncated<>(msgBuilder, maxMessageBytes < message.length); + } +} diff --git a/services/src/main/java/io/grpc/services/ChannelzProtoUtil.java b/services/src/main/java/io/grpc/services/ChannelzProtoUtil.java new file mode 100644 index 00000000000..349995d9f80 --- /dev/null +++ b/services/src/main/java/io/grpc/services/ChannelzProtoUtil.java @@ -0,0 +1,471 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.services; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.protobuf.Any; +import com.google.protobuf.ByteString; +import com.google.protobuf.Int64Value; +import com.google.protobuf.util.Durations; +import com.google.protobuf.util.Timestamps; +import io.grpc.ConnectivityState; +import io.grpc.InternalChannelz; +import io.grpc.InternalChannelz.ChannelStats; +import io.grpc.InternalChannelz.ChannelTrace.Event; +import io.grpc.InternalChannelz.RootChannelList; +import io.grpc.InternalChannelz.ServerList; +import io.grpc.InternalChannelz.ServerSocketsList; +import io.grpc.InternalChannelz.ServerStats; +import io.grpc.InternalChannelz.SocketStats; +import io.grpc.InternalChannelz.TransportStats; +import io.grpc.InternalInstrumented; +import io.grpc.InternalWithLogId; +import io.grpc.Status; +import io.grpc.channelz.v1.Address; +import io.grpc.channelz.v1.Address.OtherAddress; +import io.grpc.channelz.v1.Address.TcpIpAddress; +import io.grpc.channelz.v1.Address.UdsAddress; +import io.grpc.channelz.v1.Channel; +import io.grpc.channelz.v1.ChannelConnectivityState; +import io.grpc.channelz.v1.ChannelConnectivityState.State; +import io.grpc.channelz.v1.ChannelData; +import io.grpc.channelz.v1.ChannelRef; +import io.grpc.channelz.v1.ChannelTrace; +import io.grpc.channelz.v1.ChannelTraceEvent; +import io.grpc.channelz.v1.ChannelTraceEvent.Severity; +import io.grpc.channelz.v1.GetServerSocketsResponse; +import io.grpc.channelz.v1.GetServersResponse; +import io.grpc.channelz.v1.GetTopChannelsResponse; +import io.grpc.channelz.v1.Security; +import io.grpc.channelz.v1.Security.OtherSecurity; +import io.grpc.channelz.v1.Security.Tls; +import io.grpc.channelz.v1.Server; +import io.grpc.channelz.v1.ServerData; +import io.grpc.channelz.v1.ServerRef; +import io.grpc.channelz.v1.Socket; +import io.grpc.channelz.v1.SocketData; +import io.grpc.channelz.v1.SocketOption; +import io.grpc.channelz.v1.SocketOptionLinger; +import io.grpc.channelz.v1.SocketOptionTcpInfo; +import io.grpc.channelz.v1.SocketOptionTimeout; +import io.grpc.channelz.v1.SocketRef; +import io.grpc.channelz.v1.Subchannel; +import io.grpc.channelz.v1.SubchannelRef; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.security.cert.CertificateEncodingException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A static utility class for turning internal data structures into protos. + */ +final class ChannelzProtoUtil { + private static final Logger logger = Logger.getLogger(ChannelzProtoUtil.class.getName()); + + private ChannelzProtoUtil() { + // do not instantiate. + } + + static ChannelRef toChannelRef(InternalWithLogId obj) { + return ChannelRef + .newBuilder() + .setChannelId(obj.getLogId().getId()) + .setName(obj.toString()) + .build(); + } + + static SubchannelRef toSubchannelRef(InternalWithLogId obj) { + return SubchannelRef + .newBuilder() + .setSubchannelId(obj.getLogId().getId()) + .setName(obj.toString()) + .build(); + } + + static ServerRef toServerRef(InternalWithLogId obj) { + return ServerRef + .newBuilder() + .setServerId(obj.getLogId().getId()) + .setName(obj.toString()) + .build(); + } + + static SocketRef toSocketRef(InternalWithLogId obj) { + return SocketRef + .newBuilder() + .setSocketId(obj.getLogId().getId()) + .setName(obj.toString()) + .build(); + } + + static Server toServer(InternalInstrumented obj) { + ServerStats stats = getFuture(obj.getStats()); + Server.Builder builder = Server + .newBuilder() + .setRef(toServerRef(obj)) + .setData(toServerData(stats)); + for (InternalInstrumented listenSocket : stats.listenSockets) { + builder.addListenSocket(toSocketRef(listenSocket)); + } + return builder.build(); + } + + static ServerData toServerData(ServerStats stats) { + return ServerData + .newBuilder() + .setCallsStarted(stats.callsStarted) + .setCallsSucceeded(stats.callsSucceeded) + .setCallsFailed(stats.callsFailed) + .setLastCallStartedTimestamp(Timestamps.fromNanos(stats.lastCallStartedNanos)) + .build(); + } + + static Security toSecurity(InternalChannelz.Security security) { + Preconditions.checkNotNull(security); + Preconditions.checkState( + security.tls != null ^ security.other != null, + "one of tls or othersecurity must be non null"); + if (security.tls != null) { + Tls.Builder tlsBuilder + = Tls.newBuilder().setStandardName(security.tls.cipherSuiteStandardName); + try { + if (security.tls.localCert != null) { + tlsBuilder.setLocalCertificate(ByteString.copyFrom( + security.tls.localCert.getEncoded())); + } + if (security.tls.remoteCert != null) { + tlsBuilder.setRemoteCertificate(ByteString.copyFrom( + security.tls.remoteCert.getEncoded())); + } + } catch (CertificateEncodingException e) { + logger.log(Level.FINE, "Caught exception", e); + } + return Security.newBuilder().setTls(tlsBuilder).build(); + } else { + OtherSecurity.Builder builder = OtherSecurity.newBuilder().setName(security.other.name); + if (security.other.any != null) { + builder.setValue((Any) security.other.any); + } + return Security.newBuilder().setOther(builder).build(); + } + } + + static Socket toSocket(InternalInstrumented obj) { + SocketStats socketStats = getFuture(obj.getStats()); + Socket.Builder builder = Socket.newBuilder() + .setRef(toSocketRef(obj)) + .setLocal(toAddress(socketStats.local)); + if (socketStats.security != null) { + builder.setSecurity(toSecurity(socketStats.security)); + } + // listen sockets do not have remote nor data + if (socketStats.remote != null) { + builder.setRemote(toAddress(socketStats.remote)); + } + builder.setData(extractSocketData(socketStats)); + return builder.build(); + } + + static Address toAddress(SocketAddress address) { + Preconditions.checkNotNull(address); + Address.Builder builder = Address.newBuilder(); + if (address instanceof InetSocketAddress) { + InetSocketAddress inetAddress = (InetSocketAddress) address; + builder.setTcpipAddress( + TcpIpAddress + .newBuilder() + .setIpAddress( + ByteString.copyFrom(inetAddress.getAddress().getAddress())) + .setPort(inetAddress.getPort()) + .build()); + } else if (address.getClass().getName().endsWith("io.netty.channel.unix.DomainSocketAddress")) { + builder.setUdsAddress( + UdsAddress + .newBuilder() + .setFilename(address.toString()) // DomainSocketAddress.toString returns filename + .build()); + } else { + builder.setOtherAddress(OtherAddress.newBuilder().setName(address.toString()).build()); + } + return builder.build(); + } + + static SocketData extractSocketData(SocketStats socketStats) { + SocketData.Builder builder = SocketData.newBuilder(); + if (socketStats.data != null) { + TransportStats s = socketStats.data; + builder + .setStreamsStarted(s.streamsStarted) + .setStreamsSucceeded(s.streamsSucceeded) + .setStreamsFailed(s.streamsFailed) + .setMessagesSent(s.messagesSent) + .setMessagesReceived(s.messagesReceived) + .setKeepAlivesSent(s.keepAlivesSent) + .setLastLocalStreamCreatedTimestamp( + Timestamps.fromNanos(s.lastLocalStreamCreatedTimeNanos)) + .setLastRemoteStreamCreatedTimestamp( + Timestamps.fromNanos(s.lastRemoteStreamCreatedTimeNanos)) + .setLastMessageSentTimestamp( + Timestamps.fromNanos(s.lastMessageSentTimeNanos)) + .setLastMessageReceivedTimestamp( + Timestamps.fromNanos(s.lastMessageReceivedTimeNanos)) + .setLocalFlowControlWindow( + Int64Value.of(s.localFlowControlWindow)) + .setRemoteFlowControlWindow( + Int64Value.of(s.remoteFlowControlWindow)); + } + builder.addAllOption(toSocketOptionsList(socketStats.socketOptions)); + return builder.build(); + } + + public static final String SO_LINGER = "SO_LINGER"; + public static final String SO_TIMEOUT = "SO_TIMEOUT"; + public static final String TCP_INFO = "TCP_INFO"; + + static SocketOption toSocketOptionLinger(int lingerSeconds) { + final SocketOptionLinger lingerOpt; + if (lingerSeconds >= 0) { + lingerOpt = SocketOptionLinger + .newBuilder() + .setActive(true) + .setDuration(Durations.fromSeconds(lingerSeconds)) + .build(); + } else { + lingerOpt = SocketOptionLinger.getDefaultInstance(); + } + return SocketOption + .newBuilder() + .setName(SO_LINGER) + .setAdditional(Any.pack(lingerOpt)) + .build(); + } + + static SocketOption toSocketOptionTimeout(String name, int timeoutMillis) { + Preconditions.checkNotNull(name); + return SocketOption + .newBuilder() + .setName(name) + .setAdditional( + Any.pack( + SocketOptionTimeout + .newBuilder() + .setDuration(Durations.fromMillis(timeoutMillis)) + .build())) + .build(); + } + + static SocketOption toSocketOptionTcpInfo(InternalChannelz.TcpInfo i) { + SocketOptionTcpInfo tcpInfo = SocketOptionTcpInfo.newBuilder() + .setTcpiState(i.state) + .setTcpiCaState(i.caState) + .setTcpiRetransmits(i.retransmits) + .setTcpiProbes(i.probes) + .setTcpiBackoff(i.backoff) + .setTcpiOptions(i.options) + .setTcpiSndWscale(i.sndWscale) + .setTcpiRcvWscale(i.rcvWscale) + .setTcpiRto(i.rto) + .setTcpiAto(i.ato) + .setTcpiSndMss(i.sndMss) + .setTcpiRcvMss(i.rcvMss) + .setTcpiUnacked(i.unacked) + .setTcpiSacked(i.sacked) + .setTcpiLost(i.lost) + .setTcpiRetrans(i.retrans) + .setTcpiFackets(i.fackets) + .setTcpiLastDataSent(i.lastDataSent) + .setTcpiLastAckSent(i.lastAckSent) + .setTcpiLastDataRecv(i.lastDataRecv) + .setTcpiLastAckRecv(i.lastAckRecv) + .setTcpiPmtu(i.pmtu) + .setTcpiRcvSsthresh(i.rcvSsthresh) + .setTcpiRtt(i.rtt) + .setTcpiRttvar(i.rttvar) + .setTcpiSndSsthresh(i.sndSsthresh) + .setTcpiSndCwnd(i.sndCwnd) + .setTcpiAdvmss(i.advmss) + .setTcpiReordering(i.reordering) + .build(); + return SocketOption + .newBuilder() + .setName(TCP_INFO) + .setAdditional(Any.pack(tcpInfo)) + .build(); + } + + static SocketOption toSocketOptionAdditional(String name, String value) { + Preconditions.checkNotNull(name); + Preconditions.checkNotNull(value); + return SocketOption.newBuilder().setName(name).setValue(value).build(); + } + + static List toSocketOptionsList(InternalChannelz.SocketOptions options) { + Preconditions.checkNotNull(options); + List ret = new ArrayList<>(); + if (options.lingerSeconds != null) { + ret.add(toSocketOptionLinger(options.lingerSeconds)); + } + if (options.soTimeoutMillis != null) { + ret.add(toSocketOptionTimeout(SO_TIMEOUT, options.soTimeoutMillis)); + } + if (options.tcpInfo != null) { + ret.add(toSocketOptionTcpInfo(options.tcpInfo)); + } + for (Map.Entry entry : options.others.entrySet()) { + ret.add(toSocketOptionAdditional(entry.getKey(), entry.getValue())); + } + return ret; + } + + static Channel toChannel(InternalInstrumented channel) { + ChannelStats stats = getFuture(channel.getStats()); + Channel.Builder channelBuilder = Channel + .newBuilder() + .setRef(toChannelRef(channel)) + .setData(extractChannelData(stats)); + for (InternalWithLogId subchannel : stats.subchannels) { + channelBuilder.addSubchannelRef(toSubchannelRef(subchannel)); + } + + return channelBuilder.build(); + } + + static ChannelData extractChannelData(InternalChannelz.ChannelStats stats) { + ChannelData.Builder builder = ChannelData.newBuilder(); + builder.setTarget(stats.target) + .setState(toChannelConnectivityState(stats.state)) + .setCallsStarted(stats.callsStarted) + .setCallsSucceeded(stats.callsSucceeded) + .setCallsFailed(stats.callsFailed) + .setLastCallStartedTimestamp(Timestamps.fromNanos(stats.lastCallStartedNanos)); + if (stats.channelTrace != null) { + builder.setTrace(toChannelTrace(stats.channelTrace)); + } + return builder.build(); + } + + static ChannelConnectivityState toChannelConnectivityState(ConnectivityState s) { + return ChannelConnectivityState.newBuilder().setState(toState(s)).build(); + } + + private static ChannelTrace toChannelTrace(InternalChannelz.ChannelTrace channelTrace) { + return ChannelTrace.newBuilder() + .setNumEventsLogged(channelTrace.numEventsLogged) + .setCreationTimestamp(Timestamps.fromNanos(channelTrace.creationTimeNanos)) + .addAllEvents(toChannelTraceEvents(channelTrace.events)) + .build(); + } + + private static List toChannelTraceEvents(List events) { + List channelTraceEvents = new ArrayList<>(); + for (Event event : events) { + ChannelTraceEvent.Builder builder = ChannelTraceEvent.newBuilder() + .setDescription(event.description) + .setSeverity(Severity.valueOf(event.severity.name())) + .setTimestamp(Timestamps.fromNanos(event.timestampNanos)); + if (event.channelRef != null) { + builder.setChannelRef(toChannelRef(event.channelRef)); + } + if (event.subchannelRef != null) { + builder.setSubchannelRef(toSubchannelRef(event.subchannelRef)); + } + channelTraceEvents.add(builder.build()); + } + return Collections.unmodifiableList(channelTraceEvents); + } + + static State toState(ConnectivityState state) { + if (state == null) { + return State.UNKNOWN; + } + try { + return Enum.valueOf(State.class, state.name()); + } catch (IllegalArgumentException e) { + return State.UNKNOWN; + } + } + + static Subchannel toSubchannel(InternalInstrumented subchannel) { + ChannelStats stats = getFuture(subchannel.getStats()); + Subchannel.Builder subchannelBuilder = Subchannel + .newBuilder() + .setRef(toSubchannelRef(subchannel)) + .setData(extractChannelData(stats)); + Preconditions.checkState(stats.sockets.isEmpty() || stats.subchannels.isEmpty()); + for (InternalWithLogId childSocket : stats.sockets) { + subchannelBuilder.addSocketRef(toSocketRef(childSocket)); + } + for (InternalWithLogId childSubchannel : stats.subchannels) { + subchannelBuilder.addSubchannelRef(toSubchannelRef(childSubchannel)); + } + return subchannelBuilder.build(); + } + + static GetTopChannelsResponse toGetTopChannelResponse(RootChannelList rootChannels) { + GetTopChannelsResponse.Builder responseBuilder = GetTopChannelsResponse + .newBuilder() + .setEnd(rootChannels.end); + for (InternalInstrumented c : rootChannels.channels) { + responseBuilder.addChannel(ChannelzProtoUtil.toChannel(c)); + } + return responseBuilder.build(); + } + + static GetServersResponse toGetServersResponse(ServerList servers) { + GetServersResponse.Builder responseBuilder = GetServersResponse + .newBuilder() + .setEnd(servers.end); + for (InternalInstrumented s : servers.servers) { + responseBuilder.addServer(ChannelzProtoUtil.toServer(s)); + } + return responseBuilder.build(); + } + + static GetServerSocketsResponse toGetServerSocketsResponse(ServerSocketsList serverSockets) { + GetServerSocketsResponse.Builder responseBuilder = GetServerSocketsResponse + .newBuilder() + .setEnd(serverSockets.end); + for (InternalWithLogId s : serverSockets.sockets) { + responseBuilder.addSocketRef(ChannelzProtoUtil.toSocketRef(s)); + } + return responseBuilder.build(); + } + + private static T getFuture(ListenableFuture future) { + try { + T ret = future.get(); + if (ret == null) { + throw Status.UNIMPLEMENTED + .withDescription("The entity's stats can not be retrieved. " + + "If this is an InProcessTransport this is expected.") + .asRuntimeException(); + } + return ret; + } catch (InterruptedException e) { + throw Status.INTERNAL.withCause(e).asRuntimeException(); + } catch (ExecutionException e) { + throw Status.INTERNAL.withCause(e).asRuntimeException(); + } + } +} diff --git a/services/src/main/java/io/grpc/services/ChannelzService.java b/services/src/main/java/io/grpc/services/ChannelzService.java new file mode 100644 index 00000000000..07e9e7ad035 --- /dev/null +++ b/services/src/main/java/io/grpc/services/ChannelzService.java @@ -0,0 +1,237 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.services; + +import com.google.common.annotations.VisibleForTesting; +import io.grpc.ExperimentalApi; +import io.grpc.InternalChannelz; +import io.grpc.InternalChannelz.ChannelStats; +import io.grpc.InternalChannelz.ServerList; +import io.grpc.InternalChannelz.ServerSocketsList; +import io.grpc.InternalChannelz.ServerStats; +import io.grpc.InternalChannelz.SocketStats; +import io.grpc.InternalInstrumented; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.channelz.v1.ChannelzGrpc; +import io.grpc.channelz.v1.GetChannelRequest; +import io.grpc.channelz.v1.GetChannelResponse; +import io.grpc.channelz.v1.GetServerRequest; +import io.grpc.channelz.v1.GetServerResponse; +import io.grpc.channelz.v1.GetServerSocketsRequest; +import io.grpc.channelz.v1.GetServerSocketsResponse; +import io.grpc.channelz.v1.GetServersRequest; +import io.grpc.channelz.v1.GetServersResponse; +import io.grpc.channelz.v1.GetSocketRequest; +import io.grpc.channelz.v1.GetSocketResponse; +import io.grpc.channelz.v1.GetSubchannelRequest; +import io.grpc.channelz.v1.GetSubchannelResponse; +import io.grpc.channelz.v1.GetTopChannelsRequest; +import io.grpc.channelz.v1.GetTopChannelsResponse; +import io.grpc.stub.StreamObserver; + +/** + * The channelz service provides stats about a running gRPC process. + * + * @deprecated Use {@link io.grpc.protobuf.services.ChannelzService} instead. + */ +@Deprecated +@ExperimentalApi("https://github.com/grpc/grpc-java/issues/4206") +public final class ChannelzService extends ChannelzGrpc.ChannelzImplBase { + private final InternalChannelz channelz; + private final int maxPageSize; + + /** + * Creates an instance. + */ + public static ChannelzService newInstance(int maxPageSize) { + return new ChannelzService(InternalChannelz.instance(), maxPageSize); + } + + @VisibleForTesting + ChannelzService(InternalChannelz channelz, int maxPageSize) { + this.channelz = channelz; + this.maxPageSize = maxPageSize; + } + + /** Returns top level channel aka {@link io.grpc.ManagedChannel}. */ + @Override + public void getTopChannels( + GetTopChannelsRequest request, StreamObserver responseObserver) { + InternalChannelz.RootChannelList rootChannels + = channelz.getRootChannels(request.getStartChannelId(), maxPageSize); + + GetTopChannelsResponse resp; + try { + resp = ChannelzProtoUtil.toGetTopChannelResponse(rootChannels); + } catch (StatusRuntimeException e) { + responseObserver.onError(e); + return; + } + + responseObserver.onNext(resp); + responseObserver.onCompleted(); + } + + /** Returns a top level channel aka {@link io.grpc.ManagedChannel}. */ + @Override + public void getChannel( + GetChannelRequest request, StreamObserver responseObserver) { + InternalInstrumented s = channelz.getRootChannel(request.getChannelId()); + if (s == null) { + responseObserver.onError( + Status.NOT_FOUND.withDescription("Can't find channel " + request.getChannelId()) + .asRuntimeException()); + return; + } + + GetChannelResponse resp; + try { + resp = GetChannelResponse + .newBuilder() + .setChannel(ChannelzProtoUtil.toChannel(s)) + .build(); + } catch (StatusRuntimeException e) { + responseObserver.onError(e); + return; + } + + responseObserver.onNext(resp); + responseObserver.onCompleted(); + } + + /** Returns servers. */ + @Override + public void getServers( + GetServersRequest request, StreamObserver responseObserver) { + ServerList servers = channelz.getServers(request.getStartServerId(), maxPageSize); + + GetServersResponse resp; + try { + resp = ChannelzProtoUtil.toGetServersResponse(servers); + } catch (StatusRuntimeException e) { + responseObserver.onError(e); + return; + } + + responseObserver.onNext(resp); + responseObserver.onCompleted(); + } + + /** Returns a server. */ + @Override + public void getServer( + GetServerRequest request, StreamObserver responseObserver) { + InternalInstrumented s = channelz.getServer(request.getServerId()); + if (s == null) { + responseObserver.onError( + Status.NOT_FOUND.withDescription("Can't find server " + request.getServerId()) + .asRuntimeException()); + return; + } + + GetServerResponse resp; + try { + resp = GetServerResponse + .newBuilder() + .setServer(ChannelzProtoUtil.toServer(s)) + .build(); + } catch (StatusRuntimeException e) { + responseObserver.onError(e); + return; + } + + responseObserver.onNext(resp); + responseObserver.onCompleted(); + } + + /** Returns a subchannel. */ + @Override + public void getSubchannel( + GetSubchannelRequest request, StreamObserver responseObserver) { + InternalInstrumented s = channelz.getSubchannel(request.getSubchannelId()); + if (s == null) { + responseObserver.onError( + Status.NOT_FOUND.withDescription("Can't find subchannel " + request.getSubchannelId()) + .asRuntimeException()); + return; + } + + GetSubchannelResponse resp; + try { + resp = GetSubchannelResponse + .newBuilder() + .setSubchannel(ChannelzProtoUtil.toSubchannel(s)) + .build(); + } catch (StatusRuntimeException e) { + responseObserver.onError(e); + return; + } + + responseObserver.onNext(resp); + responseObserver.onCompleted(); + } + + /** Returns a socket. */ + @Override + public void getSocket( + GetSocketRequest request, StreamObserver responseObserver) { + InternalInstrumented s = channelz.getSocket(request.getSocketId()); + if (s == null) { + responseObserver.onError( + Status.NOT_FOUND.withDescription("Can't find socket " + request.getSocketId()) + .asRuntimeException()); + return; + } + + GetSocketResponse resp; + try { + resp = + GetSocketResponse.newBuilder().setSocket(ChannelzProtoUtil.toSocket(s)).build(); + } catch (StatusRuntimeException e) { + responseObserver.onError(e); + return; + } + + responseObserver.onNext(resp); + responseObserver.onCompleted(); + } + + @Override + public void getServerSockets( + GetServerSocketsRequest request, StreamObserver responseObserver) { + ServerSocketsList serverSockets + = channelz.getServerSockets(request.getServerId(), request.getStartSocketId(), maxPageSize); + if (serverSockets == null) { + responseObserver.onError( + Status.NOT_FOUND.withDescription("Can't find server " + request.getServerId()) + .asRuntimeException()); + return; + } + + GetServerSocketsResponse resp; + try { + resp = ChannelzProtoUtil.toGetServerSocketsResponse(serverSockets); + } catch (StatusRuntimeException e) { + responseObserver.onError(e); + return; + } + + responseObserver.onNext(resp); + responseObserver.onCompleted(); + } +} diff --git a/services/src/main/java/io/grpc/services/InetAddressUtil.java b/services/src/main/java/io/grpc/services/InetAddressUtil.java new file mode 100644 index 00000000000..057a8ccb5e6 --- /dev/null +++ b/services/src/main/java/io/grpc/services/InetAddressUtil.java @@ -0,0 +1,94 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.services; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.primitives.Ints; +import java.net.Inet4Address; +import java.net.Inet6Address; +import java.net.InetAddress; +import java.util.Arrays; + +// This is copied from guava 20.0 because it is a @Beta api +final class InetAddressUtil { + private static final int IPV6_PART_COUNT = 8; + + public static String toAddrString(InetAddress ip) { + checkNotNull(ip); + if (ip instanceof Inet4Address) { + // For IPv4, Java's formatting is good enough. + return ip.getHostAddress(); + } + checkArgument(ip instanceof Inet6Address); + byte[] bytes = ip.getAddress(); + int[] hextets = new int[IPV6_PART_COUNT]; + for (int i = 0; i < hextets.length; i++) { + hextets[i] = Ints.fromBytes((byte) 0, (byte) 0, bytes[2 * i], bytes[2 * i + 1]); + } + compressLongestRunOfZeroes(hextets); + return hextetsToIPv6String(hextets); + } + + private static void compressLongestRunOfZeroes(int[] hextets) { + int bestRunStart = -1; + int bestRunLength = -1; + int runStart = -1; + for (int i = 0; i < hextets.length + 1; i++) { + if (i < hextets.length && hextets[i] == 0) { + if (runStart < 0) { + runStart = i; + } + } else if (runStart >= 0) { + int runLength = i - runStart; + if (runLength > bestRunLength) { + bestRunStart = runStart; + bestRunLength = runLength; + } + runStart = -1; + } + } + if (bestRunLength >= 2) { + Arrays.fill(hextets, bestRunStart, bestRunStart + bestRunLength, -1); + } + } + + private static String hextetsToIPv6String(int[] hextets) { + // While scanning the array, handle these state transitions: + // start->num => "num" start->gap => "::" + // num->num => ":num" num->gap => "::" + // gap->num => "num" gap->gap => "" + StringBuilder buf = new StringBuilder(39); + boolean lastWasNumber = false; + for (int i = 0; i < hextets.length; i++) { + boolean thisIsNumber = hextets[i] >= 0; + if (thisIsNumber) { + if (lastWasNumber) { + buf.append(':'); + } + buf.append(Integer.toHexString(hextets[i])); + } else { + if (i == 0 || lastWasNumber) { + buf.append("::"); + } + } + lastWasNumber = thisIsNumber; + } + return buf.toString(); + } +} \ No newline at end of file diff --git a/services/src/main/java/io/grpc/services/TempFileSink.java b/services/src/main/java/io/grpc/services/TempFileSink.java new file mode 100644 index 00000000000..c28339d1bd6 --- /dev/null +++ b/services/src/main/java/io/grpc/services/TempFileSink.java @@ -0,0 +1,84 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.services; + +import com.google.protobuf.MessageLite; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * The output file goes to the JVM's temp dir with a prefix of BINARY_INFO. The proto messages + * are written serially using {@link MessageLite#writeDelimitedTo(OutputStream)}. + */ +class TempFileSink implements BinaryLogSink { + private static final Logger logger = Logger.getLogger(TempFileSink.class.getName()); + + private final String outPath; + private final OutputStream out; + private boolean closed; + + TempFileSink() throws IOException { + File outFile = File.createTempFile("BINARY_INFO.", ""); + outPath = outFile.getPath(); + logger.log(Level.INFO, "Writing binary logs to to {0}", outFile.getAbsolutePath()); + out = new BufferedOutputStream(new FileOutputStream(outFile)); + } + + String getPath() { + return this.outPath; + } + + @Override + public synchronized void write(MessageLite message) { + if (closed) { + logger.log(Level.FINEST, "Attempt to write after TempFileSink is closed."); + return; + } + try { + message.writeDelimitedTo(out); + } catch (IOException e) { + logger.log(Level.SEVERE, "Caught exception while writing", e); + closeQuietly(); + } + } + + @Override + public synchronized void close() throws IOException { + if (closed) { + return; + } + closed = true; + try { + out.flush(); + } finally { + out.close(); + } + } + + private synchronized void closeQuietly() { + try { + close(); + } catch (IOException e) { + logger.log(Level.SEVERE, "Caught exception while closing", e); + } + } +}