Skip to content

Commit

Permalink
examples: add new example to show updating metadata in interceptors (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
richzw committed Dec 6, 2022
1 parent 001d234 commit a205447
Show file tree
Hide file tree
Showing 5 changed files with 304 additions and 0 deletions.
5 changes: 5 additions & 0 deletions Documentation/grpc-metadata.md
Expand Up @@ -223,3 +223,8 @@ func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) erro
stream.SetTrailer(trailer)
}
```

## Updating metadata from a server interceptor

An example for updating metadata from a server interceptor is
available [here](../examples/features/metadata_interceptor/server/main.go).
3 changes: 3 additions & 0 deletions examples/examples_test.sh
Expand Up @@ -59,6 +59,7 @@ EXAMPLES=(
"features/interceptor"
"features/load_balancing"
"features/metadata"
"features/metadata_interceptor"
"features/multiplex"
"features/name_resolving"
"features/unix_abstract"
Expand Down Expand Up @@ -105,6 +106,7 @@ declare -A EXPECTED_SERVER_OUTPUT=(
["features/interceptor"]="unary echoing message \"hello world\""
["features/load_balancing"]="serving on :50051"
["features/metadata"]="message:\"this is examples/metadata\", sending echo"
["features/metadata_interceptor"]="key1 from metadata: "
["features/multiplex"]=":50051"
["features/name_resolving"]="serving on localhost:50051"
["features/unix_abstract"]="serving on @abstract-unix-socket"
Expand All @@ -121,6 +123,7 @@ declare -A EXPECTED_CLIENT_OUTPUT=(
["features/interceptor"]="UnaryEcho: hello world"
["features/load_balancing"]="calling helloworld.Greeter/SayHello with pick_first"
["features/metadata"]="this is examples/metadata"
["features/metadata_interceptor"]="BidiStreaming Echo: hello world"
["features/multiplex"]="Greeting: Hello multiplex"
["features/name_resolving"]="calling helloworld.Greeter/SayHello to \"example:///resolver.example.grpc.io\""
["features/unix_abstract"]="calling echo.Echo/UnaryEcho to unix-abstract:abstract-unix-socket"
Expand Down
70 changes: 70 additions & 0 deletions examples/features/metadata_interceptor/README.md
@@ -0,0 +1,70 @@
# Metadata interceptor example

This example shows how to update metadata from unary and streaming interceptors on the server.
Please see
[grpc-metadata.md](https://github.com/grpc/grpc-go/blob/master/Documentation/grpc-metadata.md)
for more information.

## Try it

```
go run server/main.go
```

```
go run client/main.go
```

## Explanation

#### Unary interceptor

The interceptor can read existing metadata from the RPC context passed to it.
Since Go contexts are immutable, the interceptor will have to create a new context
with updated metadata and pass it to the provided handler.

```go
func SomeInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// Get the incoming metadata from the RPC context, and add a new
// key-value pair to it.
md, ok := metadata.FromIncomingContext(ctx)
md.Append("key1", "value1")

// Create a context with the new metadata and pass it to handler.
ctx = metadata.NewIncomingContext(ctx, md)
return handler(ctx, req)
}
```

#### Streaming interceptor

`grpc.ServerStream` does not provide a way to modify its RPC context. The streaming
interceptor therefore needs to implement the `grpc.ServerStream` interface and return
a context with updated metadata.

The easiest way to do this would be to create a type which embeds the `grpc.ServerStream`
interface and overrides only the `Context()` method to return a context with updated
metadata. The streaming interceptor would then pass this wrapped stream to the provided handler.

```go
type wrappedStream struct {
grpc.ServerStream
ctx context.Context
}

func (s *wrappedStream) Context() context.Context {
return s.ctx
}

func SomeStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// Get the incoming metadata from the RPC context, and add a new
// key-value pair to it.
md, ok := metadata.FromIncomingContext(ctx)
md.Append("key1", "value1")

// Create a context with the new metadata and pass it to handler.
ctx = metadata.NewIncomingContext(ctx, md)

