From ef019401630c089ae81400d6eefdafc914312c2b Mon Sep 17 00:00:00 2001 From: tmdiep Date: Fri, 2 Jul 2021 07:02:31 +1000 Subject: [PATCH] test(pubsublite): testing support for operations (#4361) --- pubsublite/internal/test/mock.go | 53 ++++++++++++++++++++++++---- pubsublite/internal/test/util.go | 12 +++++++ pubsublite/internal/wire/rpc_test.go | 18 +++------- 3 files changed, 63 insertions(+), 20 deletions(-) diff --git a/pubsublite/internal/test/mock.go b/pubsublite/internal/test/mock.go index 425cda76c37..7ff8275d8ec 100644 --- a/pubsublite/internal/test/mock.go +++ b/pubsublite/internal/test/mock.go @@ -29,6 +29,7 @@ import ( emptypb "github.com/golang/protobuf/ptypes/empty" pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" + lrpb "google.golang.org/genproto/googleapis/longrunning" ) // MockServer is an in-memory mock implementation of a Pub/Sub Lite service, @@ -62,6 +63,7 @@ func NewServer() (*Server, error) { pb.RegisterSubscriberServiceServer(srv.Gsrv, liteServer) pb.RegisterCursorServiceServer(srv.Gsrv, liteServer) pb.RegisterPartitionAssignmentServiceServer(srv.Gsrv, liteServer) + lrpb.RegisterOperationsServer(srv.Gsrv, liteServer) srv.Start() return &Server{LiteServer: liteServer, gRPCServer: srv}, nil } @@ -87,6 +89,7 @@ type mockLiteServer struct { pb.SubscriberServiceServer pb.CursorServiceServer pb.PartitionAssignmentServiceServer + lrpb.OperationsServer mu sync.Mutex @@ -277,7 +280,7 @@ func (s *mockLiteServer) doTopicResponse(ctx context.Context, req interface{}) ( } resp, ok := retResponse.(*pb.Topic) if !ok { - return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %v", reflect.TypeOf(retResponse)) + return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %T", retResponse) } return resp, nil } @@ -289,7 +292,19 @@ func (s *mockLiteServer) doSubscriptionResponse(ctx context.Context, req interfa } resp, ok := retResponse.(*pb.Subscription) if !ok { - return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %v", reflect.TypeOf(retResponse)) + return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %T", retResponse) + } + return resp, nil +} + +func (s *mockLiteServer) doOperationResponse(ctx context.Context, req interface{}) (*lrpb.Operation, error) { + retResponse, retErr := s.popGlobalVerifiers(req) + if retErr != nil { + return nil, retErr + } + resp, ok := retResponse.(*lrpb.Operation) + if !ok { + return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %T", retResponse) } return resp, nil } @@ -301,7 +316,7 @@ func (s *mockLiteServer) doEmptyResponse(ctx context.Context, req interface{}) ( } resp, ok := retResponse.(*emptypb.Empty) if !ok { - return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %v", reflect.TypeOf(retResponse)) + return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %T", retResponse) } return resp, nil } @@ -325,7 +340,7 @@ func (s *mockLiteServer) GetTopicPartitions(ctx context.Context, req *pb.GetTopi } resp, ok := retResponse.(*pb.TopicPartitions) if !ok { - return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %v", reflect.TypeOf(retResponse)) + return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %T", retResponse) } return resp, nil } @@ -346,6 +361,10 @@ func (s *mockLiteServer) UpdateSubscription(ctx context.Context, req *pb.UpdateS return s.doSubscriptionResponse(ctx, req) } +func (s *mockLiteServer) SeekSubscription(ctx context.Context, req *pb.SeekSubscriptionRequest) (*lrpb.Operation, error) { + return s.doOperationResponse(ctx, req) +} + func (s *mockLiteServer) DeleteSubscription(ctx context.Context, req *pb.DeleteSubscriptionRequest) (*emptypb.Empty, error) { return s.doEmptyResponse(ctx, req) } @@ -357,7 +376,7 @@ func (s *mockLiteServer) ListTopics(ctx context.Context, req *pb.ListTopicsReque } resp, ok := retResponse.(*pb.ListTopicsResponse) if !ok { - return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %v", reflect.TypeOf(retResponse)) + return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %T", retResponse) } return resp, nil } @@ -369,7 +388,7 @@ func (s *mockLiteServer) ListTopicSubscriptions(ctx context.Context, req *pb.Lis } resp, ok := retResponse.(*pb.ListTopicSubscriptionsResponse) if !ok { - return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %v", reflect.TypeOf(retResponse)) + return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %T", retResponse) } return resp, nil } @@ -381,7 +400,27 @@ func (s *mockLiteServer) ListSubscriptions(ctx context.Context, req *pb.ListSubs } resp, ok := retResponse.(*pb.ListSubscriptionsResponse) if !ok { - return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %v", reflect.TypeOf(retResponse)) + return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %T", retResponse) } return resp, nil } + +func (s *mockLiteServer) GetOperation(ctx context.Context, req *lrpb.GetOperationRequest) (*lrpb.Operation, error) { + return s.doOperationResponse(ctx, req) +} + +func (s *mockLiteServer) ListOperations(ctx context.Context, req *lrpb.ListOperationsRequest) (*lrpb.ListOperationsResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "Unexpected ListOperations call") +} + +func (s *mockLiteServer) DeleteOperation(ctx context.Context, req *lrpb.DeleteOperationRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "Unexpected DeleteOperation call") +} + +func (s *mockLiteServer) CancelOperation(ctx context.Context, req *lrpb.CancelOperationRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "Unexpected CancelOperation call") +} + +func (s *mockLiteServer) WaitOperation(ctx context.Context, req *lrpb.WaitOperationRequest) (*lrpb.Operation, error) { + return nil, status.Errorf(codes.Unimplemented, "Unexpected WaitOperation call") +} diff --git a/pubsublite/internal/test/util.go b/pubsublite/internal/test/util.go index 6a35095afdf..29afb9a35f8 100644 --- a/pubsublite/internal/test/util.go +++ b/pubsublite/internal/test/util.go @@ -14,6 +14,7 @@ package test import ( + "log" "strings" "github.com/google/go-cmp/cmp" @@ -21,6 +22,8 @@ import ( "golang.org/x/xerrors" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" ) // ErrorEqual compares two errors for equivalence. @@ -54,3 +57,12 @@ func (f *FakeSource) Int63() int64 { return f.Ret } // Seed is unimplemented. func (f *FakeSource) Seed(seed int64) {} + +// MakeAny packs a message into an Any proto. +func MakeAny(msg proto.Message) *anypb.Any { + any, err := anypb.New(msg) + if err != nil { + log.Fatalf("Failed to make Any: %v", err) + } + return any +} diff --git a/pubsublite/internal/wire/rpc_test.go b/pubsublite/internal/wire/rpc_test.go index 850a946a76f..e624b9e70e2 100644 --- a/pubsublite/internal/wire/rpc_test.go +++ b/pubsublite/internal/wire/rpc_test.go @@ -16,10 +16,10 @@ package wire import ( "encoding/base64" "errors" - "log" "testing" "cloud.google.com/go/internal/testutil" + "cloud.google.com/go/pubsublite/internal/test" "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -30,18 +30,10 @@ import ( spb "google.golang.org/genproto/googleapis/rpc/status" ) -func makeAny(msg proto.Message) *anypb.Any { - any, err := anypb.New(msg) - if err != nil { - log.Fatalf("Failed to make Any: %v", err) - } - return any -} - func makeStreamResetSignal() error { statuspb := &spb.Status{ Code: int32(codes.Aborted), - Details: []*anypb.Any{makeAny(&errdetails.ErrorInfo{ + Details: []*anypb.Any{test.MakeAny(&errdetails.ErrorInfo{ Reason: "RESET", Domain: "pubsublite.googleapis.com", })}, } @@ -63,7 +55,7 @@ func TestIsStreamResetSignal(t *testing.T) { desc: "non-retryable code", err: status.ErrorProto(&spb.Status{ Code: int32(codes.FailedPrecondition), - Details: []*anypb.Any{makeAny(&errdetails.ErrorInfo{Reason: "RESET", Domain: "pubsublite.googleapis.com"})}, + Details: []*anypb.Any{test.MakeAny(&errdetails.ErrorInfo{Reason: "RESET", Domain: "pubsublite.googleapis.com"})}, }), want: false, }, @@ -71,7 +63,7 @@ func TestIsStreamResetSignal(t *testing.T) { desc: "wrong domain", err: status.ErrorProto(&spb.Status{ Code: int32(codes.Aborted), - Details: []*anypb.Any{makeAny(&errdetails.ErrorInfo{Reason: "RESET"})}, + Details: []*anypb.Any{test.MakeAny(&errdetails.ErrorInfo{Reason: "RESET"})}, }), want: false, }, @@ -79,7 +71,7 @@ func TestIsStreamResetSignal(t *testing.T) { desc: "wrong reason", err: status.ErrorProto(&spb.Status{ Code: int32(codes.Aborted), - Details: []*anypb.Any{makeAny(&errdetails.ErrorInfo{Domain: "pubsublite.googleapis.com"})}, + Details: []*anypb.Any{test.MakeAny(&errdetails.ErrorInfo{Domain: "pubsublite.googleapis.com"})}, }), want: false, },