Skip to content

Commit

Permalink
xds/resolver: support inline RDS resource from LDS response (#4299)
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Mar 31, 2021
1 parent 0028242 commit c72e1c8
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 17 deletions.
8 changes: 8 additions & 0 deletions xds/internal/client/client.go
Expand Up @@ -195,7 +195,15 @@ type UpdateMetadata struct {
type ListenerUpdate struct {
// RouteConfigName is the route configuration name corresponding to the
// target which is being watched through LDS.
//
// Only one of RouteConfigName and InlineRouteConfig is set.
RouteConfigName string
// InlineRouteConfig is the inline route configuration (RDS response)
// returned inside LDS.
//
// Only one of RouteConfigName and InlineRouteConfig is set.
InlineRouteConfig *RouteConfigUpdate

// MaxStreamDuration contains the HTTP connection manager's
// common_http_protocol_options.max_stream_duration field, or zero if
// unset.
Expand Down
53 changes: 53 additions & 0 deletions xds/internal/client/lds_test.go
Expand Up @@ -54,6 +54,7 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) {
v3LDSTarget = "lds.target.good:3333"
v2RouteConfigName = "v2RouteConfig"
v3RouteConfigName = "v3RouteConfig"
routeName = "routeName"
)

var (
Expand Down Expand Up @@ -132,6 +133,39 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) {
ConfigType: &v3httppb.HttpFilter_TypedConfig{TypedConfig: unknownFilterConfig},
IsOptional: true,
}
v3LisWithInlineRoute = &anypb.Any{
TypeUrl: version.V3ListenerURL,
Value: func() []byte {
hcm := &v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{
RouteConfig: &v3routepb.RouteConfiguration{
Name: routeName,
VirtualHosts: []*v3routepb.VirtualHost{{
Domains: []string{v3LDSTarget},
Routes: []*v3routepb.Route{{
Match: &v3routepb.RouteMatch{
PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"},
},
Action: &v3routepb.Route_Route{
Route: &v3routepb.RouteAction{
ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: clusterName},
}}}}}}},
},
CommonHttpProtocolOptions: &v3corepb.HttpProtocolOptions{
MaxStreamDuration: durationpb.New(time.Second),
},
}
mcm := marshalAny(hcm)
lis := &v3listenerpb.Listener{
Name: v3LDSTarget,
ApiListener: &v3listenerpb.ApiListener{
ApiListener: mcm,
},
}
mLis, _ := proto.Marshal(lis)
return mLis
}(),
}
v3LisWithFilters = func(fs ...*v3httppb.HttpFilter) *anypb.Any {
hcm := &v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{
Expand Down Expand Up @@ -650,6 +684,25 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) {
Version: testVersion,
},
},
{
name: "v3 listener with inline route configuration",
resources: []*anypb.Any{v3LisWithInlineRoute},
wantUpdate: map[string]ListenerUpdate{
v3LDSTarget: {
InlineRouteConfig: &RouteConfigUpdate{
VirtualHosts: []*VirtualHost{{
Domains: []string{v3LDSTarget},
Routes: []*Route{{Prefix: newStringP("/"), WeightedClusters: map[string]WeightedCluster{clusterName: {Weight: 1}}}},
}}},
MaxStreamDuration: time.Second,
Raw: v3LisWithInlineRoute,
},
},
wantMD: UpdateMetadata{
Status: ServiceStatusACKed,
Version: testVersion,
},
},
{
name: "multiple listener resources",
resources: []*anypb.Any{v2Lis, v3LisWithFilters()},
Expand Down
16 changes: 9 additions & 7 deletions xds/internal/client/xds.go
Expand Up @@ -72,24 +72,24 @@ func unmarshalListenerResource(r *anypb.Any, logger *grpclog.PrefixLogger) (stri
}
logger.Infof("Resource with name: %v, type: %T, contains: %v", lis.GetName(), lis, lis)

lu, err := processListener(lis, v2)
lu, err := processListener(lis, logger, v2)
if err != nil {
return lis.GetName(), ListenerUpdate{}, err
}
lu.Raw = r
return lis.GetName(), *lu, nil
}

