Skip to content

Commit

Permalink
feat: implement grpc-observability logging via binarylog
Browse files Browse the repository at this point in the history
  • Loading branch information
lidizheng committed Feb 28, 2022
1 parent ec717ca commit 072c5b2
Show file tree
Hide file tree
Showing 11 changed files with 3,793 additions and 0 deletions.
95 changes: 95 additions & 0 deletions observability/config.go
@@ -0,0 +1,95 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package observability

import (
"context"
"encoding/json"
"os"

gcplogging "cloud.google.com/go/logging"
"golang.org/x/oauth2/google"
configpb "google.golang.org/grpc/observability/internal/config"
"google.golang.org/protobuf/encoding/protojson"
)

const (
envKeyObservabilityConfig = "GRPC_OBSERVABILITY_CONFIG"
)

// gcpDefaultCredentials is the JSON loading struct used to get project id.
type gcpDefaultCredentials struct {
QuotaProjectID string `json:"quota_project_id"`
}

// fetchDefaultProjectID fetches the default GCP project id from environment.
func fetchDefaultProjectID(ctx context.Context) string {
// Step 1: Check ENV var
if s := os.Getenv("GCLOUD_PROJECT_ID"); s != "" {
return s
}
// Step 2: Check default credential
if credentials, err := google.FindDefaultCredentials(ctx, gcplogging.WriteScope); err == nil {
logger.Infof("found Google Default Credential")
// Step 2.1: Check if the ProjectID is in the plain view
if credentials.ProjectID != "" {
return credentials.ProjectID
} else if len(credentials.JSON) > 0 {
// Step 2.2: Check if the JSON form of the credentials has it
var d gcpDefaultCredentials
if err := json.Unmarshal(credentials.JSON, &d); err != nil {
logger.Infof("failed to parse default credentials JSON")
} else if d.QuotaProjectID != "" {
return d.QuotaProjectID
}
}
} else {
logger.Info("failed to locate Google Default Credential: %v", err)
}
// No default project ID found
return ""
}

// parseObservabilityConfig parses and processes the config for observability,
// currently, we only support loading config from static ENV var. But we might
// support dynamic configuration with control plane in future.
func parseObservabilityConfig(ctx context.Context) *configpb.ObservabilityConfig {
// Parse the config from ENV var
var config configpb.ObservabilityConfig
content := os.Getenv(envKeyObservabilityConfig)
if content != "" {
if err := protojson.Unmarshal([]byte(content), &config); err != nil {
logger.Warningf("failed to load observability config from env GRPC_OBSERVABILITY_CONFIG: %s", err)
}
}
// Fill in GCP project id if not present
if config.ExporterConfig == nil {
config.ExporterConfig = &configpb.ObservabilityConfig_ExporterConfig{
ProjectId: fetchDefaultProjectID(ctx),
}
} else {
// If any default exporter is required, fill the default project id
if config.ExporterConfig.ProjectId == "" {
config.ExporterConfig.ProjectId = fetchDefaultProjectID(ctx)
}
}
configJSON, _ := protojson.Marshal(&config)
logger.Infof("Using ObservabilityConfig: %v", string(configJSON))
return &config
}
130 changes: 130 additions & 0 deletions observability/exporting.go
@@ -0,0 +1,130 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package observability

import (
"context"
"encoding/json"
"fmt"

gcplogging "cloud.google.com/go/logging"
grpclogrecordpb "google.golang.org/grpc/observability/internal/logging"
)

// genericLoggingExporter is the interface of logging exporter for gRPC
// Observability. Ideally, we should use what OTEL provides, but their Golang
// implementation is in "frozen" state. So, this plugin provides a minimum
// interface to satisfy testing purposes.
type genericLoggingExporter interface {
// EmitGrpcLogRecord writes a gRPC LogRecord to cache without blocking.
EmitGrpcLogRecord(*grpclogrecordpb.GrpcLogRecord)
// Close flushes all pending data and closes the exporter.
Close() error
}

