Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

examples: create an example for enabling and configuring retry #3028

Merged
merged 17 commits into from Oct 3, 2019
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
154 changes: 154 additions & 0 deletions examples/features/retry/README.md
@@ -0,0 +1,154 @@
# Reflection
abserari marked this conversation as resolved.
Show resolved Hide resolved

This example shows how enabling and configuring retry on gRPC client.
abserari marked this conversation as resolved.
Show resolved Hide resolved

# Read

[Document about this example](https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retry-policy)


# Try it

```go
abserari marked this conversation as resolved.
Show resolved Hide resolved
go run server/main.go
```

```go
go run client/main.go
abserari marked this conversation as resolved.
Show resolved Hide resolved
```

abserari marked this conversation as resolved.
Show resolved Hide resolved
# Usage

{
abserari marked this conversation as resolved.
Show resolved Hide resolved
"methodConfig": [{
// config per method or all methods, this is for all methods under grpc.example.echo.Echo
// service
"name": [{"service": "grpc.examples.echo.Echo"}],
"waitForReady": true,

"retryPolicy": {
"MaxAttempts": 4,
"InitialBackoff": ".01s",
"MaxBackoff": ".01s",
"BackoffMultiplier": 1.0,
// this value is grpc code
"RetryableStatusCodes": [ "UNAVAILABLE" ]
}
}]
}

