Skip to content

Commit

Permalink
fakeserver: add v3 support to the xDS fakeserver implementation (#5698)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Oct 17, 2022
1 parent 912765f commit bb3d739
Showing 1 changed file with 146 additions and 23 deletions.
169 changes: 146 additions & 23 deletions xds/internal/testutils/fakeserver/server.go
Expand Up @@ -17,6 +17,10 @@
*/

// Package fakeserver provides a fake implementation of the management server.
//
// This package is recommended only for scenarios which cannot be tested using
// the xDS management server (which uses envoy-go-control-plane) provided by the
// `internal/testutils/xds/e2e` package.
package fakeserver

import (
Expand All @@ -31,10 +35,14 @@ import (
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/status"

discoverypb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
lrsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2"
lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2"
v2discoverypb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
v2discoverygrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
v3discoverygrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
v2lrsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2"
v2lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2"
v3lrsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v3"
v3lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v3"
)

const (
Expand All @@ -61,6 +69,10 @@ type Response struct {
// Server is a fake implementation of xDS and LRS protocols. It listens on the
// same port for both services and exposes a bunch of channels to send/receive
// messages.
//
// This server is recommended only for scenarios which cannot be tested using
// the xDS management server (which uses envoy-go-control-plane) provided by the
// `internal/testutils/xds/e2e` package.
type Server struct {
// XDSRequestChan is a channel on which received xDS requests are made
// available to the users of this Server.
Expand All @@ -74,6 +86,12 @@ type Server struct {
// LRSResponseChan is a channel on which the Server accepts the LRS
// response to be sent to the client.
LRSResponseChan chan *Response
// LRSStreamOpenChan is a channel on which the Server sends notifications
// when a new LRS stream is created.
LRSStreamOpenChan *testutils.Channel
// LRSStreamCloseChan is a channel on which the Server sends notifications
// when an existing LRS stream is closed.
LRSStreamCloseChan *testutils.Channel
// NewConnChan is a channel on which the fake server notifies receipt of new
// connection attempts. Tests can gate on this event before proceeding to
// other actions which depend on a connection to the fake server being up.
Expand All @@ -82,8 +100,10 @@ type Server struct {
Address string

// The underlying fake implementation of xDS and LRS.
xdsS *xdsServer
lrsS *lrsServer
xdsV2 *xdsServerV2
xdsV3 *xdsServerV3
lrsV2 *lrsServerV2
lrsV3 *lrsServerV3
}

type wrappedListener struct {
Expand All @@ -110,34 +130,40 @@ func StartServer() (*Server, func(), error) {
}

s := &Server{
XDSRequestChan: testutils.NewChannelWithSize(defaultChannelBufferSize),
LRSRequestChan: testutils.NewChannelWithSize(defaultChannelBufferSize),
NewConnChan: testutils.NewChannelWithSize(defaultChannelBufferSize),
XDSResponseChan: make(chan *Response, defaultChannelBufferSize),
LRSResponseChan: make(chan *Response, 1), // The server only ever sends one response.
Address: lis.Addr().String(),
XDSRequestChan: testutils.NewChannelWithSize(defaultChannelBufferSize),
LRSRequestChan: testutils.NewChannelWithSize(defaultChannelBufferSize),
NewConnChan: testutils.NewChannelWithSize(defaultChannelBufferSize),
XDSResponseChan: make(chan *Response, defaultChannelBufferSize),
LRSResponseChan: make(chan *Response, 1), // The server only ever sends one response.
LRSStreamOpenChan: testutils.NewChannel(),
LRSStreamCloseChan: testutils.NewChannel(),
Address: lis.Addr().String(),
}
s.xdsS = &xdsServer{reqChan: s.XDSRequestChan, respChan: s.XDSResponseChan}
s.lrsS = &lrsServer{reqChan: s.LRSRequestChan, respChan: s.LRSResponseChan}
s.xdsV2 = &xdsServerV2{reqChan: s.XDSRequestChan, respChan: s.XDSResponseChan}
s.xdsV3 = &xdsServerV3{reqChan: s.XDSRequestChan, respChan: s.XDSResponseChan}
s.lrsV2 = &lrsServerV2{reqChan: s.LRSRequestChan, respChan: s.LRSResponseChan, streamOpenChan: s.LRSStreamOpenChan, streamCloseChan: s.LRSStreamCloseChan}
s.lrsV3 = &lrsServerV3{reqChan: s.LRSRequestChan, respChan: s.LRSResponseChan, streamOpenChan: s.LRSStreamOpenChan, streamCloseChan: s.LRSStreamCloseChan}
wp := &wrappedListener{
Listener: lis,
server: s,
}

server := grpc.NewServer()
lrsgrpc.RegisterLoadReportingServiceServer(server, s.lrsS)
adsgrpc.RegisterAggregatedDiscoveryServiceServer(server, s.xdsS)
v2lrsgrpc.RegisterLoadReportingServiceServer(server, s.lrsV2)
v2discoverygrpc.RegisterAggregatedDiscoveryServiceServer(server, s.xdsV2)
v3lrsgrpc.RegisterLoadReportingServiceServer(server, s.lrsV3)
v3discoverygrpc.RegisterAggregatedDiscoveryServiceServer(server, s.xdsV3)
go server.Serve(wp)

return s, func() { server.Stop() }, nil
}

type xdsServer struct {
type xdsServerV2 struct {
reqChan *testutils.Channel
respChan chan *Response
}

func (xdsS *xdsServer) StreamAggregatedResources(s adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
func (xdsS *xdsServerV2) StreamAggregatedResources(s v2discoverygrpc.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
errCh := make(chan error, 2)
go func() {
for {
Expand All @@ -162,7 +188,7 @@ func (xdsS *xdsServer) StreamAggregatedResources(s adsgrpc.AggregatedDiscoverySe
retErr = r.Err
return
}
if err := s.Send(r.Resp.(*discoverypb.DiscoveryResponse)); err != nil {
if err := s.Send(r.Resp.(*v2discoverypb.DiscoveryResponse)); err != nil {
retErr = err
return
}
Expand All @@ -179,16 +205,113 @@ func (xdsS *xdsServer) StreamAggregatedResources(s adsgrpc.AggregatedDiscoverySe
return nil
}

func (xdsS *xdsServer) DeltaAggregatedResources(adsgrpc.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error {
func (xdsS *xdsServerV2) DeltaAggregatedResources(v2discoverygrpc.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error {
return status.Error(codes.Unimplemented, "")
}

type lrsServer struct {
type xdsServerV3 struct {
reqChan *testutils.Channel
respChan chan *Response
}

func (lrsS *lrsServer) StreamLoadStats(s lrsgrpc.LoadReportingService_StreamLoadStatsServer) error {
func (xdsS *xdsServerV3) StreamAggregatedResources(s v3discoverygrpc.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
errCh := make(chan error, 2)
go func() {
for {
req, err := s.Recv()
if err != nil {
errCh <- err
return
}
xdsS.reqChan.Send(&Request{req, err})
}
}()
go func() {
var retErr error
defer func() {
errCh <- retErr
}()

for {
select {
case r := <-xdsS.respChan:
if r.Err != nil {
retErr = r.Err
return
}
if err := s.Send(r.Resp.(*v3discoverypb.DiscoveryResponse)); err != nil {
retErr = err
return
}
case <-s.Context().Done():
retErr = s.Context().Err()
return
}
}
}()

if err := <-errCh; err != nil {
return err
}
return nil
}

func (xdsS *xdsServerV3) DeltaAggregatedResources(v3discoverygrpc.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error {
return status.Error(codes.Unimplemented, "")
}

type lrsServerV2 struct {
reqChan *testutils.Channel
respChan chan *Response
streamOpenChan *testutils.Channel
streamCloseChan *testutils.Channel
}

func (lrsS *lrsServerV2) StreamLoadStats(s v2lrsgrpc.LoadReportingService_StreamLoadStatsServer) error {
lrsS.streamOpenChan.Send(nil)
defer lrsS.streamCloseChan.Send(nil)

req, err := s.Recv()
lrsS.reqChan.Send(&Request{req, err})
if err != nil {
return err
}

select {
case r := <-lrsS.respChan:
if r.Err != nil {
return r.Err
}
if err := s.Send(r.Resp.(*v2lrspb.LoadStatsResponse)); err != nil {
return err
}
case <-s.Context().Done():
return s.Context().Err()
}

for {
req, err := s.Recv()
lrsS.reqChan.Send(&Request{req, err})
if err != nil {
if err == io.EOF {
return nil
}
return err
}
}
}

type lrsServerV3 struct {
reqChan *testutils.Channel
respChan chan *Response
streamOpenChan *testutils.Channel
streamCloseChan *testutils.Channel
}

func (lrsS *lrsServerV3) StreamLoadStats(s v3lrsgrpc.LoadReportingService_StreamLoadStatsServer) error {
lrsS.streamOpenChan.Send(nil)
defer lrsS.streamCloseChan.Send(nil)

req, err := s.Recv()
lrsS.reqChan.Send(&Request{req, err})
if err != nil {
Expand All @@ -200,7 +323,7 @@ func (lrsS *lrsServer) StreamLoadStats(s lrsgrpc.LoadReportingService_StreamLoad
if r.Err != nil {
return r.Err
}
if err := s.Send(r.Resp.(*lrspb.LoadStatsResponse)); err != nil {
if err := s.Send(r.Resp.(*v3lrspb.LoadStatsResponse)); err != nil {
return err
}
case <-s.Context().Done():
Expand Down

0 comments on commit bb3d739

Please sign in to comment.