func processListener(lis *v3listenerpb.Listener, v2 bool) (*ListenerUpdate, error) {
func processListener(lis *v3listenerpb.Listener, logger *grpclog.PrefixLogger, v2 bool) (*ListenerUpdate, error) {
if lis.GetApiListener() != nil {
return processClientSideListener(lis, v2)
return processClientSideListener(lis, logger, v2)
}
return processServerSideListener(lis)
}

// processClientSideListener checks if the provided Listener proto meets
// the expected criteria. If so, it returns a non-empty routeConfigName.
func processClientSideListener(lis *v3listenerpb.Listener, v2 bool) (*ListenerUpdate, error) {
func processClientSideListener(lis *v3listenerpb.Listener, logger *grpclog.PrefixLogger, v2 bool) (*ListenerUpdate, error) {
update := &ListenerUpdate{}

apiLisAny := lis.GetApiListener().GetApiListener()
Expand All @@ -112,9 +112,11 @@ func processClientSideListener(lis *v3listenerpb.Listener, v2 bool) (*ListenerUp
}
update.RouteConfigName = name
case *v3httppb.HttpConnectionManager_RouteConfig:
// TODO: Add support for specifying the RouteConfiguration inline
// in the LDS response.
return nil, fmt.Errorf("LDS response contains RDS config inline. Not supported for now: %+v", apiLis)
routeU, err := generateRDSUpdateFromRouteConfiguration(apiLis.GetRouteConfig(), logger, v2)
if err != nil {
return nil, fmt.Errorf("failed to parse inline RDS resp: %v", err)
}
update.InlineRouteConfig = &routeU
case nil:
return nil, fmt.Errorf("no RouteSpecifier: %+v", apiLis)
default:
Expand Down
39 changes: 29 additions & 10 deletions xds/internal/resolver/watch_service.go
Expand Up @@ -110,6 +110,22 @@ func (w *serviceUpdateWatcher) handleLDSResp(update xdsclient.ListenerUpdate, er
httpFilterConfig: update.HTTPFilters,
}

if update.InlineRouteConfig != nil {
// If there was an RDS watch, cancel it.
w.rdsName = ""
if w.rdsCancel != nil {
w.rdsCancel()
w.rdsCancel = nil
}

// Handle the inline RDS update as if it's from an RDS watch.
w.updateVirtualHostsFromRDS(*update.InlineRouteConfig)
return
}

// RDS name from update is not an empty string, need RDS to fetch the
// routes.

if w.rdsName == update.RouteConfigName {
// If the new RouteConfigName is same as the previous, don't cancel and
// restart the RDS watch.
Expand All @@ -126,6 +142,18 @@ func (w *serviceUpdateWatcher) handleLDSResp(update xdsclient.ListenerUpdate, er
w.rdsCancel = w.c.WatchRouteConfig(update.RouteConfigName, w.handleRDSResp)
}

func (w *serviceUpdateWatcher) updateVirtualHostsFromRDS(update xdsclient.RouteConfigUpdate) {
matchVh := findBestMatchingVirtualHost(w.serviceName, update.VirtualHosts)
if matchVh == nil {
// No matching virtual host found.
w.serviceCb(serviceUpdate{}, fmt.Errorf("no matching virtual host found for %q", w.serviceName))
return
}

w.lastUpdate.virtualHost = matchVh
w.serviceCb(w.lastUpdate, nil)
}

func (w *serviceUpdateWatcher) handleRDSResp(update xdsclient.RouteConfigUpdate, err error) {
w.logger.Infof("received RDS update: %+v, err: %v", update, err)
w.mu.Lock()
Expand All @@ -142,16 +170,7 @@ func (w *serviceUpdateWatcher) handleRDSResp(update xdsclient.RouteConfigUpdate,
w.serviceCb(serviceUpdate{}, err)
return
}

matchVh := findBestMatchingVirtualHost(w.serviceName, update.VirtualHosts)
if matchVh == nil {
// No matching virtual host found.
w.serviceCb(serviceUpdate{}, fmt.Errorf("no matching virtual host found for %q", w.serviceName))
return
}

w.lastUpdate.virtualHost = matchVh
w.serviceCb(w.lastUpdate, nil)
w.updateVirtualHostsFromRDS(update)
}

func (w *serviceUpdateWatcher) close() {
Expand Down
78 changes: 78 additions & 0 deletions xds/internal/resolver/watch_service_test.go
Expand Up @@ -356,3 +356,81 @@ func (s) TestServiceNotCancelRDSOnSameLDSUpdate(t *testing.T) {
t.Fatalf("wait for cancel route watch failed: %v, want nil", err)
}
}

// TestServiceWatchInlineRDS covers the cases switching between:
// - LDS update contains RDS name to watch
// - LDS update contains inline RDS resource
func (s) TestServiceWatchInlineRDS(t *testing.T) {
serviceUpdateCh := testutils.NewChannel()
xdsC := fakeclient.NewClient()
cancelWatch := watchService(xdsC, targetStr, func(update serviceUpdate, err error) {
serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
}, nil)
defer cancelWatch()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

// First LDS update is LDS with RDS name to watch.
waitForWatchListener(ctx, t, xdsC, targetStr)
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)
wantUpdate := serviceUpdate{virtualHost: &xdsclient.VirtualHost{Domains: []string{"target"}, Routes: []*xdsclient.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}}}}}}
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*xdsclient.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}}}},
},
},
}, nil)
if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil {
t.Fatal(err)
}

// Switch LDS resp to a LDS with inline RDS resource
wantVirtualHosts2 := &xdsclient.VirtualHost{Domains: []string{"target"},
Routes: []*xdsclient.Route{{
Path: newStringP(""),
WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}},
}},
}
wantUpdate2 := serviceUpdate{virtualHost: wantVirtualHosts2}
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{InlineRouteConfig: &xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{wantVirtualHosts2},
}}, nil)
// This inline RDS resource should cause the RDS watch to be canceled.
if err := xdsC.WaitForCancelRouteConfigWatch(ctx); err != nil {
t.Fatalf("wait for cancel route watch failed: %v, want nil", err)
}
if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate2); err != nil {
t.Fatal(err)
}

// Switch LDS update back to LDS with RDS name to watch.
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*xdsclient.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}}}},
},
},
}, nil)
if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil {
t.Fatal(err)
}

// Switch LDS resp to a LDS with inline RDS resource again.
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{InlineRouteConfig: &xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{wantVirtualHosts2},
}}, nil)
// This inline RDS resource should cause the RDS watch to be canceled.
if err := xdsC.WaitForCancelRouteConfigWatch(ctx); err != nil {
t.Fatalf("wait for cancel route watch failed: %v, want nil", err)
}
if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate2); err != nil {
t.Fatal(err)
}
}

0 comments on commit c72e1c8

Please sign in to comment.