Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(pubsublite): testing support for operations #4361

Merged
merged 2 commits into from Jul 1, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
53 changes: 46 additions & 7 deletions pubsublite/internal/test/mock.go
Expand Up @@ -29,6 +29,7 @@ import (

emptypb "github.com/golang/protobuf/ptypes/empty"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
lrpb "google.golang.org/genproto/googleapis/longrunning"
)

// MockServer is an in-memory mock implementation of a Pub/Sub Lite service,
Expand Down Expand Up @@ -62,6 +63,7 @@ func NewServer() (*Server, error) {
pb.RegisterSubscriberServiceServer(srv.Gsrv, liteServer)
pb.RegisterCursorServiceServer(srv.Gsrv, liteServer)
pb.RegisterPartitionAssignmentServiceServer(srv.Gsrv, liteServer)
lrpb.RegisterOperationsServer(srv.Gsrv, liteServer)
srv.Start()
return &Server{LiteServer: liteServer, gRPCServer: srv}, nil
}
Expand All @@ -87,6 +89,7 @@ type mockLiteServer struct {
pb.SubscriberServiceServer
pb.CursorServiceServer
pb.PartitionAssignmentServiceServer
lrpb.OperationsServer

mu sync.Mutex

Expand Down Expand Up @@ -277,7 +280,7 @@ func (s *mockLiteServer) doTopicResponse(ctx context.Context, req interface{}) (
}
resp, ok := retResponse.(*pb.Topic)
if !ok {
return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %v", reflect.TypeOf(retResponse))
return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %T", retResponse)
}
return resp, nil
}
Expand All @@ -289,7 +292,19 @@ func (s *mockLiteServer) doSubscriptionResponse(ctx context.Context, req interfa
}
resp, ok := retResponse.(*pb.Subscription)
if !ok {
return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %v", reflect.TypeOf(retResponse))
return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %T", retResponse)
}
return resp, nil
}

func (s *mockLiteServer) doOperationResponse(ctx context.Context, req interface{}) (*lrpb.Operation, error) {
retResponse, retErr := s.popGlobalVerifiers(req)
if retErr != nil {
return nil, retErr
}
resp, ok := retResponse.(*lrpb.Operation)
if !ok {
return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %T", retResponse)
}
return resp, nil
}
Expand All @@ -301,7 +316,7 @@ func (s *mockLiteServer) doEmptyResponse(ctx context.Context, req interface{}) (
}
resp, ok := retResponse.(*emptypb.Empty)
if !ok {
return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %v", reflect.TypeOf(retResponse))
return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %T", retResponse)
}
return resp, nil
}
Expand All @@ -325,7 +340,7 @@ func (s *mockLiteServer) GetTopicPartitions(ctx context.Context, req *pb.GetTopi
}
resp, ok := retResponse.(*pb.TopicPartitions)
if !ok {
return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %v", reflect.TypeOf(retResponse))
return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %T", retResponse)
}
return resp, nil
}
Expand All @@ -346,6 +361,10 @@ func (s *mockLiteServer) UpdateSubscription(ctx context.Context, req *pb.UpdateS
return s.doSubscriptionResponse(ctx, req)
}

func (s *mockLiteServer) SeekSubscription(ctx context.Context, req *pb.SeekSubscriptionRequest) (*lrpb.Operation, error) {
return s.doOperationResponse(ctx, req)
}

func (s *mockLiteServer) DeleteSubscription(ctx context.Context, req *pb.DeleteSubscriptionRequest) (*emptypb.Empty, error) {
return s.doEmptyResponse(ctx, req)
}
Expand All @@ -357,7 +376,7 @@ func (s *mockLiteServer) ListTopics(ctx context.Context, req *pb.ListTopicsReque
}
resp, ok := retResponse.(*pb.ListTopicsResponse)
if !ok {
return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %v", reflect.TypeOf(retResponse))
return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %T", retResponse)
}
return resp, nil
}
Expand All @@ -369,7 +388,7 @@ func (s *mockLiteServer) ListTopicSubscriptions(ctx context.Context, req *pb.Lis
}
resp, ok := retResponse.(*pb.ListTopicSubscriptionsResponse)
if !ok {
return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %v", reflect.TypeOf(retResponse))
return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %T", retResponse)
}
return resp, nil
}
Expand All @@ -381,7 +400,27 @@ func (s *mockLiteServer) ListSubscriptions(ctx context.Context, req *pb.ListSubs
}
resp, ok := retResponse.(*pb.ListSubscriptionsResponse)
if !ok {
return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %v", reflect.TypeOf(retResponse))
return nil, status.Errorf(codes.FailedPrecondition, "mockserver: invalid response type %T", retResponse)
}
return resp, nil
}

