Skip to content

Commit

Permalink
Add unit test && remove debug lines
Browse files Browse the repository at this point in the history
  • Loading branch information
lidizheng committed May 20, 2022
1 parent 157aecb commit 1dd4544
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 90 deletions.
1 change: 0 additions & 1 deletion clientconn.go
Expand Up @@ -146,7 +146,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
cc.ctx, cc.cancel = context.WithCancel(context.Background())

fmt.Printf("\n!!!defaultDialOption=%v \n\n", defaultDialOption)
for _, opt := range defaultDialOption {
opt.apply(&cc.dopts)
}
Expand Down
4 changes: 3 additions & 1 deletion gcp/observability/observability.go
Expand Up @@ -64,7 +64,9 @@ func Start(ctx context.Context) error {
if err := ensureProjectIDInObservabilityConfig(ctx, config); err != nil {
return err
}
startOpenCensus(config)

// Enabling tracing and metrics via OpenCensus
startOpenCensus(config, nil)

// Logging is controlled by the config at methods level.
return defaultLogger.Start(ctx, config)
Expand Down
156 changes: 95 additions & 61 deletions gcp/observability/observability_test.go
Expand Up @@ -28,6 +28,8 @@ import (
"testing"
"time"

"go.opencensus.io/stats/view"
"go.opencensus.io/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
configpb "google.golang.org/grpc/gcp/observability/internal/config"
Expand Down Expand Up @@ -69,6 +71,7 @@ var (
testErrorPayload = []byte{77, 97, 114, 116, 104, 97}
testErrorMessage = "test case injected error"
infinitySizeBytes int32 = 1024 * 1024 * 1024
defaultRequestCount = 24
)

type testServer struct {
Expand Down Expand Up @@ -124,6 +127,7 @@ func (fle *fakeLoggingExporter) Close() error {
type test struct {
t *testing.T
fle *fakeLoggingExporter
fe *fakeOpenCensusExporter

testServer testgrpc.TestServiceServer // nil means none
// srv and srvAddr are set once startServer is called.
Expand All @@ -141,7 +145,7 @@ func (te *test) tearDown() {
te.srv.Stop()
End()

if !te.fle.isClosed {
if te.fle != nil && !te.fle.isClosed {
te.t.Fatalf("fakeLoggingExporter not closed!")
}
}
Expand All @@ -151,8 +155,7 @@ func (te *test) tearDown() {
// modify it before calling its startServer and clientConn methods.
func newTest(t *testing.T) *test {
return &test{
t: t,
fle: &fakeLoggingExporter{t: t},
t: t,
}
}

Expand Down Expand Up @@ -194,6 +197,7 @@ func (te *test) clientConn() *grpc.ClientConn {

func (te *test) enablePluginWithConfig(config *configpb.ObservabilityConfig) {
// Injects the fake exporter for testing purposes
te.fle = &fakeLoggingExporter{t: te.t}
defaultLogger = newBinaryLogger(nil)
iblog.SetLogger(defaultLogger)
if err := defaultLogger.start(config, te.fle); err != nil {
Expand All @@ -215,14 +219,16 @@ func (te *test) enablePluginWithCaptureAll() {
})
}

func (te *test) enableOC() {
func (te *test) enableOpenCensus() {
defaultMetricsReportingInterval = time.Millisecond * 100
config := &configpb.ObservabilityConfig{
EnableCloudLogging: true,
EnableCloudTrace: true,
EnableCloudMonitoring: true,
DestinationProjectId: "grpc-testing",
EnableCloudLogging: true,
EnableCloudTrace: true,
EnableCloudMonitoring: true,
GlobalTraceSamplingRate: 1.0,
}
startOpenCensus(config)
te.fe = &fakeOpenCensusExporter{SeenViews: make(map[string]string), t: te.t}
startOpenCensus(config, te.fe)
}

func checkEventCommon(t *testing.T, seen *grpclogrecordpb.GrpcLogRecord) {
Expand Down Expand Up @@ -339,6 +345,48 @@ func checkEventTrailer(t *testing.T, seen *grpclogrecordpb.GrpcLogRecord, want *
}
}

const (
TypeOpenCensusViewDistribution string = "distribution"
TypeOpenCensusViewCount = "count"
TypeOpenCensusViewSum = "sum"
TypeOpenCensusViewLastValue = "last_value"
)

type fakeOpenCensusExporter struct {
// The map of the observed View name and type
SeenViews map[string]string
// Number of spans
SeenSpans int

t *testing.T
mu sync.RWMutex
}

func (fe *fakeOpenCensusExporter) ExportView(vd *view.Data) {
fe.mu.Lock()
defer fe.mu.Unlock()
for _, row := range vd.Rows {
fe.t.Logf("Metrics[%s]", vd.View.Name)
switch row.Data.(type) {
case *view.DistributionData:
fe.SeenViews[vd.View.Name] = TypeOpenCensusViewDistribution
case *view.CountData:
fe.SeenViews[vd.View.Name] = TypeOpenCensusViewCount
case *view.SumData:
fe.SeenViews[vd.View.Name] = TypeOpenCensusViewSum
case *view.LastValueData:
fe.SeenViews[vd.View.Name] = TypeOpenCensusViewLastValue
}
}
}

func (fe *fakeOpenCensusExporter) ExportSpan(vd *trace.SpanData) {
fe.mu.Lock()
defer fe.mu.Unlock()
fe.SeenSpans += 1
fe.t.Logf("Span[%v]", vd.Name)
}

func (s) TestLoggingForOkCall(t *testing.T) {
te := newTest(t)
defer te.tearDown()
Expand Down Expand Up @@ -654,71 +702,57 @@ func (s) TestNoEnvSet(t *testing.T) {
func (s) TestOpenCensusIntegration(t *testing.T) {
te := newTest(t)
defer te.tearDown()
te.enableOC()
te.enableOpenCensus()
te.startServer(&testServer{})
tc := testgrpc.NewTestServiceClient(te.clientConn())

var (
resp *testpb.SimpleResponse
req *testpb.SimpleRequest
err error
req *testpb.SimpleRequest
err error

validClientViews, validServerViews, validSpans bool
)
req = &testpb.SimpleRequest{Payload: &testpb.Payload{Body: testOkPayload}}
for {
tCtx, _ := context.WithTimeout(context.Background(), defaultTestTimeout)
resp, err = tc.UnaryCall(metadata.NewOutgoingContext(tCtx, testHeaderMetadata), req)
for i := 0; i < defaultRequestCount; i++ {
req = &testpb.SimpleRequest{Payload: &testpb.Payload{Body: testOkPayload}}
tCtx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
_, err = tc.UnaryCall(metadata.NewOutgoingContext(tCtx, testHeaderMetadata), req)
if err != nil {
t.Fatalf("unary call failed: %v", err)
}
t.Logf("unary call passed: %v", resp)
time.Sleep(time.Millisecond * 500)
}
t.Logf("unary call passed count=%v", defaultRequestCount)

// Wait for the gRPC transport to gracefully close to ensure no lost event.
te.cc.Close()
te.srv.GracefulStop()
// Check size of events
if len(te.fle.clientEvents) != 5 {
t.Fatalf("expects 5 client events, got %d", len(te.fle.clientEvents))

deadline := time.Now().Add(defaultTestTimeout)
for time.Now().Before(deadline) {
validClientViews = false
validServerViews = false
validSpans = false
te.fe.mu.RLock()
if value, ok := te.fe.SeenViews["grpc.io/client/completed_rpcs"]; ok {
if value == TypeOpenCensusViewCount {
validClientViews = true
}
}
if value, ok := te.fe.SeenViews["grpc.io/server/completed_rpcs"]; ok {
if value == TypeOpenCensusViewCount {
validServerViews = true
}
}
if te.fe.SeenSpans > 0 {
validSpans = true
}
te.fe.mu.RUnlock()
if validClientViews && validServerViews && validSpans {
break
}
time.Sleep(100 * time.Millisecond)
}
if len(te.fle.serverEvents) != 5 {
t.Fatalf("expects 5 server events, got %d", len(te.fle.serverEvents))
if !validClientViews || !validServerViews || !validSpans {
t.Fatalf("Invalid OpenCensus export data: validClientViews=%v validServerViews=%v validSpans=%v", validClientViews, validServerViews, validSpans)
}
// Client events
checkEventRequestHeader(te.t, te.fle.clientEvents[0], &grpclogrecordpb.GrpcLogRecord{
EventLogger: grpclogrecordpb.GrpcLogRecord_LOGGER_CLIENT,
Authority: te.srvAddr,
ServiceName: "grpc.testing.TestService",
MethodName: "UnaryCall",
})
checkEventRequestMessage(te.t, te.fle.clientEvents[1], &grpclogrecordpb.GrpcLogRecord{
EventLogger: grpclogrecordpb.GrpcLogRecord_LOGGER_CLIENT,
}, testOkPayload)
checkEventResponseHeader(te.t, te.fle.clientEvents[2], &grpclogrecordpb.GrpcLogRecord{
EventLogger: grpclogrecordpb.GrpcLogRecord_LOGGER_CLIENT,
})
checkEventResponseMessage(te.t, te.fle.clientEvents[3], &grpclogrecordpb.GrpcLogRecord{
EventLogger: grpclogrecordpb.GrpcLogRecord_LOGGER_CLIENT,
}, testOkPayload)
checkEventTrailer(te.t, te.fle.clientEvents[4], &grpclogrecordpb.GrpcLogRecord{
EventLogger: grpclogrecordpb.GrpcLogRecord_LOGGER_CLIENT,
StatusCode: 0,
})
// Server events
checkEventRequestHeader(te.t, te.fle.serverEvents[0], &grpclogrecordpb.GrpcLogRecord{
EventLogger: grpclogrecordpb.GrpcLogRecord_LOGGER_SERVER,
})
checkEventRequestMessage(te.t, te.fle.serverEvents[1], &grpclogrecordpb.GrpcLogRecord{
EventLogger: grpclogrecordpb.GrpcLogRecord_LOGGER_SERVER,
}, testOkPayload)
checkEventResponseHeader(te.t, te.fle.serverEvents[2], &grpclogrecordpb.GrpcLogRecord{
EventLogger: grpclogrecordpb.GrpcLogRecord_LOGGER_SERVER,
})
checkEventResponseMessage(te.t, te.fle.serverEvents[3], &grpclogrecordpb.GrpcLogRecord{
EventLogger: grpclogrecordpb.GrpcLogRecord_LOGGER_SERVER,
}, testOkPayload)
checkEventTrailer(te.t, te.fle.serverEvents[4], &grpclogrecordpb.GrpcLogRecord{
EventLogger: grpclogrecordpb.GrpcLogRecord_LOGGER_SERVER,
StatusCode: 0,
})
}
46 changes: 21 additions & 25 deletions gcp/observability/opencensus.go
Expand Up @@ -23,7 +23,8 @@ import (
"time"

"contrib.go.opencensus.io/exporter/stackdriver"
"go.opencensus.io/examples/exporter"
"contrib.go.opencensus.io/exporter/stackdriver/monitoredresource"

"go.opencensus.io/plugin/ocgrpc"
"go.opencensus.io/stats/view"
"go.opencensus.io/trace"
Expand All @@ -32,41 +33,40 @@ import (
"google.golang.org/grpc/internal"
)

const (
var (
// It's a variable instead of const to speed up testing
defaultMetricsReportingInterval = time.Second * 30
)

func startOpenCensus(config *configpb.ObservabilityConfig) error {
// This method accepts config and exporter; the exporter argument is exposed to
// assist unit testing of the OpenCensus behavior.
func startOpenCensus(config *configpb.ObservabilityConfig, exporter interface{}) error {
// If both tracing and metrics are disabled, there's no point inject default
// StatsHandler.
if config == nil || (!config.EnableCloudTrace && !config.EnableCloudMonitoring) {
return nil
}

var (
se *stackdriver.Exporter
so trace.StartOptions
err error
)

// Create the Stackdriver exporter, which is shared between tracing and stats
// mr := monitoredresource.Autodetect()
// logger.Infof("Detected MonitoredResource:: %+v", mr)
if se, err = stackdriver.NewExporter(stackdriver.Options{
ProjectID: config.DestinationProjectId,
// MonitoredResource: mr,
}); err != nil {
return err
if exporter == nil {
// Create the Stackdriver exporter, which is shared between tracing and stats
mr := monitoredresource.Autodetect()
logger.Infof("Detected MonitoredResource:: %+v", mr)
if exporter, err = stackdriver.NewExporter(stackdriver.Options{
ProjectID: config.DestinationProjectId,
MonitoredResource: mr,
}); err != nil {
return fmt.Errorf("failed to create Stackdriver exporter: %v", err)
}
}

if config.EnableCloudTrace {
so.Sampler = trace.ProbabilitySampler(config.GlobalTraceSamplingRate)
trace.RegisterExporter(se)
logger.Infof("Start tracing collection and exporting with global_trace_sampling_rate=%.2f", config.GlobalTraceSamplingRate)
if logger.V(2) {
// Following exporter prints all collected metrics
trace.RegisterExporter(&exporter.PrintExporter{})
}
trace.RegisterExporter(exporter.(trace.Exporter))
logger.Infof("Start collecting and exporting trace spans with global_trace_sampling_rate=%.2f", config.GlobalTraceSamplingRate)
}

if config.EnableCloudMonitoring {
Expand All @@ -77,12 +77,8 @@ func startOpenCensus(config *configpb.ObservabilityConfig) error {
return fmt.Errorf("failed to register default server views: %v", err)
}
view.SetReportingPeriod(defaultMetricsReportingInterval)
if logger.V(2) {
// Following exporter prints all collected metrics
view.RegisterExporter(&exporter.PrintExporter{})
}
view.RegisterExporter(se)
logger.Infof("Start metrics collection and exporting")
view.RegisterExporter(exporter.(view.Exporter))
logger.Infof("Start collecting and exporting metrics")
}

// Only register default StatsHandlers if other things are setup correctly.
Expand Down
2 changes: 0 additions & 2 deletions stream.go
Expand Up @@ -21,7 +21,6 @@ package grpc
import (
"context"
"errors"
"fmt"
"io"
"math"
"strconv"
Expand Down Expand Up @@ -376,7 +375,6 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error)
ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.cp, cs.comp)
method := cs.callHdr.Method
var beginTime time.Time
fmt.Printf("!!!!!cs.cc.dopts.copts.StatsHandler=%v", cs.cc.dopts.copts.StatsHandler)
if len(cs.cc.dopts.copts.StatsHandler) != 0 {
for _, sh := range cs.cc.dopts.copts.StatsHandler {
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast})
Expand Down

0 comments on commit 1dd4544

Please sign in to comment.