Skip to content

Commit

Permalink
test(pubsublite): testing support for operations (#4361)
Browse files Browse the repository at this point in the history
  • Loading branch information
tmdiep committed Jul 1, 2021
1 parent 0ef2a3b commit ef01940
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 20 deletions.
53 changes: 46 additions & 7 deletions pubsublite/internal/test/mock.go
Expand Up @@ -29,6 +29,7 @@ import (

emptypb "github.com/golang/protobuf/ptypes/empty"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
lrpb "google.golang.org/genproto/googleapis/longrunning"
)

// MockServer is an in-memory mock implementation of a Pub/Sub Lite service,
Expand Down Expand Up @@ -62,6 +63,7 @@ func NewServer() (*Server, error) {
pb.RegisterSubscriberServiceServer(srv.Gsrv, liteServer)
pb.RegisterCursorServiceServer(srv.Gsrv, liteServer)
pb.RegisterPartitionAssignmentServiceServer(srv.Gsrv, liteServer)
lrpb.RegisterOperationsServer(srv.Gsrv, liteServer)
srv.Start()
return &Server{LiteServer: liteServer, gRPCServer: srv}, nil
}
Expand All @@ -87,6 +89,7 @@ type mockLiteServer struct {
pb.SubscriberServiceServer
pb.CursorServiceServer
pb.PartitionAssignmentServiceServer
lrpb.OperationsServer

mu sync.Mutex

Expand Down Expand Up @@ -277,7 +280,7 @@ func (s *mockLiteServer) doTopicResponse(ctx context.Context, req interface{}) (
}
resp, ok := retResponse.(*pb.Topic)
if !ok {
return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %v", reflect.TypeOf(retResponse))
return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %T", retResponse)
}
return resp, nil
}
Expand All @@ -289,7 +292,19 @@ func (s *mockLiteServer) doSubscriptionResponse(ctx context.Context, req interfa
}
resp, ok := retResponse.(*pb.Subscription)
if !ok {
return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %v", reflect.TypeOf(retResponse))
return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %T", retResponse)
}
return resp, nil
}

func (s *mockLiteServer) doOperationResponse(ctx context.Context, req interface{}) (*lrpb.Operation, error) {
retResponse, retErr := s.popGlobalVerifiers(req)
if retErr != nil {
return nil, retErr
}
resp, ok := retResponse.(*lrpb.Operation)
if !ok {
return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %T", retResponse)
}
return resp, nil
}
Expand All @@ -301,7 +316,7 @@ func (s *mockLiteServer) doEmptyResponse(ctx context.Context, req interface{}) (
}
resp, ok := retResponse.(*emptypb.Empty)
if !ok {
return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %v", reflect.TypeOf(retResponse))
return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %T", retResponse)
}
return resp, nil
}
Expand All @@ -325,7 +340,7 @@ func (s *mockLiteServer) GetTopicPartitions(ctx context.Context, req *pb.GetTopi
}
resp, ok := retResponse.(*pb.TopicPartitions)
if !ok {
return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %v", reflect.TypeOf(retResponse))
return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %T", retResponse)
}
return resp, nil
}
Expand All @@ -346,6 +361,10 @@ func (s *mockLiteServer) UpdateSubscription(ctx context.Context, req *pb.UpdateS
return s.doSubscriptionResponse(ctx, req)
}

func (s *mockLiteServer) SeekSubscription(ctx context.Context, req *pb.SeekSubscriptionRequest) (*lrpb.Operation, error) {
return s.doOperationResponse(ctx, req)
}

func (s *mockLiteServer) DeleteSubscription(ctx context.Context, req *pb.DeleteSubscriptionRequest) (*emptypb.Empty, error) {
return s.doEmptyResponse(ctx, req)
}
Expand All @@ -357,7 +376,7 @@ func (s *mockLiteServer) ListTopics(ctx context.Context, req *pb.ListTopicsReque
}
resp, ok := retResponse.(*pb.ListTopicsResponse)
if !ok {
return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %v", reflect.TypeOf(retResponse))
return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %T", retResponse)
}
return resp, nil
}
Expand All @@ -369,7 +388,7 @@ func (s *mockLiteServer) ListTopicSubscriptions(ctx context.Context, req *pb.Lis
}
resp, ok := retResponse.(*pb.ListTopicSubscriptionsResponse)
if !ok {
return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %v", reflect.TypeOf(retResponse))
return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %T", retResponse)
}
return resp, nil
}
Expand All @@ -381,7 +400,27 @@ func (s *mockLiteServer) ListSubscriptions(ctx context.Context, req *pb.ListSubs
}
resp, ok := retResponse.(*pb.ListSubscriptionsResponse)
if !ok {
return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %v", reflect.TypeOf(retResponse))
return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %T", retResponse)
}
return resp, nil
}

