Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rls: control channel implementation #5046

Merged
merged 11 commits into from Dec 15, 2021
2 changes: 1 addition & 1 deletion balancer/rls/internal/builder.go
Expand Up @@ -23,7 +23,7 @@ import (
"google.golang.org/grpc/balancer"
)

const rlsBalancerName = "rls"
const rlsBalancerName = "rls_experimental"

func init() {
balancer.Register(&rlsBB{})
Expand Down
92 changes: 49 additions & 43 deletions balancer/rls/internal/control_channel.go
Expand Up @@ -47,75 +47,65 @@ type controlChannel struct {
// rpcTimeout specifies the timeout for the RouteLookup RPC call. The LB
// policy receives this value in its service config.
rpcTimeout time.Duration
// buildOpts is the balancer.BuildOptions passed to the RLS LB policy. This
// will used to determine the dial options for the control channel.
buildOpts balancer.BuildOptions
// backToReadyCh is the channel on which an update is pushed when the
// connectivity state changes from READY --> TRANSIENT_FAILURE --> READY.
backToReadyCh chan struct{}
// throttler in an adaptive throttling implementation used to avoid
// hammering the RLS service while it is overloaded or down.
throttler adaptiveThrottler

ctx context.Context
ctxCancel context.CancelFunc
cc *grpc.ClientConn
stub rlsgrpc.RouteLookupServiceClient
logger *internalgrpclog.PrefixLogger
cc *grpc.ClientConn
stub rlsgrpc.RouteLookupServiceClient
logger *internalgrpclog.PrefixLogger
}

func newControlChannel(rlsServerName string, rpcTimeout time.Duration, bOpts balancer.BuildOptions, backToReadyCh chan struct{}) (*controlChannel, error) {
ctx, cancel := context.WithCancel(context.Background())
ctrlCh := &controlChannel{
ctx: ctx,
ctxCancel: cancel,
rpcTimeout: rpcTimeout,
buildOpts: bOpts,
backToReadyCh: backToReadyCh,
throttler: newAdaptiveThrottler(),
}
ctrlCh.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-control-channel %p] ", ctrlCh))

dopts, err := ctrlCh.dialOpts()
dopts, err := ctrlCh.dialOpts(bOpts)
if err != nil {
return nil, err
}
cc, err := grpc.Dial(rlsServerName, dopts...)
ctrlCh.cc, err = grpc.Dial(rlsServerName, dopts...)
if err != nil {
return nil, err
}
ctrlCh.cc = cc
ctrlCh.stub = rlsgrpc.NewRouteLookupServiceClient(cc)
ctrlCh.stub = rlsgrpc.NewRouteLookupServiceClient(ctrlCh.cc)
ctrlCh.logger.Infof("Control channel created to RLS server at: %v", rlsServerName)

go ctrlCh.monitorConnectivityState()
return ctrlCh, nil
}

