Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: add HashPolicy fields to RDS update #4521

Merged
merged 17 commits into from Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions internal/xds/env/env.go
Expand Up @@ -40,6 +40,7 @@ const (
BootstrapFileContentEnv = "GRPC_XDS_BOOTSTRAP_CONFIG"

circuitBreakingSupportEnv = "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"
ringHashSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"
timeoutSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"
faultInjectionSupportEnv = "GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION"
clientSideSecuritySupportEnv = "GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT"
Expand Down Expand Up @@ -67,6 +68,10 @@ var (
// enabled, which can be disabled by setting the environment variable
// "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING" to "false".
CircuitBreakingSupport = !strings.EqualFold(os.Getenv(circuitBreakingSupportEnv), "false")
// RingHashSupport indicates whether ring hash support is enabled, which can
// be enabled by setting the environment variable
// "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH" to "true".
RingHashSupport = strings.EqualFold(os.Getenv(ringHashSupportEnv), "true")
// TimeoutSupport indicates whether support for max_stream_duration in
// route actions is enabled. This can be disabled by setting the
// environment variable "GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT" to "false".
Expand Down
24 changes: 24 additions & 0 deletions xds/internal/xdsclient/client.go
Expand Up @@ -269,6 +269,28 @@ type VirtualHost struct {
HTTPFilterConfigOverride map[string]httpfilter.FilterConfig
}

// HashPolicyType specifies the type of HashPolicy from a received RDS Response.
type HashPolicyType int

const (
// HashPolicyTypeHeader specifies to hash a Header in the incoming request.
HashPolicyTypeHeader HashPolicyType = iota
// HashPolicyTypeChannelID specifies to hash a unique Identifier of the
// Channel. In grpc-go, this will be done using the ClientConn pointer.
HashPolicyTypeChannelID
)

// HashPolicy specifies the HashPolicy if the upstream cluster uses a hashing
// load balancer.
type HashPolicy struct {
HashPolicyType HashPolicyType
Terminal bool
// Fields used for type HEADER.
HeaderName string
Regex *regexp.Regexp
RegexSubstitution string
}

// Route is both a specification of how to match a request as well as an
// indication of the action to take upon match.
type Route struct {
Expand All @@ -281,6 +303,8 @@ type Route struct {
Headers []*HeaderMatcher
Fraction *uint32

HashPolicies []*HashPolicy

// If the matchers above indicate a match, the below configuration is used.
WeightedClusters map[string]WeightedCluster
// If MaxStreamDuration is nil, it indicates neither of the route action's
Expand Down
172 changes: 171 additions & 1 deletion xds/internal/xdsclient/rds_test.go
Expand Up @@ -1167,6 +1167,61 @@ func (s) TestRoutesProtoToSlice(t *testing.T) {
}},
wantErr: false,
},
{
name: "good-with-channel-id-hash-policy",
routes: []*v3routepb.Route{
{
Match: &v3routepb.RouteMatch{
PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/a/"},
Headers: []*v3routepb.HeaderMatcher{
{
Name: "th",
HeaderMatchSpecifier: &v3routepb.HeaderMatcher_PrefixMatch{
PrefixMatch: "tv",
},
InvertMatch: true,
},
},
RuntimeFraction: &v3corepb.RuntimeFractionalPercent{
DefaultValue: &v3typepb.FractionalPercent{
Numerator: 1,
Denominator: v3typepb.FractionalPercent_HUNDRED,
},
},
},
Action: &v3routepb.Route_Route{
Route: &v3routepb.RouteAction{
ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{
WeightedClusters: &v3routepb.WeightedCluster{
Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
{Name: "B", Weight: &wrapperspb.UInt32Value{Value: 60}},
{Name: "A", Weight: &wrapperspb.UInt32Value{Value: 40}},
},
TotalWeight: &wrapperspb.UInt32Value{Value: 100},
}},
HashPolicy: []*v3routepb.RouteAction_HashPolicy{
{PolicySpecifier: &v3routepb.RouteAction_HashPolicy_FilterState_{FilterState: &v3routepb.RouteAction_HashPolicy_FilterState{Key: "io.grpc.channel_id"}}},
},
}},
},
},
wantRoutes: []*Route{{
Prefix: newStringP("/a/"),
Headers: []*HeaderMatcher{
{
Name: "th",
InvertMatch: newBoolP(true),
PrefixMatch: newStringP("tv"),
},
},
Fraction: newUInt32P(10000),
WeightedClusters: map[string]WeightedCluster{"A": {Weight: 40}, "B": {Weight: 60}},
HashPolicies: []*HashPolicy{
{HashPolicyType: HashPolicyTypeChannelID},
},
}},
wantErr: false,
},
{
name: "with custom HTTP filter config",
routes: goodRouteWithFilterConfigs(map[string]*anypb.Any{"foo": customFilterConfig}),
Expand Down Expand Up @@ -1223,7 +1278,9 @@ func (s) TestRoutesProtoToSlice(t *testing.T) {
return fmt.Sprint(fc)
}),
}

