Skip to content

Commit

Permalink
rls: propagate headers received in RLS response to backends (#5883)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Dec 21, 2022
1 parent f94594d commit 5ff7dfc
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 3 deletions.
17 changes: 15 additions & 2 deletions balancer/rls/picker.go
Expand Up @@ -167,13 +167,26 @@ func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
//
// Caller must hold at least a read-lock on p.lb.cacheMu.
func (p *rlsPicker) delegateToChildPolicies(dcEntry *cacheEntry, info balancer.PickInfo) (balancer.PickResult, error) {
const rlsDataHeaderName = "x-google-rls-data"
for i, cpw := range dcEntry.childPolicyWrappers {
state := (*balancer.State)(atomic.LoadPointer(&cpw.state))
// Delegate to the child policy if it is not in TRANSIENT_FAILURE, or if
// it the last one (which handles the case of delegating to the last
// it is the last one (which handles the case of delegating to the last
// child picker if all child polcies are in TRANSIENT_FAILURE).
if state.ConnectivityState != connectivity.TransientFailure || i == len(dcEntry.childPolicyWrappers)-1 {
return state.Picker.Pick(info)
// Any header data received from the RLS server is stored in the
// cache entry and needs to be sent to the actual backend in the
// X-Google-RLS-Data header.
res, err := state.Picker.Pick(info)
if err != nil {
return res, err
}
if res.Metatada == nil {
res.Metatada = metadata.Pairs(rlsDataHeaderName, dcEntry.headerData)
} else {
res.Metatada.Append(rlsDataHeaderName, dcEntry.headerData)
}
return res, nil
}
}
// In the unlikely event that we have a cache entry with no targets, we end up
Expand Down
65 changes: 64 additions & 1 deletion balancer/rls/picker_test.go
Expand Up @@ -20,16 +20,22 @@ package rls

import (
"context"
"fmt"
"testing"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
"google.golang.org/grpc/internal/stubserver"
rlstest "google.golang.org/grpc/internal/testutils/rls"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"

rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
testgrpc "google.golang.org/grpc/test/grpc_testing"
testpb "google.golang.org/grpc/test/grpc_testing"
)

// Test verifies the scenario where there is no matching entry in the data cache
Expand Down Expand Up @@ -241,6 +247,63 @@ func (s) TestPick_DataCacheHit_NoPendingEntry_ValidEntry(t *testing.T) {
verifyRLSRequest(t, rlsReqCh, false)
}

// Test verifies the scenario where there is a matching entry in the data cache
// which is valid and there is no pending request. The pick is expected to be
// delegated to the child policy.
func (s) TestPick_DataCacheHit_NoPendingEntry_ValidEntry_WithHeaderData(t *testing.T) {
// Start an RLS server and set the throttler to never throttle requests.
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil)
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Build the RLS config without a default target.
rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)

// Start a test backend which expects the header data contents sent from the
// RLS server to be part of RPC metadata as X-Google-RLS-Data header.
const headerDataContents = "foo,bar,baz"
backend := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
gotHeaderData := metadata.ValueFromIncomingContext(ctx, "x-google-rls-data")
if len(gotHeaderData) != 1 || gotHeaderData[0] != headerDataContents {
return nil, fmt.Errorf("got metadata in `X-Google-RLS-Data` is %v, want %s", gotHeaderData, headerDataContents)
}
return &testpb.Empty{}, nil
},
}
if err := backend.StartServer(); err != nil {
t.Fatalf("Failed to start backend: %v", err)
}
t.Logf("Started TestService backend at: %q", backend.Address)
defer backend.Stop()

// Setup the fake RLS server to return the above backend as a target in the
// RLS response. Also, populate the header data field in the response.
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{
Targets: []string{backend.Address},
HeaderData: headerDataContents,
}}
})

// Register a manual resolver and push the RLS service config through it.
r := startManualResolverWithConfig(t, rlsConfig)

// Dial the backend.
cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()

// Make an RPC and ensure it gets routed to the test backend with the header
// data sent by the RLS server.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if _, err := testgrpc.NewTestServiceClient(cc).EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall() RPC: %v", err)
}
}

// Test verifies the scenario where there is a matching entry in the data cache
// which is stale and there is no pending request. The pick is expected to be
// delegated to the child policy with a proactive cache refresh.
Expand Down

0 comments on commit 5ff7dfc

Please sign in to comment.