diff --git a/Documentation/grpc-metadata.md b/Documentation/grpc-metadata.md index ff4de6e71de..06b36f4ac17 100644 --- a/Documentation/grpc-metadata.md +++ b/Documentation/grpc-metadata.md @@ -223,3 +223,8 @@ func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) erro stream.SetTrailer(trailer) } ``` + +## Updating metadata from a server interceptor + +An example for updating metadata from a server interceptor is +available [here](../examples/features/metadata_interceptor/server/main.go). diff --git a/examples/examples_test.sh b/examples/examples_test.sh index bde2837f659..0c919a1a096 100755 --- a/examples/examples_test.sh +++ b/examples/examples_test.sh @@ -59,6 +59,7 @@ EXAMPLES=( "features/interceptor" "features/load_balancing" "features/metadata" + "features/metadata_interceptor" "features/multiplex" "features/name_resolving" "features/unix_abstract" @@ -105,6 +106,7 @@ declare -A EXPECTED_SERVER_OUTPUT=( ["features/interceptor"]="unary echoing message \"hello world\"" ["features/load_balancing"]="serving on :50051" ["features/metadata"]="message:\"this is examples/metadata\", sending echo" + ["features/metadata_interceptor"]="key1 from metadata: " ["features/multiplex"]=":50051" ["features/name_resolving"]="serving on localhost:50051" ["features/unix_abstract"]="serving on @abstract-unix-socket" @@ -121,6 +123,7 @@ declare -A EXPECTED_CLIENT_OUTPUT=( ["features/interceptor"]="UnaryEcho: hello world" ["features/load_balancing"]="calling helloworld.Greeter/SayHello with pick_first" ["features/metadata"]="this is examples/metadata" + ["features/metadata_interceptor"]="BidiStreaming Echo: hello world" ["features/multiplex"]="Greeting: Hello multiplex" ["features/name_resolving"]="calling helloworld.Greeter/SayHello to \"example:///resolver.example.grpc.io\"" ["features/unix_abstract"]="calling echo.Echo/UnaryEcho to unix-abstract:abstract-unix-socket" diff --git a/examples/features/metadata_interceptor/README.md b/examples/features/metadata_interceptor/README.md new file mode 100644 index 00000000000..93a6925d79e --- /dev/null +++ b/examples/features/metadata_interceptor/README.md @@ -0,0 +1,70 @@ +# Metadata interceptor example + +This example shows how to update metadata from unary and streaming interceptors on the server. +Please see +[grpc-metadata.md](https://github.com/grpc/grpc-go/blob/master/Documentation/grpc-metadata.md) +for more information. + +## Try it + +``` +go run server/main.go +``` + +``` +go run client/main.go +``` + +## Explanation + +#### Unary interceptor + +The interceptor can read existing metadata from the RPC context passed to it. +Since Go contexts are immutable, the interceptor will have to create a new context +with updated metadata and pass it to the provided handler. + +```go +func SomeInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + // Get the incoming metadata from the RPC context, and add a new + // key-value pair to it. + md, ok := metadata.FromIncomingContext(ctx) + md.Append("key1", "value1") + + // Create a context with the new metadata and pass it to handler. + ctx = metadata.NewIncomingContext(ctx, md) + return handler(ctx, req) +} +``` + +#### Streaming interceptor + +`grpc.ServerStream` does not provide a way to modify its RPC context. The streaming +interceptor therefore needs to implement the `grpc.ServerStream` interface and return +a context with updated metadata. + +The easiest way to do this would be to create a type which embeds the `grpc.ServerStream` +interface and overrides only the `Context()` method to return a context with updated +metadata. The streaming interceptor would then pass this wrapped stream to the provided handler. + +```go +type wrappedStream struct { + grpc.ServerStream + ctx context.Context +} + +func (s *wrappedStream) Context() context.Context { + return s.ctx +} + +func SomeStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + // Get the incoming metadata from the RPC context, and add a new + // key-value pair to it. + md, ok := metadata.FromIncomingContext(ctx) + md.Append("key1", "value1") + + // Create a context with the new metadata and pass it to handler. + ctx = metadata.NewIncomingContext(ctx, md) + + return handler(srv, &wrappedStream{ss, ctx}) +} +``` \ No newline at end of file diff --git a/examples/features/metadata_interceptor/client/main.go b/examples/features/metadata_interceptor/client/main.go new file mode 100644 index 00000000000..a6ad804d726 --- /dev/null +++ b/examples/features/metadata_interceptor/client/main.go @@ -0,0 +1,86 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Binary client is an example client. +package main + +import ( + "context" + "flag" + "fmt" + "io" + "log" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + pb "google.golang.org/grpc/examples/features/proto/echo" +) + +var addr = flag.String("addr", "localhost:50051", "the address to connect to") + +func callUnaryEcho(ctx context.Context, client pb.EchoClient) { + resp, err := client.UnaryEcho(ctx, &pb.EchoRequest{Message: "hello world"}) + if err != nil { + log.Fatalf("UnaryEcho %v", err) + } + fmt.Println("UnaryEcho: ", resp.Message) +} + +func callBidiStreamingEcho(ctx context.Context, client pb.EchoClient) { + c, err := client.BidirectionalStreamingEcho(ctx) + if err != nil { + log.Fatalf("BidiStreamingEcho %v", err) + } + + if err := c.Send(&pb.EchoRequest{Message: "hello world"}); err != nil { + log.Fatalf("Sending echo request: %v", err) + } + c.CloseSend() + + for { + resp, err := c.Recv() + if err == io.EOF { + break + } + if err != nil { + log.Fatalf("Receiving echo response: %v", err) + } + fmt.Println("BidiStreaming Echo: ", resp.Message) + } +} + +func main() { + flag.Parse() + + conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + log.Fatalf("grpc.Dial(%q): %v", *addr, err) + } + defer conn.Close() + + ec := pb.NewEchoClient(conn) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + callUnaryEcho(ctx, ec) + + callBidiStreamingEcho(ctx, ec) +} diff --git a/examples/features/metadata_interceptor/server/main.go b/examples/features/metadata_interceptor/server/main.go new file mode 100644 index 00000000000..8f0dc5bfe6d --- /dev/null +++ b/examples/features/metadata_interceptor/server/main.go @@ -0,0 +1,140 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Binary server is an example server. +package main + +import ( + "context" + "flag" + "fmt" + "io" + "log" + "net" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + + pb "google.golang.org/grpc/examples/features/proto/echo" +) + +var port = flag.Int("port", 50051, "the port to serve on") + +var errMissingMetadata = status.Errorf(codes.InvalidArgument, "no incoming metadata in rpc context") + +type server struct { + pb.UnimplementedEchoServer +} + +func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return nil, errMissingMetadata + } + + md.Append("key1", "value1") + ctx = metadata.NewIncomingContext(ctx, md) + + return handler(ctx, req) +} + +func (s *server) UnaryEcho(ctx context.Context, in *pb.EchoRequest) (*pb.EchoResponse, error) { + fmt.Printf("--- UnaryEcho ---\n") + + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return nil, status.Errorf(codes.Internal, "UnaryEcho: missing incoming metadata in rpc context") + } + + // Read and print metadata added by the interceptor. + if v, ok := md["key1"]; ok { + fmt.Printf("key1 from metadata: \n") + for i, e := range v { + fmt.Printf(" %d. %s\n", i, e) + } + } + + return &pb.EchoResponse{Message: in.Message}, nil +} + +type wrappedStream struct { + grpc.ServerStream + ctx context.Context +} + +func (s *wrappedStream) Context() context.Context { + return s.ctx +} + +func streamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + md, ok := metadata.FromIncomingContext(ss.Context()) + if !ok { + return errMissingMetadata + } + + md.Append("key1", "value1") + ctx := metadata.NewIncomingContext(ss.Context(), md) + + return handler(srv, &wrappedStream{ss, ctx}) +} + +func (s *server) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error { + fmt.Printf("--- BidirectionalStreamingEcho ---\n") + + md, ok := metadata.FromIncomingContext(stream.Context()) + if !ok { + return status.Errorf(codes.Internal, "BidirectionalStreamingEcho: missing incoming metadata in rpc context") + } + + // Read and print metadata added by the interceptor. + if v, ok := md["key1"]; ok { + fmt.Printf("key1 from metadata: \n") + for i, e := range v { + fmt.Printf(" %d. %s\n", i, e) + } + } + + // Read requests and send responses. + for { + in, err := stream.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return err + } + if err = stream.Send(&pb.EchoResponse{Message: in.Message}); err != nil { + return err + } + } +} + +func main() { + flag.Parse() + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port)) + if err != nil { + log.Fatalf("net.Listen() failed: %v", err) + } + fmt.Printf("Server listening at %v\n", lis.Addr()) + + s := grpc.NewServer(grpc.UnaryInterceptor(unaryInterceptor), grpc.StreamInterceptor(streamInterceptor)) + pb.RegisterEchoServer(s, &server{}) + s.Serve(lis) +}