Skip to content

Commit

Permalink
gcp/observability: implement logging via binarylog (#5196)
Browse files Browse the repository at this point in the history
  • Loading branch information
lidizheng committed Apr 6, 2022
1 parent 18fdf54 commit 4467a29
Show file tree
Hide file tree
Showing 16 changed files with 3,425 additions and 58 deletions.
102 changes: 102 additions & 0 deletions gcp/observability/config.go
@@ -0,0 +1,102 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package observability

import (
"context"
"fmt"
"os"
"regexp"

gcplogging "cloud.google.com/go/logging"
"golang.org/x/oauth2/google"
configpb "google.golang.org/grpc/observability/internal/config"
"google.golang.org/protobuf/encoding/protojson"
)

const (
envObservabilityConfig = "GRPC_CONFIG_OBSERVABILITY"
envProjectID = "GOOGLE_CLOUD_PROJECT"
logFilterPatternRegexpStr = `^([\w./]+)/((?:\w+)|[*])$`
)

var logFilterPatternRegexp = regexp.MustCompile(logFilterPatternRegexpStr)

// fetchDefaultProjectID fetches the default GCP project id from environment.
func fetchDefaultProjectID(ctx context.Context) string {
// Step 1: Check ENV var
if s := os.Getenv(envProjectID); s != "" {
logger.Infof("Found project ID from env %v: %v", envProjectID, s)
return s
}
// Step 2: Check default credential
credentials, err := google.FindDefaultCredentials(ctx, gcplogging.WriteScope)
if err != nil {
logger.Infof("Failed to locate Google Default Credential: %v", err)
return ""
}
if credentials.ProjectID == "" {
logger.Infof("Failed to find project ID in default credential: %v", err)
return ""
}
logger.Infof("Found project ID from Google Default Credential: %v", credentials.ProjectID)
return credentials.ProjectID
}

func validateFilters(config *configpb.ObservabilityConfig) error {
for _, filter := range config.GetLogFilters() {
if filter.Pattern == "*" {
continue
}
match := logFilterPatternRegexp.FindStringSubmatch(filter.Pattern)
if match == nil {
return fmt.Errorf("invalid log filter pattern: %v", filter.Pattern)
}
}
return nil
}

func parseObservabilityConfig() (*configpb.ObservabilityConfig, error) {
// Parse the config from ENV var
if content := os.Getenv(envObservabilityConfig); content != "" {
var config configpb.ObservabilityConfig
if err := protojson.Unmarshal([]byte(content), &config); err != nil {
return nil, fmt.Errorf("error parsing observability config from env %v: %v", envObservabilityConfig, err)
}
if err := validateFilters(&config); err != nil {
return nil, fmt.Errorf("error parsing observability config: %v", err)
}
logger.Infof("Parsed ObservabilityConfig: %+v", &config)
return &config, nil
}
// If the ENV var doesn't exist, do nothing
return nil, nil
}

func ensureProjectIDInObservabilityConfig(ctx context.Context, config *configpb.ObservabilityConfig) error {
if config.GetDestinationProjectId() == "" {
// Try to fetch the GCP project id
projectID := fetchDefaultProjectID(ctx)
if projectID == "" {
return fmt.Errorf("empty destination project ID")
}
config.DestinationProjectId = projectID
}
return nil
}
128 changes: 128 additions & 0 deletions gcp/observability/exporting.go
@@ -0,0 +1,128 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package observability

import (
"context"
"encoding/json"
"fmt"
"os"

gcplogging "cloud.google.com/go/logging"
grpclogrecordpb "google.golang.org/grpc/observability/internal/logging"
"google.golang.org/protobuf/encoding/protojson"
)

// loggingExporter is the interface of logging exporter for gRPC Observability.
// In future, we might expose this to allow users provide custom exporters. But
// now, it exists for testing purposes.
type loggingExporter interface {
// EmitGrpcLogRecord writes a gRPC LogRecord to cache without blocking.
EmitGrpcLogRecord(*grpclogrecordpb.GrpcLogRecord)
// Close flushes all pending data and closes the exporter.
Close() error
}

type cloudLoggingExporter struct {
projectID string
client *gcplogging.Client
logger *gcplogging.Logger
}

func newCloudLoggingExporter(ctx context.Context, projectID string) (*cloudLoggingExporter, error) {
c, err := gcplogging.NewClient(ctx, fmt.Sprintf("projects/%v", projectID))
if err != nil {
return nil, fmt.Errorf("failed to create cloudLoggingExporter: %v", err)
}
defer logger.Infof("Successfully created cloudLoggingExporter")
customTags := getCustomTags(os.Environ())
if len(customTags) != 0 {
logger.Infof("Adding custom tags: %+v", customTags)
}
return &cloudLoggingExporter{
projectID: projectID,
client: c,
logger: c.Logger("grpc", gcplogging.CommonLabels(customTags)),
}, nil
}

// mapLogLevelToSeverity maps the gRPC defined log level to Cloud Logging's
// Severity. The canonical definition can be found at
// https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#LogSeverity.
var logLevelToSeverity = map[grpclogrecordpb.GrpcLogRecord_LogLevel]gcplogging.Severity{
grpclogrecordpb.GrpcLogRecord_LOG_LEVEL_UNKNOWN: 0,
grpclogrecordpb.GrpcLogRecord_LOG_LEVEL_TRACE: 100, // Cloud Logging doesn't have a trace level, treated as DEBUG.
grpclogrecordpb.GrpcLogRecord_LOG_LEVEL_DEBUG: 100,
grpclogrecordpb.GrpcLogRecord_LOG_LEVEL_INFO: 200,
grpclogrecordpb.GrpcLogRecord_LOG_LEVEL_WARN: 400,
grpclogrecordpb.GrpcLogRecord_LOG_LEVEL_ERROR: 500,
grpclogrecordpb.GrpcLogRecord_LOG_LEVEL_CRITICAL: 600,
}

var protoToJSONOptions = &protojson.MarshalOptions{
UseProtoNames: true,
UseEnumNumbers: false,
}

func (cle *cloudLoggingExporter) EmitGrpcLogRecord(l *grpclogrecordpb.GrpcLogRecord) {
// Converts the log record content to a more readable format via protojson.
jsonBytes, err := protoToJSONOptions.Marshal(l)
if err != nil {
logger.Infof("Unable to marshal log record: %v", l)
return
}
var payload map[string]interface{}
err = json.Unmarshal(jsonBytes, &payload)
if err != nil {
logger.Infof("Unable to unmarshal bytes to JSON: %v", jsonBytes)
return
}
entry := gcplogging.Entry{
Timestamp: l.Timestamp.AsTime(),
Severity: logLevelToSeverity[l.LogLevel],
Payload: payload,
}
cle.logger.Log(entry)
if logger.V(2) {
logger.Infof("Uploading event to CloudLogging: %+v", entry)
}
}

func (cle *cloudLoggingExporter) Close() error {
var errFlush, errClose error
if cle.logger != nil {
errFlush = cle.logger.Flush()
}
if cle.client != nil {
errClose = cle.client.Close()
}
if errFlush != nil && errClose != nil {
return fmt.Errorf("failed to close exporter. Flush failed: %v; Close failed: %v", errFlush, errClose)
}
if errFlush != nil {
return errFlush
}
if errClose != nil {
return errClose
}
cle.logger = nil
cle.client = nil
logger.Infof("Closed CloudLogging exporter")
return nil
}
14 changes: 14 additions & 0 deletions gcp/observability/go.mod
@@ -0,0 +1,14 @@
module google.golang.org/grpc/observability

go 1.14

require (
cloud.google.com/go/logging v1.4.2
github.com/golang/protobuf v1.5.2
github.com/google/uuid v1.3.0
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8
google.golang.org/grpc v1.43.0
google.golang.org/protobuf v1.27.1
)

replace google.golang.org/grpc => ../../

0 comments on commit 4467a29

Please sign in to comment.