oldRingHashSupport := env.RingHashSupport
env.RingHashSupport = true
defer func() { env.RingHashSupport = oldRingHashSupport }()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
oldFI := env.FaultInjectionSupport
Expand All @@ -1241,6 +1298,119 @@ func (s) TestRoutesProtoToSlice(t *testing.T) {
}
}

func (s) TestHashPoliciesProtoToSlice(t *testing.T) {
tests := []struct {
name string
hashPolicies []*v3routepb.RouteAction_HashPolicy
wantHashPolicies []*HashPolicy
wantErr bool
}{
// header-hash-policy tests a basic hash policy that specifies to hash a
// certain header.
{
name: "header-hash-policy",
hashPolicies: []*v3routepb.RouteAction_HashPolicy{
{
PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{
Header: &v3routepb.RouteAction_HashPolicy_Header{
HeaderName: ":path",
RegexRewrite: &v3matcherpb.RegexMatchAndSubstitute{
Pattern: &v3matcherpb.RegexMatcher{Regex: "/products"},
Substitution: "/products",
},
},
},
},
},
wantHashPolicies: []*HashPolicy{
{
HashPolicyType: HashPolicyTypeHeader,
HeaderName: ":path",
Regex: func() *regexp.Regexp { return regexp.MustCompile("/products") }(),
RegexSubstitution: "/products",
},
},
},
// channel-id-hash-policy tests a basic hash policy that specifies to
// hash a unique identifier of the channel.
{
name: "channel-id-hash-policy",
hashPolicies: []*v3routepb.RouteAction_HashPolicy{
{PolicySpecifier: &v3routepb.RouteAction_HashPolicy_FilterState_{FilterState: &v3routepb.RouteAction_HashPolicy_FilterState{Key: "io.grpc.channel_id"}}},
},
wantHashPolicies: []*HashPolicy{
{HashPolicyType: HashPolicyTypeChannelID},
},
},
// unsupported-filter-state-key tests that an unsupported key in the
// filter state hash policy are treated as a no-op.
{
name: "wrong-filter-state-key",
hashPolicies: []*v3routepb.RouteAction_HashPolicy{
{PolicySpecifier: &v3routepb.RouteAction_HashPolicy_FilterState_{FilterState: &v3routepb.RouteAction_HashPolicy_FilterState{Key: "unsupported key"}}},
},
},
// no-op-hash-policy tests that hash policies that are not supported by
// grpc are treated as a no-op.
{
name: "no-op-hash-policy",
hashPolicies: []*v3routepb.RouteAction_HashPolicy{
{PolicySpecifier: &v3routepb.RouteAction_HashPolicy_FilterState_{}},
},
},
// header-and-channel-id-hash-policy test that a list of header and
// channel id hash policies are successfully converted to an internal
// struct.
{
name: "header-and-channel-id-hash-policy",
hashPolicies: []*v3routepb.RouteAction_HashPolicy{
{
PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{
Header: &v3routepb.RouteAction_HashPolicy_Header{
HeaderName: ":path",
RegexRewrite: &v3matcherpb.RegexMatchAndSubstitute{
Pattern: &v3matcherpb.RegexMatcher{Regex: "/products"},
Substitution: "/products",
},
},
},
},
{
PolicySpecifier: &v3routepb.RouteAction_HashPolicy_FilterState_{FilterState: &v3routepb.RouteAction_HashPolicy_FilterState{Key: "io.grpc.channel_id"}},
Terminal: true,
},
},
wantHashPolicies: []*HashPolicy{
{
HashPolicyType: HashPolicyTypeHeader,
HeaderName: ":path",
Regex: func() *regexp.Regexp { return regexp.MustCompile("/products") }(),
RegexSubstitution: "/products",
},
{
HashPolicyType: HashPolicyTypeChannelID,
Terminal: true,
},
},
},
}

oldRingHashSupport := env.RingHashSupport
env.RingHashSupport = true
defer func() { env.RingHashSupport = oldRingHashSupport }()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := hashPoliciesProtoToSlice(tt.hashPolicies, nil)
if (err != nil) != tt.wantErr {
t.Fatalf("hashPoliciesProtoToSlice() error = %v, wantErr %v", err, tt.wantErr)
}
if diff := cmp.Diff(got, tt.wantHashPolicies, cmp.AllowUnexported(regexp.Regexp{})); diff != "" {
t.Fatalf("hashPoliciesProtoToSlice() returned unexpected diff (-got +want):\n%s", diff)
}
})
}
}

