Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

examples: add an example to illustrate the usage of stats handler #5657

Merged
merged 17 commits into from Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 24 additions & 0 deletions examples/features/stats_monitoring/README.md
@@ -0,0 +1,24 @@
# Stats Monitoring Handler

This example demonstrates the use of [stats package](https://pkg.go.dev/google.golang.org/grpc/stats) for reporting various network and RPC stats.
_Note that all fields are READ-ONLY and the APIs of `stats package` are experimental_.
Yash-Handa marked this conversation as resolved.
Show resolved Hide resolved

## Try it

```
go run server/main.go
```

```
go run client/main.go
```

## Explanation

gRPC provides a mechanism to hook on to various events (phases) of the request-response network cycle through [`stats.Handler`](https://pkg.go.dev/google.golang.org/grpc/stats#Handler) interface. To access these events, a concrete type that implements `stats.Handler` should be passed to `grpc.WithStatsHandler()` on the client side and `grpc.StatsHandler()` on the server side.
Yash-Handa marked this conversation as resolved.
Show resolved Hide resolved

The `HandleRPC(context.Context, RPCStats)` method on `stats.Handler` is called multiple times during a request-response cycle, and various event stats are passed to its `RPCStats` parameter (an interface). The concrete types that implement this interface are: `*stats.Begin`, `*stats.InHeader`, `*stats.InPayload`, `*stats.InTrailer`, `*stats.OutHeader`, `*stats.OutPayload`, `*stats.OutTrailer`, and `*stats.End`. The order of these events differs on client and server.
Yash-Handa marked this conversation as resolved.
Show resolved Hide resolved

Similarly, the `HandleConn(context.Context, ConnStats)` method on `stats.Handler` is called twice, once at the beginning of the connection with `*stats.ConnBegin` and once at the end with `*stats.ConnEnd`.

NOTE: The [stats package](https://pkg.go.dev/google.golang.org/grpc/stats) should only be used for network monitoring purposes, and not as an alternative to [interceptors](https://github.com/grpc/grpc-go/blob/master/examples/features/metadata).
Yash-Handa marked this conversation as resolved.
Show resolved Hide resolved
97 changes: 97 additions & 0 deletions examples/features/stats_monitoring/client/main.go
@@ -0,0 +1,97 @@
// Binary client is an example client.
Yash-Handa marked this conversation as resolved.
Show resolved Hide resolved
Yash-Handa marked this conversation as resolved.
Show resolved Hide resolved
package main

import (
"context"
"flag"
"log"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/stats"

pb "google.golang.org/grpc/examples/features/proto/echo"
Yash-Handa marked this conversation as resolved.
Show resolved Hide resolved
)

var addr = flag.String("addr", "localhost:50051", "the address to connect to")

// *statsHandler implements [stats.Handler](https://pkg.go.dev/google.golang.org/grpc/stats#Handler) interface
Yash-Handa marked this conversation as resolved.
Show resolved Hide resolved
type statsHandler struct{}

// TagRPC can attach some information to the given context.
// The context used for the rest lifetime of the RPC will be derived from the returned context.
func (st *statsHandler) TagRPC(ctx context.Context, stat *stats.RPCTagInfo) context.Context {
log.Printf("[TagRPC] RPC request send to: %s", stat.FullMethodName)
Yash-Handa marked this conversation as resolved.
Show resolved Hide resolved
return ctx
}

// Note: All stat fields are read-only
func (st *statsHandler) HandleRPC(ctx context.Context, stat stats.RPCStats) {
switch stat := stat.(type) {
case *stats.Begin:
log.Printf("[HandleRPC] [%T] Request sending process started at: %s", stat, stat.BeginTime.Format(time.StampMicro))
Yash-Handa marked this conversation as resolved.
Show resolved Hide resolved

case *stats.OutHeader:
log.Printf("[HandleRPC] [%T] headers: %v", stat, stat.Header)

case *stats.OutPayload:
log.Printf("[HandleRPC] [%T] payload (%d bytes on wire): %s", stat, stat.WireLength, stat.Payload.(*pb.EchoRequest))
log.Printf("[HandleRPC] [%T] Request sending process completed at: %s", stat, stat.SentTime.Format(time.StampMicro))

easwars marked this conversation as resolved.
Show resolved Hide resolved
case *stats.InHeader:
log.Printf("[HandleRPC] [%T] headers (%d bytes on wire): %v", stat, stat.WireLength, stat.Header)

case *stats.InTrailer:
log.Printf("[HandleRPC] [%T] trailers (%d bytes on wire): %v", stat, stat.WireLength, stat.Trailer)

case *stats.InPayload:
log.Printf("[HandleRPC] [%T] payload (%d bytes on wire): %s", stat, stat.WireLength, stat.Payload.(*pb.EchoResponse))
log.Printf("[HandleRPC] [%T] payload received at: %s", stat, stat.RecvTime.Format(time.StampMicro))

case *stats.End:
log.Printf("[HandleRPC] [%T] response completed at: %s", stat, stat.EndTime.Format(time.StampMicro))
log.Printf("[HandleRPC] [%T] request-response cycle duration (incl. 2 sec server sleep): %s", stat, stat.EndTime.Sub(stat.BeginTime))
if stat.Error != nil {
log.Printf("[HandleRPC] [%T] request-response cycle errored: %v", stat, stat.Error)
}
}
}

// TagConn can attach some information to the given context.
// The context used in HandleConn for this connection will be derived from the context returned.
// In gRPC client:
// The context used in HandleRPC for RPCs on this connection will NOT be derived from the context returned.
func (st *statsHandler) TagConn(ctx context.Context, stat *stats.ConnTagInfo) context.Context {
log.Printf("[TagConn] %s --> %s", stat.LocalAddr, stat.RemoteAddr)
return ctx
Yash-Handa marked this conversation as resolved.
Show resolved Hide resolved
}

func (st *statsHandler) HandleConn(ctx context.Context, stat stats.ConnStats) {
// NOP
Yash-Handa marked this conversation as resolved.
Show resolved Hide resolved
}

func main() {
flag.Parse()
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithStatsHandler(&statsHandler{}),
}
conn, err := grpc.Dial(*addr, opts...)
if err != nil {
log.Fatalf("did not connect: %v", err)
Yash-Handa marked this conversation as resolved.
Show resolved Hide resolved
}
defer conn.Close()

c := pb.NewEchoClient(conn)
Yash-Handa marked this conversation as resolved.
Show resolved Hide resolved

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

resp, err := c.UnaryEcho(ctx, &pb.EchoRequest{Message: "stats handler demo"})
if err != nil {
log.Fatalf("unexpected error from UnaryEcho: %v", err)
}
log.Println("RPC response:", resp.Message)

Yash-Handa marked this conversation as resolved.
Show resolved Hide resolved
Yash-Handa marked this conversation as resolved.
Show resolved Hide resolved
}
102 changes: 102 additions & 0 deletions examples/features/stats_monitoring/server/main.go
@@ -0,0 +1,102 @@
// Binary server is an example server.
package main

import (
"context"
"flag"
"fmt"
"net"
"os"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/stats"

pb "google.golang.org/grpc/examples/features/proto/echo"
)

var port = flag.Int("port", 50051, "the port to serve on")

func init() {
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stdout, os.Stdout, os.Stderr))
Yash-Handa marked this conversation as resolved.
Show resolved Hide resolved
}

