Skip to content

Commit

Permalink
feat(pubsub/pstest): add support to register other servers into grpc.…
Browse files Browse the repository at this point in the history
…Server
  • Loading branch information
tpokki committed Apr 18, 2024
1 parent 62d6ecf commit 0ac86b8
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 0 deletions.
11 changes: 11 additions & 0 deletions pubsub/pstest/fake.go
Expand Up @@ -37,6 +37,7 @@ import (
"cloud.google.com/go/internal/testutil"
pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
"go.einride.tech/aip/filtering"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
durpb "google.golang.org/protobuf/types/known/durationpb"
Expand Down Expand Up @@ -111,6 +112,13 @@ func NewServer(opts ...ServerReactorOption) *Server {

// NewServerWithPort creates a new fake server running in the current process at the specified port.
func NewServerWithPort(port int, opts ...ServerReactorOption) *Server {
return NewServerWithCallback(port, func(*grpc.Server) { /* empty */ }, opts...)
}

// NewServerWithCallback creates new fake server running in the current process at the specified port.
// Before starting the server, the provided callback is called to allow caller to register additional fakes
// into grpc server.
func NewServerWithCallback(port int, callback func(*grpc.Server), opts ...ServerReactorOption) *Server {
srv, err := testutil.NewServerWithPort(port)
if err != nil {
panic(fmt.Sprintf("pstest.NewServerWithPort: %v", err))
Expand All @@ -136,6 +144,9 @@ func NewServerWithPort(port int, opts ...ServerReactorOption) *Server {
pb.RegisterPublisherServer(srv.Gsrv, &s.GServer)
pb.RegisterSubscriberServer(srv.Gsrv, &s.GServer)
pb.RegisterSchemaServiceServer(srv.Gsrv, &s.GServer)

callback(srv.Gsrv)

srv.Start()
return s
}
Expand Down
43 changes: 43 additions & 0 deletions pubsub/pstest/fake_test.go
Expand Up @@ -27,10 +27,12 @@ import (
"testing"
"time"

iampb "cloud.google.com/go/iam/apiv1/iampb"
"cloud.google.com/go/internal/testutil"
pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
field_mask "google.golang.org/protobuf/types/known/fieldmaskpb"
Expand Down Expand Up @@ -62,6 +64,47 @@ func TestNewServerWithPort(t *testing.T) {
defer conn.Close()
}

func TestNewServerWithCallback(t *testing.T) {
// Allocate an available port to use with NewServerWithPort and then close it so it's available.
// Note: There is no guarantee that the port does not become used between closing
// the listener and creating the new server with NewServerWithPort, but the chances are
// very small.
l, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatal(err)
}
port := l.Addr().(*net.TCPAddr).Port
l.Close()

additionalFake := struct {
iampb.UnimplementedIAMPolicyServer
}{}

verifyCallback := false
callback := func(grpc *grpc.Server) {
// register something
iampb.RegisterIAMPolicyServer(grpc, &additionalFake)
verifyCallback = true
}

// Pass a non 0 port to demonstrate we can pass a hardcoded port for the server to listen on
srv := NewServerWithCallback(port, callback)
if err != nil {
t.Fatal(err)
}
defer srv.Close()

conn, err := grpc.NewClient(srv.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatal(err)
}
defer conn.Close()

if !verifyCallback {
t.Fatal("callback was not invoked")
}
}

func TestTopics(t *testing.T) {
pclient, sclient, server, cleanup := newFake(context.TODO(), t)
defer cleanup()
Expand Down

0 comments on commit 0ac86b8

Please sign in to comment.