func (s *mockLiteServer) GetOperation(ctx context.Context, req *lrpb.GetOperationRequest) (*lrpb.Operation, error) {
return s.doOperationResponse(ctx, req)
}

func (s *mockLiteServer) ListOperations(ctx context.Context, req *lrpb.ListOperationsRequest) (*lrpb.ListOperationsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "Unexpected ListOperations call")
}

func (s *mockLiteServer) DeleteOperation(ctx context.Context, req *lrpb.DeleteOperationRequest) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "Unexpected DeleteOperation call")
}

func (s *mockLiteServer) CancelOperation(ctx context.Context, req *lrpb.CancelOperationRequest) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "Unexpected CancelOperation call")
}

func (s *mockLiteServer) WaitOperation(ctx context.Context, req *lrpb.WaitOperationRequest) (*lrpb.Operation, error) {
return nil, status.Errorf(codes.Unimplemented, "Unexpected WaitOperation call")
}
12 changes: 12 additions & 0 deletions pubsublite/internal/test/util.go
Expand Up @@ -14,13 +14,16 @@
package test

import (
"log"
"strings"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"golang.org/x/xerrors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
)

// ErrorEqual compares two errors for equivalence.
Expand Down Expand Up @@ -54,3 +57,12 @@ func (f *FakeSource) Int63() int64 { return f.Ret }

// Seed is unimplemented.
func (f *FakeSource) Seed(seed int64) {}

// MakeAny packs a message into an Any proto.
func MakeAny(msg proto.Message) *anypb.Any {
any, err := anypb.New(msg)
if err != nil {
log.Fatalf("Failed to make Any: %v", err)
}
return any
}
18 changes: 5 additions & 13 deletions pubsublite/internal/wire/rpc_test.go
Expand Up @@ -16,10 +16,10 @@ package wire
import (
"encoding/base64"
"errors"
"log"
"testing"

"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/pubsublite/internal/test"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -30,18 +30,10 @@ import (
spb "google.golang.org/genproto/googleapis/rpc/status"
)

func makeAny(msg proto.Message) *anypb.Any {
any, err := anypb.New(msg)
if err != nil {
log.Fatalf("Failed to make Any: %v", err)
}
return any
}

func makeStreamResetSignal() error {
statuspb := &spb.Status{
Code: int32(codes.Aborted),
Details: []*anypb.Any{makeAny(&errdetails.ErrorInfo{
Details: []*anypb.Any{test.MakeAny(&errdetails.ErrorInfo{
Reason: "RESET", Domain: "pubsublite.googleapis.com",
})},
}
Expand All @@ -63,23 +55,23 @@ func TestIsStreamResetSignal(t *testing.T) {
desc: "non-retryable code",
err: status.ErrorProto(&spb.Status{
Code: int32(codes.FailedPrecondition),
Details: []*anypb.Any{makeAny(&errdetails.ErrorInfo{Reason: "RESET", Domain: "pubsublite.googleapis.com"})},
Details: []*anypb.Any{test.MakeAny(&errdetails.ErrorInfo{Reason: "RESET", Domain: "pubsublite.googleapis.com"})},
}),
want: false,
},
{
desc: "wrong domain",
err: status.ErrorProto(&spb.Status{
Code: int32(codes.Aborted),
Details: []*anypb.Any{makeAny(&errdetails.ErrorInfo{Reason: "RESET"})},
Details: []*anypb.Any{test.MakeAny(&errdetails.ErrorInfo{Reason: "RESET"})},
}),
want: false,
},
{
desc: "wrong reason",
err: status.ErrorProto(&spb.Status{
Code: int32(codes.Aborted),
Details: []*anypb.Any{makeAny(&errdetails.ErrorInfo{Domain: "pubsublite.googleapis.com"})},
Details: []*anypb.Any{test.MakeAny(&errdetails.ErrorInfo{Domain: "pubsublite.googleapis.com"})},
}),
want: false,
},
Expand Down