// *statsHandler implements [stats.Handler](https://pkg.go.dev/google.golang.org/grpc/stats#Handler) interface
type statsHandler struct{}
Yash-Handa marked this conversation as resolved.
Show resolved Hide resolved

// TagRPC can attach some information to the given context.
// The context used for the rest lifetime of the RPC will be derived from the returned context.
func (st *statsHandler) TagRPC(ctx context.Context, stat *stats.RPCTagInfo) context.Context {
grpclog.Infof("[%s] [TagRPC] RPC request received for: %s", ctx.Value("remoteAddr"), stat.FullMethodName)
return ctx
}

// Note: All stat fields are read-only
func (st *statsHandler) HandleRPC(ctx context.Context, stat stats.RPCStats) {
prompt := fmt.Sprintf("[%s] [HandleRPC] [%T]", ctx.Value("remoteAddr"), stat)

switch stat := stat.(type) {
case *stats.InHeader:
grpclog.Infof("%s headers (%d bytes on wire): %v", prompt, stat.WireLength, stat.Header)

case *stats.Begin:
grpclog.Infof("%s request processing started at: %s", prompt, stat.BeginTime.Format(time.StampMicro))

case *stats.InPayload:
grpclog.Infof("%s payload (%d bytes on wire): %s", prompt, stat.WireLength, stat.Payload.(*pb.EchoRequest))
grpclog.Infof("%s payload received from client at: %s", prompt, stat.RecvTime.Format(time.StampMicro))

case *stats.OutHeader:
grpclog.Infof("%s headers: %v", prompt, stat.Header)

case *stats.OutPayload:
grpclog.Infof("%s payload (%d bytes on wire): %s", prompt, stat.WireLength, stat.Payload.(*pb.EchoResponse))
grpclog.Infof("%s payload sent to client at: %s", prompt, stat.SentTime.Format(time.StampMicro))

case *stats.OutTrailer:
grpclog.Infof("%s trailers: %v", prompt, stat.Trailer)

case *stats.End:
grpclog.Infof("%s response completed at: %s", prompt, stat.EndTime.Format(time.StampMicro))
grpclog.Infof("%s request-response cycle duration (incl. 2 sec server sleep): %s", prompt, stat.EndTime.Sub(stat.BeginTime))
if stat.Error != nil {
grpclog.Infof("%s request-response cycle errored: %v", prompt, stat.Error)
}
}
}

// TagConn can attach some information to the given context.
// The context used in HandleConn for this connection will be derived from the context returned.
// In gRPC server:
// The context used in HandleRPC for RPCs on this connection will be derived from the context returned.
func (st *statsHandler) TagConn(ctx context.Context, stat *stats.ConnTagInfo) context.Context {
grpclog.Infof("[TagConn] %s --> %s", stat.RemoteAddr, stat.LocalAddr)
return context.WithValue(ctx, "remoteAddr", stat.RemoteAddr)
}

func (st *statsHandler) HandleConn(ctx context.Context, stat stats.ConnStats) {
// NOP
}

type server struct {
pb.UnimplementedEchoServer
}

func (s *server) UnaryEcho(ctx context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) {
time.Sleep(2 * time.Second)
return &pb.EchoResponse{Message: req.Message}, nil
}

func main() {
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
grpclog.Fatalf("failed to listen: %v", err)
}
grpclog.Infof("server listening at %v\n", lis.Addr())

s := grpc.NewServer(grpc.StatsHandler(&statsHandler{}))
pb.RegisterEchoServer(s, &server{})
grpclog.Fatalf("failed to serve: %v", s.Serve(lis))
}