Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds/resolver: support inline RDS resource from LDS response #4299

Merged
merged 6 commits into from Mar 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions xds/internal/client/client.go
Expand Up @@ -195,7 +195,15 @@ type UpdateMetadata struct {
type ListenerUpdate struct {
// RouteConfigName is the route configuration name corresponding to the
// target which is being watched through LDS.
//
// Only one of RouteConfigName and InlineRouteConfig is set.
RouteConfigName string
// InlineRouteConfig is the inline route configuration (RDS response)
// returned inside LDS.
//
// Only one of RouteConfigName and InlineRouteConfig is set.
InlineRouteConfig *RouteConfigUpdate

// MaxStreamDuration contains the HTTP connection manager's
// common_http_protocol_options.max_stream_duration field, or zero if
// unset.
Expand Down
53 changes: 53 additions & 0 deletions xds/internal/client/lds_test.go
Expand Up @@ -54,6 +54,7 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) {
v3LDSTarget = "lds.target.good:3333"
v2RouteConfigName = "v2RouteConfig"
v3RouteConfigName = "v3RouteConfig"
routeName = "routeName"
)

var (
Expand Down Expand Up @@ -132,6 +133,39 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) {
ConfigType: &v3httppb.HttpFilter_TypedConfig{TypedConfig: unknownFilterConfig},
IsOptional: true,
}
v3LisWithInlineRoute = &anypb.Any{
TypeUrl: version.V3ListenerURL,
Value: func() []byte {
hcm := &v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{
RouteConfig: &v3routepb.RouteConfiguration{
Name: routeName,
VirtualHosts: []*v3routepb.VirtualHost{{
Domains: []string{v3LDSTarget},
Routes: []*v3routepb.Route{{
Match: &v3routepb.RouteMatch{
PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"},
},
Action: &v3routepb.Route_Route{
Route: &v3routepb.RouteAction{
ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: clusterName},
}}}}}}},
},
CommonHttpProtocolOptions: &v3corepb.HttpProtocolOptions{
MaxStreamDuration: durationpb.New(time.Second),
},
}
mcm := marshalAny(hcm)
lis := &v3listenerpb.Listener{
Name: v3LDSTarget,
ApiListener: &v3listenerpb.ApiListener{
ApiListener: mcm,
},
}
mLis, _ := proto.Marshal(lis)
return mLis
}(),
}
v3LisWithFilters = func(fs ...*v3httppb.HttpFilter) *anypb.Any {
hcm := &v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{
Expand Down Expand Up @@ -650,6 +684,25 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) {
Version: testVersion,
},
},
{
name: "v3 listener with inline route configuration",
resources: []*anypb.Any{v3LisWithInlineRoute},
wantUpdate: map[string]ListenerUpdate{
v3LDSTarget: {
InlineRouteConfig: &RouteConfigUpdate{
VirtualHosts: []*VirtualHost{{
Domains: []string{v3LDSTarget},
Routes: []*Route{{Prefix: newStringP("/"), WeightedClusters: map[string]WeightedCluster{clusterName: {Weight: 1}}}},
}}},
MaxStreamDuration: time.Second,
Raw: v3LisWithInlineRoute,
},
},
wantMD: UpdateMetadata{
Status: ServiceStatusACKed,
Version: testVersion,
},
},
{
name: "multiple listener resources",
resources: []*anypb.Any{v2Lis, v3LisWithFilters()},
Expand Down
16 changes: 9 additions & 7 deletions xds/internal/client/xds.go
Expand Up @@ -72,24 +72,24 @@ func unmarshalListenerResource(r *anypb.Any, logger *grpclog.PrefixLogger) (stri
}
logger.Infof("Resource with name: %v, type: %T, contains: %v", lis.GetName(), lis, lis)

lu, err := processListener(lis, v2)
lu, err := processListener(lis, logger, v2)
if err != nil {
return lis.GetName(), ListenerUpdate{}, err
}
lu.Raw = r
return lis.GetName(), *lu, nil
}

