Skip to content

Commit

Permalink
xds: Implementation of xds_resolver using LDS/RDS (#3183)
Browse files Browse the repository at this point in the history
xds: Implementation of xds_resolver using LDS/RDS
  • Loading branch information
easwars committed Nov 19, 2019
1 parent 71ba135 commit 5d4cc8a
Show file tree
Hide file tree
Showing 11 changed files with 1,081 additions and 200 deletions.
12 changes: 3 additions & 9 deletions xds/experimental/xds_experimental.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,7 @@
package experimental

import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/resolver"
xdsbalancer "google.golang.org/grpc/xds/internal/balancer"
xdsresolver "google.golang.org/grpc/xds/internal/resolver"
_ "google.golang.org/grpc/xds/internal/balancer" // Register the xds_balancer
_ "google.golang.org/grpc/xds/internal/resolver" // Register the xds_resolver
_ "google.golang.org/grpc/xds/internal/resolver/old" // Register the old xds_resolver
)

func init() {
resolver.Register(xdsresolver.NewBuilder())
balancer.Register(xdsbalancer.NewBalancerBuilder())
}
11 changes: 5 additions & 6 deletions xds/internal/balancer/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,13 @@ var (
}
)

type xdsBalancerBuilder struct{}

// NewBalancerBuilder creates a new implementation of the balancer.Builder
// interface for the xDS balancer.
func NewBalancerBuilder() balancer.Builder {
return &xdsBalancerBuilder{}
func init() {
balancer.Register(&xdsBalancerBuilder{})
}

type xdsBalancerBuilder struct{}

// Build helps implement the balancer.Builder interface.
func (b *xdsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
ctx, cancel := context.WithCancel(context.Background())
x := &xdsBalancer{
Expand Down
12 changes: 10 additions & 2 deletions xds/internal/client/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ import (
"io/ioutil"
"os"

corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/golang/protobuf/jsonpb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/google"
"google.golang.org/grpc/grpclog"

corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
)

const (
Expand Down Expand Up @@ -125,7 +126,6 @@ func NewConfig() *Config {
grpclog.Errorf("xds: jsonpb.Unmarshal(%v) failed during bootstrap: %v", string(v), err)
break
}
n.BuildVersion = gRPCVersion
config.NodeProto = n
case "xds_server":
xs := &xdsServer{}
Expand All @@ -147,6 +147,14 @@ func NewConfig() *Config {
}
}

// If we don't find a nodeProto in the bootstrap file, we just create an
// empty one here. That way, callers of this function can always expect
// that the NodeProto field is non-nil.
if config.NodeProto == nil {
config.NodeProto = &corepb.Node{}
}
config.NodeProto.BuildVersion = gRPCVersion

grpclog.Infof("xds: bootstrap.NewConfig returning: %+v", config)
return config
}
41 changes: 25 additions & 16 deletions xds/internal/client/bootstrap/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import (
"os"
"testing"

corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/golang/protobuf/proto"
structpb "github.com/golang/protobuf/ptypes/struct"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/google"

corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
structpb "github.com/golang/protobuf/ptypes/struct"
)

var (
Expand Down Expand Up @@ -197,7 +198,13 @@ func TestNewConfig(t *testing.T) {
{"nonExistentBootstrapFile", &Config{}},
{"empty", &Config{}},
{"badJSON", &Config{}},
{"emptyNodeProto", &Config{BalancerName: "trafficdirector.googleapis.com:443"}},
{
"emptyNodeProto",
&Config{
BalancerName: "trafficdirector.googleapis.com:443",
NodeProto: &corepb.Node{BuildVersion: gRPCVersion},
},
},
{"emptyXdsServer", &Config{NodeProto: nodeProto}},
{"unknownTopLevelFieldInFile", nilCredsConfig},
{"unknownFieldInNodeProto", &Config{NodeProto: nodeProto}},
Expand All @@ -209,19 +216,21 @@ func TestNewConfig(t *testing.T) {
}

for _, test := range tests {
if err := os.Setenv(fileEnv, test.name); err != nil {
t.Fatalf("os.Setenv(%s, %s) failed with error: %v", fileEnv, test.name, err)
}
config := NewConfig()
if config.BalancerName != test.wantConfig.BalancerName {
t.Errorf("%s: config.BalancerName is %s, want %s", test.name, config.BalancerName, test.wantConfig.BalancerName)
}
if !proto.Equal(config.NodeProto, test.wantConfig.NodeProto) {
t.Errorf("%s: config.NodeProto is %#v, want %#v", test.name, config.NodeProto, test.wantConfig.NodeProto)
}
if (config.Creds != nil) != (test.wantConfig.Creds != nil) {
t.Errorf("%s: config.Creds is %#v, want %#v", test.name, config.Creds, test.wantConfig.Creds)
}
t.Run(test.name, func(t *testing.T) {
if err := os.Setenv(fileEnv, test.name); err != nil {
t.Fatalf("os.Setenv(%s, %s) failed with error: %v", fileEnv, test.name, err)
}
config := NewConfig()
if config.BalancerName != test.wantConfig.BalancerName {
t.Errorf("config.BalancerName is %s, want %s", config.BalancerName, test.wantConfig.BalancerName)
}
if !proto.Equal(config.NodeProto, test.wantConfig.NodeProto) {
t.Errorf("config.NodeProto is %#v, want %#v", config.NodeProto, test.wantConfig.NodeProto)
}
if (config.Creds != nil) != (test.wantConfig.Creds != nil) {
t.Errorf("config.Creds is %#v, want %#v", config.Creds, test.wantConfig.Creds)
}
})
}
}

Expand Down
159 changes: 159 additions & 0 deletions xds/internal/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package client implementation a full fledged gRPC client for the xDS API
// used by the xds resolver and balancer implementations.
package client

import (
"context"
"errors"
"fmt"
"sync"

"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/xds/internal/client/bootstrap"
)

// For overriding in unittests.
var dialFunc = grpc.DialContext

// Options provides all parameters required for the creation of an xDS client.
type Options struct {
// Config contains a fully populated bootstrap config. It is the
// responsibility of the caller to use some sane defaults here if the
// bootstrap process returned with certain fields left unspecified.
Config bootstrap.Config
// DialOpts contains dial options to be used when dialing the xDS server.
DialOpts []grpc.DialOption
}

// Client is a full fledged gRPC client which queries a set of discovery APIs
// (collectively termed as xDS) on a remote management server, to discover
// various dynamic resources. A single client object will be shared by the xds
// resolver and balancer implementations.
type Client struct {
cc *grpc.ClientConn // Connection to the xDS server
v2c *v2Client // Actual xDS client implementation using the v2 API

mu sync.Mutex
serviceCallback func(ServiceUpdate, error)
ldsCancel func()
rdsCancel func()
}

// New returns a new xdsClient configured with opts.
func New(opts Options) (*Client, error) {
switch {
case opts.Config.BalancerName == "":
return nil, errors.New("xds: no xds_server name provided in options")
case opts.Config.Creds == nil:
return nil, errors.New("xds: no credentials provided in options")
case opts.Config.NodeProto == nil:
return nil, errors.New("xds: no node_proto provided in options")
}

dopts := append([]grpc.DialOption{opts.Config.Creds}, opts.DialOpts...)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cc, err := dialFunc(ctx, opts.Config.BalancerName, dopts...)
if err != nil {
// An error from a non-blocking dial indicates something serious.
return nil, fmt.Errorf("xds: failed to dial balancer {%s}: %v", opts.Config.BalancerName, err)
}

c := &Client{
cc: cc,
v2c: newV2Client(cc, opts.Config.NodeProto, backoff.DefaultExponential.Backoff),
}
return c, nil
}

// Close closes the gRPC connection to the xDS server.
func (c *Client) Close() {
// TODO: Should we invoke the registered callbacks here with an error that
// the client is closed?
c.v2c.close()
c.cc.Close()
}

// ServiceUpdate contains update about the service.
type ServiceUpdate struct {
Cluster string
}

// handleLDSUpdate is the LDS watcher callback we registered with the v2Client.
func (c *Client) handleLDSUpdate(u ldsUpdate, err error) {
grpclog.Infof("xds: client received LDS update: %+v, err: %v", u, err)
if err != nil {
c.mu.Lock()
if c.serviceCallback != nil {
c.serviceCallback(ServiceUpdate{}, err)
}
c.mu.Unlock()
return
}

c.mu.Lock()
c.rdsCancel = c.v2c.watchRDS(u.routeName, c.handleRDSUpdate)
c.mu.Unlock()
}

// handleRDSUpdate is the RDS watcher callback we registered with the v2Client.
func (c *Client) handleRDSUpdate(u rdsUpdate, err error) {
grpclog.Infof("xds: client received RDS update: %+v, err: %v", u, err)
if err != nil {
c.mu.Lock()
if c.serviceCallback != nil {
c.serviceCallback(ServiceUpdate{}, err)
}
c.mu.Unlock()
return
}

c.mu.Lock()
if c.serviceCallback != nil {
c.serviceCallback(ServiceUpdate{Cluster: u.clusterName}, nil)
}
c.mu.Unlock()
}

// WatchService uses LDS and RDS protocols to discover information about the
// provided serviceName.
func (c *Client) WatchService(serviceName string, callback func(ServiceUpdate, error)) (cancel func()) {
// TODO: Error out early if the client is closed. Ideally, this should
// never be called after the client is closed though.
c.mu.Lock()
c.serviceCallback = callback
c.ldsCancel = c.v2c.watchLDS(serviceName, c.handleLDSUpdate)
c.mu.Unlock()

return func() {
c.mu.Lock()
c.serviceCallback = nil
if c.ldsCancel != nil {
c.ldsCancel()
}
if c.rdsCancel != nil {
c.rdsCancel()
}
c.mu.Unlock()
}
}

0 comments on commit 5d4cc8a

Please sign in to comment.