Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: add HashPolicy fields to RDS update #4521

Merged
merged 17 commits into from Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 24 additions & 0 deletions xds/internal/xdsclient/client.go
Expand Up @@ -269,6 +269,28 @@ type VirtualHost struct {
HTTPFilterConfigOverride map[string]httpfilter.FilterConfig
}

// HashPolicyType specifies the type of HashPolicy from a received RDS Response.
type HashPolicyType int

const (
// HashPolicyTypeHeader specifies to hash a Header in the incoming request.
HashPolicyTypeHeader HashPolicyType = iota
// HashPolicyTypeChannelID specifies to hash a unique Identifier of the
// Channel. In grpc-go, this will be done using the ClientConn pointer.
HashPolicyTypeChannelID
)

// HashPolicy specifies the HashPolicy if the upstream cluster uses a hashing
// load balancer.
type HashPolicy struct {
HashPolicyType HashPolicyType
Terminal bool
// Fields used for type HEADER.
HeaderName string
Regex string
RegexSubstitution string
}

// Route is both a specification of how to match a request as well as an
// indication of the action to take upon match.
type Route struct {
Expand All @@ -281,6 +303,8 @@ type Route struct {
Headers []*HeaderMatcher
Fraction *uint32

HashPolicies []*HashPolicy

// If the matchers above indicate a match, the below configuration is used.
WeightedClusters map[string]WeightedCluster
// If MaxStreamDuration is nil, it indicates neither of the route action's
Expand Down
154 changes: 154 additions & 0 deletions xds/internal/xdsclient/rds_test.go
Expand Up @@ -1167,6 +1167,61 @@ func (s) TestRoutesProtoToSlice(t *testing.T) {
}},
wantErr: false,
},
{
name: "good-with-channel-id-hash-policy",
routes: []*v3routepb.Route{
{
Match: &v3routepb.RouteMatch{
PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/a/"},
Headers: []*v3routepb.HeaderMatcher{
{
Name: "th",
HeaderMatchSpecifier: &v3routepb.HeaderMatcher_PrefixMatch{
PrefixMatch: "tv",
},
InvertMatch: true,
},
},
RuntimeFraction: &v3corepb.RuntimeFractionalPercent{
DefaultValue: &v3typepb.FractionalPercent{
Numerator: 1,
Denominator: v3typepb.FractionalPercent_HUNDRED,
},
},
},
Action: &v3routepb.Route_Route{
Route: &v3routepb.RouteAction{
ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{
WeightedClusters: &v3routepb.WeightedCluster{
Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
{Name: "B", Weight: &wrapperspb.UInt32Value{Value: 60}},
{Name: "A", Weight: &wrapperspb.UInt32Value{Value: 40}},
},
TotalWeight: &wrapperspb.UInt32Value{Value: 100},
}},
HashPolicy: []*v3routepb.RouteAction_HashPolicy{
{PolicySpecifier: &v3routepb.RouteAction_HashPolicy_ConnectionProperties_{}},
},
}},
},
},
wantRoutes: []*Route{{
Prefix: newStringP("/a/"),
Headers: []*HeaderMatcher{
{
Name: "th",
InvertMatch: newBoolP(true),
PrefixMatch: newStringP("tv"),
},
},
Fraction: newUInt32P(10000),
WeightedClusters: map[string]WeightedCluster{"A": {Weight: 40}, "B": {Weight: 60}},
HashPolicies: []*HashPolicy{
{HashPolicyType: HashPolicyTypeChannelID},
},
}},
wantErr: false,
},
{
name: "with custom HTTP filter config",
routes: goodRouteWithFilterConfigs(map[string]*anypb.Any{"foo": customFilterConfig}),
Expand Down Expand Up @@ -1241,6 +1296,105 @@ func (s) TestRoutesProtoToSlice(t *testing.T) {
}
}

