Skip to content

Commit

Permalink
gcp/observability: implement public preview config syntax, logging sc…
Browse files Browse the repository at this point in the history
…hema, and exposed metrics (#5704)
  • Loading branch information
zasweq committed Oct 12, 2022
1 parent 8062981 commit 8b3b10b
Show file tree
Hide file tree
Showing 16 changed files with 1,904 additions and 2,099 deletions.
246 changes: 163 additions & 83 deletions gcp/observability/config.go
Expand Up @@ -21,86 +21,23 @@ package observability
import (
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"os"
"regexp"

gcplogging "cloud.google.com/go/logging"
"golang.org/x/oauth2/google"
"google.golang.org/grpc/internal/envconfig"
)

const (
envObservabilityConfig = "GRPC_CONFIG_OBSERVABILITY"
envObservabilityConfigJSON = "GRPC_CONFIG_OBSERVABILITY_JSON"
envProjectID = "GOOGLE_CLOUD_PROJECT"
logFilterPatternRegexpStr = `^([\w./]+)/((?:\w+)|[*])$`
envProjectID = "GOOGLE_CLOUD_PROJECT"
methodStringRegexpStr = `^([\w./]+)/((?:\w+)|[*])$`
)

var logFilterPatternRegexp = regexp.MustCompile(logFilterPatternRegexpStr)

// logFilter represents a method logging configuration.
type logFilter struct {
// Pattern is a string which can select a group of method names. By
// default, the Pattern is an empty string, matching no methods.
//
// Only "*" Wildcard is accepted for Pattern. A Pattern is in the form
// of <service>/<method> or just a character "*" .
//
// If the Pattern is "*", it specifies the defaults for all the
// services; If the Pattern is <service>/*, it specifies the defaults
// for all methods in the specified service <service>; If the Pattern is
// */<method>, this is not supported.
//
// Examples:
// - "Foo/Bar" selects only the method "Bar" from service "Foo"
// - "Foo/*" selects all methods from service "Foo"
// - "*" selects all methods from all services.
Pattern string `json:"pattern,omitempty"`
// HeaderBytes is the number of bytes of each header to log. If the size of
// the header is greater than the defined limit, content past the limit will
// be truncated. The default value is 0.
HeaderBytes int32 `json:"header_bytes,omitempty"`
// MessageBytes is the number of bytes of each message to log. If the size
// of the message is greater than the defined limit, content pass the limit
// will be truncated. The default value is 0.
MessageBytes int32 `json:"message_bytes,omitempty"`
}

// config is configuration for observability behaviors. By default, no
// configuration is required for tracing/metrics/logging to function. This
// config captures the most common knobs for gRPC users. It's always possible to
// override with explicit config in code.
type config struct {
// EnableCloudTrace represents whether the tracing data upload to
// CloudTrace should be enabled or not.
EnableCloudTrace bool `json:"enable_cloud_trace,omitempty"`
// EnableCloudMonitoring represents whether the metrics data upload to
// CloudMonitoring should be enabled or not.
EnableCloudMonitoring bool `json:"enable_cloud_monitoring,omitempty"`
// EnableCloudLogging represents Whether the logging data upload to
// CloudLogging should be enabled or not.
EnableCloudLogging bool `json:"enable_cloud_logging,omitempty"`
// DestinationProjectID is the destination GCP project identifier for the
// uploading log entries. If empty, the gRPC Observability plugin will
// attempt to fetch the project_id from the GCP environment variables, or
// from the default credentials.
DestinationProjectID string `json:"destination_project_id,omitempty"`
// LogFilters is a list of method config. The order matters here - the first
// Pattern which matches the current method will apply the associated config
// options in the logFilter. Any other logFilter that also matches that
// comes later will be ignored. So a logFilter of "*/*" should appear last
// in this list.
LogFilters []logFilter `json:"log_filters,omitempty"`
// GlobalTraceSamplingRate is the global setting that controls the
// probability of a RPC being traced. For example, 0.05 means there is a 5%
// chance for a RPC to be traced, 1.0 means trace every call, 0 means don’t
// start new traces.
GlobalTraceSamplingRate float64 `json:"global_trace_sampling_rate,omitempty"`
// CustomTags a list of custom tags that will be attached to every log
// entry.
CustomTags map[string]string `json:"custom_tags,omitempty"`
}
var methodStringRegexp = regexp.MustCompile(methodStringRegexpStr)

// fetchDefaultProjectID fetches the default GCP project id from environment.
func fetchDefaultProjectID(ctx context.Context) string {
Expand All @@ -123,14 +60,34 @@ func fetchDefaultProjectID(ctx context.Context) string {
return credentials.ProjectID
}

func validateFilters(config *config) error {
for _, filter := range config.LogFilters {
if filter.Pattern == "*" {
func validateLogEventMethod(methods []string, exclude bool) error {
for _, method := range methods {
if method == "*" {
if exclude {
return errors.New("cannot have exclude and a '*' wildcard")
}
continue
}
match := logFilterPatternRegexp.FindStringSubmatch(filter.Pattern)
match := methodStringRegexp.FindStringSubmatch(method)
if match == nil {
return fmt.Errorf("invalid log filter Pattern: %v", filter.Pattern)
return fmt.Errorf("invalid method string: %v", method)
}
}
return nil
}

func validateLoggingEvents(config *config) error {
if config.CloudLogging == nil {
return nil
}
for _, clientRPCEvent := range config.CloudLogging.ClientRPCEvents {
if err := validateLogEventMethod(clientRPCEvent.Methods, clientRPCEvent.Exclude); err != nil {
return fmt.Errorf("error in clientRPCEvent method: %v", err)
}
}
for _, serverRPCEvent := range config.CloudLogging.ServerRPCEvents {
if err := validateLogEventMethod(serverRPCEvent.Methods, serverRPCEvent.Exclude); err != nil {
return fmt.Errorf("error in serverRPCEvent method: %v", err)
}
}
return nil
Expand All @@ -144,38 +101,161 @@ func unmarshalAndVerifyConfig(rawJSON json.RawMessage) (*config, error) {
if err := json.Unmarshal(rawJSON, &config); err != nil {
return nil, fmt.Errorf("error parsing observability config: %v", err)
}
if err := validateFilters(&config); err != nil {
if err := validateLoggingEvents(&config); err != nil {
return nil, fmt.Errorf("error parsing observability config: %v", err)
}
if config.GlobalTraceSamplingRate > 1 || config.GlobalTraceSamplingRate < 0 {
return nil, fmt.Errorf("error parsing observability config: invalid global trace sampling rate %v", config.GlobalTraceSamplingRate)
if config.CloudTrace != nil && (config.CloudTrace.SamplingRate > 1 || config.CloudTrace.SamplingRate < 0) {
return nil, fmt.Errorf("error parsing observability config: invalid cloud trace sampling rate %v", config.CloudTrace.SamplingRate)
}
logger.Infof("Parsed ObservabilityConfig: %+v", &config)
return &config, nil
}

func parseObservabilityConfig() (*config, error) {
if fileSystemPath := os.Getenv(envObservabilityConfigJSON); fileSystemPath != "" {
content, err := ioutil.ReadFile(fileSystemPath) // TODO: Switch to os.ReadFile once dropped support for go 1.15
if f := envconfig.ObservabilityConfigFile; f != "" {
if envconfig.ObservabilityConfig != "" {
logger.Warning("Ignoring GRPC_GCP_OBSERVABILITY_CONFIG and using GRPC_GCP_OBSERVABILITY_CONFIG_FILE contents.")
}
content, err := ioutil.ReadFile(f) // TODO: Switch to os.ReadFile once dropped support for go 1.15
if err != nil {
return nil, fmt.Errorf("error reading observability configuration file %q: %v", fileSystemPath, err)
return nil, fmt.Errorf("error reading observability configuration file %q: %v", f, err)
}
return unmarshalAndVerifyConfig(content)
} else if content := os.Getenv(envObservabilityConfig); content != "" {
return unmarshalAndVerifyConfig([]byte(content))
} else if envconfig.ObservabilityConfig != "" {
return unmarshalAndVerifyConfig([]byte(envconfig.ObservabilityConfig))
}
// If the ENV var doesn't exist, do nothing
return nil, nil
}

func ensureProjectIDInObservabilityConfig(ctx context.Context, config *config) error {
if config.DestinationProjectID == "" {
if config.ProjectID == "" {
// Try to fetch the GCP project id
projectID := fetchDefaultProjectID(ctx)
if projectID == "" {
return fmt.Errorf("empty destination project ID")
}
config.DestinationProjectID = projectID
config.ProjectID = projectID
}
return nil
}

type clientRPCEvents struct {
// Methods is a list of strings which can select a group of methods. By
// default, the list is empty, matching no methods.
//
// The value of the method is in the form of <service>/<method>.
//
// "*" is accepted as a wildcard for:
// 1. The method name. If the value is <service>/*, it matches all
// methods in the specified service.
// 2. The whole value of the field which matches any <service>/<method>.
// It’s not supported when Exclude is true.
// 3. The * wildcard cannot be used on the service name independently,
// */<method> is not supported.
//
// The service name, when specified, must be the fully qualified service
// name, including the package name.
//
// Examples:
// 1."goo.Foo/Bar" selects only the method "Bar" from service "goo.Foo",
// here “goo” is the package name.
// 2."goo.Foo/*" selects all methods from service "goo.Foo"
// 3. "*" selects all methods from all services.
Methods []string `json:"method,omitempty"`
// Exclude represents whether the methods denoted by Methods should be
// excluded from logging. The default value is false, meaning the methods
// denoted by Methods are included in the logging. If Exclude is true, the
// wildcard `*` cannot be used as value of an entry in Methods.
Exclude bool `json:"exclude,omitempty"`
// MaxMetadataBytes is the maximum number of bytes of each header to log. If
// the size of the metadata is greater than the defined limit, content past
// the limit will be truncated. The default value is 0.
MaxMetadataBytes int `json:"max_metadata_bytes"`
// MaxMessageBytes is the maximum number of bytes of each message to log. If
// the size of the message is greater than the defined limit, content past
// the limit will be truncated. The default value is 0.
MaxMessageBytes int `json:"max_message_bytes"`
}

type serverRPCEvents struct {
// Methods is a list of strings which can select a group of methods. By
// default, the list is empty, matching no methods.
//
// The value of the method is in the form of <service>/<method>.
//
// "*" is accepted as a wildcard for:
// 1. The method name. If the value is <service>/*, it matches all
// methods in the specified service.
// 2. The whole value of the field which matches any <service>/<method>.
// It’s not supported when Exclude is true.
// 3. The * wildcard cannot be used on the service name independently,
// */<method> is not supported.
//
// The service name, when specified, must be the fully qualified service
// name, including the package name.
//
// Examples:
// 1."goo.Foo/Bar" selects only the method "Bar" from service "goo.Foo",
// here “goo” is the package name.
// 2."goo.Foo/*" selects all methods from service "goo.Foo"
// 3. "*" selects all methods from all services.
Methods []string `json:"method,omitempty"`
// Exclude represents whether the methods denoted by Methods should be
// excluded from logging. The default value is false, meaning the methods
// denoted by Methods are included in the logging. If Exclude is true, the
// wildcard `*` cannot be used as value of an entry in Methods.
Exclude bool `json:"exclude,omitempty"`
// MaxMetadataBytes is the maximum number of bytes of each header to log. If
// the size of the metadata is greater than the defined limit, content past
// the limit will be truncated. The default value is 0.
MaxMetadataBytes int `json:"max_metadata_bytes"`
// MaxMessageBytes is the maximum number of bytes of each message to log. If
// the size of the message is greater than the defined limit, content past
// the limit will be truncated. The default value is 0.
MaxMessageBytes int `json:"max_message_bytes"`
}

type cloudLogging struct {
// ClientRPCEvents represents the configuration for outgoing RPC's from the
// binary. The client_rpc_events configs are evaluated in text order, the
// first one matched is used. If an RPC doesn't match an entry, it will
// continue on to the next entry in the list.
ClientRPCEvents []clientRPCEvents `json:"client_rpc_events,omitempty"`

// ServerRPCEvents represents the configuration for incoming RPC's to the
// binary. The server_rpc_events configs are evaluated in text order, the
// first one matched is used. If an RPC doesn't match an entry, it will
// continue on to the next entry in the list.
ServerRPCEvents []serverRPCEvents `json:"server_rpc_events,omitempty"`
}

type cloudMonitoring struct{}

type cloudTrace struct {
// SamplingRate is the global setting that controls the probability of a RPC
// being traced. For example, 0.05 means there is a 5% chance for a RPC to
// be traced, 1.0 means trace every call, 0 means don’t start new traces. By
// default, the sampling_rate is 0.
SamplingRate float64 `json:"sampling_rate,omitempty"`
}

type config struct {
// ProjectID is the destination GCP project identifier for uploading log
// entries. If empty, the gRPC Observability plugin will attempt to fetch
// the project_id from the GCP environment variables, or from the default
// credentials. If not found, the observability init functions will return
// an error.
ProjectID string `json:"project_id,omitempty"`
// CloudLogging defines the logging options. If not present, logging is disabled.
CloudLogging *cloudLogging `json:"cloud_logging,omitempty"`
// CloudMonitoring determines whether or not metrics are enabled based on
// whether it is present or not. If present, monitoring will be enabled, if
// not present, monitoring is disabled.
CloudMonitoring *cloudMonitoring `json:"cloud_monitoring,omitempty"`
// CloudTrace defines the tracing options. When present, tracing is enabled
// with default configurations. When absent, the tracing is disabled.
CloudTrace *cloudTrace `json:"cloud_trace,omitempty"`
// Labels are applied to cloud logging, monitoring, and trace.
Labels map[string]string `json:"labels,omitempty"`
}
54 changes: 8 additions & 46 deletions gcp/observability/exporting.go
Expand Up @@ -20,20 +20,17 @@ package observability

import (
"context"
"encoding/json"
"fmt"

gcplogging "cloud.google.com/go/logging"
grpclogrecordpb "google.golang.org/grpc/gcp/observability/internal/logging"
"google.golang.org/protobuf/encoding/protojson"
)

// loggingExporter is the interface of logging exporter for gRPC Observability.
// In future, we might expose this to allow users provide custom exporters. But
// now, it exists for testing purposes.
type loggingExporter interface {
// EmitGrpcLogRecord writes a gRPC LogRecord to cache without blocking.
EmitGrpcLogRecord(*grpclogrecordpb.GrpcLogRecord)
EmitGcpLoggingEntry(entry gcplogging.Entry)
// Close flushes all pending data and closes the exporter.
Close() error
}
Expand All @@ -44,58 +41,23 @@ type cloudLoggingExporter struct {
logger *gcplogging.Logger
}

func newCloudLoggingExporter(ctx context.Context, config *config) (*cloudLoggingExporter, error) {
c, err := gcplogging.NewClient(ctx, fmt.Sprintf("projects/%v", config.DestinationProjectID))
func newCloudLoggingExporter(ctx context.Context, config *config) (loggingExporter, error) {
c, err := gcplogging.NewClient(ctx, fmt.Sprintf("projects/%v", config.ProjectID))
if err != nil {
return nil, fmt.Errorf("failed to create cloudLoggingExporter: %v", err)
}
defer logger.Infof("Successfully created cloudLoggingExporter")
if len(config.CustomTags) != 0 {
logger.Infof("Adding custom tags: %+v", config.CustomTags)
if len(config.Labels) != 0 {
logger.Infof("Adding labels: %+v", config.Labels)
}
return &cloudLoggingExporter{
projectID: config.DestinationProjectID,
projectID: config.ProjectID,
client: c,
logger: c.Logger("microservices.googleapis.com/observability/grpc", gcplogging.CommonLabels(config.CustomTags)),
logger: c.Logger("microservices.googleapis.com/observability/grpc", gcplogging.CommonLabels(config.Labels)),
}, nil
}

// mapLogLevelToSeverity maps the gRPC defined log level to Cloud Logging's
// Severity. The canonical definition can be found at
// https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#LogSeverity.
var logLevelToSeverity = map[grpclogrecordpb.GrpcLogRecord_LogLevel]gcplogging.Severity{
grpclogrecordpb.GrpcLogRecord_LOG_LEVEL_UNKNOWN: 0,
grpclogrecordpb.GrpcLogRecord_LOG_LEVEL_TRACE: 100, // Cloud Logging doesn't have a trace level, treated as DEBUG.
grpclogrecordpb.GrpcLogRecord_LOG_LEVEL_DEBUG: 100,
grpclogrecordpb.GrpcLogRecord_LOG_LEVEL_INFO: 200,
grpclogrecordpb.GrpcLogRecord_LOG_LEVEL_WARN: 400,
grpclogrecordpb.GrpcLogRecord_LOG_LEVEL_ERROR: 500,
grpclogrecordpb.GrpcLogRecord_LOG_LEVEL_CRITICAL: 600,
}

var protoToJSONOptions = &protojson.MarshalOptions{
UseProtoNames: true,
UseEnumNumbers: false,
}

func (cle *cloudLoggingExporter) EmitGrpcLogRecord(l *grpclogrecordpb.GrpcLogRecord) {
// Converts the log record content to a more readable format via protojson.
jsonBytes, err := protoToJSONOptions.Marshal(l)
if err != nil {
logger.Infof("Unable to marshal log record: %v", l)
return
}
var payload map[string]interface{}
err = json.Unmarshal(jsonBytes, &payload)
if err != nil {
logger.Infof("Unable to unmarshal bytes to JSON: %v", jsonBytes)
return
}
entry := gcplogging.Entry{
Timestamp: l.Timestamp.AsTime(),
Severity: logLevelToSeverity[l.LogLevel],
Payload: payload,
}
func (cle *cloudLoggingExporter) EmitGcpLoggingEntry(entry gcplogging.Entry) {
cle.logger.Log(entry)
if logger.V(2) {
logger.Infof("Uploading event to CloudLogging: %+v", entry)
Expand Down

0 comments on commit 8b3b10b

Please sign in to comment.