func processListener(lis *v3listenerpb.Listener, v2 bool) (*ListenerUpdate, error) {
func processListener(lis *v3listenerpb.Listener, logger *grpclog.PrefixLogger, v2 bool) (*ListenerUpdate, error) {
if lis.GetApiListener() != nil {
return processClientSideListener(lis, v2)
return processClientSideListener(lis, logger, v2)
}
return processServerSideListener(lis)
}

// processClientSideListener checks if the provided Listener proto meets
// the expected criteria. If so, it returns a non-empty routeConfigName.
func processClientSideListener(lis *v3listenerpb.Listener, v2 bool) (*ListenerUpdate, error) {
func processClientSideListener(lis *v3listenerpb.Listener, logger *grpclog.PrefixLogger, v2 bool) (*ListenerUpdate, error) {
update := &ListenerUpdate{}

apiLisAny := lis.GetApiListener().GetApiListener()
Expand All @@ -112,9 +112,11 @@ func processClientSideListener(lis *v3listenerpb.Listener, v2 bool) (*ListenerUp
}
update.RouteConfigName = name
case *v3httppb.HttpConnectionManager_RouteConfig:
// TODO: Add support for specifying the RouteConfiguration inline
// in the LDS response.
return nil, fmt.Errorf("LDS response contains RDS config inline. Not supported for now: %+v", apiLis)
routeU, err := generateRDSUpdateFromRouteConfiguration(apiLis.GetRouteConfig(), logger, v2)
if err != nil {
return nil, fmt.Errorf("failed to parse inline RDS resp: %v", err)
}
update.InlineRouteConfig = &routeU
case nil:
return nil, fmt.Errorf("no RouteSpecifier: %+v", apiLis)
default:
Expand Down
39 changes: 29 additions & 10 deletions xds/internal/resolver/watch_service.go
Expand Up @@ -110,6 +110,22 @@ func (w *serviceUpdateWatcher) handleLDSResp(update xdsclient.ListenerUpdate, er
httpFilterConfig: update.HTTPFilters,
}

if update.InlineRouteConfig != nil {
// If there was an RDS watch, cancel it.
w.rdsName = ""
if w.rdsCancel != nil {
w.rdsCancel()
w.rdsCancel = nil
}

// Handle the inline RDS update as if it's from an RDS watch.
w.updateVirtualHostsFromRDS(*update.InlineRouteConfig)
return
}

// RDS name from update is not an empty string, need RDS to fetch the
// routes.

if w.rdsName == update.RouteConfigName {
// If the new RouteConfigName is same as the previous, don't cancel and
// restart the RDS watch.
Expand All @@ -126,6 +142,18 @@ func (w *serviceUpdateWatcher) handleLDSResp(update xdsclient.ListenerUpdate, er
w.rdsCancel = w.c.WatchRouteConfig(update.RouteConfigName, w.handleRDSResp)
}

func (w *serviceUpdateWatcher) updateVirtualHostsFromRDS(update xdsclient.RouteConfigUpdate) {
matchVh := findBestMatchingVirtualHost(w.serviceName, update.VirtualHosts)
if matchVh == nil {
// No matching virtual host found.
w.serviceCb(serviceUpdate{}, fmt.Errorf("no matching virtual host found for %q", w.serviceName))
return
}

w.lastUpdate.virtualHost = matchVh
w.serviceCb(w.lastUpdate, nil)
}

func (w *serviceUpdateWatcher) handleRDSResp(update xdsclient.RouteConfigUpdate, err error) {
w.logger.Infof("received RDS update: %+v, err: %v", update, err)
w.mu.Lock()
Expand All @@ -142,16 +170,7 @@ func (w *serviceUpdateWatcher) handleRDSResp(update xdsclient.RouteConfigUpdate,
w.serviceCb(serviceUpdate{}, err)
return
}

matchVh := findBestMatchingVirtualHost(w.serviceName, update.VirtualHosts)
if matchVh == nil {
// No matching virtual host found.
w.serviceCb(serviceUpdate{}, fmt.Errorf("no matching virtual host found for %q", w.serviceName))
return
}

w.lastUpdate.virtualHost = matchVh
w.serviceCb(w.lastUpdate, nil)
w.updateVirtualHostsFromRDS(update)
}

func (w *serviceUpdateWatcher) close() {
Expand Down
78 changes: 78 additions & 0 deletions xds/internal/resolver/watch_service_test.go
Expand Up @@ -356,3 +356,81 @@ func (s) TestServiceNotCancelRDSOnSameLDSUpdate(t *testing.T) {
t.Fatalf("wait for cancel route watch failed: %v, want nil", err)
}
}

// TestServiceWatchInlineRDS covers the cases switching between:
// - LDS update contains RDS name to watch
// - LDS update contains inline RDS resource
func (s) TestServiceWatchInlineRDS(t *testing.T) {
serviceUpdateCh := testutils.NewChannel()
xdsC := fakeclient.NewClient()
cancelWatch := watchService(xdsC, targetStr, func(update serviceUpdate, err error) {
serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
}, nil)
defer cancelWatch()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

// First LDS update is LDS with RDS name to watch.
waitForWatchListener(ctx, t, xdsC, targetStr)
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)
wantUpdate := serviceUpdate{virtualHost: &xdsclient.VirtualHost{Domains: []string{"target"}, Routes: []*xdsclient.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}}}}}}
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*xdsclient.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}}}},
},
},
}, nil)
if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil {
t.Fatal(err)
}

// Switch LDS resp to a LDS with inline RDS resource
wantVirtualHosts2 := &xdsclient.VirtualHost{Domains: []string{"target"},
Routes: []*xdsclient.Route{{
Path: newStringP(""),
WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}},
}},
}
wantUpdate2 := serviceUpdate{virtualHost: wantVirtualHosts2}
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{InlineRouteConfig: &xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{wantVirtualHosts2},
}}, nil)
// This inline RDS resource should cause the RDS watch to be canceled.
if err := xdsC.WaitForCancelRouteConfigWatch(ctx); err != nil {
t.Fatalf("wait for cancel route watch failed: %v, want nil", err)
}
if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate2); err != nil {
t.Fatal(err)
}

// Switch LDS update back to LDS with RDS name to watch.
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*xdsclient.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}}}},
},
},
}, nil)
if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil {
t.Fatal(err)
}

// Switch LDS resp to a LDS with inline RDS resource again.
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{InlineRouteConfig: &xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{wantVirtualHosts2},
}}, nil)
// This inline RDS resource should cause the RDS watch to be canceled.
if err := xdsC.WaitForCancelRouteConfigWatch(ctx); err != nil {
t.Fatalf("wait for cancel route watch failed: %v, want nil", err)
}
if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate2); err != nil {
t.Fatal(err)
}
}