func newStringP(s string) *string {
return &s
}
Expand Down
41 changes: 41 additions & 0 deletions xds/internal/xdsclient/xds.go
Expand Up @@ -500,6 +500,16 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger,

route.WeightedClusters = make(map[string]WeightedCluster)
action := r.GetRoute()

// Hash Policies are only applicable for a Ring Hash LB.
if env.RingHashSupport {
hp, err := hashPoliciesProtoToSlice(action.HashPolicy, logger)
if err != nil {
return nil, err
}
route.HashPolicies = hp
}

switch a := action.GetClusterSpecifier().(type) {
case *v3routepb.RouteAction_Cluster:
route.WeightedClusters[a.Cluster] = WeightedCluster{Weight: 1}
Expand Down Expand Up @@ -561,6 +571,37 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger,
return routesRet, nil
}

func hashPoliciesProtoToSlice(policies []*v3routepb.RouteAction_HashPolicy, logger *grpclog.PrefixLogger) ([]*HashPolicy, error) {
var hashPoliciesRet []*HashPolicy
for _, p := range policies {
policy := HashPolicy{Terminal: p.Terminal}
switch p.GetPolicySpecifier().(type) {
case *v3routepb.RouteAction_HashPolicy_Header_:
policy.HashPolicyType = HashPolicyTypeHeader
policy.HeaderName = p.GetHeader().GetHeaderName()
regex := p.GetHeader().GetRegexRewrite().GetPattern().GetRegex()
re, err := regexp.Compile(regex)
if err != nil {
return nil, fmt.Errorf("hash policy %+v contains an invalid regex %q", p, regex)
}
policy.Regex = re
policy.RegexSubstitution = p.GetHeader().GetRegexRewrite().GetSubstitution()
case *v3routepb.RouteAction_HashPolicy_FilterState_:
if p.GetFilterState().GetKey() != "io.grpc.channel_id" {
logger.Infof("hash policy %+v contains an invalid key for filter state policy %q", p, p.GetFilterState().GetKey())
continue
}
policy.HashPolicyType = HashPolicyTypeChannelID
default:
logger.Infof("hash policy %T is an unsupported hash policy", p.GetPolicySpecifier())
continue
}

hashPoliciesRet = append(hashPoliciesRet, &policy)
}
return hashPoliciesRet, nil
}

// UnmarshalCluster processes resources received in an CDS response, validates
// them, and transforms them into a native struct which contains only fields we
// are interested in.
Expand Down