return handler(srv, &wrappedStream{ss, ctx})
}
```
86 changes: 86 additions & 0 deletions examples/features/metadata_interceptor/client/main.go
@@ -0,0 +1,86 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Binary client is an example client.
package main

import (
"context"
"flag"
"fmt"
"io"
"log"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

pb "google.golang.org/grpc/examples/features/proto/echo"
)

var addr = flag.String("addr", "localhost:50051", "the address to connect to")

func callUnaryEcho(ctx context.Context, client pb.EchoClient) {
resp, err := client.UnaryEcho(ctx, &pb.EchoRequest{Message: "hello world"})
if err != nil {
log.Fatalf("UnaryEcho %v", err)
}
fmt.Println("UnaryEcho: ", resp.Message)
}

func callBidiStreamingEcho(ctx context.Context, client pb.EchoClient) {
c, err := client.BidirectionalStreamingEcho(ctx)
if err != nil {
log.Fatalf("BidiStreamingEcho %v", err)
}

if err := c.Send(&pb.EchoRequest{Message: "hello world"}); err != nil {
log.Fatalf("Sending echo request: %v", err)
}
c.CloseSend()

for {
resp, err := c.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("Receiving echo response: %v", err)
}
fmt.Println("BidiStreaming Echo: ", resp.Message)
}
}

func main() {
flag.Parse()

conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("grpc.Dial(%q): %v", *addr, err)
}
defer conn.Close()

ec := pb.NewEchoClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

callUnaryEcho(ctx, ec)

callBidiStreamingEcho(ctx, ec)
}
140 changes: 140 additions & 0 deletions examples/features/metadata_interceptor/server/main.go
@@ -0,0 +1,140 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Binary server is an example server.
package main

import (
"context"
"flag"
"fmt"
"io"
"log"
"net"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

pb "google.golang.org/grpc/examples/features/proto/echo"
)

var port = flag.Int("port", 50051, "the port to serve on")

var errMissingMetadata = status.Errorf(codes.InvalidArgument, "no incoming metadata in rpc context")

type server struct {
pb.UnimplementedEchoServer
}

func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, errMissingMetadata
}

md.Append("key1", "value1")
ctx = metadata.NewIncomingContext(ctx, md)

return handler(ctx, req)
}

func (s *server) UnaryEcho(ctx context.Context, in *pb.EchoRequest) (*pb.EchoResponse, error) {
fmt.Printf("--- UnaryEcho ---\n")

md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Errorf(codes.Internal, "UnaryEcho: missing incoming metadata in rpc context")
}

// Read and print metadata added by the interceptor.
if v, ok := md["key1"]; ok {
fmt.Printf("key1 from metadata: \n")
for i, e := range v {
fmt.Printf(" %d. %s\n", i, e)
}
}

return &pb.EchoResponse{Message: in.Message}, nil
}

type wrappedStream struct {
grpc.ServerStream
ctx context.Context
}

func (s *wrappedStream) Context() context.Context {
return s.ctx
}

func streamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
md, ok := metadata.FromIncomingContext(ss.Context())
if !ok {
return errMissingMetadata
}

md.Append("key1", "value1")
ctx := metadata.NewIncomingContext(ss.Context(), md)

return handler(srv, &wrappedStream{ss, ctx})
}

func (s *server) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error {
fmt.Printf("--- BidirectionalStreamingEcho ---\n")

md, ok := metadata.FromIncomingContext(stream.Context())
if !ok {
return status.Errorf(codes.Internal, "BidirectionalStreamingEcho: missing incoming metadata in rpc context")
}

// Read and print metadata added by the interceptor.
if v, ok := md["key1"]; ok {
fmt.Printf("key1 from metadata: \n")
for i, e := range v {
fmt.Printf(" %d. %s\n", i, e)
}
}

// Read requests and send responses.
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
if err = stream.Send(&pb.EchoResponse{Message: in.Message}); err != nil {
return err
}
}
}

func main() {
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Fatalf("net.Listen() failed: %v", err)
}
fmt.Printf("Server listening at %v\n", lis.Addr())

s := grpc.NewServer(grpc.UnaryInterceptor(unaryInterceptor), grpc.StreamInterceptor(streamInterceptor))
pb.RegisterEchoServer(s, &server{})
s.Serve(lis)
}

0 comments on commit a205447

Please sign in to comment.