Skip to content

Commit

Permalink
xds: Add RLS Cluster Specifier Plugin (#5004)
Browse files Browse the repository at this point in the history
* xds: Add RLS Cluster Specifier Plugin
  • Loading branch information
zasweq committed Dec 15, 2021
1 parent 50f8270 commit 029b822
Show file tree
Hide file tree
Showing 3 changed files with 297 additions and 0 deletions.
25 changes: 25 additions & 0 deletions balancer/rls/rls.go
@@ -0,0 +1,25 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package rls imports to init the rls lb policy for testing purposes.
package rls

import (
// Blank import to init the rls lb policy for external use.
_ "google.golang.org/grpc/balancer/rls/internal"
)
104 changes: 104 additions & 0 deletions xds/internal/clusterspecifier/rls/rls.go
@@ -0,0 +1,104 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package rls implements the RLS cluster specifier plugin.
package rls

import (
"encoding/json"
"fmt"

"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/proto/grpc_lookup_v1"
"google.golang.org/grpc/xds/internal/clusterspecifier"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/anypb"

// Blank import to init the RLS LB policy.
_ "google.golang.org/grpc/balancer/rls"
)

const rlsBalancerName = "rls_experimental"

func init() {
if envconfig.XDSRLS {
clusterspecifier.Register(rls{})
}
}

type rls struct{}

func (rls) TypeURLs() []string {
return []string{"type.googleapis.com/grpc.lookup.v1.RouteLookupClusterSpecifier"}
}

// lbConfigJSON is the RLS LB Policies configuration in JSON format.
// RouteLookupConfig will be a raw JSON string from the passed in proto
// configuration, and the other fields will be hardcoded.
type lbConfigJSON struct {
RouteLookupConfig json.RawMessage `json:"routeLookupConfig"`
ChildPolicy []map[string]json.RawMessage `json:"childPolicy"`
ChildPolicyConfigTargetFieldName string `json:"childPolicyConfigTargetFieldName"`
}

func (rls) ParseClusterSpecifierConfig(cfg proto.Message) (clusterspecifier.BalancerConfig, error) {
if cfg == nil {
return nil, fmt.Errorf("rls_csp: nil configuration message provided")
}
any, ok := cfg.(*anypb.Any)
if !ok {
return nil, fmt.Errorf("rls_csp: error parsing config %v: unknown type %T", cfg, cfg)
}
rlcs := new(grpc_lookup_v1.RouteLookupClusterSpecifier)

if err := ptypes.UnmarshalAny(any, rlcs); err != nil {
return nil, fmt.Errorf("rls_csp: error parsing config %v: %v", cfg, err)
}
rlcJSON, err := protojson.Marshal(rlcs.GetRouteLookupConfig())
if err != nil {
return nil, fmt.Errorf("rls_csp: error marshaling route lookup config: %v: %v", rlcs.GetRouteLookupConfig(), err)
}
lbCfgJSON := &lbConfigJSON{
RouteLookupConfig: rlcJSON, // "JSON form of RouteLookupClusterSpecifier.config" - RLS in xDS Design Doc
ChildPolicy: []map[string]json.RawMessage{
{
"cds_experimental": json.RawMessage("{}"),
},
},
ChildPolicyConfigTargetFieldName: "cluster",
}

rawJSON, err := json.Marshal(lbCfgJSON)
if err != nil {
return nil, fmt.Errorf("rls_csp: error marshaling load balancing config %v: %v", lbCfgJSON, err)
}

rlsBB := balancer.Get(rlsBalancerName)
if rlsBB == nil {
return nil, fmt.Errorf("RLS LB policy not registered")
}
_, err = rlsBB.(balancer.ConfigParser).ParseConfig(rawJSON)
if err != nil {
return nil, fmt.Errorf("rls_csp: validation error from rls lb policy parsing %v", err)
}

return clusterspecifier.BalancerConfig{{rlsBalancerName: lbCfgJSON}}, nil
}
168 changes: 168 additions & 0 deletions xds/internal/clusterspecifier/rls/rls_test.go
@@ -0,0 +1,168 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package rls

import (
"encoding/json"
"testing"

"github.com/golang/protobuf/proto"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
_ "google.golang.org/grpc/balancer/rls"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/proto/grpc_lookup_v1"
"google.golang.org/grpc/internal/testutils"
_ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer"
"google.golang.org/grpc/xds/internal/clusterspecifier"
"google.golang.org/protobuf/types/known/durationpb"
)

func init() {
clusterspecifier.Register(rls{})
}

type s struct {
grpctest.Tester
}

func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

// TestParseClusterSpecifierConfig tests the parsing functionality of the RLS
// Cluster Specifier Plugin.
func (s) TestParseClusterSpecifierConfig(t *testing.T) {
tests := []struct {
name string
rlcs proto.Message
wantConfig clusterspecifier.BalancerConfig
wantErr bool
}{
{
name: "invalid-rls-cluster-specifier",
rlcs: rlsClusterSpecifierConfigError,
wantErr: true,
},
{
name: "valid-rls-cluster-specifier",
rlcs: rlsClusterSpecifierConfigWithoutTransformations,
wantConfig: configWithoutTransformationsWant,
},
}
for _, test := range tests {
cs := clusterspecifier.Get("type.googleapis.com/grpc.lookup.v1.RouteLookupClusterSpecifier")
if cs == nil {
t.Fatal("Error getting cluster specifier")
}
lbCfg, err := cs.ParseClusterSpecifierConfig(test.rlcs)

if (err != nil) != test.wantErr {
t.Fatalf("ParseClusterSpecifierConfig(%+v) returned err: %v, wantErr: %v", test.rlcs, err, test.wantErr)
}
if test.wantErr { // Successfully received an error.
return
}
// Marshal and then unmarshal into interface{} to get rid of
// nondeterministic protojson Marshaling.
lbCfgJSON, err := json.Marshal(lbCfg)
if err != nil {
t.Fatalf("json.Marshal(%+v) returned err %v", lbCfg, err)
}
var got interface{}
err = json.Unmarshal(lbCfgJSON, got)
if err != nil {
t.Fatalf("json.Unmarshal(%+v) returned err %v", lbCfgJSON, err)
}
wantCfgJSON, err := json.Marshal(test.wantConfig)
if err != nil {
t.Fatalf("json.Marshal(%+v) returned err %v", test.wantConfig, err)
}
var want interface{}
err = json.Unmarshal(wantCfgJSON, want)
if err != nil {
t.Fatalf("json.Unmarshal(%+v) returned err %v", lbCfgJSON, err)
}
if diff := cmp.Diff(want, got, cmpopts.EquateEmpty()); diff != "" {
t.Fatalf("ParseClusterSpecifierConfig(%+v) returned expected, diff (-want +got) %v", test.rlcs, diff)
}
}
}

// This will error because the required match field is set in grpc key builder.
var rlsClusterSpecifierConfigError = testutils.MarshalAny(&grpc_lookup_v1.RouteLookupClusterSpecifier{
RouteLookupConfig: &grpc_lookup_v1.RouteLookupConfig{
GrpcKeybuilders: []*grpc_lookup_v1.GrpcKeyBuilder{
{
Names: []*grpc_lookup_v1.GrpcKeyBuilder_Name{
{
Service: "service",
Method: "method",
},
},
Headers: []*grpc_lookup_v1.NameMatcher{
{
Key: "k1",
RequiredMatch: true,
Names: []string{"v1"},
},
},
},
},
},
})

// Corresponds to the rls unit test case in
// balancer/rls/internal/config_test.go.
var rlsClusterSpecifierConfigWithoutTransformations = testutils.MarshalAny(&grpc_lookup_v1.RouteLookupClusterSpecifier{
RouteLookupConfig: &grpc_lookup_v1.RouteLookupConfig{
GrpcKeybuilders: []*grpc_lookup_v1.GrpcKeyBuilder{
{
Names: []*grpc_lookup_v1.GrpcKeyBuilder_Name{
{
Service: "service",
Method: "method",
},
},
Headers: []*grpc_lookup_v1.NameMatcher{
{
Key: "k1",
Names: []string{"v1"},
},
},
},
},
LookupService: "target",
LookupServiceTimeout: &durationpb.Duration{Seconds: 100},
MaxAge: &durationpb.Duration{Seconds: 60},
StaleAge: &durationpb.Duration{Seconds: 50},
CacheSizeBytes: 1000,
DefaultTarget: "passthrough:///default",
},
})

var configWithoutTransformationsWant = clusterspecifier.BalancerConfig{{"rls_experimental": &lbConfigJSON{
RouteLookupConfig: []byte(`{"grpcKeybuilders":[{"names":[{"service":"service","method":"method"}],"headers":[{"key":"k1","names":["v1"]}]}],"lookupService":"target","lookupServiceTimeout":"100s","maxAge":"60s","staleAge":"50s","cacheSizeBytes":"1000","defaultTarget":"passthrough:///default"}`),
ChildPolicy: []map[string]json.RawMessage{
{
"cds_experimental": []byte(`{}`),
},
},
ChildPolicyConfigTargetFieldName: "cluster",
}}}

0 comments on commit 029b822

Please sign in to comment.