# Know all retry policies Integration with Service Config
From [grpc/proposal](https://github.com/grpc/proposal/blob/master/A6-client-retries.md#integration-with-service-config)
abserari marked this conversation as resolved.
Show resolved Hide resolved

The retry policy is transmitted to the client through the service config mechanism. The following is what the JSON configuration file would look like:

{
"loadBalancingPolicy": string,

"methodConfig": [
{
"name": [
{
"service": string,
"method": string,
}
],

// Only one of retryPolicy or hedgingPolicy may be set. If neither is set,
// RPCs will not be retried or hedged.

"retryPolicy": {
// The maximum number of RPC attempts, including the original RPC.
//
// This field is required and must be two or greater.
"maxAttempts": number,

// Exponential backoff parameters. The initial retry attempt will occur at
// random(0, initialBackoff). In general, the nth attempt since the last
// server pushback response (if any), will occur at random(0,
// min(initialBackoff*backoffMultiplier**(n-1), maxBackoff)).
// The following two fields take their form from:
// https://developers.google.com/protocol-buffers/docs/proto3#json
// They are representations of the proto3 Duration type. Note that the
// numeric portion of the string must be a valid JSON number.
// They both must be greater than zero.
"initialBackoff": string, // Required. Long decimal with "s" appended
"maxBackoff": string, // Required. Long decimal with "s" appended
"backoffMultiplier": number // Required. Must be greater than zero.

// The set of status codes which may be retried.
//
// Status codes are specified in the integer form or the case-insensitive
// string form (eg. [14], ["UNAVAILABLE"] or ["unavailable"])
//
// This field is required and must be non-empty.
"retryableStatusCodes": []
}

"hedgingPolicy": {
// The hedging policy will send up to maxAttempts RPCs.
// This number represents the all RPC attempts, including the
// original and all the hedged RPCs.
//
// This field is required and must be two or greater.
"maxAttempts": number,

// The original RPC will be sent immediately, but the maxAttempts-1
// subsequent hedged RPCs will be sent at intervals of every hedgingDelay.
// Set this to "0s", or leave unset, to immediately send all maxAttempts RPCs.
// hedgingDelay takes its form from:
// https://developers.google.com/protocol-buffers/docs/proto3#json
// It is a representation of the proto3 Duration type. Note that the
// numeric portion of the string must be a valid JSON number.
"hedgingDelay": string,

// The set of status codes which indicate other hedged RPCs may still
// succeed. If a non-fatal status code is returned by the server, hedged
// RPCs will continue. Otherwise, outstanding requests will be canceled and
// the error returned to the client application layer.
//
// Status codes are specified in the integer form or the case-insensitive
// string form (eg. [14], ["UNAVAILABLE"] or ["unavailable"])
//
// This field is optional.
"nonFatalStatusCodes": []
}

"waitForReady": bool,
"timeout": string,
"maxRequestMessageBytes": number,
"maxResponseMessageBytes": number
}
]

// If a RetryThrottlingPolicy is provided, gRPC will automatically throttle
// retry attempts and hedged RPCs when the client’s ratio of failures to
// successes exceeds a threshold.
//
// For each server name, the gRPC client will maintain a token_count which is
// initially set to maxTokens, and can take values between 0 and maxTokens.
//
// Every outgoing RPC (regardless of service or method invoked) will change
// token_count as follows:
//
// - Every failed RPC will decrement the token_count by 1.
// - Every successful RPC will increment the token_count by tokenRatio.
//
// If token_count is less than or equal to maxTokens / 2, then RPCs will not
// be retried and hedged RPCs will not be sent.
"retryThrottling": {
// The number of tokens starts at maxTokens. The token_count will always be
// between 0 and maxTokens.
//
// This field is required and must be in the range (0, 1000]. Up to 3
// decimal places are supported
"maxTokens": number,

// The amount of tokens to add on each successful RPC. Typically this will
// be some number between 0 and 1, e.g., 0.1.
//
// This field is required and must be greater than zero. Up to 3 decimal
// places are supported.
"tokenRatio": number
}
}
79 changes: 79 additions & 0 deletions examples/features/retry/client/main.go
@@ -0,0 +1,79 @@
/*
*
* Copyright 2018 gRPC authors.
abserari marked this conversation as resolved.
Show resolved Hide resolved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Binary client is an example client.
package main

import (
"context"
"flag"
"log"
"time"

"google.golang.org/grpc"
pb "google.golang.org/grpc/examples/features/proto/echo"
)

var (
addr = flag.String("addr", "localhost:50052", "the address to connect to")
// see https://github.com/grpc/grpc/blob/master/doc/service_config.md to know more about service config
retryPolicy = `{
"methodConfig": [{
"name": [{"service": "grpc.examples.echo.Echo"}],
"waitForReady": true,
"retryPolicy": {
"MaxAttempts": 4,
"InitialBackoff": ".01s",
"MaxBackoff": ".01s",
"BackoffMultiplier": 1.0,
"RetryableStatusCodes": [ "UNAVAILABLE" ]
}
}]}`
)

// use grpc.WithDefaultServiceConfig() to set service config
func retryDial() (*grpc.ClientConn, error) {
return grpc.Dial(*addr, grpc.WithInsecure(), grpc.WithDefaultServiceConfig(retryPolicy))
}

func newCtx(timeout time.Duration) context.Context {
ctx, _ := context.WithTimeout(context.TODO(), timeout)
abserari marked this conversation as resolved.
Show resolved Hide resolved
return ctx
}

func main() {
flag.Parse()

// Set up a connection to the server.
conn, err := retryDial()
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer func() {
if e := conn.Close(); e != nil {
log.Printf("failed to close connection: %s", e)
}
}()

c := pb.NewEchoClient(conn)
reply, err := c.UnaryEcho(newCtx(1*time.Second), &pb.EchoRequest{Message: "Try and Success"})
if err != nil {
log.Println(err)
abserari marked this conversation as resolved.
Show resolved Hide resolved
}
log.Println(reply)
abserari marked this conversation as resolved.
Show resolved Hide resolved
}
105 changes: 105 additions & 0 deletions examples/features/retry/server/main.go
@@ -0,0 +1,105 @@
/*
*
* Copyright 2018 gRPC authors.
abserari marked this conversation as resolved.
Show resolved Hide resolved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Binary server is an example server.
package main

import (
"context"
"flag"
"fmt"
"log"
"net"
"sync"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
pb "google.golang.org/grpc/examples/features/proto/echo"
"google.golang.org/grpc/status"
)

var port = flag.Int("port", 50052, "port number")

type failingServer struct {
mu sync.Mutex

reqCounter uint
reqModulo uint
reqSleep time.Duration
abserari marked this conversation as resolved.
Show resolved Hide resolved
reqError codes.Code
}

func (s *failingServer) maybeFailRequest() error {
s.mu.Lock()
defer s.mu.Unlock()
s.reqCounter++
if (s.reqModulo > 0) && (s.reqCounter%s.reqModulo == 0) {
return nil
}
time.Sleep(s.reqSleep)
return status.Errorf(s.reqError, "maybeFailRequest: failing it")
}

func (s *failingServer) UnaryEcho(ctx context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) {
if err := s.maybeFailRequest(); err != nil {
log.Println("request failed count:", s.reqCounter)
return nil, err
}

log.Println("request succeeded count:", s.reqCounter)
return &pb.EchoResponse{Message: req.Message}, nil
}

func (s *failingServer) ServerStreamingEcho(req *pb.EchoRequest, stream pb.Echo_ServerStreamingEchoServer) error {
return status.Error(codes.Unimplemented, "RPC unimplemented")
}

func (s *failingServer) ClientStreamingEcho(stream pb.Echo_ClientStreamingEchoServer) error {
return status.Error(codes.Unimplemented, "RPC unimplemented")
}

func (s *failingServer) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error {
return status.Error(codes.Unimplemented, "RPC unimplemented")
}

func main() {
flag.Parse()

address := fmt.Sprintf(":%v", *port)
lis, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
fmt.Println("listen on address", address)

s := grpc.NewServer()

// a retry configuration
abserari marked this conversation as resolved.
Show resolved Hide resolved
failingservice := &failingServer{
reqCounter: 0,
reqModulo: 4,
reqError: codes.Unavailable, /* uint = 14 */
reqSleep: 0,
}

pb.RegisterEchoServer(s, failingservice)
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}