// loggingExporter is the global logging exporter, may be nil.
var loggingExporter genericLoggingExporter

type cloudLoggingExporter struct {
projectID string
client *gcplogging.Client
logger *gcplogging.Logger
}

func newCloudLoggingExporter(ctx context.Context, projectID string) *cloudLoggingExporter {
c, err := gcplogging.NewClient(ctx, fmt.Sprintf("projects/%v", projectID))
if err != nil {
logger.Errorf("failed to create cloudLoggingExporter: %v", err)
return nil
}
logger.Infof("successfully created cloudLoggingExporter")
return &cloudLoggingExporter{
projectID: projectID,
client: c,
logger: c.Logger("grpc"),
}
}

// mapLogLevelToSeverity maps the gRPC defined log level to Cloud Logging's
// Severity. The canonical definition can be found at
// https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#LogSeverity.
var logLevelToSeverity = map[grpclogrecordpb.GrpcLogRecord_LogLevel]gcplogging.Severity{
grpclogrecordpb.GrpcLogRecord_LOG_LEVEL_UNKNOWN: 0,
grpclogrecordpb.GrpcLogRecord_LOG_LEVEL_TRACE: 100, // Cloud Logging doesn't have a trace level, treated as DEBUG.
grpclogrecordpb.GrpcLogRecord_LOG_LEVEL_DEBUG: 100,
grpclogrecordpb.GrpcLogRecord_LOG_LEVEL_INFO: 200,
grpclogrecordpb.GrpcLogRecord_LOG_LEVEL_WARN: 400,
grpclogrecordpb.GrpcLogRecord_LOG_LEVEL_ERROR: 500,
grpclogrecordpb.GrpcLogRecord_LOG_LEVEL_CRITICAL: 600,
}

func (cle *cloudLoggingExporter) EmitGrpcLogRecord(l *grpclogrecordpb.GrpcLogRecord) {
var severity, ok = logLevelToSeverity[l.LogLevel]
if !ok {
logger.Errorf("invalid log level: %v", l.LogLevel)
severity = 0
}
entry := gcplogging.Entry{
Timestamp: l.Timestamp.AsTime(),
Severity: severity,
Payload: l,
}
cle.logger.Log(entry)
if logger.V(2) {
eventJSON, _ := json.Marshal(&entry)
logger.Infof("uploading event to CloudLogging: %s", eventJSON)
}
}

func (cle *cloudLoggingExporter) Close() error {
if cle.logger != nil {
if err := cle.logger.Flush(); err != nil {
return err
}
}
if cle.client != nil {
if err := cle.client.Close(); err != nil {
return err
}
}
logger.Infof("closed CloudLogging exporter")
return nil
}

func createDefaultLoggingExporter(ctx context.Context, projectID string) {
loggingExporter = newCloudLoggingExporter(ctx, projectID)
}

func closeLoggingExporter() {
if loggingExporter != nil {
if err := loggingExporter.Close(); err != nil {
logger.Infof("failed to close logging exporter: %v", err)
}
loggingExporter = nil
}
}

// emit is the wrapper for producing a log entry, hiding all the abstraction details.
func emit(l *grpclogrecordpb.GrpcLogRecord) error {
if loggingExporter == nil {
return fmt.Errorf("default logging exporter is nil")
}
loggingExporter.EmitGrpcLogRecord(l)
return nil
}
15 changes: 15 additions & 0 deletions observability/go.mod
@@ -0,0 +1,15 @@
module google.golang.org/grpc/observability

go 1.14

require (
cloud.google.com/go/logging v1.4.2
github.com/golang/protobuf v1.5.2
github.com/google/go-cmp v0.5.6 // indirect
github.com/google/uuid v1.3.0
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8
google.golang.org/grpc v1.43.0
google.golang.org/protobuf v1.27.1
)

replace google.golang.org/grpc => ../

0 comments on commit 072c5b2

Please sign in to comment.