func (s *mockLiteServer) GetOperation(ctx context.Context, req *lrpb.GetOperationRequest) (*lrpb.Operation, error) {
return s.doOperationResponse(ctx, req)
}

func (s *mockLiteServer) ListOperations(ctx context.Context, req *lrpb.ListOperationsRequest) (*lrpb.ListOperationsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "Unexpected ListOperations call")
}

func (s *mockLiteServer) DeleteOperation(ctx context.Context, req *lrpb.DeleteOperationRequest) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "Unexpected DeleteOperation call")
}

func (s *mockLiteServer) CancelOperation(ctx context.Context, req *lrpb.CancelOperationRequest) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "Unexpected CancelOperation call")
}

func (s *mockLiteServer) WaitOperation(ctx context.Context, req *lrpb.WaitOperationRequest) (*lrpb.Operation, error) {
return nil, status.Errorf(codes.Unimplemented, "Unexpected WaitOperation call")
}
12 changes: 12 additions & 0 deletions pubsublite/internal/test/util.go
Expand Up @@ -14,13 +14,16 @@
package test

import (
"log"
"strings"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"golang.org/x/xerrors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
)

// ErrorEqual compares two errors for equivalence.
Expand Down Expand Up @@ -54,3 +57,12 @@ func (f *FakeSource) Int63() int64 { return f.Ret }

// Seed is unimplemented.
func (f *FakeSource) Seed(seed int64) {}

// MakeAny packs a message into an Any proto.
func MakeAny(msg proto.Message) *anypb.Any {
any, err := anypb.New(msg)
if err != nil {
log.Fatalf("Failed to make Any: %v", err)
}
return any
}
18 changes: 5 additions & 13 deletions pubsublite/internal/wire/rpc_test.go
Expand Up @@ -16,10 +16,10 @@ package wire
import (
"encoding/base64"
"errors"
"log"
"testing"

"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/pubsublite/internal/test"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -30,18 +30,10 @@ import (
spb "google.golang.org/genproto/googleapis/rpc/status"
)

func makeAny(msg proto.Message) *anypb.Any {
any, err := anypb.New(msg)
if err != nil {
log.Fatalf("Failed to make Any: %v", err)
}
return any
}

func makeStreamResetSignal() error {
statuspb := &spb.Status{
Code: int32(codes.Aborted),
Details: []*anypb.Any{makeAny(&errdetails.ErrorInfo{
Details: []*anypb.Any{test.MakeAny(&errdetails.ErrorInfo{
Reason: "RESET", Domain: "pubsublite.googleapis.com",
})},
}
Expand All @@ -63,23 +55,23 @@ func TestIsStreamResetSignal(t *testing.T) {
desc: "non-retryable code",
err: status.ErrorProto(&spb.Status{
Code: int32(codes.FailedPrecondition),
Details: []*anypb.Any{makeAny(&errdetails.ErrorInfo{Reason: "RESET", Domain: "pubsublite.googleapis.com"})},
Details: []*anypb.Any{test.MakeAny(&errdetails.ErrorInfo{Reason: "RESET", Domain: "pubsublite.googleapis.com"})},
}),
want: false,
},
{
desc: "wrong domain",
err: status.ErrorProto(&spb.Status{
Code: int32(codes.Aborted),
Details: []*anypb.Any{makeAny(&errdetails.ErrorInfo{Reason: "RESET"})},
Details: []*anypb.Any{test.MakeAny(&errdetails.ErrorInfo{Reason: "RESET"})},
}),
want: false,
},
{
desc: "wrong reason",
err: status.ErrorProto(&spb.Status{
Code: int32(codes.Aborted),
Details: []*anypb.Any{makeAny(&errdetails.ErrorInfo{Domain: "pubsublite.googleapis.com"})},
Details: []*anypb.Any{test.MakeAny(&errdetails.ErrorInfo{Domain: "pubsublite.googleapis.com"})},
}),
want: false,
},
Expand Down

0 comments on commit ef01940

Please sign in to comment.