func (s) TestHashPoliciesProtoToSlice(t *testing.T) {
tests := []struct {
name string
hashPolicies []*v3routepb.RouteAction_HashPolicy
wantHashPolicies []*HashPolicy
}{
// header-hash-policy tests a basic hash policy that specifies to hash a
// certain header.
{
name: "header-hash-policy",
hashPolicies: []*v3routepb.RouteAction_HashPolicy{
{
PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{
Header: &v3routepb.RouteAction_HashPolicy_Header{
HeaderName: ":path",
RegexRewrite: &v3matcherpb.RegexMatchAndSubstitute{
Pattern: &v3matcherpb.RegexMatcher{Regex: "/products"},
Substitution: "/products",
},
},
},
},
},
wantHashPolicies: []*HashPolicy{
{
HashPolicyType: HashPolicyTypeHeader,
HeaderName: ":path",
Regex: "/products",
RegexSubstitution: "/products",
},
},
},
// channel-id-hash-policy tests a basic hash policy that specifies to
// hash a unique identifier of the channel.
{
name: "channel-id-hash-policy",
hashPolicies: []*v3routepb.RouteAction_HashPolicy{
{PolicySpecifier: &v3routepb.RouteAction_HashPolicy_ConnectionProperties_{}},
},
wantHashPolicies: []*HashPolicy{
{HashPolicyType: HashPolicyTypeChannelID},
},
},
// no-op-hash-policy tests that hash policies that are not supported by
// grpc currently are ignored.
{
name: "no-op-hash-policy",
hashPolicies: []*v3routepb.RouteAction_HashPolicy{
{PolicySpecifier: &v3routepb.RouteAction_HashPolicy_FilterState_{}},
},
wantHashPolicies: []*HashPolicy{{}},
},
// header-and-channel-id-hash-policy test that a list of header and
// channel id hash policies are successfully converted to an internal
// struct.
{
name: "header-and-channel-id-hash-policy",
hashPolicies: []*v3routepb.RouteAction_HashPolicy{
{
PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{
Header: &v3routepb.RouteAction_HashPolicy_Header{
HeaderName: ":path",
RegexRewrite: &v3matcherpb.RegexMatchAndSubstitute{
Pattern: &v3matcherpb.RegexMatcher{Regex: "/products"},
Substitution: "/products",
},
},
},
},
{
PolicySpecifier: &v3routepb.RouteAction_HashPolicy_ConnectionProperties_{},
Terminal: true,
},
},
wantHashPolicies: []*HashPolicy{
{
HashPolicyType: HashPolicyTypeHeader,
HeaderName: ":path",
Regex: "/products",
RegexSubstitution: "/products",
},
{
HashPolicyType: HashPolicyTypeChannelID,
Terminal: true,
},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := hashPoliciesProtoToSlice(tt.hashPolicies)
if diff := cmp.Diff(got, tt.wantHashPolicies); diff != "" {
t.Fatalf("hashPoliciesProtoToSlice returned returned unexpected diff (-got +want):\n%s", diff)
}
})
}
}

func newStringP(s string) *string {
return &s
}
Expand Down
28 changes: 28 additions & 0 deletions xds/internal/xdsclient/xds.go
Expand Up @@ -500,6 +500,7 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger,

route.WeightedClusters = make(map[string]WeightedCluster)
action := r.GetRoute()
route.HashPolicies = hashPoliciesProtoToSlice(action.HashPolicy)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add an env var GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH
See also #4440

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

switch a := action.GetClusterSpecifier().(type) {
case *v3routepb.RouteAction_Cluster:
route.WeightedClusters[a.Cluster] = WeightedCluster{Weight: 1}
Expand Down Expand Up @@ -561,6 +562,33 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger,
return routesRet, nil
}

func hashPoliciesProtoToSlice(policies []*v3routepb.RouteAction_HashPolicy) []*HashPolicy {
var hashPoliciesRet []*HashPolicy
for _, p := range policies {
var policy HashPolicy
policy.Terminal = p.Terminal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this can be oneline
policy := HashPolicy{Terminal: p.Terminal}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

switch p.GetPolicySpecifier().(type) {
case *v3routepb.RouteAction_HashPolicy_Header_:
policy.HashPolicyType = HashPolicyTypeHeader
policy.HeaderName = p.GetHeader().HeaderName
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call GetHeaderName(). Otherwise this will panic if GetHeader() returns nil.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I've actually been wondering about this for a while.

policy.Regex = p.GetHeader().GetRegexRewrite().GetPattern().Regex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make these two type *regexp.Regexp, parse the regex, and return error if it returns errors.

Like https://github.com/grpc/grpc-go/blob/master/xds/internal/xdsclient/xds.go#L461

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I treated these errors as no-ops in config selector. Your idea is much better, fail at parsing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I only changed .Regex though, because the others aren't Regexes but strings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also changed all the no op logic due to our discussion yesterday. Rather than make the no-op decision for the user, just fail at parsing step.

policy.RegexSubstitution = p.GetHeader().GetRegexRewrite().Substitution
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call GetSubstitution()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// ConnectionProperties in Envoy specifies to hash the source ip
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this under case *v3routepb.RouteAction_HashPolicy_ConnectionProperties_.

Or, move this to the next config selector PR. This comment doesn't make much sense here.

Copy link
Contributor Author

@zasweq zasweq Jun 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this not make sense? How would you describe the scope of comments in this file?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment would make more sense in config selector, where you hash the pointer. You don't need to explain here how it will be handled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmmm ok deleted.

// address. In grpc, this is logically equivalent to hashing the
// channel pointer.
case *v3routepb.RouteAction_HashPolicy_ConnectionProperties_:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't support connection properties I think.

The way to specify channel_id is the filter_state field set to contain the key "io.grpc.channel_id".
http://doc/10J27vUqOYH8du6XZLcnmwOOSwO9r3HjIe_Jybn6AbVE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

policy.HashPolicyType = HashPolicyTypeChannelID
default:
// In the case where the hash policy action isn't to hash the header
// or the unique identifier of the channel, ignore the field in the
// update and treat as a no-op.
}

hashPoliciesRet = append(hashPoliciesRet, &policy)
}
return hashPoliciesRet
}

// UnmarshalCluster processes resources received in an CDS response, validates
// them, and transforms them into a native struct which contains only fields we
// are interested in.
Expand Down