Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: Add new test for round robin resolver #15577

Merged
merged 1 commit into from
Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 8 additions & 4 deletions pkg/grpc_testing/stub_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"fmt"
"net"
"strconv"
"sync/atomic"

"google.golang.org/grpc"
testpb "google.golang.org/grpc/test/grpc_testing"
Expand Down Expand Up @@ -93,20 +95,22 @@ func (ss *StubServer) Addr() string {

type dummyStubServer struct {
testpb.UnimplementedTestServiceServer
body []byte
counter uint64
}

func (d dummyStubServer) UnaryCall(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
func (d *dummyStubServer) UnaryCall(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
newCount := atomic.AddUint64(&d.counter, 1)

return &testpb.SimpleResponse{
Payload: &testpb.Payload{
Type: testpb.PayloadType_COMPRESSABLE,
Body: d.body,
Body: []byte(strconv.FormatUint(newCount, 10)),
jmhbnz marked this conversation as resolved.
Show resolved Hide resolved
},
}, nil
}

// NewDummyStubServer creates a simple test server that serves Unary calls with
// responses with the given payload.
func NewDummyStubServer(body []byte) *StubServer {
return New(dummyStubServer{body: body})
return New(&dummyStubServer{})
}
100 changes: 67 additions & 33 deletions tests/integration/clientv3/naming/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ package naming_test
import (
"bytes"
"context"
"fmt"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/client/v3/naming/endpoints"
Expand All @@ -29,25 +30,23 @@ import (
testpb "google.golang.org/grpc/test/grpc_testing"
)

// This test mimics scenario described in grpc_naming.md doc.
func testEtcdGrpcResolver(t *testing.T, lbPolicy string) {

func TestEtcdGrpcResolver(t *testing.T) {
integration2.BeforeTest(t)

s1PayloadBody := []byte{'1'}
s1 := grpc_testing.NewDummyStubServer(s1PayloadBody)
// Setup two new dummy stub servers
payloadBody := []byte{'1'}
s1 := grpc_testing.NewDummyStubServer(payloadBody)
if err := s1.Start(nil); err != nil {
t.Fatal("failed to start dummy grpc server (s1)", err)
}
defer s1.Stop()

s2PayloadBody := []byte{'2'}
s2 := grpc_testing.NewDummyStubServer(s2PayloadBody)
s2 := grpc_testing.NewDummyStubServer(payloadBody)
if err := s2.Start(nil); err != nil {
t.Fatal("failed to start dummy grpc server (s2)", err)
}
defer s2.Stop()

// Create new cluster with endpoint manager with two endpoints
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3})
defer clus.Terminate(t)

Expand All @@ -64,55 +63,90 @@ func TestEtcdGrpcResolver(t *testing.T) {
t.Fatal("failed to add foo", err)
}

err = em.AddEndpoint(context.TODO(), "foo/e2", e2)
if err != nil {
t.Fatal("failed to add foo", err)
}

b, err := resolver.NewBuilder(clus.Client(1))
if err != nil {
t.Fatal("failed to new resolver builder", err)
}

conn, err := grpc.Dial("etcd:///foo", grpc.WithInsecure(), grpc.WithResolvers(b))
// Create connection with provided lb policy
conn, err := grpc.Dial("etcd:///foo", grpc.WithInsecure(), grpc.WithResolvers(b),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingPolicy":"%s"}`, lbPolicy)))
if err != nil {
t.Fatal("failed to connect to foo", err)
}
defer conn.Close()

// Send an initial request that should go to e1
c := testpb.NewTestServiceClient(conn)
resp, err := c.UnaryCall(context.TODO(), &testpb.SimpleRequest{}, grpc.WaitForReady(true))
if err != nil {
t.Fatal("failed to invoke rpc to foo (e1)", err)
}
if resp.GetPayload() == nil || !bytes.Equal(resp.GetPayload().GetBody(), s1PayloadBody) {
if resp.GetPayload() == nil || !bytes.Equal(resp.GetPayload().GetBody(), payloadBody) {
t.Fatalf("unexpected response from foo (e1): %s", resp.GetPayload().GetBody())
}

em.DeleteEndpoint(context.TODO(), "foo/e1")
em.AddEndpoint(context.TODO(), "foo/e2", e2)
// Send more requests
lastResponse := []byte{'1'}
totalRequests := 100
for i := 1; i < totalRequests; i++ {
resp, err := c.UnaryCall(context.TODO(), &testpb.SimpleRequest{}, grpc.WaitForReady(true))
if err != nil {
t.Fatal("failed to invoke rpc to foo", err)
}

// We use a loop with deadline of 30s to avoid test getting flake
// as it's asynchronous for gRPC Client to update underlying connections.
maxRetries := 300
retryPeriod := 100 * time.Millisecond
retries := 0
for {
time.Sleep(retryPeriod)
retries++
t.Logf("Response: %v", string(resp.GetPayload().GetBody()))

resp, err = c.UnaryCall(context.TODO(), &testpb.SimpleRequest{})
if err != nil {
if retries < maxRetries {
continue
}
t.Fatal("failed to invoke rpc to foo (e2)", err)
if resp.GetPayload() == nil {
t.Fatalf("unexpected response from foo: %s", resp.GetPayload().GetBody())
}
lastResponse = resp.GetPayload().GetBody()
}

// If the load balancing policy is pick first then return payload should equal number of requests
t.Logf("Last response: %v", string(lastResponse))
if lbPolicy == "pick_first" {
if string(lastResponse) != "100" {
t.Fatalf("unexpected total responses from foo: %s", string(lastResponse))
}
if resp.GetPayload() == nil || !bytes.Equal(resp.GetPayload().GetBody(), s2PayloadBody) {
if retries < maxRetries {
continue
}
t.Fatalf("unexpected response from foo (e2): %s", resp.GetPayload().GetBody())
}

// If the load balancing policy is round robin we should see roughly half total requests served by each server
if lbPolicy == "round_robin" {
responses, err := strconv.Atoi(string(lastResponse))
if err != nil {
t.Fatalf("couldn't convert to int: %s", string(lastResponse))
}
break

// Allow 10% tolerance as round robin is not perfect and we don't want the test to flake
expected := float64(totalRequests) * 0.5
assert.InEpsilon(t, float64(expected), float64(responses), 0.1, "unexpected total responses from foo: %s", string(lastResponse))
}
}

// TestEtcdGrpcResolverPickFirst mimics scenarios described in grpc_naming.md doc.
func TestEtcdGrpcResolverPickFirst(t *testing.T) {

integration2.BeforeTest(t)

// Pick first is the default load balancer policy for grpc-go
testEtcdGrpcResolver(t, "pick_first")
}

// TestEtcdGrpcResolverRoundRobin mimics scenarios described in grpc_naming.md doc.
func TestEtcdGrpcResolverRoundRobin(t *testing.T) {

integration2.BeforeTest(t)

// Round robin is a common alternative for more production oriented scenarios
testEtcdGrpcResolver(t, "round_robin")
}
jmhbnz marked this conversation as resolved.
Show resolved Hide resolved

func TestEtcdEndpointManager(t *testing.T) {
integration2.BeforeTest(t)

Expand Down