Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub/pstest): add support to register other servers into grpc.Server #9722

Merged
merged 2 commits into from May 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions pubsub/pstest/fake.go
Expand Up @@ -37,6 +37,7 @@ import (
"cloud.google.com/go/internal/testutil"
pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
"go.einride.tech/aip/filtering"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
durpb "google.golang.org/protobuf/types/known/durationpb"
Expand Down Expand Up @@ -111,6 +112,13 @@ func NewServer(opts ...ServerReactorOption) *Server {

// NewServerWithPort creates a new fake server running in the current process at the specified port.
func NewServerWithPort(port int, opts ...ServerReactorOption) *Server {
return NewServerWithCallback(port, func(*grpc.Server) { /* empty */ }, opts...)
}

// NewServerWithCallback creates new fake server running in the current process at the specified port.
// Before starting the server, the provided callback is called to allow caller to register additional fakes
// into grpc server.
func NewServerWithCallback(port int, callback func(*grpc.Server), opts ...ServerReactorOption) *Server {
srv, err := testutil.NewServerWithPort(port)
if err != nil {
panic(fmt.Sprintf("pstest.NewServerWithPort: %v", err))
Expand All @@ -136,6 +144,9 @@ func NewServerWithPort(port int, opts ...ServerReactorOption) *Server {
pb.RegisterPublisherServer(srv.Gsrv, &s.GServer)
pb.RegisterSubscriberServer(srv.Gsrv, &s.GServer)
pb.RegisterSchemaServiceServer(srv.Gsrv, &s.GServer)

callback(srv.Gsrv)

srv.Start()
return s
}
Expand Down
43 changes: 43 additions & 0 deletions pubsub/pstest/fake_test.go
Expand Up @@ -27,10 +27,12 @@ import (
"testing"
"time"

iampb "cloud.google.com/go/iam/apiv1/iampb"
"cloud.google.com/go/internal/testutil"
pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
field_mask "google.golang.org/protobuf/types/known/fieldmaskpb"
Expand Down Expand Up @@ -62,6 +64,47 @@ func TestNewServerWithPort(t *testing.T) {
defer conn.Close()
}

func TestNewServerWithCallback(t *testing.T) {
// Allocate an available port to use with NewServerWithPort and then close it so it's available.
// Note: There is no guarantee that the port does not become used between closing
// the listener and creating the new server with NewServerWithPort, but the chances are
// very small.
l, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatal(err)
}
port := l.Addr().(*net.TCPAddr).Port
l.Close()

additionalFake := struct {
iampb.UnimplementedIAMPolicyServer
}{}

verifyCallback := false
callback := func(grpc *grpc.Server) {
// register something
iampb.RegisterIAMPolicyServer(grpc, &additionalFake)
verifyCallback = true
}

// Pass a non 0 port to demonstrate we can pass a hardcoded port for the server to listen on
srv := NewServerWithCallback(port, callback)
if err != nil {
t.Fatal(err)
}
defer srv.Close()

conn, err := grpc.NewClient(srv.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatal(err)
}
defer conn.Close()

if !verifyCallback {
t.Fatal("callback was not invoked")
}
}

func TestTopics(t *testing.T) {
pclient, sclient, server, cleanup := newFake(context.TODO(), t)
defer cleanup()
Expand Down