Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

authz: create file watcher interceptor for gRPC SDK API #4760

Merged
merged 16 commits into from Oct 9, 2021
Merged
296 changes: 257 additions & 39 deletions authz/sdk_end2end_test.go
Expand Up @@ -21,7 +21,10 @@ package authz_test
import (
"context"
"io"
"io/ioutil"
ashithasantosh marked this conversation as resolved.
Show resolved Hide resolved
"net"
"os"
"path"
"testing"
"time"

Expand Down Expand Up @@ -53,15 +56,14 @@ func (s *testServer) StreamingInputCall(stream pb.TestService_StreamingInputCall
}
}

func TestSDKEnd2End(t *testing.T) {
tests := map[string]struct {
authzPolicy string
md metadata.MD
wantStatusCode codes.Code
wantErr string
}{
"DeniesRpcRequestMatchInDenyNoMatchInAllow": {
authzPolicy: `{
var sdkTests = map[string]struct {
authzPolicy string
md metadata.MD
wantStatusCode codes.Code
wantErr string
}{
"DeniesRpcMatchInDenyNoMatchInAllow": {
authzPolicy: `{
"name": "authz",
"allow_rules":
[
Expand Down Expand Up @@ -100,12 +102,12 @@ func TestSDKEnd2End(t *testing.T) {
}
]
}`,
md: metadata.Pairs("key-abc", "val-abc"),
wantStatusCode: codes.PermissionDenied,
wantErr: "unauthorized RPC request rejected",
},
"DeniesRpcRequestMatchInDenyAndAllow": {
authzPolicy: `{
md: metadata.Pairs("key-abc", "val-abc"),
wantStatusCode: codes.PermissionDenied,
wantErr: "unauthorized RPC request rejected",
},
"DeniesRpcMatchInDenyAndAllow": {
authzPolicy: `{
"name": "authz",
"allow_rules":
[
Expand All @@ -132,11 +134,11 @@ func TestSDKEnd2End(t *testing.T) {
}
]
}`,
wantStatusCode: codes.PermissionDenied,
wantErr: "unauthorized RPC request rejected",
},
"AllowsRpcRequestNoMatchInDenyMatchInAllow": {
authzPolicy: `{
wantStatusCode: codes.PermissionDenied,
wantErr: "unauthorized RPC request rejected",
},
"AllowsRpcNoMatchInDenyMatchInAllow": {
authzPolicy: `{
"name": "authz",
"allow_rules":
[
Expand Down Expand Up @@ -169,11 +171,11 @@ func TestSDKEnd2End(t *testing.T) {
}
]
}`,
md: metadata.Pairs("key-xyz", "val-xyz"),
wantStatusCode: codes.OK,
},
"AllowsRpcRequestNoMatchInDenyAndAllow": {
authzPolicy: `{
md: metadata.Pairs("key-xyz", "val-xyz"),
wantStatusCode: codes.OK,
},
"AllowsRpcNoMatchInDenyAndAllow": {
authzPolicy: `{
"name": "authz",
"allow_rules":
[
Expand All @@ -200,11 +202,11 @@ func TestSDKEnd2End(t *testing.T) {
}
]
}`,
wantStatusCode: codes.PermissionDenied,
wantErr: "unauthorized RPC request rejected",
},
"AllowsRpcRequestEmptyDenyMatchInAllow": {
authzPolicy: `{
wantStatusCode: codes.PermissionDenied,
wantErr: "unauthorized RPC request rejected",
},
"AllowsRpcEmptyDenyMatchInAllow": {
authzPolicy: `{
"name": "authz",
"allow_rules":
[
Expand All @@ -230,10 +232,10 @@ func TestSDKEnd2End(t *testing.T) {
}
]
}`,
wantStatusCode: codes.OK,
},
"DeniesRpcRequestEmptyDenyNoMatchInAllow": {
authzPolicy: `{
wantStatusCode: codes.OK,
},
"DeniesRpcEmptyDenyNoMatchInAllow": {
authzPolicy: `{
"name": "authz",
"allow_rules":
[
Expand All @@ -249,11 +251,13 @@ func TestSDKEnd2End(t *testing.T) {
}
]
}`,
wantStatusCode: codes.PermissionDenied,
wantErr: "unauthorized RPC request rejected",
},
}
for name, test := range tests {
wantStatusCode: codes.PermissionDenied,
wantErr: "unauthorized RPC request rejected",
},
}

func TestSDKStaticPolicyEnd2End(t *testing.T) {
dfawley marked this conversation as resolved.
Show resolved Hide resolved
for name, test := range sdkTests {
t.Run(name, func(t *testing.T) {
// Start a gRPC server with SDK unary and stream server interceptors.
i, _ := authz.NewStatic(test.authzPolicy)
Expand All @@ -266,6 +270,64 @@ func TestSDKEnd2End(t *testing.T) {
grpc.ChainStreamInterceptor(i.StreamInterceptor))
pb.RegisterTestServiceServer(s, &testServer{})
go s.Serve(lis)
defer s.Stop()

// Establish a connection to the server.
clientConn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
if err != nil {
t.Fatalf("grpc.Dial(%v) failed: %v", lis.Addr().String(), err)
}
defer clientConn.Close()
client := pb.NewTestServiceClient(clientConn)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
ctx = metadata.NewOutgoingContext(ctx, test.md)

// Verifying authorization decision for Unary RPC.
_, err = client.UnaryCall(ctx, &pb.SimpleRequest{})
if got := status.Convert(err); got.Code() != test.wantStatusCode || got.Message() != test.wantErr {
t.Fatalf("[UnaryCall] error want:{%v %v} got:{%v %v}", test.wantStatusCode, test.wantErr, got.Code(), got.Message())
}

// Verifying authorization decision for Streaming RPC.
stream, err := client.StreamingInputCall(ctx)
if err != nil {
t.Fatalf("failed StreamingInputCall err: %v", err)
}
req := &pb.StreamingInputCallRequest{
Payload: &pb.Payload{
Body: []byte("hi"),
},
}
if err := stream.Send(req); err != nil && err != io.EOF {
t.Fatalf("failed stream.Send err: %v", err)
}
_, err = stream.CloseAndRecv()
if got := status.Convert(err); got.Code() != test.wantStatusCode || got.Message() != test.wantErr {
t.Fatalf("[StreamingCall] error want:{%v %v} got:{%v %v}", test.wantStatusCode, test.wantErr, got.Code(), got.Message())
}
})
}
}

func TestSDKFileWatcherEnd2End(t *testing.T) {
for name, test := range sdkTests {
t.Run(name, func(t *testing.T) {
dir := createTmpPolicyFile(t, name+"*", []byte(test.authzPolicy))
dfawley marked this conversation as resolved.
Show resolved Hide resolved
i, _ := authz.NewFileWatcher(path.Join(dir, "policy.json"), 1*time.Second)
dfawley marked this conversation as resolved.
Show resolved Hide resolved
defer i.Close()

// Start a gRPC server with SDK unary and stream server interceptors.
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("error listening: %v", err)
}
s := grpc.NewServer(
grpc.ChainUnaryInterceptor(i.UnaryInterceptor),
grpc.ChainStreamInterceptor(i.StreamInterceptor))
pb.RegisterTestServiceServer(s, &testServer{})
go s.Serve(lis)

// Establish a connection to the server.
clientConn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
Expand Down Expand Up @@ -305,3 +367,159 @@ func TestSDKEnd2End(t *testing.T) {
})
}
}

func TestSDKFileWatcher_ValidPolicyRefresh(t *testing.T) {
valid1 := sdkTests["DeniesRpcMatchInDenyAndAllow"]
dir := createTmpPolicyFile(t, "valid_policy_refresh*", []byte(valid1.authzPolicy))
i, _ := authz.NewFileWatcher(path.Join(dir, "policy.json"), 1*time.Second)
defer i.Close()

// Start a gRPC server with SDK unary server interceptor.
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("error listening: %v", err)
}
s := grpc.NewServer(
grpc.ChainUnaryInterceptor(i.UnaryInterceptor))
pb.RegisterTestServiceServer(s, &testServer{})
go s.Serve(lis)

// Establish a connection to the server.
clientConn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
if err != nil {
t.Fatalf("grpc.Dial(%v) failed: %v", lis.Addr().String(), err)
}
defer clientConn.Close()
client := pb.NewTestServiceClient(clientConn)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Verifying authorization decision.
_, err = client.UnaryCall(ctx, &pb.SimpleRequest{})
if got := status.Convert(err); got.Code() != valid1.wantStatusCode || got.Message() != valid1.wantErr {
t.Fatalf("error want:{%v %v} got:{%v %v}", valid1.wantStatusCode, valid1.wantErr, got.Code(), got.Message())
}

// Rewrite the file with a different valid authorization policy.
valid2 := sdkTests["AllowsRpcEmptyDenyMatchInAllow"]
if err := ioutil.WriteFile(path.Join(dir, "policy.json"), []byte(valid2.authzPolicy), os.ModePerm); err != nil {
t.Fatalf("ioutil.WriteFile(%q) failed: %v", path.Join(dir, "policy.json"), err)
}
// Wait 2 seconds for background go routine to read the updated files.
time.Sleep(2 * time.Second)
dfawley marked this conversation as resolved.
Show resolved Hide resolved

// Verifying authorization decision.
_, err = client.UnaryCall(ctx, &pb.SimpleRequest{})
ashithasantosh marked this conversation as resolved.
Show resolved Hide resolved
if got := status.Convert(err); got.Code() != valid2.wantStatusCode || got.Message() != valid2.wantErr {
t.Fatalf("error want:{%v %v} got:{%v %v}", valid2.wantStatusCode, valid2.wantErr, got.Code(), got.Message())
ashithasantosh marked this conversation as resolved.
Show resolved Hide resolved
}
}

func TestSDKFileWatcher_InvalidPolicySkipReload(t *testing.T) {
valid := sdkTests["DeniesRpcMatchInDenyAndAllow"]
dir := createTmpPolicyFile(t, "invalid_policy_skip_reload*", []byte(valid.authzPolicy))
i, _ := authz.NewFileWatcher(path.Join(dir, "policy.json"), 1*time.Second)
defer i.Close()

// Start a gRPC server with SDK unary server interceptors.
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("error listening: %v", err)
}
s := grpc.NewServer(
grpc.ChainUnaryInterceptor(i.UnaryInterceptor))
pb.RegisterTestServiceServer(s, &testServer{})
go s.Serve(lis)

// Establish a connection to the server.
clientConn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
if err != nil {
t.Fatalf("grpc.Dial(%v) failed: %v", lis.Addr().String(), err)
}
defer clientConn.Close()
client := pb.NewTestServiceClient(clientConn)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Verifying authorization decision.
_, err = client.UnaryCall(ctx, &pb.SimpleRequest{})
if got := status.Convert(err); got.Code() != valid.wantStatusCode || got.Message() != valid.wantErr {
t.Fatalf("[UnaryCall] error want:{%v %v} got:{%v %v}", valid.wantStatusCode, valid.wantErr, got.Code(), got.Message())
}

// Skips the invalid policy update, and continues to use the valid policy.
if err := ioutil.WriteFile(path.Join(dir, "policy.json"), []byte("{}"), os.ModePerm); err != nil {
t.Fatalf("ioutil.WriteFile(%q) failed: %v", path.Join(dir, "policy.json"), err)
}
// Wait 2 seconds for background go routine to read the updated files.
time.Sleep(2 * time.Second)

// Verifying authorization decision.
_, err = client.UnaryCall(ctx, &pb.SimpleRequest{})
if got := status.Convert(err); got.Code() != valid.wantStatusCode || got.Message() != valid.wantErr {
t.Fatalf("[UnaryCall] error want:{%v %v} got:{%v %v}", valid.wantStatusCode, valid.wantErr, got.Code(), got.Message())
}
}

func TestSDKFileWatcher_RecoversFromReloadFailure(t *testing.T) {
valid1 := sdkTests["DeniesRpcMatchInDenyAndAllow"]
dir := createTmpPolicyFile(t, "recovers_from_reload_failure*", []byte(valid1.authzPolicy))
i, _ := authz.NewFileWatcher(path.Join(dir, "policy.json"), 1*time.Second)
defer i.Close()

// Start a gRPC server with SDK unary server interceptors.
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("error listening: %v", err)
}
s := grpc.NewServer(
grpc.ChainUnaryInterceptor(i.UnaryInterceptor))
pb.RegisterTestServiceServer(s, &testServer{})
go s.Serve(lis)

// Establish a connection to the server.
clientConn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
if err != nil {
t.Fatalf("grpc.Dial(%v) failed: %v", lis.Addr().String(), err)
}
defer clientConn.Close()
client := pb.NewTestServiceClient(clientConn)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Verifying authorization decision.
_, err = client.UnaryCall(ctx, &pb.SimpleRequest{})
if got := status.Convert(err); got.Code() != valid1.wantStatusCode || got.Message() != valid1.wantErr {
t.Fatalf("[UnaryCall] error want:{%v %v} got:{%v %v}", valid1.wantStatusCode, valid1.wantErr, got.Code(), got.Message())
}

// Skips the invalid policy update, and continues to use the valid policy.
if err := ioutil.WriteFile(path.Join(dir, "policy.json"), []byte("{}"), os.ModePerm); err != nil {
t.Fatalf("ioutil.WriteFile(%q) failed: %v", path.Join(dir, "policy.json"), err)
}
// Wait 2 seconds for background go routine to read the updated files.
time.Sleep(2 * time.Second)

// Verifying authorization decision.
_, err = client.UnaryCall(ctx, &pb.SimpleRequest{})
if got := status.Convert(err); got.Code() != valid1.wantStatusCode || got.Message() != valid1.wantErr {
t.Fatalf("[UnaryCall] error want:{%v %v} got:{%v %v}", valid1.wantStatusCode, valid1.wantErr, got.Code(), got.Message())
}

// Rewrite the file with a different valid authorization policy.
valid2 := sdkTests["AllowsRpcEmptyDenyMatchInAllow"]
if err := ioutil.WriteFile(path.Join(dir, "policy.json"), []byte(valid2.authzPolicy), os.ModePerm); err != nil {
t.Fatalf("ioutil.WriteFile(%q) failed: %v", path.Join(dir, "policy.json"), err)
}
// Wait 2 seconds for background go routine to read the updated files.
time.Sleep(2 * time.Second)

// Verifying authorization decision.
_, err = client.UnaryCall(ctx, &pb.SimpleRequest{})
if got := status.Convert(err); got.Code() != valid2.wantStatusCode || got.Message() != valid2.wantErr {
t.Fatalf("[UnaryCall] error want:{%v %v} got:{%v %v}", valid2.wantStatusCode, valid2.wantErr, got.Code(), got.Message())
}
}