From e5e85831f791ebb155380c5352b4636c45c886d3 Mon Sep 17 00:00:00 2001 From: Toni Pokki Date: Sun, 7 Apr 2024 12:19:28 +0300 Subject: [PATCH] feat(pubsub/pstest): add support to register other servers into grpc.Server --- pubsub/pstest/fake.go | 11 ++++++++++ pubsub/pstest/fake_test.go | 43 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/pubsub/pstest/fake.go b/pubsub/pstest/fake.go index 9448c88ca21..42d6ac82a07 100644 --- a/pubsub/pstest/fake.go +++ b/pubsub/pstest/fake.go @@ -37,6 +37,7 @@ import ( "cloud.google.com/go/internal/testutil" pb "cloud.google.com/go/pubsub/apiv1/pubsubpb" "go.einride.tech/aip/filtering" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" durpb "google.golang.org/protobuf/types/known/durationpb" @@ -111,6 +112,13 @@ func NewServer(opts ...ServerReactorOption) *Server { // NewServerWithPort creates a new fake server running in the current process at the specified port. func NewServerWithPort(port int, opts ...ServerReactorOption) *Server { + return NewServerWithCallback(port, func(*grpc.Server) { /* empty */ }, opts...) +} + +// NewServerWithCallback creates new fake server running in the current process at the specified port. +// Before starting the server, the provided callback is called to allow caller to register additional fakes +// into grpc server. +func NewServerWithCallback(port int, callback func(*grpc.Server), opts ...ServerReactorOption) *Server { srv, err := testutil.NewServerWithPort(port) if err != nil { panic(fmt.Sprintf("pstest.NewServerWithPort: %v", err)) @@ -136,6 +144,9 @@ func NewServerWithPort(port int, opts ...ServerReactorOption) *Server { pb.RegisterPublisherServer(srv.Gsrv, &s.GServer) pb.RegisterSubscriberServer(srv.Gsrv, &s.GServer) pb.RegisterSchemaServiceServer(srv.Gsrv, &s.GServer) + + callback(srv.Gsrv) + srv.Start() return s } diff --git a/pubsub/pstest/fake_test.go b/pubsub/pstest/fake_test.go index c259fab832c..98504606a6e 100644 --- a/pubsub/pstest/fake_test.go +++ b/pubsub/pstest/fake_test.go @@ -27,10 +27,12 @@ import ( "testing" "time" + iampb "cloud.google.com/go/iam/apiv1/iampb" "cloud.google.com/go/internal/testutil" pb "cloud.google.com/go/pubsub/apiv1/pubsubpb" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/durationpb" field_mask "google.golang.org/protobuf/types/known/fieldmaskpb" @@ -62,6 +64,47 @@ func TestNewServerWithPort(t *testing.T) { defer conn.Close() } +func TestNewServerWithCallback(t *testing.T) { + // Allocate an available port to use with NewServerWithPort and then close it so it's available. + // Note: There is no guarantee that the port does not become used between closing + // the listener and creating the new server with NewServerWithPort, but the chances are + // very small. + l, err := net.Listen("tcp", ":0") + if err != nil { + t.Fatal(err) + } + port := l.Addr().(*net.TCPAddr).Port + l.Close() + + additionalFake := struct { + iampb.UnimplementedIAMPolicyServer + }{} + + verifyCallback := false + callback := func(grpc *grpc.Server) { + // register something + iampb.RegisterIAMPolicyServer(grpc, &additionalFake) + verifyCallback = true + } + + // Pass a non 0 port to demonstrate we can pass a hardcoded port for the server to listen on + srv := NewServerWithCallback(port, callback) + if err != nil { + t.Fatal(err) + } + defer srv.Close() + + conn, err := grpc.NewClient(srv.Addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + if !verifyCallback { + t.Fatal("callback was not invoked") + } +} + func TestTopics(t *testing.T) { pclient, sclient, server, cleanup := newFake(context.TODO(), t) defer cleanup()