Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gcp-observability: update observability logging proto #9608

Merged
merged 3 commits into from Oct 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -35,7 +35,6 @@
import io.grpc.gcp.observability.interceptors.LogHelper;
import io.grpc.gcp.observability.logging.GcpLogSink;
import io.grpc.gcp.observability.logging.Sink;
import io.grpc.internal.TimeProvider;
import io.opencensus.common.Duration;
import io.opencensus.contrib.grpc.metrics.RpcViewConstants;
import io.opencensus.exporter.stats.stackdriver.StackdriverStatsConfiguration;
Expand Down Expand Up @@ -79,8 +78,8 @@ public static synchronized GcpObservability grpcInit() throws IOException {
ObservabilityConfigImpl observabilityConfig = ObservabilityConfigImpl.getInstance();
Sink sink = new GcpLogSink(observabilityConfig.getDestinationProjectId(),
globalLocationTags.getLocationTags(), observabilityConfig.getCustomTags(),
observabilityConfig.getFlushMessageCount(), SERVICES_TO_EXCLUDE);
LogHelper helper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER);
SERVICES_TO_EXCLUDE);
LogHelper helper = new LogHelper(sink);
ConfigFilterHelper configFilterHelper = ConfigFilterHelper.factory(observabilityConfig);
instance = grpcInit(sink, observabilityConfig,
new InternalLoggingChannelInterceptor.FactoryImpl(helper, configFilterHelper),
Expand Down
Expand Up @@ -36,9 +36,6 @@ public interface ObservabilityConfig {
/** Get destination project ID - where logs will go. */
String getDestinationProjectId();

/** Get message count threshold to flush - flush once message count is reached. */
Long getFlushMessageCount();

/** Get filters set for logging. */
List<LogFilter> getLogFilters();

Expand Down
Expand Up @@ -45,7 +45,6 @@ final class ObservabilityConfigImpl implements ObservabilityConfig {
private boolean enableCloudMonitoring = false;
private boolean enableCloudTracing = false;
private String destinationProjectId = null;
private Long flushMessageCount = null;
private List<LogFilter> logFilters;
private List<EventType> eventTypes;
private Sampler sampler;
Expand Down Expand Up @@ -87,7 +86,6 @@ private void parseConfig(Map<String, ?> config) {
enableCloudTracing = value;
}
destinationProjectId = JsonUtil.getString(config, "destination_project_id");
flushMessageCount = JsonUtil.getNumberAsLong(config, "flush_message_count");
List<?> rawList = JsonUtil.getList(config, "log_filters");
if (rawList != null) {
List<Map<String, ?>> jsonLogFilters = JsonUtil.checkObjectList(rawList);
Expand All @@ -102,7 +100,7 @@ private void parseConfig(Map<String, ?> config) {
List<String> jsonEventTypes = JsonUtil.checkStringList(rawList);
ImmutableList.Builder<EventType> eventTypesBuilder = new ImmutableList.Builder<>();
for (String jsonEventType : jsonEventTypes) {
eventTypesBuilder.add(convertEventType(jsonEventType));
eventTypesBuilder.add(EventType.valueOf(jsonEventType));
}
this.eventTypes = eventTypesBuilder.build();
}
Expand Down Expand Up @@ -136,28 +134,6 @@ private void parseConfig(Map<String, ?> config) {
}
}

private EventType convertEventType(String val) {
switch (val) {
case "GRPC_CALL_UNKNOWN":
return EventType.GRPC_CALL_UNKNOWN;
case "GRPC_CALL_REQUEST_HEADER":
return EventType.GRPC_CALL_REQUEST_HEADER;
case "GRPC_CALL_RESPONSE_HEADER":
return EventType.GRPC_CALL_RESPONSE_HEADER;
case "GRPC_CALL_REQUEST_MESSAGE":
return EventType.GRPC_CALL_REQUEST_MESSAGE;
case "GRPC_CALL_RESPONSE_MESSAGE":
return EventType.GRPC_CALL_RESPONSE_MESSAGE;
case "GRPC_CALL_TRAILER":
return EventType.GRPC_CALL_TRAILER;
case "GRPC_CALL_HALF_CLOSE":
return EventType.GRPC_CALL_HALF_CLOSE;
case "GRPC_CALL_CANCEL":
return EventType.GRPC_CALL_CANCEL;
default:
throw new IllegalArgumentException("Unknown event type value:" + val);
}
}

private LogFilter parseJsonLogFilter(Map<String, ?> logFilterMap) {
return new LogFilter(JsonUtil.getString(logFilterMap, "pattern"),
Expand Down Expand Up @@ -185,11 +161,6 @@ public String getDestinationProjectId() {
return destinationProjectId;
}

@Override
public Long getFlushMessageCount() {
return flushMessageCount;
}

@Override
public List<LogFilter> getLogFilters() {
return logFilters;
Expand Down
Expand Up @@ -85,7 +85,7 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT
CallOptions callOptions, Channel next) {

final AtomicLong seq = new AtomicLong(1);
final String rpcId = UUID.randomUUID().toString();
final String callId = UUID.randomUUID().toString();
final String authority = next.authority();
final String serviceName = method.getServiceName();
final String methodName = method.getBareMethodName();
Expand All @@ -105,24 +105,24 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT

@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
// Event: EventType.GRPC_CALL_REQUEST_HEADER
// Event: EventType.CLIENT_HEADER
// The timeout should reflect the time remaining when the call is started, so compute
// remaining time here.
final Duration timeout = deadline == null ? null
: Durations.fromNanos(deadline.timeRemaining(TimeUnit.NANOSECONDS));

if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_HEADER)) {
if (filterHelper.isEventToBeLogged(EventType.CLIENT_HEADER)) {
try {
helper.logRequestHeader(
helper.logClientHeader(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
timeout,
headers,
maxHeaderBytes,
EventLogger.LOGGER_CLIENT,
rpcId,
EventLogger.CLIENT,
callId,
null);
} catch (Exception e) {
// Catching generic exceptions instead of specific ones for all the events.
Expand All @@ -139,19 +139,20 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onMessage(RespT message) {
// Event: EventType.GRPC_CALL_RESPONSE_MESSAGE
EventType responseMessageType = EventType.GRPC_CALL_RESPONSE_MESSAGE;
// Event: EventType.SERVER_MESSAGE
EventType responseMessageType = EventType.SERVER_MESSAGE;
if (filterHelper.isEventToBeLogged(responseMessageType)) {
try {
helper.logRpcMessage(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
responseMessageType,
message,
maxMessageBytes,
EventLogger.LOGGER_CLIENT,
rpcId);
EventLogger.CLIENT,
callId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response message", e);
}
Expand All @@ -161,17 +162,18 @@ public void onMessage(RespT message) {

@Override
public void onHeaders(Metadata headers) {
// Event: EventType.GRPC_CALL_RESPONSE_HEADER
if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_HEADER)) {
// Event: EventType.SERVER_HEADER
if (filterHelper.isEventToBeLogged(EventType.SERVER_HEADER)) {
try {
helper.logResponseHeader(
helper.logServerHeader(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
headers,
maxHeaderBytes,
EventLogger.LOGGER_CLIENT,
rpcId,
EventLogger.CLIENT,
callId,
LogHelper.getPeerAddress(getAttributes()));
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response header", e);
Expand All @@ -182,18 +184,19 @@ public void onHeaders(Metadata headers) {

@Override
public void onClose(Status status, Metadata trailers) {
// Event: EventType.GRPC_CALL_TRAILER
if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_TRAILER)) {
// Event: EventType.SERVER_TRAILER
if (filterHelper.isEventToBeLogged(EventType.SERVER_TRAILER)) {
try {
helper.logTrailer(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
status,
trailers,
maxHeaderBytes,
EventLogger.LOGGER_CLIENT,
rpcId,
EventLogger.CLIENT,
callId,
LogHelper.getPeerAddress(getAttributes()));
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log trailer", e);
Expand All @@ -207,19 +210,20 @@ public void onClose(Status status, Metadata trailers) {

@Override
public void sendMessage(ReqT message) {
// Event: EventType.GRPC_CALL_REQUEST_MESSAGE
EventType requestMessageType = EventType.GRPC_CALL_REQUEST_MESSAGE;
// Event: EventType.CLIENT_MESSAGE
EventType requestMessageType = EventType.CLIENT_MESSAGE;
if (filterHelper.isEventToBeLogged(requestMessageType)) {
try {
helper.logRpcMessage(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
requestMessageType,
message,
maxMessageBytes,
EventLogger.LOGGER_CLIENT,
rpcId);
EventLogger.CLIENT,
callId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log request message", e);
}
Expand All @@ -229,15 +233,16 @@ public void sendMessage(ReqT message) {

@Override
public void halfClose() {
// Event: EventType.GRPC_CALL_HALF_CLOSE
if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_HALF_CLOSE)) {
// Event: EventType.CLIENT_HALF_CLOSE
if (filterHelper.isEventToBeLogged(EventType.CLIENT_HALF_CLOSE)) {
try {
helper.logHalfClose(
seq.getAndIncrement(),
serviceName,
methodName,
EventLogger.LOGGER_CLIENT,
rpcId);
authority,
EventLogger.CLIENT,
callId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log half close", e);
}
Expand All @@ -247,15 +252,16 @@ public void halfClose() {

@Override
public void cancel(String message, Throwable cause) {
// Event: EventType.GRPC_CALL_CANCEL
if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_CANCEL)) {
// Event: EventType.CANCEL
if (filterHelper.isEventToBeLogged(EventType.CANCEL)) {
try {
helper.logCancel(
seq.getAndIncrement(),
serviceName,
methodName,
EventLogger.LOGGER_CLIENT,
rpcId);
authority,
EventLogger.CLIENT,
callId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log cancel", e);
}
Expand Down