diff --git a/xds/internal/client/client.go b/xds/internal/client/client.go index 5c0f38a9782..d06473b02a6 100644 --- a/xds/internal/client/client.go +++ b/xds/internal/client/client.go @@ -195,7 +195,15 @@ type UpdateMetadata struct { type ListenerUpdate struct { // RouteConfigName is the route configuration name corresponding to the // target which is being watched through LDS. + // + // Only one of RouteConfigName and InlineRouteConfig is set. RouteConfigName string + // InlineRouteConfig is the inline route configuration (RDS response) + // returned inside LDS. + // + // Only one of RouteConfigName and InlineRouteConfig is set. + InlineRouteConfig *RouteConfigUpdate + // MaxStreamDuration contains the HTTP connection manager's // common_http_protocol_options.max_stream_duration field, or zero if // unset. diff --git a/xds/internal/client/lds_test.go b/xds/internal/client/lds_test.go index df8098df836..da0e0f95615 100644 --- a/xds/internal/client/lds_test.go +++ b/xds/internal/client/lds_test.go @@ -54,6 +54,7 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) { v3LDSTarget = "lds.target.good:3333" v2RouteConfigName = "v2RouteConfig" v3RouteConfigName = "v3RouteConfig" + routeName = "routeName" ) var ( @@ -132,6 +133,39 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) { ConfigType: &v3httppb.HttpFilter_TypedConfig{TypedConfig: unknownFilterConfig}, IsOptional: true, } + v3LisWithInlineRoute = &anypb.Any{ + TypeUrl: version.V3ListenerURL, + Value: func() []byte { + hcm := &v3httppb.HttpConnectionManager{ + RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{ + RouteConfig: &v3routepb.RouteConfiguration{ + Name: routeName, + VirtualHosts: []*v3routepb.VirtualHost{{ + Domains: []string{v3LDSTarget}, + Routes: []*v3routepb.Route{{ + Match: &v3routepb.RouteMatch{ + PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}, + }, + Action: &v3routepb.Route_Route{ + Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: clusterName}, + }}}}}}}, + }, + CommonHttpProtocolOptions: &v3corepb.HttpProtocolOptions{ + MaxStreamDuration: durationpb.New(time.Second), + }, + } + mcm := marshalAny(hcm) + lis := &v3listenerpb.Listener{ + Name: v3LDSTarget, + ApiListener: &v3listenerpb.ApiListener{ + ApiListener: mcm, + }, + } + mLis, _ := proto.Marshal(lis) + return mLis + }(), + } v3LisWithFilters = func(fs ...*v3httppb.HttpFilter) *anypb.Any { hcm := &v3httppb.HttpConnectionManager{ RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{ @@ -650,6 +684,25 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) { Version: testVersion, }, }, + { + name: "v3 listener with inline route configuration", + resources: []*anypb.Any{v3LisWithInlineRoute}, + wantUpdate: map[string]ListenerUpdate{ + v3LDSTarget: { + InlineRouteConfig: &RouteConfigUpdate{ + VirtualHosts: []*VirtualHost{{ + Domains: []string{v3LDSTarget}, + Routes: []*Route{{Prefix: newStringP("/"), WeightedClusters: map[string]WeightedCluster{clusterName: {Weight: 1}}}}, + }}}, + MaxStreamDuration: time.Second, + Raw: v3LisWithInlineRoute, + }, + }, + wantMD: UpdateMetadata{ + Status: ServiceStatusACKed, + Version: testVersion, + }, + }, { name: "multiple listener resources", resources: []*anypb.Any{v2Lis, v3LisWithFilters()}, diff --git a/xds/internal/client/xds.go b/xds/internal/client/xds.go index 2791603ce26..fc1112e180b 100644 --- a/xds/internal/client/xds.go +++ b/xds/internal/client/xds.go @@ -72,7 +72,7 @@ func unmarshalListenerResource(r *anypb.Any, logger *grpclog.PrefixLogger) (stri } logger.Infof("Resource with name: %v, type: %T, contains: %v", lis.GetName(), lis, lis) - lu, err := processListener(lis, v2) + lu, err := processListener(lis, logger, v2) if err != nil { return lis.GetName(), ListenerUpdate{}, err } @@ -80,16 +80,16 @@ func unmarshalListenerResource(r *anypb.Any, logger *grpclog.PrefixLogger) (stri return lis.GetName(), *lu, nil } -func processListener(lis *v3listenerpb.Listener, v2 bool) (*ListenerUpdate, error) { +func processListener(lis *v3listenerpb.Listener, logger *grpclog.PrefixLogger, v2 bool) (*ListenerUpdate, error) { if lis.GetApiListener() != nil { - return processClientSideListener(lis, v2) + return processClientSideListener(lis, logger, v2) } return processServerSideListener(lis) } // processClientSideListener checks if the provided Listener proto meets // the expected criteria. If so, it returns a non-empty routeConfigName. -func processClientSideListener(lis *v3listenerpb.Listener, v2 bool) (*ListenerUpdate, error) { +func processClientSideListener(lis *v3listenerpb.Listener, logger *grpclog.PrefixLogger, v2 bool) (*ListenerUpdate, error) { update := &ListenerUpdate{} apiLisAny := lis.GetApiListener().GetApiListener() @@ -112,9 +112,11 @@ func processClientSideListener(lis *v3listenerpb.Listener, v2 bool) (*ListenerUp } update.RouteConfigName = name case *v3httppb.HttpConnectionManager_RouteConfig: - // TODO: Add support for specifying the RouteConfiguration inline - // in the LDS response. - return nil, fmt.Errorf("LDS response contains RDS config inline. Not supported for now: %+v", apiLis) + routeU, err := generateRDSUpdateFromRouteConfiguration(apiLis.GetRouteConfig(), logger, v2) + if err != nil { + return nil, fmt.Errorf("failed to parse inline RDS resp: %v", err) + } + update.InlineRouteConfig = &routeU case nil: return nil, fmt.Errorf("no RouteSpecifier: %+v", apiLis) default: diff --git a/xds/internal/resolver/watch_service.go b/xds/internal/resolver/watch_service.go index 913ac4ced15..42ede988300 100644 --- a/xds/internal/resolver/watch_service.go +++ b/xds/internal/resolver/watch_service.go @@ -110,6 +110,22 @@ func (w *serviceUpdateWatcher) handleLDSResp(update xdsclient.ListenerUpdate, er httpFilterConfig: update.HTTPFilters, } + if update.InlineRouteConfig != nil { + // If there was an RDS watch, cancel it. + w.rdsName = "" + if w.rdsCancel != nil { + w.rdsCancel() + w.rdsCancel = nil + } + + // Handle the inline RDS update as if it's from an RDS watch. + w.updateVirtualHostsFromRDS(*update.InlineRouteConfig) + return + } + + // RDS name from update is not an empty string, need RDS to fetch the + // routes. + if w.rdsName == update.RouteConfigName { // If the new RouteConfigName is same as the previous, don't cancel and // restart the RDS watch. @@ -126,6 +142,18 @@ func (w *serviceUpdateWatcher) handleLDSResp(update xdsclient.ListenerUpdate, er w.rdsCancel = w.c.WatchRouteConfig(update.RouteConfigName, w.handleRDSResp) } +func (w *serviceUpdateWatcher) updateVirtualHostsFromRDS(update xdsclient.RouteConfigUpdate) { + matchVh := findBestMatchingVirtualHost(w.serviceName, update.VirtualHosts) + if matchVh == nil { + // No matching virtual host found. + w.serviceCb(serviceUpdate{}, fmt.Errorf("no matching virtual host found for %q", w.serviceName)) + return + } + + w.lastUpdate.virtualHost = matchVh + w.serviceCb(w.lastUpdate, nil) +} + func (w *serviceUpdateWatcher) handleRDSResp(update xdsclient.RouteConfigUpdate, err error) { w.logger.Infof("received RDS update: %+v, err: %v", update, err) w.mu.Lock() @@ -142,16 +170,7 @@ func (w *serviceUpdateWatcher) handleRDSResp(update xdsclient.RouteConfigUpdate, w.serviceCb(serviceUpdate{}, err) return } - - matchVh := findBestMatchingVirtualHost(w.serviceName, update.VirtualHosts) - if matchVh == nil { - // No matching virtual host found. - w.serviceCb(serviceUpdate{}, fmt.Errorf("no matching virtual host found for %q", w.serviceName)) - return - } - - w.lastUpdate.virtualHost = matchVh - w.serviceCb(w.lastUpdate, nil) + w.updateVirtualHostsFromRDS(update) } func (w *serviceUpdateWatcher) close() { diff --git a/xds/internal/resolver/watch_service_test.go b/xds/internal/resolver/watch_service_test.go index 705a3d35ae1..2bfe3e984d3 100644 --- a/xds/internal/resolver/watch_service_test.go +++ b/xds/internal/resolver/watch_service_test.go @@ -356,3 +356,81 @@ func (s) TestServiceNotCancelRDSOnSameLDSUpdate(t *testing.T) { t.Fatalf("wait for cancel route watch failed: %v, want nil", err) } } + +// TestServiceWatchInlineRDS covers the cases switching between: +// - LDS update contains RDS name to watch +// - LDS update contains inline RDS resource +func (s) TestServiceWatchInlineRDS(t *testing.T) { + serviceUpdateCh := testutils.NewChannel() + xdsC := fakeclient.NewClient() + cancelWatch := watchService(xdsC, targetStr, func(update serviceUpdate, err error) { + serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err}) + }, nil) + defer cancelWatch() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // First LDS update is LDS with RDS name to watch. + waitForWatchListener(ctx, t, xdsC, targetStr) + xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil) + waitForWatchRouteConfig(ctx, t, xdsC, routeStr) + wantUpdate := serviceUpdate{virtualHost: &xdsclient.VirtualHost{Domains: []string{"target"}, Routes: []*xdsclient.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}}}}}} + xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{ + VirtualHosts: []*xdsclient.VirtualHost{ + { + Domains: []string{targetStr}, + Routes: []*xdsclient.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}}}}, + }, + }, + }, nil) + if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil { + t.Fatal(err) + } + + // Switch LDS resp to a LDS with inline RDS resource + wantVirtualHosts2 := &xdsclient.VirtualHost{Domains: []string{"target"}, + Routes: []*xdsclient.Route{{ + Path: newStringP(""), + WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}}, + }}, + } + wantUpdate2 := serviceUpdate{virtualHost: wantVirtualHosts2} + xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{InlineRouteConfig: &xdsclient.RouteConfigUpdate{ + VirtualHosts: []*xdsclient.VirtualHost{wantVirtualHosts2}, + }}, nil) + // This inline RDS resource should cause the RDS watch to be canceled. + if err := xdsC.WaitForCancelRouteConfigWatch(ctx); err != nil { + t.Fatalf("wait for cancel route watch failed: %v, want nil", err) + } + if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate2); err != nil { + t.Fatal(err) + } + + // Switch LDS update back to LDS with RDS name to watch. + xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil) + waitForWatchRouteConfig(ctx, t, xdsC, routeStr) + xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{ + VirtualHosts: []*xdsclient.VirtualHost{ + { + Domains: []string{targetStr}, + Routes: []*xdsclient.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}}}}, + }, + }, + }, nil) + if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil { + t.Fatal(err) + } + + // Switch LDS resp to a LDS with inline RDS resource again. + xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{InlineRouteConfig: &xdsclient.RouteConfigUpdate{ + VirtualHosts: []*xdsclient.VirtualHost{wantVirtualHosts2}, + }}, nil) + // This inline RDS resource should cause the RDS watch to be canceled. + if err := xdsC.WaitForCancelRouteConfigWatch(ctx); err != nil { + t.Fatalf("wait for cancel route watch failed: %v, want nil", err) + } + if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate2); err != nil { + t.Fatal(err) + } +}