Skip to content

Commit

Permalink
xds: set dial target in request resource_names (#3081)
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Oct 14, 2019
1 parent 1610f0f commit 0859afa
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 124 deletions.
23 changes: 10 additions & 13 deletions xds/internal/balancer/lrs/lrs.go
Expand Up @@ -24,7 +24,6 @@ import (
"time"

"github.com/golang/protobuf/ptypes"
structpb "github.com/golang/protobuf/ptypes/struct"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/backoff"
Expand Down Expand Up @@ -141,7 +140,7 @@ func (rld *rpcLoadData) loadAndClear() (s float64, c uint64) {
// lrsStore collects loads from xds balancer, and periodically sends load to the
// server.
type lrsStore struct {
node *basepb.Node
serviceName string
backoff backoff.Strategy
lastReported time.Time

Expand All @@ -152,15 +151,7 @@ type lrsStore struct {
// NewStore creates a store for load reports.
func NewStore(serviceName string) Store {
return &lrsStore{
node: &basepb.Node{
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
internal.GrpcHostname: {
Kind: &structpb.Value_StringValue{StringValue: serviceName},
},
},
},
},
serviceName: serviceName,
backoff: backoff.DefaultExponential,
lastReported: time.Now(),
}
Expand Down Expand Up @@ -324,7 +315,14 @@ func (ls *lrsStore) ReportTo(ctx context.Context, cc *grpc.ClientConn) {
continue
}
if err := stream.Send(&lrspb.LoadStatsRequest{
Node: ls.node,
// TODO: when moving this to the xds client, the Node
// field needs to be set to node from bootstrap file.
// Node: c.config.NodeProto,
ClusterStats: []*loadreportpb.ClusterStats{{
// TODO: this is user's dial target now, as a temporary
// solution. Eventually this will be from CDS's response.
ClusterName: ls.serviceName,
}},
}); err != nil {
grpclog.Infof("lrs: failed to send first request: %v", err)
continue
Expand Down Expand Up @@ -366,7 +364,6 @@ func (ls *lrsStore) sendLoads(ctx context.Context, stream lrsgrpc.LoadReportingS
return
}
if err := stream.Send(&lrspb.LoadStatsRequest{
Node: ls.node,
ClusterStats: ls.buildStats(clusterName),
}); err != nil {
grpclog.Infof("lrs: failed to send report: %v", err)
Expand Down
14 changes: 3 additions & 11 deletions xds/internal/balancer/lrs/lrs_test.go
Expand Up @@ -30,13 +30,11 @@ import (

"github.com/golang/protobuf/proto"
durationpb "github.com/golang/protobuf/ptypes/duration"
structpb "github.com/golang/protobuf/ptypes/struct"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal"
basepb "google.golang.org/grpc/xds/internal/proto/envoy/api/v2/core/base"
loadreportpb "google.golang.org/grpc/xds/internal/proto/envoy/api/v2/endpoint/load_report"
lrsgrpc "google.golang.org/grpc/xds/internal/proto/envoy/service/load_stats/v2/lrs"
lrspb "google.golang.org/grpc/xds/internal/proto/envoy/service/load_stats/v2/lrs"
Expand Down Expand Up @@ -362,15 +360,9 @@ func (lrss *lrsServer) StreamLoadStats(stream lrsgrpc.LoadReportingService_Strea
return err
}
if !proto.Equal(req, &lrspb.LoadStatsRequest{
Node: &basepb.Node{
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
internal.GrpcHostname: {
Kind: &structpb.Value_StringValue{StringValue: testService},
},
},
},
},
ClusterStats: []*loadreportpb.ClusterStats{{
ClusterName: testService,
}},
}) {
return status.Errorf(codes.FailedPrecondition, "unexpected req: %+v", req)
}
Expand Down
36 changes: 9 additions & 27 deletions xds/internal/balancer/xds_client.go
Expand Up @@ -26,17 +26,14 @@ import (

"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
structpb "github.com/golang/protobuf/ptypes/struct"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/lrs"
xdsclient "google.golang.org/grpc/xds/internal/client"
cdspb "google.golang.org/grpc/xds/internal/proto/envoy/api/v2/cds"
basepb "google.golang.org/grpc/xds/internal/proto/envoy/api/v2/core/base"
discoverypb "google.golang.org/grpc/xds/internal/proto/envoy/api/v2/discovery"
edspb "google.golang.org/grpc/xds/internal/proto/envoy/api/v2/eds"
adsgrpc "google.golang.org/grpc/xds/internal/proto/envoy/service/discovery/v2/ads"
Expand All @@ -54,6 +51,7 @@ const (
type client struct {
ctx context.Context
cancel context.CancelFunc
serviceName string
cli adsgrpc.AggregatedDiscoveryServiceClient
dialer func(context.Context, string) (net.Conn, error)
channelzParentID int64
Expand Down Expand Up @@ -116,33 +114,27 @@ func (c *client) dial() {

func (c *client) newCDSRequest() *discoverypb.DiscoveryRequest {
cdsReq := &discoverypb.DiscoveryRequest{
Node: c.config.NodeProto,
TypeUrl: cdsType,
Node: c.config.NodeProto,
TypeUrl: cdsType,
ResourceNames: []string{c.serviceName},
}
return cdsReq
}

func (c *client) newEDSRequest() *discoverypb.DiscoveryRequest {
// TODO: Once we change the client to always make a CDS call, we can remove
// this boolean field from the metadata.
np := proto.Clone(c.config.NodeProto).(*basepb.Node)
np.Metadata.Fields[endpointRequired] = &structpb.Value{
Kind: &structpb.Value_BoolValue{BoolValue: c.enableCDS},
}

edsReq := &discoverypb.DiscoveryRequest{
Node: np,
Node: c.config.NodeProto,
// TODO: the expected ResourceName could be in a different format from
// dial target. (test_service.test_namespace.traffic_director.com vs
// test_namespace:test_service).
//
// The solution today is to always include GrpcHostname in metadata,
// with the value set to dial target.
// The solution today is to always set dial target in resource_names.
//
// A future solution could be: always do CDS, get cluster name from CDS
// response, and use it here.
// `ResourceNames: []string{c.clusterName},`
TypeUrl: edsType,
TypeUrl: edsType,
ResourceNames: []string{c.serviceName},
}
return edsReq
}
Expand Down Expand Up @@ -255,6 +247,7 @@ func (c *client) adsCallAttempt() (firstRespReceived bool) {
func newXDSClient(balancerName string, enableCDS bool, opts balancer.BuildOptions, loadStore lrs.Store, newADS func(context.Context, proto.Message) error, loseContact func(ctx context.Context), exitCleanup func()) *client {
c := &client{
enableCDS: enableCDS,
serviceName: opts.Target.Endpoint,
dialer: opts.Dialer,
channelzParentID: opts.ChannelzParentID,
newADS: newADS,
Expand All @@ -275,17 +268,6 @@ func newXDSClient(balancerName string, enableCDS bool, opts balancer.BuildOption
if c.config.Creds == nil {
c.config.Creds = credsFromDefaults(balancerName, &opts)
}
if c.config.NodeProto == nil {
c.config.NodeProto = &basepb.Node{
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
internal.GrpcHostname: {
Kind: &structpb.Value_StringValue{StringValue: opts.Target.Endpoint},
},
},
},
}
}
return c
}

Expand Down
42 changes: 6 additions & 36 deletions xds/internal/balancer/xds_client_test.go
Expand Up @@ -36,7 +36,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal"
cdspb "google.golang.org/grpc/xds/internal/proto/envoy/api/v2/cds"
addresspb "google.golang.org/grpc/xds/internal/proto/envoy/api/v2/core/address"
basepb "google.golang.org/grpc/xds/internal/proto/envoy/api/v2/core/base"
Expand All @@ -50,46 +49,17 @@ import (
var (
testServiceName = "test/foo"
testCDSReq = &discoverypb.DiscoveryRequest{
Node: &basepb.Node{
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
internal.GrpcHostname: {
Kind: &structpb.Value_StringValue{StringValue: testServiceName},
},
},
},
},
TypeUrl: cdsType,
TypeUrl: cdsType,
ResourceNames: []string{testServiceName},
}
testEDSReq = &discoverypb.DiscoveryRequest{
Node: &basepb.Node{
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
internal.GrpcHostname: {
Kind: &structpb.Value_StringValue{StringValue: testServiceName},
},
endpointRequired: {
Kind: &structpb.Value_BoolValue{BoolValue: true},
},
},
},
},
TypeUrl: edsType,
// TODO: this should be cluster name from CDS response, not the service name.
ResourceNames: []string{testServiceName},
}
testEDSReqWithoutEndpoints = &discoverypb.DiscoveryRequest{
Node: &basepb.Node{
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
internal.GrpcHostname: {
Kind: &structpb.Value_StringValue{StringValue: testServiceName},
},
endpointRequired: {
Kind: &structpb.Value_BoolValue{BoolValue: false},
},
},
},
},
TypeUrl: edsType,
TypeUrl: edsType,
ResourceNames: []string{testServiceName},
}
testCluster = &cdspb.Cluster{
Name: testServiceName,
Expand Down
16 changes: 4 additions & 12 deletions xds/internal/balancer/xds_lrs_test.go
Expand Up @@ -26,15 +26,13 @@ import (

"github.com/golang/protobuf/proto"
durationpb "github.com/golang/protobuf/ptypes/duration"
structpb "github.com/golang/protobuf/ptypes/struct"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal"
xdsinternal "google.golang.org/grpc/xds/internal"
basepb "google.golang.org/grpc/xds/internal/proto/envoy/api/v2/core/base"
loadreportpb "google.golang.org/grpc/xds/internal/proto/envoy/api/v2/endpoint/load_report"
lrsgrpc "google.golang.org/grpc/xds/internal/proto/envoy/service/load_stats/v2/lrs"
lrspb "google.golang.org/grpc/xds/internal/proto/envoy/service/load_stats/v2/lrs"
)
Expand All @@ -52,15 +50,9 @@ func (lrss *lrsServer) StreamLoadStats(stream lrsgrpc.LoadReportingService_Strea
return err
}
if !proto.Equal(req, &lrspb.LoadStatsRequest{
Node: &basepb.Node{
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
internal.GrpcHostname: {
Kind: &structpb.Value_StringValue{StringValue: testServiceName},
},
},
},
},
ClusterStats: []*loadreportpb.ClusterStats{{
ClusterName: testServiceName,
}},
}) {
return status.Errorf(codes.FailedPrecondition, "unexpected req: %+v", req)
}
Expand Down
25 changes: 0 additions & 25 deletions xds/internal/const.go

This file was deleted.

0 comments on commit 0859afa

Please sign in to comment.