From bf52a77e111e3e5a068ccd4f45be6ec45d52c9d3 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Wed, 16 Mar 2022 16:07:07 -0700 Subject: [PATCH] Adopt the new design of the plugin --- observability/config.go | 10 +- observability/exporting.go | 9 +- observability/logging.go | 292 ++++++++++++++++------------ observability/observability.go | 9 +- observability/observability_test.go | 42 +++- 5 files changed, 223 insertions(+), 139 deletions(-) diff --git a/observability/config.go b/observability/config.go index b7598e824844..bb861713fbb8 100644 --- a/observability/config.go +++ b/observability/config.go @@ -29,15 +29,15 @@ import ( ) const ( - envKeyObservabilityConfig = "GRPC_CONFIG_OBSERVABILITY" - envProjectID = "GOOGLE_CLOUD_PROJECT" + envObservabilityConfig = "GRPC_CONFIG_OBSERVABILITY" + envProjectID = "GOOGLE_CLOUD_PROJECT" ) // fetchDefaultProjectID fetches the default GCP project id from environment. func fetchDefaultProjectID(ctx context.Context) string { // Step 1: Check ENV var if s := os.Getenv(envProjectID); s != "" { - logger.Infof("Found project ID from env GOOGLE_CLOUD_PROJECT: %v", s) + logger.Infof("Found project ID from env %v: %v", envProjectID, s) return s } // Step 2: Check default credential @@ -56,10 +56,10 @@ func fetchDefaultProjectID(ctx context.Context) string { func parseObservabilityConfig() *configpb.ObservabilityConfig { // Parse the config from ENV var - if content := os.Getenv(envKeyObservabilityConfig); content != "" { + if content := os.Getenv(envObservabilityConfig); content != "" { var config configpb.ObservabilityConfig if err := protojson.Unmarshal([]byte(content), &config); err != nil { - logger.Warningf("Error parsing observability config from env GRPC_CONFIG_OBSERVABILITY: %v", err) + logger.Warningf("Error parsing observability config from env %v: %v", envObservabilityConfig, err) return nil } logger.Infof("Parsed ObservabilityConfig: %+v", &config) diff --git a/observability/exporting.go b/observability/exporting.go index 29555639ab91..46eebcc262f7 100644 --- a/observability/exporting.go +++ b/observability/exporting.go @@ -28,10 +28,9 @@ import ( "google.golang.org/protobuf/encoding/protojson" ) -// loggingExporter is the interface of logging exporter for gRPC -// Observability. Ideally, we should use what OTEL provides, but their Golang -// implementation is in "frozen" state. So, this plugin provides a minimum -// interface to satisfy testing purposes. +// loggingExporter is the interface of logging exporter for gRPC Observability. +// In future, we might expose this to allow users provide custom exporters. But +// now, it exists for testing purposes. type loggingExporter interface { // EmitGrpcLogRecord writes a gRPC LogRecord to cache without blocking. EmitGrpcLogRecord(*grpclogrecordpb.GrpcLogRecord) @@ -114,11 +113,13 @@ func (cle *cloudLoggingExporter) Close() error { if err := cle.logger.Flush(); err != nil { return err } + cle.logger = nil } if cle.client != nil { if err := cle.client.Close(); err != nil { return err } + cle.client = nil } logger.Infof("Closed CloudLogging exporter") return nil diff --git a/observability/logging.go b/observability/logging.go index bbbaa90e96d7..6baf1e49ac0b 100644 --- a/observability/logging.go +++ b/observability/logging.go @@ -20,176 +20,218 @@ package observability import ( "fmt" + "net" "strings" - "sync" + "time" + "github.com/golang/protobuf/ptypes" "github.com/google/uuid" - "google.golang.org/grpc/binarylog" binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1" iblog "google.golang.org/grpc/internal/binarylog" + "google.golang.org/grpc/metadata" configpb "google.golang.org/grpc/observability/internal/config" grpclogrecordpb "google.golang.org/grpc/observability/internal/logging" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" ) -// translateMetadata translates the metadata from Binary Logging format to -// its GrpcLogRecord equivalent. -func translateMetadata(m *binlogpb.Metadata) *grpclogrecordpb.GrpcLogRecord_Metadata { - var res grpclogrecordpb.GrpcLogRecord_Metadata - res.Entry = make([]*grpclogrecordpb.GrpcLogRecord_MetadataEntry, len(m.Entry)) - for i, e := range m.Entry { - res.Entry[i] = &grpclogrecordpb.GrpcLogRecord_MetadataEntry{ +func setMetadataToGrpcLogRecord(lr *grpclogrecordpb.GrpcLogRecord, m metadata.MD, mlc *iblog.MethodLoggerConfig) { + mdPb := iblog.MdToMetadataProto(m) + lr.PayloadTruncated = iblog.TruncateMetadata(mlc, mdPb) + lr.Metadata = &grpclogrecordpb.GrpcLogRecord_Metadata{ + Entry: make([]*grpclogrecordpb.GrpcLogRecord_MetadataEntry, len(mdPb.GetEntry())), + } + for i, e := range mdPb.GetEntry() { + lr.Metadata.Entry[i] = &grpclogrecordpb.GrpcLogRecord_MetadataEntry{ Key: e.Key, Value: e.Value, } } - return &res } -type cloudLoggingSink struct { - callIDToUUID sync.Map - exporter loggingExporter - lock sync.RWMutex +func setPeerToGrpcLogRecord(lr *grpclogrecordpb.GrpcLogRecord, peer net.Addr) { + if peer != nil { + peerPb := iblog.AddrToProto(peer) + lr.PeerAddress = &grpclogrecordpb.GrpcLogRecord_Address{ + Type: grpclogrecordpb.GrpcLogRecord_Address_Type(peerPb.GetType()), + Address: peerPb.GetAddress(), + IpPort: peerPb.GetIpPort(), + } + } } -var defaultCloudLoggingSink *cloudLoggingSink - -func (c *cloudLoggingSink) getUUID(callID uint64) string { - u, ok := c.callIDToUUID.Load(callID) - if !ok { - value := uuid.NewString() - c.callIDToUUID.Store(callID, value) - return value +func setMessageToGrpcLogRecord(lr *grpclogrecordpb.GrpcLogRecord, msg interface{}, mlc *iblog.MethodLoggerConfig) { + var ( + data []byte + err error + ) + if m, ok := msg.(proto.Message); ok { + data, err = proto.Marshal(m) + if err != nil { + logger.Infof("failed to marshal proto message: %v", err) + } + } else if b, ok := msg.([]byte); ok { + data = b + } else { + logger.Infof("message to log is neither proto.message nor []byte") + } + msgPb := &binlogpb.Message{ + Length: uint32(len(data)), + Data: data, } - return u.(string) + lr.PayloadTruncated = iblog.TruncateMessage(mlc, msgPb) + lr.Message = msgPb.Data + lr.PayloadSize = msgPb.Length } -func (c *cloudLoggingSink) removeEntry(callID uint64) { - c.callIDToUUID.Delete(callID) +func getEventLogger(isClient bool) grpclogrecordpb.GrpcLogRecord_EventLogger { + if isClient { + return grpclogrecordpb.GrpcLogRecord_LOGGER_CLIENT + } + return grpclogrecordpb.GrpcLogRecord_LOGGER_SERVER } -func (c *cloudLoggingSink) SetExporter(exporter loggingExporter) { - c.lock.Lock() - c.exporter = exporter - c.lock.Unlock() +type observabilityBinaryMethodLogger struct { + rpcID, serviceName, methodName string + sequenceIDGen iblog.CallIDGenerator + originalMethodLogger iblog.MethodLogger + methodLoggerConfig *iblog.MethodLoggerConfig } -// Write translates a Binary Logging log entry to a GrpcLogEntry used by the gRPC -// Observability project and emits it. -func (c *cloudLoggingSink) Write(binlogEntry *binlogpb.GrpcLogEntry) error { - c.lock.RLock() - exporter := c.exporter - if exporter == nil { - c.lock.RUnlock() - return nil +func (ml *observabilityBinaryMethodLogger) Log(c iblog.LogEntryConfig) { + // Invoke the original MethodLogger to maintain backward compatibility + if ml.originalMethodLogger != nil { + ml.originalMethodLogger.Log(c) } - c.lock.RUnlock() + timestamp, err := ptypes.TimestampProto(time.Now()) + if err != nil { + logger.Errorf("Failed to convert time.Now() to TimestampProto: %v", err) + } grpcLogRecord := &grpclogrecordpb.GrpcLogRecord{ - Timestamp: binlogEntry.GetTimestamp(), - RpcId: c.getUUID(binlogEntry.GetCallId()), - SequenceId: binlogEntry.GetSequenceIdWithinCall(), + Timestamp: timestamp, + RpcId: ml.rpcID, + SequenceId: ml.sequenceIDGen.Next(), // Making DEBUG the default LogLevel LogLevel: grpclogrecordpb.GrpcLogRecord_LOG_LEVEL_DEBUG, } - callEnded := false - switch binlogEntry.GetType() { - case binlogpb.GrpcLogEntry_EVENT_TYPE_UNKNOWN: - grpcLogRecord.EventType = grpclogrecordpb.GrpcLogRecord_GRPC_CALL_UNKNOWN - case binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER: + switch v := c.(type) { + case *iblog.ClientHeader: grpcLogRecord.EventType = grpclogrecordpb.GrpcLogRecord_GRPC_CALL_REQUEST_HEADER - if binlogEntry.GetClientHeader() != nil { - methodName := binlogEntry.GetClientHeader().MethodName - if strings.Contains(methodName, "/") { - tokens := strings.Split(methodName, "/") - if len(tokens) == 3 { - // Example method name: /grpc.testing.TestService/UnaryCall - grpcLogRecord.ServiceName = tokens[1] - grpcLogRecord.MethodName = tokens[2] - } else if len(tokens) == 2 { - // Example method name: grpc.testing.TestService/UnaryCall - grpcLogRecord.ServiceName = tokens[0] - grpcLogRecord.MethodName = tokens[1] - } else { - logger.Errorf("Malformed method name: %v", methodName) - } - } - grpcLogRecord.Timeout = binlogEntry.GetClientHeader().Timeout - grpcLogRecord.Authority = binlogEntry.GetClientHeader().Authority - grpcLogRecord.Metadata = translateMetadata(binlogEntry.GetClientHeader().Metadata) - } - if binlogEntry.GetPeer() != nil { - grpcLogRecord.PeerAddress = &grpclogrecordpb.GrpcLogRecord_Address{ - Type: grpclogrecordpb.GrpcLogRecord_Address_Type(binlogEntry.Peer.Type), - Address: binlogEntry.Peer.Address, - IpPort: binlogEntry.Peer.IpPort, + grpcLogRecord.EventLogger = getEventLogger(v.OnClientSide) + methodName := v.MethodName + if strings.Contains(methodName, "/") { + tokens := strings.Split(methodName, "/") + if len(tokens) == 3 { + // Example method name: /grpc.testing.TestService/UnaryCall + ml.serviceName = tokens[1] + ml.methodName = tokens[2] + } else if len(tokens) == 2 { + // Example method name: grpc.testing.TestService/UnaryCall + ml.serviceName = tokens[0] + ml.methodName = tokens[1] + } else { + logger.Errorf("Malformed method name: %v", methodName) } } - grpcLogRecord.PayloadTruncated = binlogEntry.GetPayloadTruncated() - case binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER: + grpcLogRecord.Timeout = ptypes.DurationProto(v.Timeout) + grpcLogRecord.Authority = v.Authority + setMetadataToGrpcLogRecord(grpcLogRecord, v.Header, ml.methodLoggerConfig) + setPeerToGrpcLogRecord(grpcLogRecord, v.PeerAddr) + case *iblog.ServerHeader: grpcLogRecord.EventType = grpclogrecordpb.GrpcLogRecord_GRPC_CALL_RESPONSE_HEADER - if binlogEntry.GetPeer() != nil { - grpcLogRecord.PeerAddress = &grpclogrecordpb.GrpcLogRecord_Address{ - Type: grpclogrecordpb.GrpcLogRecord_Address_Type(binlogEntry.Peer.Type), - Address: binlogEntry.Peer.Address, - IpPort: binlogEntry.Peer.IpPort, - } - } - grpcLogRecord.Metadata = translateMetadata(binlogEntry.GetServerHeader().Metadata) - grpcLogRecord.PayloadTruncated = binlogEntry.GetPayloadTruncated() - case binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE: + grpcLogRecord.EventLogger = getEventLogger(v.OnClientSide) + setMetadataToGrpcLogRecord(grpcLogRecord, v.Header, ml.methodLoggerConfig) + setPeerToGrpcLogRecord(grpcLogRecord, v.PeerAddr) + case *iblog.ClientMessage: grpcLogRecord.EventType = grpclogrecordpb.GrpcLogRecord_GRPC_CALL_REQUEST_MESSAGE - grpcLogRecord.Message = binlogEntry.GetMessage().GetData() - grpcLogRecord.PayloadSize = binlogEntry.GetMessage().GetLength() - grpcLogRecord.PayloadTruncated = binlogEntry.GetPayloadTruncated() - case binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE: + grpcLogRecord.EventLogger = getEventLogger(v.OnClientSide) + setMessageToGrpcLogRecord(grpcLogRecord, v.Message, ml.methodLoggerConfig) + case *iblog.ServerMessage: grpcLogRecord.EventType = grpclogrecordpb.GrpcLogRecord_GRPC_CALL_RESPONSE_MESSAGE - grpcLogRecord.Message = binlogEntry.GetMessage().GetData() - grpcLogRecord.PayloadSize = binlogEntry.GetMessage().GetLength() - grpcLogRecord.PayloadTruncated = binlogEntry.GetPayloadTruncated() - case binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE: + grpcLogRecord.EventLogger = getEventLogger(v.OnClientSide) + setMessageToGrpcLogRecord(grpcLogRecord, v.Message, ml.methodLoggerConfig) + case *iblog.ClientHalfClose: grpcLogRecord.EventType = grpclogrecordpb.GrpcLogRecord_GRPC_CALL_HALF_CLOSE - case binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER: + grpcLogRecord.EventLogger = getEventLogger(v.OnClientSide) + case *iblog.ServerTrailer: grpcLogRecord.EventType = grpclogrecordpb.GrpcLogRecord_GRPC_CALL_TRAILER - grpcLogRecord.Metadata = translateMetadata(binlogEntry.GetTrailer().Metadata) - grpcLogRecord.StatusCode = binlogEntry.GetTrailer().GetStatusCode() - grpcLogRecord.StatusMessage = binlogEntry.GetTrailer().GetStatusMessage() - grpcLogRecord.StatusDetails = binlogEntry.GetTrailer().GetStatusDetails() - grpcLogRecord.PayloadTruncated = binlogEntry.GetPayloadTruncated() - callEnded = true - case binlogpb.GrpcLogEntry_EVENT_TYPE_CANCEL: + grpcLogRecord.EventLogger = getEventLogger(v.OnClientSide) + setMetadataToGrpcLogRecord(grpcLogRecord, v.Trailer, ml.methodLoggerConfig) + setPeerToGrpcLogRecord(grpcLogRecord, v.PeerAddr) + st, ok := status.FromError(v.Err) + if !ok { + logger.Info("Error in trailer is not a status error") + } + stProto := st.Proto() + if stProto != nil && len(stProto.Details) != 0 { + detailsBytes, err := proto.Marshal(stProto) + if err != nil { + logger.Infof("Failed to marshal status proto: %v", err) + } else { + grpcLogRecord.StatusDetails = detailsBytes + } + } + grpcLogRecord.StatusCode = uint32(st.Code()) + grpcLogRecord.StatusMessage = st.Message() + case *iblog.Cancel: grpcLogRecord.EventType = grpclogrecordpb.GrpcLogRecord_GRPC_CALL_CANCEL - callEnded = true + grpcLogRecord.EventLogger = getEventLogger(v.OnClientSide) default: - return fmt.Errorf("unknown event type: %v", binlogEntry.Type) + logger.Errorf("Unexpected LogEntryConfig: %+v", c) + return } - switch binlogEntry.Logger { - case binlogpb.GrpcLogEntry_LOGGER_CLIENT: - grpcLogRecord.EventLogger = grpclogrecordpb.GrpcLogRecord_LOGGER_CLIENT - case binlogpb.GrpcLogEntry_LOGGER_SERVER: - grpcLogRecord.EventLogger = grpclogrecordpb.GrpcLogRecord_LOGGER_SERVER - default: - grpcLogRecord.EventLogger = grpclogrecordpb.GrpcLogRecord_LOGGER_UNKNOWN + + if ml.serviceName != "" { + grpcLogRecord.ServiceName = ml.serviceName } - if callEnded { - c.removeEntry(binlogEntry.CallId) + if ml.methodName != "" { + grpcLogRecord.MethodName = ml.methodName } + // CloudLogging client doesn't return error on entry write. Entry writes // don't mean the data will be uploaded immediately. - exporter.EmitGrpcLogRecord(grpcLogRecord) - return nil + globalLoggingExporter.EmitGrpcLogRecord(grpcLogRecord) +} + +type observabilityBinaryLogger struct { + // originalLogger is needed to ensure binary logging users won't be impacted + // by this plugin. Users are allowed to subscribe to a completely different + // set of methods. + originalLogger iblog.Logger + // wrappedLogger is needed to reuse the control string parsing, logger + // config managing logic + wrappedLogger iblog.Logger +} + +func (l *observabilityBinaryLogger) GetMethodLoggerConfig(methodName string) *iblog.MethodLoggerConfig { + if l.wrappedLogger == nil { + return nil + } + return l.wrappedLogger.GetMethodLoggerConfig(methodName) } -// Close closes the cloudLoggingSink. This call cleans exporter field, so -// following writes will be noop. -func (c *cloudLoggingSink) Close() error { - c.lock.Lock() - defer c.lock.Unlock() - c.exporter = nil - return nil +func (l *observabilityBinaryLogger) GetMethodLogger(methodName string) iblog.MethodLogger { + var ol iblog.MethodLogger + if l.originalLogger != nil { + ol = l.originalLogger.GetMethodLogger(methodName) + } + + mlc := l.GetMethodLoggerConfig(methodName) + if mlc == nil { + return ol + } + return &observabilityBinaryMethodLogger{ + originalMethodLogger: ol, + methodLoggerConfig: mlc, + rpcID: uuid.NewString(), + } } -func newCloudLoggingSink() *cloudLoggingSink { - return &cloudLoggingSink{} +func newObservabilityBinaryLogger(iblogger iblog.Logger) *observabilityBinaryLogger { + return &observabilityBinaryLogger{ + originalLogger: iblogger, + } } func compileBinaryLogControlString(config *configpb.ObservabilityConfig) string { @@ -212,9 +254,11 @@ func compileBinaryLogControlString(config *configpb.ObservabilityConfig) string return strings.Join(entries, ",") } +var defaultLogger *observabilityBinaryLogger + func prepareLogging() { - defaultCloudLoggingSink = newCloudLoggingSink() - binarylog.SetSink(defaultCloudLoggingSink) + defaultLogger = newObservabilityBinaryLogger(iblog.GetLogger()) + iblog.SetLogger(defaultLogger) } func startLogging(config *configpb.ObservabilityConfig) { @@ -222,6 +266,6 @@ func startLogging(config *configpb.ObservabilityConfig) { return } binlogConfig := compileBinaryLogControlString(config) - iblog.SetLogger(iblog.NewLoggerFromConfigString(binlogConfig)) + defaultLogger.wrappedLogger = iblog.NewLoggerFromConfigString(binlogConfig) logger.Infof("Start logging with config [%v]", binlogConfig) } diff --git a/observability/observability.go b/observability/observability.go index b2184030014e..bd7141eba18c 100644 --- a/observability/observability.go +++ b/observability/observability.go @@ -47,6 +47,7 @@ func init() { // - it registers default exporters if not disabled by the config; // - it sets up binary logging sink against the logging exporter. // +// Note: this method should only be invoked once, it's not thread-safe. // Note: currently, the binarylog module only supports one sink, so using the // "observability" module will conflict with existing binarylog usage. // Note: handle the error @@ -59,19 +60,15 @@ func Start(ctx context.Context) error { // Set the project ID if it isn't configured manually. maybeUpdateProjectIDInObservabilityConfig(ctx, config) - // Logging is controlled by the config at methods level. Users might bring - // their in-house exporter. If logging is disabled, binary logging also - // won't start. The overhead should be minimum. + // Logging is controlled by the config at methods level. startLogging(config) - // If the cloud logging exporter is not disabled, register one. if config.GetDestinationProjectId() == "" || !config.GetEnableCloudLogging() { return nil } if err := createDefaultLoggingExporter(ctx, config.DestinationProjectId); err != nil { return err } - defaultCloudLoggingSink.SetExporter(globalLoggingExporter) return nil } @@ -79,6 +76,8 @@ func Start(ctx context.Context) error { // invoked in the main function of the application. The suggested usage is // "defer observability.End()". This function also flushes data to upstream, and // cleanup resources. +// +// Note: this method should only be invoked once, it's not thread-safe. func End() { closeLoggingExporter() } diff --git a/observability/observability_test.go b/observability/observability_test.go index d84b7f7c2305..43f621d48c77 100644 --- a/observability/observability_test.go +++ b/observability/observability_test.go @@ -213,7 +213,14 @@ func (te *test) enablePluginWithFakeExporters() { Start(context.Background()) // Injects the fake exporter for testing purposes globalLoggingExporter = te.fle - defaultCloudLoggingSink.SetExporter(te.fle) +} + +func (te *test) enablePluginWithEmptyConfig() { + os.Setenv(envKeyObservabilityConfig, "{}") + // Explicitly re-parse the ObservabilityConfig + Start(context.Background()) + // Injects the fake exporter for testing purposes + globalLoggingExporter = te.fle } func checkEventCommon(t *testing.T, seen *grpclogrecordpb.GrpcLogRecord) { @@ -463,3 +470,36 @@ func (s) TestLoggingForErrorCall(t *testing.T) { StatusMessage: testErrorMessage, }) } + +func (s) TestNoConfig(t *testing.T) { + te := newTest(t) + defer te.tearDown() + te.enablePluginWithEmptyConfig() + te.startServer(&testServer{}) + tc := testgrpc.NewTestServiceClient(te.clientConn()) + + var ( + resp *testpb.SimpleResponse + req *testpb.SimpleRequest + err error + ) + req = &testpb.SimpleRequest{Payload: &testpb.Payload{Body: testOkPayload}} + tCtx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + resp, err = tc.UnaryCall(metadata.NewOutgoingContext(tCtx, testHeaderMetadata), req) + if err != nil { + t.Fatalf("unary call failed: %v", err) + } + t.Logf("unary call passed: %v", resp) + + // Wait for the gRPC transport to gracefully close to ensure no lost event. + te.cc.Close() + te.srv.GracefulStop() + // Check size of events + if len(te.fle.clientEvents) != 0 { + t.Fatalf("expects 0 client events, got %d", len(te.fle.clientEvents)) + } + if len(te.fle.serverEvents) != 0 { + t.Fatalf("expects 0 server events, got %d", len(te.fle.serverEvents)) + } +}