Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

examples: add new example to show updating metadata in interceptors #5788

Merged
merged 16 commits into from Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
46 changes: 46 additions & 0 deletions Documentation/grpc-metadata.md
Expand Up @@ -223,3 +223,49 @@ func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) erro
stream.SetTrailer(trailer)
}
```

## Updating metadata in the interceptor - server side
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about replacing this with:

## Updating metadata from a server interceptor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, I will fix it


Server side metadata updating in the interceptor examples are available [here](../examples/features/metadata/server/main.go).

#### Unary interceptor
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move these sections to the new README instead, to avoid clutter here and also to put the documentation near the example that illustrates it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dfawley Thank you very much for your response. Do you mean moving the Updating metadata from a server interceptor part of grpc-metadata.md to the README.md under the folder examples/features/metadata_interceptor? Please correct me if anything I am misunderstanding or missing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean move everything from #### Unary interceptor down into examples/features/metadata_interceptor/README.md.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dfawley , thank you very much for your response. I have updated the document per your review comment. Could you please review it again?


To read and update metadata in the interceptor under unary call, the server needs to retrieve it from RPC context and update it through this RPC context.

```go
func SomeInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// get the metadata from context
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Maybe replace with:

// Get the incoming metadata from the RPC context, and add a new 
// key-value pair to it.
md, ok := metadata.FromIncomingContext(ctx)
md.Append("key1", "value1")

// Create a context with the new metadata and pass it to handler.
ctx = metadata.NewIncomingContext(ctx, md)
return handler(ctx, req)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, sorry for my bad statement in the comment

md, ok := metadata.FromIncomingContext(ctx)

// update metadata
md.Append("key1", "value1")

// set the metadata to context
ctx = metadata.NewIncomingContext(ctx, md)
```

#### Streaming interceptor

Since there is no direct way to set RPC context in the stream interceptor, the `wrappedStream` could be created and override `Context()` method with owned `context`. Then pass this `wrappedStream` to the stream interceptor.

```go
type wrappedStream struct {
grpc.ServerStream
ctx context.Context
}

func (s *wrappedStream) Context() context.Context {
return s.ctx
}

func streamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// get the metadata from context
md, ok := metadata.FromIncomingContext(ss.Context())
// update metadata
md.Append("key1", "value1")
// set the metadata to context
ctx := metadata.NewIncomingContext(ss.Context(), md)

return handler(srv, &wrappedStream{ss, ctx})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comments here as the one above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I will fix it

}
```
56 changes: 53 additions & 3 deletions examples/features/metadata/server/main.go
Expand Up @@ -31,14 +31,15 @@ import (

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
pb "google.golang.org/grpc/examples/features/proto/echo"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

pb "google.golang.org/grpc/examples/features/proto/echo"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this change?

Although we don't do it at all places, we try (as much as possible) to group the proto imports separately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the typo, it was caused by the Goland. I will fix it.

)

var port = flag.Int("port", 50051, "the port to serve on")

var errMissingMetadata = status.Errorf(codes.InvalidArgument, "missing metadata")

const (
timestampFormat = time.StampNano
streamingCount = 10
Expand Down Expand Up @@ -68,6 +69,13 @@ func (s *server) UnaryEcho(ctx context.Context, in *pb.EchoRequest) (*pb.EchoRes
}
}

if v, ok := md["key1"]; ok {
fmt.Printf("key1 from metadata: \n")
for i, e := range v {
fmt.Printf(" %d. %s\n", i, e)
}
}

// Create and send header.
header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(timestampFormat)})
grpc.SendHeader(ctx, header)
Expand Down Expand Up @@ -154,6 +162,39 @@ func (s *server) ClientStreamingEcho(stream pb.Echo_ClientStreamingEchoServer) e
}
}

func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, errMissingMetadata
}

md.Append("key1", "value1")
ctx = metadata.NewIncomingContext(ctx, md)

return handler(ctx, req)
}

type wrappedStream struct {
grpc.ServerStream
ctx context.Context
}

func (s *wrappedStream) Context() context.Context {
return s.ctx
}

func streamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
md, ok := metadata.FromIncomingContext(ss.Context())
if !ok {
return errMissingMetadata
}

md.Append("key1", "value1")
ctx := metadata.NewIncomingContext(ss.Context(), md)

return handler(srv, &wrappedStream{ss, ctx})
}

func (s *server) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error {
fmt.Printf("--- BidirectionalStreamingEcho ---\n")
// Create trailer in defer to record function return time.
Expand All @@ -175,6 +216,13 @@ func (s *server) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamin
}
}

if v, ok := md["key1"]; ok {
fmt.Printf("key1 from metadata: \n")
for i, e := range v {
fmt.Printf(" %d. %s\n", i, e)
}
}

// Create and send header.
header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(timestampFormat)})
stream.SendHeader(header)
Expand Down Expand Up @@ -204,7 +252,9 @@ func main() {
}
fmt.Printf("server listening at %v\n", lis.Addr())

s := grpc.NewServer()
s := grpc.NewServer(
grpc.UnaryInterceptor(unaryInterceptor),
grpc.StreamInterceptor(streamInterceptor))
pb.RegisterEchoServer(s, &server{})
s.Serve(lis)
}