// dialOpts constructs the dial options for the control plane channel.
func (cc *controlChannel) dialOpts() ([]grpc.DialOption, error) {
func (cc *controlChannel) dialOpts(bOpts balancer.BuildOptions) ([]grpc.DialOption, error) {
// The control plane channel will use the same authority as the parent
// channel for server authorization. This ensures that the identity of the
// RLS server and the identity of the backends is the same, so if the RLS
// config is injected by an attacker, it cannot cause leakage of private
// information contained in headers set by the application.
dopts := []grpc.DialOption{grpc.WithAuthority(cc.buildOpts.Authority)}
if cc.buildOpts.Dialer != nil {
dopts = append(dopts, grpc.WithContextDialer(cc.buildOpts.Dialer))
dopts := []grpc.DialOption{grpc.WithAuthority(bOpts.Authority)}
if bOpts.Dialer != nil {
dopts = append(dopts, grpc.WithContextDialer(bOpts.Dialer))
}

// The control channel will use the channel credentials from the parent
// channel, including any call creds associated with the channel creds.
var credsOpt grpc.DialOption
switch {
case cc.buildOpts.DialCreds != nil:
credsOpt = grpc.WithTransportCredentials(cc.buildOpts.DialCreds.Clone())
case cc.buildOpts.CredsBundle != nil:
case bOpts.DialCreds != nil:
credsOpt = grpc.WithTransportCredentials(bOpts.DialCreds.Clone())
case bOpts.CredsBundle != nil:
// The "fallback" mode in google default credentials (which is the only
// type of credentials we expect to be used with RLS) uses TLS/ALTS
// creds for transport and uses the same call creds as that on the
// parent bundle.
bundle, err := cc.buildOpts.CredsBundle.NewWithMode(internal.CredsBundleModeFallback)
bundle, err := bOpts.CredsBundle.NewWithMode(internal.CredsBundleModeFallback)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -150,33 +140,34 @@ func (cc *controlChannel) monitorConnectivityState() {
// returning only one new picker, regardless of how many backoff timers are
// cancelled.

// Using the background context is fine here since we check for the ClientConn
// entering SHUTDOWN and return early in that case.
ctx := context.Background()

// Wait for the control channel to become READY.
for s := cc.cc.GetState(); s != connectivity.Ready; s = cc.cc.GetState() {
if !cc.cc.WaitForStateChange(cc.ctx, s) {
// cc.ctx has expired. Can happen only if close() was called.
return
}
if !cc.waitForReady(ctx) {
return
}
cc.logger.Infof("Connectivity state is %s", cc.cc.GetState())
cc.logger.Infof("Connectivity state is READY")

for {

// Wait for the control channel to enter anything other than READY.
state := cc.cc.GetState()
if !cc.cc.WaitForStateChange(cc.ctx, state) {
// There is a small chance that the control channel has moved out of READY
// since the time we last saw it enter READY. Wait for it to enter anything
// other than READY.
if !cc.cc.WaitForStateChange(ctx, connectivity.Ready) {
// cc.ctx has expired. Can happen only if close() was called.
return
}
if cc.cc.GetState() == connectivity.Shutdown {
return
}
cc.logger.Infof("Connectivity state is %s", cc.cc.GetState())

// Wait for the control channel to become READY again.
for s := cc.cc.GetState(); s != connectivity.Ready; s = cc.cc.GetState() {
if !cc.cc.WaitForStateChange(cc.ctx, s) {
// cc.ctx has expired. Can happen only if close() was called.
return
}
if !cc.waitForReady(ctx) {
dfawley marked this conversation as resolved.
Show resolved Hide resolved
return
}
cc.logger.Infof("Connectivity state is %s", cc.cc.GetState())
cc.logger.Infof("Connectivity state is READY")

if cc.backToReadyCh != nil {
dfawley marked this conversation as resolved.
Show resolved Hide resolved
cc.logger.Infof("Control channel back to READY")
Expand All @@ -185,9 +176,24 @@ func (cc *controlChannel) monitorConnectivityState() {
}
}

// waitForReady waits for the control channel to become READY. If the provided
// context expires or the control channel enters SHUTDOWN, it returns early and
// returns false.
func (cc *controlChannel) waitForReady(ctx context.Context) (ready bool) {
for s := cc.cc.GetState(); s != connectivity.Ready; s = cc.cc.GetState() {
if s == connectivity.Shutdown {
return false
}
if !cc.cc.WaitForStateChange(ctx, s) {
// cc.ctx has expired. Can happen only if close() was called.
return false
}
}
return true
}

func (cc *controlChannel) close() {
cc.logger.Infof("Closing control channel")
cc.ctxCancel()
cc.cc.Close()
}

Expand All @@ -213,10 +219,10 @@ func (cc *controlChannel) lookup(reqKeys map[string]string, reason rlspb.RouteLo
}
cc.logger.Infof("Sending RLS request %+v", pretty.ToJSON(req))

ctx, cancel := context.WithTimeout(cc.ctx, cc.rpcTimeout)
ctx, cancel := context.WithTimeout(context.Background(), cc.rpcTimeout)
defer cancel()
resp, err := cc.stub.RouteLookup(ctx, req)
cb(resp.GetTargets(), resp.GetHeaderData(), err)
cancel()
}()
return false
}
3 changes: 1 addition & 2 deletions balancer/rls/internal/helpers_test.go
Expand Up @@ -242,9 +242,8 @@ func startBackend(t *testing.T, sopts ...grpc.ServerOption) (rpcCh chan struct{}
}
return &testpb.Empty{}, nil
},
SkipClient: true,
}
if err := backend.Start(sopts); err != nil {
if err := backend.StartServer(sopts); err != nil {
t.Fatalf("Failed to start backend: %v", err)
}
t.Logf("Started TestService backend at: %q", backend.Address)
Expand Down
22 changes: 13 additions & 9 deletions internal/stubserver/stubserver.go
Expand Up @@ -51,10 +51,6 @@ type StubServer struct {
CC *grpc.ClientConn
S *grpc.Server

// SkipClient instructs the Start() method to not create a client connected
// to this service.
SkipClient bool

// Parameters for Listen and Dial. Defaults will be used if these are empty
// before Start.
Network string
Expand Down Expand Up @@ -84,6 +80,14 @@ func (ss *StubServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallSer

// Start starts the server and creates a client connected to it.
func (ss *StubServer) Start(sopts []grpc.ServerOption, dopts ...grpc.DialOption) error {
if err := ss.StartServer(sopts); err != nil {
return err
}
return ss.StartClient(dopts...)
}

// StartServer only starts the server. It does not create a client to it.
func (ss *StubServer) StartServer(sopts []grpc.ServerOption) error {
dfawley marked this conversation as resolved.
Show resolved Hide resolved
if ss.Network == "" {
ss.Network = "tcp"
}
Expand All @@ -106,12 +110,12 @@ func (ss *StubServer) Start(sopts []grpc.ServerOption, dopts ...grpc.DialOption)
go s.Serve(lis)
ss.cleanups = append(ss.cleanups, s.Stop)
ss.S = s
return nil
}

if ss.SkipClient {
// If we were asked to not create a client, return early.
return nil
}

// StartClient creates a client connected to this service that the test may use.
// The newly created client will be available in the Client field of StubServer.
func (ss *StubServer) StartClient(dopts ...grpc.DialOption) error {
opts := append([]grpc.DialOption{grpc.WithInsecure()}, dopts...)
if ss.R != nil {
ss.Target = ss.R.Scheme() + ":///" + ss.Address
Expand Down
5 changes: 2 additions & 3 deletions internal/testutils/restartable_listener.go
Expand Up @@ -83,11 +83,10 @@ func (l *RestartableListener) Addr() net.Addr {
func (l *RestartableListener) Stop() {
l.mu.Lock()
l.stopped = true
tmp := l.conns
l.conns = nil
for _, conn := range tmp {
for _, conn := range l.conns {
dfawley marked this conversation as resolved.
Show resolved Hide resolved
conn.Close()
}
l.conns = nil
l.mu.Unlock()
}

Expand Down