Skip to content

Commit

Permalink
feat: implement grpc-observability logging via binarylog
Browse files Browse the repository at this point in the history
* Includes generalized MethodConfig
* Includes updated config definition
* Includes custom tags and location tags
  • Loading branch information
lidizheng committed Mar 16, 2022
1 parent 84793b5 commit 275c85a
Show file tree
Hide file tree
Showing 18 changed files with 3,208 additions and 62 deletions.
63 changes: 43 additions & 20 deletions internal/binarylog/binarylog.go
Expand Up @@ -31,7 +31,8 @@ import (
// Logger is the global binary logger. It can be used to get binary logger for
// each method.
type Logger interface {
getMethodLogger(methodName string) *MethodLogger
GetMethodLoggerConfig(methodName string) *MethodLoggerConfig
GetMethodLogger(methodName string) MethodLogger
}

// binLogger is the global binary logger for the binary. One of this should be
Expand All @@ -49,17 +50,24 @@ func SetLogger(l Logger) {
binLogger = l
}

// GetLogger gets the binarg logger.
//
// Only call this at init time.
func GetLogger() Logger {
return binLogger
}

// GetMethodLogger returns the methodLogger for the given methodName.
//
// methodName should be in the format of "/service/method".
//
// Each methodLogger returned by this method is a new instance. This is to
// generate sequence id within the call.
func GetMethodLogger(methodName string) *MethodLogger {
func GetMethodLogger(methodName string) MethodLogger {
if binLogger == nil {
return nil
}
return binLogger.getMethodLogger(methodName)
return binLogger.GetMethodLogger(methodName)
}

func init() {
Expand All @@ -68,15 +76,15 @@ func init() {
binLogger = NewLoggerFromConfigString(configStr)
}

type methodLoggerConfig struct {
type MethodLoggerConfig struct {
// Max length of header and message.
hdr, msg uint64
}

type logger struct {
all *methodLoggerConfig
services map[string]*methodLoggerConfig
methods map[string]*methodLoggerConfig
all *MethodLoggerConfig
services map[string]*MethodLoggerConfig
methods map[string]*MethodLoggerConfig

blacklist map[string]struct{}
}
Expand All @@ -88,7 +96,7 @@ func newEmptyLogger() *logger {
}

// Set method logger for "*".
func (l *logger) setDefaultMethodLogger(ml *methodLoggerConfig) error {
func (l *logger) setDefaultMethodLogger(ml *MethodLoggerConfig) error {
if l.all != nil {
return fmt.Errorf("conflicting global rules found")
}
Expand All @@ -99,12 +107,12 @@ func (l *logger) setDefaultMethodLogger(ml *methodLoggerConfig) error {
// Set method logger for "service/*".
//
// New methodLogger with same service overrides the old one.
func (l *logger) setServiceMethodLogger(service string, ml *methodLoggerConfig) error {
func (l *logger) setServiceMethodLogger(service string, ml *MethodLoggerConfig) error {
if _, ok := l.services[service]; ok {
return fmt.Errorf("conflicting service rules for service %v found", service)
}
if l.services == nil {
l.services = make(map[string]*methodLoggerConfig)
l.services = make(map[string]*MethodLoggerConfig)
}
l.services[service] = ml
return nil
Expand All @@ -113,15 +121,15 @@ func (l *logger) setServiceMethodLogger(service string, ml *methodLoggerConfig)
// Set method logger for "service/method".
//
// New methodLogger with same method overrides the old one.
func (l *logger) setMethodMethodLogger(method string, ml *methodLoggerConfig) error {
func (l *logger) setMethodMethodLogger(method string, ml *MethodLoggerConfig) error {
if _, ok := l.blacklist[method]; ok {
return fmt.Errorf("conflicting blacklist rules for method %v found", method)
}
if _, ok := l.methods[method]; ok {
return fmt.Errorf("conflicting method rules for method %v found", method)
}
if l.methods == nil {
l.methods = make(map[string]*methodLoggerConfig)
l.methods = make(map[string]*MethodLoggerConfig)
}
l.methods[method] = ml
return nil
Expand All @@ -142,29 +150,44 @@ func (l *logger) setBlacklist(method string) error {
return nil
}

// getMethodLogger returns the methodLogger for the given methodName.
// GetMethodLoggerConfig returns the config to create Methodlogger.
//
// methodName should be in the format of "/service/method".
// This method is needed to allow reusing the configuration logic in Logger
// while replacing the MethodLogger creation.
//
// Each methodLogger returned by this method is a new instance. This is to
// generate sequence id within the call.
func (l *logger) getMethodLogger(methodName string) *MethodLogger {
// The fields in MethodLoggerConfig stays private, so the caller won't be able
// to mutate its states.
func (l *logger) GetMethodLoggerConfig(methodName string) *MethodLoggerConfig {
s, m, err := grpcutil.ParseMethod(methodName)
if err != nil {
grpclogLogger.Infof("binarylogging: failed to parse %q: %v", methodName, err)
return nil
}
if ml, ok := l.methods[s+"/"+m]; ok {
return newMethodLogger(ml.hdr, ml.msg)
return ml
}
if _, ok := l.blacklist[s+"/"+m]; ok {
return nil
}
if ml, ok := l.services[s]; ok {
return newMethodLogger(ml.hdr, ml.msg)
return ml
}
if l.all == nil {
return nil
}
return newMethodLogger(l.all.hdr, l.all.msg)
return l.all
}

// GetMethodLogger returns the MethodLogger for the given methodName.
//
// methodName should be in the format of "/service/method".
//
// Each MethodLogger returned by this method is a new instance. This is to
// generate sequence id within the call.
func (l *logger) GetMethodLogger(methodName string) MethodLogger {
mlc := l.GetMethodLoggerConfig(methodName)
if mlc == nil {
return nil
}
return newMethodLogger(mlc)
}
6 changes: 0 additions & 6 deletions internal/binarylog/binarylog_testutil.go
Expand Up @@ -33,10 +33,4 @@ var (
// AllLogger is a logger that logs all headers/messages for all RPCs. It's
// for testing only.
AllLogger = NewLoggerFromConfigString("*")
// MdToMetadataProto converts metadata to a binary logging proto message.
// It's for testing only.
MdToMetadataProto = mdToMetadataProto
// AddrToProto converts an address to a binary logging proto message. It's
// for testing only.
AddrToProto = addrToProto
)
6 changes: 3 additions & 3 deletions internal/binarylog/env_config.go
Expand Up @@ -89,7 +89,7 @@ func (l *logger) fillMethodLoggerWithConfigString(config string) error {
if err != nil {
return fmt.Errorf("invalid config: %q, %v", config, err)
}
if err := l.setDefaultMethodLogger(&methodLoggerConfig{hdr: hdr, msg: msg}); err != nil {
if err := l.setDefaultMethodLogger(&MethodLoggerConfig{hdr: hdr, msg: msg}); err != nil {
return fmt.Errorf("invalid config: %v", err)
}
return nil
Expand All @@ -104,11 +104,11 @@ func (l *logger) fillMethodLoggerWithConfigString(config string) error {
return fmt.Errorf("invalid header/message length config: %q, %v", suffix, err)
}
if m == "*" {
if err := l.setServiceMethodLogger(s, &methodLoggerConfig{hdr: hdr, msg: msg}); err != nil {
if err := l.setServiceMethodLogger(s, &MethodLoggerConfig{hdr: hdr, msg: msg}); err != nil {
return fmt.Errorf("invalid config: %v", err)
}
} else {
if err := l.setMethodMethodLogger(s+"/"+m, &methodLoggerConfig{hdr: hdr, msg: msg}); err != nil {
if err := l.setMethodMethodLogger(s+"/"+m, &MethodLoggerConfig{hdr: hdr, msg: msg}); err != nil {
return fmt.Errorf("invalid config: %v", err)
}
}
Expand Down
73 changes: 42 additions & 31 deletions internal/binarylog/method_logger.go
Expand Up @@ -31,51 +31,54 @@ import (
"google.golang.org/grpc/status"
)

type callIDGenerator struct {
type CallIDGenerator struct {
id uint64
}

func (g *callIDGenerator) next() uint64 {
func (g *CallIDGenerator) Next() uint64 {
id := atomic.AddUint64(&g.id, 1)
return id
}

// reset is for testing only, and doesn't need to be thread safe.
func (g *callIDGenerator) reset() {
func (g *CallIDGenerator) reset() {
g.id = 0
}

var idGen callIDGenerator
var idGen CallIDGenerator

// MethodLogger is the sub-logger for each method.
type MethodLogger struct {
headerMaxLen, messageMaxLen uint64
type MethodLogger interface {
Log(LogEntryConfig)
}

type methodLogger struct {
methodLoggerConfig *MethodLoggerConfig

callID uint64
idWithinCallGen *callIDGenerator
idWithinCallGen *CallIDGenerator

sink Sink // TODO(blog): make this plugable.
}

func newMethodLogger(h, m uint64) *MethodLogger {
return &MethodLogger{
headerMaxLen: h,
messageMaxLen: m,
func newMethodLogger(mlc *MethodLoggerConfig) MethodLogger {
return &methodLogger{
methodLoggerConfig: mlc,

callID: idGen.next(),
idWithinCallGen: &callIDGenerator{},
callID: idGen.Next(),
idWithinCallGen: &CallIDGenerator{},

sink: DefaultSink, // TODO(blog): make it plugable.
}
}

// Log creates a proto binary log entry, and logs it to the sink.
func (ml *MethodLogger) Log(c LogEntryConfig) {
func (ml *methodLogger) Log(c LogEntryConfig) {
m := c.toProto()
timestamp, _ := ptypes.TimestampProto(time.Now())
m.Timestamp = timestamp
m.CallId = ml.callID
m.SequenceIdWithinCall = ml.idWithinCallGen.next()
m.SequenceIdWithinCall = ml.idWithinCallGen.Next()

switch pay := m.Payload.(type) {
case *pb.GrpcLogEntry_ClientHeader:
Expand All @@ -89,18 +92,18 @@ func (ml *MethodLogger) Log(c LogEntryConfig) {
ml.sink.Write(m)
}

func (ml *MethodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) {
if ml.headerMaxLen == maxUInt {
func TruncateMetadata(mlc *MethodLoggerConfig, mdPb *pb.Metadata) (truncated bool) {
if mlc.hdr == maxUInt {
return false
}
var (
bytesLimit = ml.headerMaxLen
bytesLimit = mlc.hdr
index int
)
// At the end of the loop, index will be the first entry where the total
// size is greater than the limit:
//
// len(entry[:index]) <= ml.hdr && len(entry[:index+1]) > ml.hdr.
// len(entry[:index]) <= mlc.hdr && len(entry[:index+1]) > mlc.hdr.
for ; index < len(mdPb.Entry); index++ {
entry := mdPb.Entry[index]
if entry.Key == "grpc-trace-bin" {
Expand All @@ -119,17 +122,25 @@ func (ml *MethodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) {
return truncated
}

func (ml *MethodLogger) truncateMessage(msgPb *pb.Message) (truncated bool) {
if ml.messageMaxLen == maxUInt {
func TruncateMessage(mlc *MethodLoggerConfig, msgPb *pb.Message) (truncated bool) {
if mlc.msg == maxUInt {
return false
}
if ml.messageMaxLen >= uint64(len(msgPb.Data)) {
if mlc.msg >= uint64(len(msgPb.Data)) {
return false
}
msgPb.Data = msgPb.Data[:ml.messageMaxLen]
msgPb.Data = msgPb.Data[:mlc.msg]
return true
}

func (ml *methodLogger) truncateMetadata(mdPb *pb.Metadata) bool {
return TruncateMetadata(ml.methodLoggerConfig, mdPb)
}

func (ml *methodLogger) truncateMessage(msgPb *pb.Message) (truncated bool) {
return TruncateMessage(ml.methodLoggerConfig, msgPb)
}

// LogEntryConfig represents the configuration for binary log entry.
type LogEntryConfig interface {
toProto() *pb.GrpcLogEntry
Expand All @@ -150,7 +161,7 @@ func (c *ClientHeader) toProto() *pb.GrpcLogEntry {
// This function doesn't need to set all the fields (e.g. seq ID). The Log
// function will set the fields when necessary.
clientHeader := &pb.ClientHeader{
Metadata: mdToMetadataProto(c.Header),
Metadata: MdToMetadataProto(c.Header),
MethodName: c.MethodName,
Authority: c.Authority,
}
Expand All @@ -169,7 +180,7 @@ func (c *ClientHeader) toProto() *pb.GrpcLogEntry {
ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
}
if c.PeerAddr != nil {
ret.Peer = addrToProto(c.PeerAddr)
ret.Peer = AddrToProto(c.PeerAddr)
}
return ret
}
Expand All @@ -187,7 +198,7 @@ func (c *ServerHeader) toProto() *pb.GrpcLogEntry {
Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER,
Payload: &pb.GrpcLogEntry_ServerHeader{
ServerHeader: &pb.ServerHeader{
Metadata: mdToMetadataProto(c.Header),
Metadata: MdToMetadataProto(c.Header),
},
},
}
Expand All @@ -197,7 +208,7 @@ func (c *ServerHeader) toProto() *pb.GrpcLogEntry {
ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
}
if c.PeerAddr != nil {
ret.Peer = addrToProto(c.PeerAddr)
ret.Peer = AddrToProto(c.PeerAddr)
}
return ret
}
Expand Down Expand Up @@ -331,7 +342,7 @@ func (c *ServerTrailer) toProto() *pb.GrpcLogEntry {
Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER,
Payload: &pb.GrpcLogEntry_Trailer{
Trailer: &pb.Trailer{
Metadata: mdToMetadataProto(c.Trailer),
Metadata: MdToMetadataProto(c.Trailer),
StatusCode: uint32(st.Code()),
StatusMessage: st.Message(),
StatusDetails: detailsBytes,
Expand All @@ -344,7 +355,7 @@ func (c *ServerTrailer) toProto() *pb.GrpcLogEntry {
ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
}
if c.PeerAddr != nil {
ret.Peer = addrToProto(c.PeerAddr)
ret.Peer = AddrToProto(c.PeerAddr)
}
return ret
}
Expand Down Expand Up @@ -379,7 +390,7 @@ func metadataKeyOmit(key string) bool {
return strings.HasPrefix(key, "grpc-")
}

func mdToMetadataProto(md metadata.MD) *pb.Metadata {
func MdToMetadataProto(md metadata.MD) *pb.Metadata {
ret := &pb.Metadata{}
for k, vv := range md {
if metadataKeyOmit(k) {
Expand All @@ -397,7 +408,7 @@ func mdToMetadataProto(md metadata.MD) *pb.Metadata {
return ret
}

func addrToProto(addr net.Addr) *pb.Address {
func AddrToProto(addr net.Addr) *pb.Address {
ret := &pb.Address{}
switch a := addr.(type) {
case *net.TCPAddr:
Expand Down

0 comments on commit 275c85a

Please sign in to comment.