From abd9443fc2aa3e7fa8bd5fbe958248b756822b83 Mon Sep 17 00:00:00 2001 From: Ashitha Santhosh Date: Sat, 11 Sep 2021 19:13:48 -0700 Subject: [PATCH 01/15] authz: create file watcher interceptor for gRPC SDK API --- authz/sdk_end2end_test.go | 296 ++++++++++++++++++++++---- authz/sdk_server_interceptors.go | 92 ++++++++ authz/sdk_server_interceptors_test.go | 69 +++++- 3 files changed, 414 insertions(+), 43 deletions(-) diff --git a/authz/sdk_end2end_test.go b/authz/sdk_end2end_test.go index 92a5e4f4b21..12dfd131e60 100644 --- a/authz/sdk_end2end_test.go +++ b/authz/sdk_end2end_test.go @@ -21,7 +21,10 @@ package authz_test import ( "context" "io" + "io/ioutil" "net" + "os" + "path" "testing" "time" @@ -53,15 +56,14 @@ func (s *testServer) StreamingInputCall(stream pb.TestService_StreamingInputCall } } -func TestSDKEnd2End(t *testing.T) { - tests := map[string]struct { - authzPolicy string - md metadata.MD - wantStatusCode codes.Code - wantErr string - }{ - "DeniesRpcRequestMatchInDenyNoMatchInAllow": { - authzPolicy: `{ +var sdkTests = map[string]struct { + authzPolicy string + md metadata.MD + wantStatusCode codes.Code + wantErr string +}{ + "DeniesRpcMatchInDenyNoMatchInAllow": { + authzPolicy: `{ "name": "authz", "allow_rules": [ @@ -100,12 +102,12 @@ func TestSDKEnd2End(t *testing.T) { } ] }`, - md: metadata.Pairs("key-abc", "val-abc"), - wantStatusCode: codes.PermissionDenied, - wantErr: "unauthorized RPC request rejected", - }, - "DeniesRpcRequestMatchInDenyAndAllow": { - authzPolicy: `{ + md: metadata.Pairs("key-abc", "val-abc"), + wantStatusCode: codes.PermissionDenied, + wantErr: "unauthorized RPC request rejected", + }, + "DeniesRpcMatchInDenyAndAllow": { + authzPolicy: `{ "name": "authz", "allow_rules": [ @@ -132,11 +134,11 @@ func TestSDKEnd2End(t *testing.T) { } ] }`, - wantStatusCode: codes.PermissionDenied, - wantErr: "unauthorized RPC request rejected", - }, - "AllowsRpcRequestNoMatchInDenyMatchInAllow": { - authzPolicy: `{ + wantStatusCode: codes.PermissionDenied, + wantErr: "unauthorized RPC request rejected", + }, + "AllowsRpcNoMatchInDenyMatchInAllow": { + authzPolicy: `{ "name": "authz", "allow_rules": [ @@ -169,11 +171,11 @@ func TestSDKEnd2End(t *testing.T) { } ] }`, - md: metadata.Pairs("key-xyz", "val-xyz"), - wantStatusCode: codes.OK, - }, - "AllowsRpcRequestNoMatchInDenyAndAllow": { - authzPolicy: `{ + md: metadata.Pairs("key-xyz", "val-xyz"), + wantStatusCode: codes.OK, + }, + "AllowsRpcNoMatchInDenyAndAllow": { + authzPolicy: `{ "name": "authz", "allow_rules": [ @@ -200,11 +202,11 @@ func TestSDKEnd2End(t *testing.T) { } ] }`, - wantStatusCode: codes.PermissionDenied, - wantErr: "unauthorized RPC request rejected", - }, - "AllowsRpcRequestEmptyDenyMatchInAllow": { - authzPolicy: `{ + wantStatusCode: codes.PermissionDenied, + wantErr: "unauthorized RPC request rejected", + }, + "AllowsRpcEmptyDenyMatchInAllow": { + authzPolicy: `{ "name": "authz", "allow_rules": [ @@ -230,10 +232,10 @@ func TestSDKEnd2End(t *testing.T) { } ] }`, - wantStatusCode: codes.OK, - }, - "DeniesRpcRequestEmptyDenyNoMatchInAllow": { - authzPolicy: `{ + wantStatusCode: codes.OK, + }, + "DeniesRpcEmptyDenyNoMatchInAllow": { + authzPolicy: `{ "name": "authz", "allow_rules": [ @@ -249,11 +251,13 @@ func TestSDKEnd2End(t *testing.T) { } ] }`, - wantStatusCode: codes.PermissionDenied, - wantErr: "unauthorized RPC request rejected", - }, - } - for name, test := range tests { + wantStatusCode: codes.PermissionDenied, + wantErr: "unauthorized RPC request rejected", + }, +} + +func TestSDKStaticPolicyEnd2End(t *testing.T) { + for name, test := range sdkTests { t.Run(name, func(t *testing.T) { // Start a gRPC server with SDK unary and stream server interceptors. i, _ := authz.NewStatic(test.authzPolicy) @@ -266,6 +270,64 @@ func TestSDKEnd2End(t *testing.T) { grpc.ChainStreamInterceptor(i.StreamInterceptor)) pb.RegisterTestServiceServer(s, &testServer{}) go s.Serve(lis) + defer s.Stop() + + // Establish a connection to the server. + clientConn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure()) + if err != nil { + t.Fatalf("grpc.Dial(%v) failed: %v", lis.Addr().String(), err) + } + defer clientConn.Close() + client := pb.NewTestServiceClient(clientConn) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + ctx = metadata.NewOutgoingContext(ctx, test.md) + + // Verifying authorization decision for Unary RPC. + _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) + if got := status.Convert(err); got.Code() != test.wantStatusCode || got.Message() != test.wantErr { + t.Fatalf("[UnaryCall] error want:{%v %v} got:{%v %v}", test.wantStatusCode, test.wantErr, got.Code(), got.Message()) + } + + // Verifying authorization decision for Streaming RPC. + stream, err := client.StreamingInputCall(ctx) + if err != nil { + t.Fatalf("failed StreamingInputCall err: %v", err) + } + req := &pb.StreamingInputCallRequest{ + Payload: &pb.Payload{ + Body: []byte("hi"), + }, + } + if err := stream.Send(req); err != nil && err != io.EOF { + t.Fatalf("failed stream.Send err: %v", err) + } + _, err = stream.CloseAndRecv() + if got := status.Convert(err); got.Code() != test.wantStatusCode || got.Message() != test.wantErr { + t.Fatalf("[StreamingCall] error want:{%v %v} got:{%v %v}", test.wantStatusCode, test.wantErr, got.Code(), got.Message()) + } + }) + } +} + +func TestSDKFileWatcherEnd2End(t *testing.T) { + for name, test := range sdkTests { + t.Run(name, func(t *testing.T) { + dir := createTmpPolicyFile(t, name+"*", []byte(test.authzPolicy)) + i, _ := authz.NewFileWatcher(path.Join(dir, "policy.json"), 1*time.Second) + defer i.Close() + + // Start a gRPC server with SDK unary and stream server interceptors. + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("error listening: %v", err) + } + s := grpc.NewServer( + grpc.ChainUnaryInterceptor(i.UnaryInterceptor), + grpc.ChainStreamInterceptor(i.StreamInterceptor)) + pb.RegisterTestServiceServer(s, &testServer{}) + go s.Serve(lis) // Establish a connection to the server. clientConn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure()) @@ -305,3 +367,159 @@ func TestSDKEnd2End(t *testing.T) { }) } } + +func TestSDKFileWatcher_ValidPolicyRefresh(t *testing.T) { + valid1 := sdkTests["DeniesRpcMatchInDenyAndAllow"] + dir := createTmpPolicyFile(t, "valid_policy_refresh*", []byte(valid1.authzPolicy)) + i, _ := authz.NewFileWatcher(path.Join(dir, "policy.json"), 1*time.Second) + defer i.Close() + + // Start a gRPC server with SDK unary server interceptor. + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("error listening: %v", err) + } + s := grpc.NewServer( + grpc.ChainUnaryInterceptor(i.UnaryInterceptor)) + pb.RegisterTestServiceServer(s, &testServer{}) + go s.Serve(lis) + + // Establish a connection to the server. + clientConn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure()) + if err != nil { + t.Fatalf("grpc.Dial(%v) failed: %v", lis.Addr().String(), err) + } + defer clientConn.Close() + client := pb.NewTestServiceClient(clientConn) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Verifying authorization decision. + _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) + if got := status.Convert(err); got.Code() != valid1.wantStatusCode || got.Message() != valid1.wantErr { + t.Fatalf("error want:{%v %v} got:{%v %v}", valid1.wantStatusCode, valid1.wantErr, got.Code(), got.Message()) + } + + // Rewrite the file with a different valid authorization policy. + valid2 := sdkTests["AllowsRpcEmptyDenyMatchInAllow"] + if err := ioutil.WriteFile(path.Join(dir, "policy.json"), []byte(valid2.authzPolicy), os.ModePerm); err != nil { + t.Fatalf("ioutil.WriteFile(%q) failed: %v", path.Join(dir, "policy.json"), err) + } + // Wait 2 seconds for background go routine to read the updated files. + time.Sleep(2 * time.Second) + + // Verifying authorization decision. + _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) + if got := status.Convert(err); got.Code() != valid2.wantStatusCode || got.Message() != valid2.wantErr { + t.Fatalf("error want:{%v %v} got:{%v %v}", valid2.wantStatusCode, valid2.wantErr, got.Code(), got.Message()) + } +} + +func TestSDKFileWatcher_InvalidPolicySkipReload(t *testing.T) { + valid := sdkTests["DeniesRpcMatchInDenyAndAllow"] + dir := createTmpPolicyFile(t, "invalid_policy_skip_reload*", []byte(valid.authzPolicy)) + i, _ := authz.NewFileWatcher(path.Join(dir, "policy.json"), 1*time.Second) + defer i.Close() + + // Start a gRPC server with SDK unary server interceptors. + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("error listening: %v", err) + } + s := grpc.NewServer( + grpc.ChainUnaryInterceptor(i.UnaryInterceptor)) + pb.RegisterTestServiceServer(s, &testServer{}) + go s.Serve(lis) + + // Establish a connection to the server. + clientConn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure()) + if err != nil { + t.Fatalf("grpc.Dial(%v) failed: %v", lis.Addr().String(), err) + } + defer clientConn.Close() + client := pb.NewTestServiceClient(clientConn) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Verifying authorization decision. + _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) + if got := status.Convert(err); got.Code() != valid.wantStatusCode || got.Message() != valid.wantErr { + t.Fatalf("[UnaryCall] error want:{%v %v} got:{%v %v}", valid.wantStatusCode, valid.wantErr, got.Code(), got.Message()) + } + + // Skips the invalid policy update, and continues to use the valid policy. + if err := ioutil.WriteFile(path.Join(dir, "policy.json"), []byte("{}"), os.ModePerm); err != nil { + t.Fatalf("ioutil.WriteFile(%q) failed: %v", path.Join(dir, "policy.json"), err) + } + // Wait 2 seconds for background go routine to read the updated files. + time.Sleep(2 * time.Second) + + // Verifying authorization decision. + _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) + if got := status.Convert(err); got.Code() != valid.wantStatusCode || got.Message() != valid.wantErr { + t.Fatalf("[UnaryCall] error want:{%v %v} got:{%v %v}", valid.wantStatusCode, valid.wantErr, got.Code(), got.Message()) + } +} + +func TestSDKFileWatcher_RecoversFromReloadFailure(t *testing.T) { + valid1 := sdkTests["DeniesRpcMatchInDenyAndAllow"] + dir := createTmpPolicyFile(t, "recovers_from_reload_failure*", []byte(valid1.authzPolicy)) + i, _ := authz.NewFileWatcher(path.Join(dir, "policy.json"), 1*time.Second) + defer i.Close() + + // Start a gRPC server with SDK unary server interceptors. + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("error listening: %v", err) + } + s := grpc.NewServer( + grpc.ChainUnaryInterceptor(i.UnaryInterceptor)) + pb.RegisterTestServiceServer(s, &testServer{}) + go s.Serve(lis) + + // Establish a connection to the server. + clientConn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure()) + if err != nil { + t.Fatalf("grpc.Dial(%v) failed: %v", lis.Addr().String(), err) + } + defer clientConn.Close() + client := pb.NewTestServiceClient(clientConn) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Verifying authorization decision. + _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) + if got := status.Convert(err); got.Code() != valid1.wantStatusCode || got.Message() != valid1.wantErr { + t.Fatalf("[UnaryCall] error want:{%v %v} got:{%v %v}", valid1.wantStatusCode, valid1.wantErr, got.Code(), got.Message()) + } + + // Skips the invalid policy update, and continues to use the valid policy. + if err := ioutil.WriteFile(path.Join(dir, "policy.json"), []byte("{}"), os.ModePerm); err != nil { + t.Fatalf("ioutil.WriteFile(%q) failed: %v", path.Join(dir, "policy.json"), err) + } + // Wait 2 seconds for background go routine to read the updated files. + time.Sleep(2 * time.Second) + + // Verifying authorization decision. + _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) + if got := status.Convert(err); got.Code() != valid1.wantStatusCode || got.Message() != valid1.wantErr { + t.Fatalf("[UnaryCall] error want:{%v %v} got:{%v %v}", valid1.wantStatusCode, valid1.wantErr, got.Code(), got.Message()) + } + + // Rewrite the file with a different valid authorization policy. + valid2 := sdkTests["AllowsRpcEmptyDenyMatchInAllow"] + if err := ioutil.WriteFile(path.Join(dir, "policy.json"), []byte(valid2.authzPolicy), os.ModePerm); err != nil { + t.Fatalf("ioutil.WriteFile(%q) failed: %v", path.Join(dir, "policy.json"), err) + } + // Wait 2 seconds for background go routine to read the updated files. + time.Sleep(2 * time.Second) + + // Verifying authorization decision. + _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) + if got := status.Convert(err); got.Code() != valid2.wantStatusCode || got.Message() != valid2.wantErr { + t.Fatalf("[UnaryCall] error want:{%v %v} got:{%v %v}", valid2.wantStatusCode, valid2.wantErr, got.Code(), got.Message()) + } +} diff --git a/authz/sdk_server_interceptors.go b/authz/sdk_server_interceptors.go index a2f992b5f26..782fba5f855 100644 --- a/authz/sdk_server_interceptors.go +++ b/authz/sdk_server_interceptors.go @@ -17,14 +17,23 @@ package authz import ( + "bytes" "context" + "fmt" + "io/ioutil" + "sync/atomic" + "time" + "unsafe" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal/xds/rbac" "google.golang.org/grpc/status" ) +var logger = grpclog.Component("sdk") + // StaticInterceptor contains engines used to make authorization decisions. It // either contains two engines deny engine followed by an allow engine or only // one allow engine. @@ -73,3 +82,86 @@ func (i *StaticInterceptor) StreamInterceptor(srv interface{}, ss grpc.ServerStr } return handler(srv, ss) } + +// FileWatcherInterceptor contains details used to make authorization decisions +// by watching a file path that contains authorization policy in JSON format. +type FileWatcherInterceptor struct { + internalInterceptor unsafe.Pointer // *StaticInterceptor + policyFile string + policyContents []byte + refreshDuration time.Duration + cancel context.CancelFunc +} + +// NewFileWatcher returns a new FileWatcherInterceptor from a policy file +// that contains JSON string of authorization policy and a refresh duration to +// specify the amount of time between policy refreshes. +func NewFileWatcher(file string, duration time.Duration) (*FileWatcherInterceptor, error) { + if file == "" { + return nil, fmt.Errorf("authorization policy file path is empty") + } + if duration <= time.Duration(0) { + return nil, fmt.Errorf("requires refresh interval(%v) greater than 0s", duration) + } + i := &FileWatcherInterceptor{policyFile: file, refreshDuration: duration} + i.updateInternalInterceptor() + ctx, cancel := context.WithCancel(context.Background()) + i.cancel = cancel + // Create a background go routine for policy refresh. + go i.run(ctx) + return i, nil +} + +func (i *FileWatcherInterceptor) run(ctx context.Context) { + ticker := time.NewTicker(i.refreshDuration) + for { + i.updateInternalInterceptor() + select { + case <-ctx.Done(): + ticker.Stop() + return + case <-ticker.C: + } + } +} + +// updateInternalInterceptor checks if the policy file that is watching has changed, +// and if so, updates the internalInterceptor with the policy. Unlike the +// constructor, if there is an error in reading the file or parsing the policy, the +// previous internalInterceptors will not be replaced. +func (i *FileWatcherInterceptor) updateInternalInterceptor() { + policyContents, err := ioutil.ReadFile(i.policyFile) + if err != nil { + logger.Warningf("policyFile(%s) read failed: %v", i.policyFile, err) + return + } + if bytes.Equal(i.policyContents, policyContents) { + return + } + interceptor, err := NewStatic(string(policyContents)) + if err != nil { + logger.Warningf("failed to update authorization engines: %v", err) + return + } + atomic.StorePointer(&i.internalInterceptor, unsafe.Pointer(interceptor)) + i.policyContents = policyContents +} + +// Close cleans up resources allocated by the interceptors. +func (i *FileWatcherInterceptor) Close() { + i.cancel() +} + +// UnaryInterceptor intercepts incoming Unary RPC requests. +// Only authorized requests are allowed to pass. Otherwise, an unauthorized +// error is returned to the client. +func (i *FileWatcherInterceptor) UnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + return ((*StaticInterceptor)(atomic.LoadPointer(&i.internalInterceptor))).UnaryInterceptor(ctx, req, info, handler) +} + +// StreamInterceptor intercepts incoming Stream RPC requests. +// Only authorized requests are allowed to pass. Otherwise, an unauthorized +// error is returned to the client. +func (i *FileWatcherInterceptor) StreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + return ((*StaticInterceptor)(atomic.LoadPointer(&i.internalInterceptor))).StreamInterceptor(srv, ss, info, handler) +} diff --git a/authz/sdk_server_interceptors_test.go b/authz/sdk_server_interceptors_test.go index e2c1072e7d8..092e216dd6e 100644 --- a/authz/sdk_server_interceptors_test.go +++ b/authz/sdk_server_interceptors_test.go @@ -19,19 +19,43 @@ package authz_test import ( + "io/ioutil" + "os" + "path" "testing" + "time" "google.golang.org/grpc/authz" ) +func createTmpPolicyFile(t *testing.T, dirSuffix string, policy []byte) string { + t.Helper() + + // Create a temp directory. Passing an empty string for the first argument + // uses the system temp directory. + dir, err := ioutil.TempDir("", dirSuffix) + if err != nil { + t.Fatalf("ioutil.TempDir() failed: %v", err) + } + t.Logf("Using tmpdir: %s", dir) + // Write policy into file. + filename := path.Join(dir, "policy.json") + if err := ioutil.WriteFile(filename, policy, os.ModePerm); err != nil { + t.Fatalf("ioutil.WriteFile(%q) failed: %v", filename, err) + } + t.Logf("Wrote file at: %s", filename) + t.Logf("%s", string(policy)) + return dir +} + func TestNewStatic(t *testing.T) { tests := map[string]struct { authzPolicy string - wantErr bool + wantErr string }{ "InvalidPolicyFailsToCreateInterceptor": { authzPolicy: `{}`, - wantErr: true, + wantErr: `"name" is not present`, }, "ValidPolicyCreatesInterceptor": { authzPolicy: `{ @@ -43,14 +67,51 @@ func TestNewStatic(t *testing.T) { } ] }`, - wantErr: false, }, } for name, test := range tests { t.Run(name, func(t *testing.T) { - if _, err := authz.NewStatic(test.authzPolicy); (err != nil) != test.wantErr { + if _, err := authz.NewStatic(test.authzPolicy); (err != nil) && (err.Error() != test.wantErr) { t.Fatalf("NewStatic(%v) returned err: %v, want err: %v", test.authzPolicy, err, test.wantErr) } }) } } + +func TestNewFileWatcher(t *testing.T) { + tests := map[string]struct { + authzPolicy string + refreshDuration time.Duration + wantErr string + }{ + "InvalidRefreshDurationFailsToCreateInterceptor": { + refreshDuration: time.Duration(0), + wantErr: "requires refresh interval(0s) greater than 0s", + }, + "InvalidPolicyFailsToCreateInterceptor": { + authzPolicy: `{}`, + refreshDuration: time.Duration(1), + wantErr: `"name" is not present`, + }, + "ValidPolicyCreatesInterceptor": { + authzPolicy: `{ + "name": "authz", + "allow_rules": + [ + { + "name": "allow_all" + } + ] + }`, + refreshDuration: time.Duration(1), + }, + } + for name, test := range tests { + t.Run(name, func(t *testing.T) { + dir := createTmpPolicyFile(t, name+"*", []byte(test.authzPolicy)) + if _, err := authz.NewFileWatcher(path.Join(dir, "policy.json"), test.refreshDuration); (err != nil) && (err.Error() != test.wantErr) { + t.Fatalf("NewFileWatcher(%v) returned err: %v, want err: %v", test.authzPolicy, err, test.wantErr) + } + }) + } +} From 7fde77342e84d188df04f2ff8ef2ef878af0bd71 Mon Sep 17 00:00:00 2001 From: Ashitha Santhosh Date: Thu, 23 Sep 2021 13:59:41 -0700 Subject: [PATCH 02/15] resolving comments --- authz/sdk_end2end_test.go | 101 ++++++++++++++++---------- authz/sdk_server_interceptors.go | 23 +++--- authz/sdk_server_interceptors_test.go | 22 +++--- 3 files changed, 85 insertions(+), 61 deletions(-) diff --git a/authz/sdk_end2end_test.go b/authz/sdk_end2end_test.go index 12dfd131e60..eac73bf740e 100644 --- a/authz/sdk_end2end_test.go +++ b/authz/sdk_end2end_test.go @@ -24,13 +24,13 @@ import ( "io/ioutil" "net" "os" - "path" "testing" "time" "google.golang.org/grpc" "google.golang.org/grpc/authz" "google.golang.org/grpc/codes" + "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" pb "google.golang.org/grpc/test/grpc_testing" @@ -56,6 +56,14 @@ func (s *testServer) StreamingInputCall(stream pb.TestService_StreamingInputCall } } +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + var sdkTests = map[string]struct { authzPolicy string md metadata.MD @@ -256,21 +264,22 @@ var sdkTests = map[string]struct { }, } -func TestSDKStaticPolicyEnd2End(t *testing.T) { +func (s) TestSDKStaticPolicyEnd2End(t *testing.T) { for name, test := range sdkTests { t.Run(name, func(t *testing.T) { // Start a gRPC server with SDK unary and stream server interceptors. i, _ := authz.NewStatic(test.authzPolicy) - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("error listening: %v", err) - } s := grpc.NewServer( grpc.ChainUnaryInterceptor(i.UnaryInterceptor), grpc.ChainStreamInterceptor(i.StreamInterceptor)) + defer s.Stop() pb.RegisterTestServiceServer(s, &testServer{}) + + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("error listening: %v", err) + } go s.Serve(lis) - defer s.Stop() // Establish a connection to the server. clientConn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure()) @@ -311,22 +320,25 @@ func TestSDKStaticPolicyEnd2End(t *testing.T) { } } -func TestSDKFileWatcherEnd2End(t *testing.T) { +func (s) TestSDKFileWatcherEnd2End(t *testing.T) { for name, test := range sdkTests { t.Run(name, func(t *testing.T) { - dir := createTmpPolicyFile(t, name+"*", []byte(test.authzPolicy)) - i, _ := authz.NewFileWatcher(path.Join(dir, "policy.json"), 1*time.Second) + file := createTmpPolicyFile(t, name, []byte(test.authzPolicy)) + i, _ := authz.NewFileWatcher(file, 1*time.Second) defer i.Close() // Start a gRPC server with SDK unary and stream server interceptors. - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("error listening: %v", err) - } s := grpc.NewServer( grpc.ChainUnaryInterceptor(i.UnaryInterceptor), grpc.ChainStreamInterceptor(i.StreamInterceptor)) + defer s.Stop() pb.RegisterTestServiceServer(s, &testServer{}) + + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("error listening: %v", err) + } + defer lis.Close() go s.Serve(lis) // Establish a connection to the server. @@ -368,20 +380,23 @@ func TestSDKFileWatcherEnd2End(t *testing.T) { } } -func TestSDKFileWatcher_ValidPolicyRefresh(t *testing.T) { +func (s) TestSDKFileWatcher_ValidPolicyRefresh(t *testing.T) { valid1 := sdkTests["DeniesRpcMatchInDenyAndAllow"] - dir := createTmpPolicyFile(t, "valid_policy_refresh*", []byte(valid1.authzPolicy)) - i, _ := authz.NewFileWatcher(path.Join(dir, "policy.json"), 1*time.Second) + file := createTmpPolicyFile(t, "valid_policy_refresh", []byte(valid1.authzPolicy)) + i, _ := authz.NewFileWatcher(file, 1*time.Second) defer i.Close() // Start a gRPC server with SDK unary server interceptor. + s := grpc.NewServer( + grpc.ChainUnaryInterceptor(i.UnaryInterceptor)) + defer s.Stop() + pb.RegisterTestServiceServer(s, &testServer{}) + lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("error listening: %v", err) } - s := grpc.NewServer( - grpc.ChainUnaryInterceptor(i.UnaryInterceptor)) - pb.RegisterTestServiceServer(s, &testServer{}) + defer lis.Close() go s.Serve(lis) // Establish a connection to the server. @@ -403,8 +418,8 @@ func TestSDKFileWatcher_ValidPolicyRefresh(t *testing.T) { // Rewrite the file with a different valid authorization policy. valid2 := sdkTests["AllowsRpcEmptyDenyMatchInAllow"] - if err := ioutil.WriteFile(path.Join(dir, "policy.json"), []byte(valid2.authzPolicy), os.ModePerm); err != nil { - t.Fatalf("ioutil.WriteFile(%q) failed: %v", path.Join(dir, "policy.json"), err) + if err := ioutil.WriteFile(file, []byte(valid2.authzPolicy), os.ModePerm); err != nil { + t.Fatalf("ioutil.WriteFile(%q) failed: %v", file, err) } // Wait 2 seconds for background go routine to read the updated files. time.Sleep(2 * time.Second) @@ -416,20 +431,23 @@ func TestSDKFileWatcher_ValidPolicyRefresh(t *testing.T) { } } -func TestSDKFileWatcher_InvalidPolicySkipReload(t *testing.T) { +func (s) TestSDKFileWatcher_InvalidPolicySkipReload(t *testing.T) { valid := sdkTests["DeniesRpcMatchInDenyAndAllow"] - dir := createTmpPolicyFile(t, "invalid_policy_skip_reload*", []byte(valid.authzPolicy)) - i, _ := authz.NewFileWatcher(path.Join(dir, "policy.json"), 1*time.Second) + file := createTmpPolicyFile(t, "invalid_policy_skip_reload", []byte(valid.authzPolicy)) + i, _ := authz.NewFileWatcher(file, 1*time.Second) defer i.Close() // Start a gRPC server with SDK unary server interceptors. + s := grpc.NewServer( + grpc.ChainUnaryInterceptor(i.UnaryInterceptor)) + defer s.Stop() + pb.RegisterTestServiceServer(s, &testServer{}) + lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("error listening: %v", err) } - s := grpc.NewServer( - grpc.ChainUnaryInterceptor(i.UnaryInterceptor)) - pb.RegisterTestServiceServer(s, &testServer{}) + defer lis.Close() go s.Serve(lis) // Establish a connection to the server. @@ -450,8 +468,8 @@ func TestSDKFileWatcher_InvalidPolicySkipReload(t *testing.T) { } // Skips the invalid policy update, and continues to use the valid policy. - if err := ioutil.WriteFile(path.Join(dir, "policy.json"), []byte("{}"), os.ModePerm); err != nil { - t.Fatalf("ioutil.WriteFile(%q) failed: %v", path.Join(dir, "policy.json"), err) + if err := ioutil.WriteFile(file, []byte("{}"), os.ModePerm); err != nil { + t.Fatalf("ioutil.WriteFile(%q) failed: %v", file, err) } // Wait 2 seconds for background go routine to read the updated files. time.Sleep(2 * time.Second) @@ -463,20 +481,23 @@ func TestSDKFileWatcher_InvalidPolicySkipReload(t *testing.T) { } } -func TestSDKFileWatcher_RecoversFromReloadFailure(t *testing.T) { +func (s) TestSDKFileWatcher_RecoversFromReloadFailure(t *testing.T) { valid1 := sdkTests["DeniesRpcMatchInDenyAndAllow"] - dir := createTmpPolicyFile(t, "recovers_from_reload_failure*", []byte(valid1.authzPolicy)) - i, _ := authz.NewFileWatcher(path.Join(dir, "policy.json"), 1*time.Second) + file := createTmpPolicyFile(t, "recovers_from_reload_failure", []byte(valid1.authzPolicy)) + i, _ := authz.NewFileWatcher(file, 1*time.Second) defer i.Close() // Start a gRPC server with SDK unary server interceptors. + s := grpc.NewServer( + grpc.ChainUnaryInterceptor(i.UnaryInterceptor)) + defer s.Stop() + pb.RegisterTestServiceServer(s, &testServer{}) + lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("error listening: %v", err) } - s := grpc.NewServer( - grpc.ChainUnaryInterceptor(i.UnaryInterceptor)) - pb.RegisterTestServiceServer(s, &testServer{}) + defer lis.Close() go s.Serve(lis) // Establish a connection to the server. @@ -497,8 +518,8 @@ func TestSDKFileWatcher_RecoversFromReloadFailure(t *testing.T) { } // Skips the invalid policy update, and continues to use the valid policy. - if err := ioutil.WriteFile(path.Join(dir, "policy.json"), []byte("{}"), os.ModePerm); err != nil { - t.Fatalf("ioutil.WriteFile(%q) failed: %v", path.Join(dir, "policy.json"), err) + if err := ioutil.WriteFile(file, []byte("{}"), os.ModePerm); err != nil { + t.Fatalf("ioutil.WriteFile(%q) failed: %v", file, err) } // Wait 2 seconds for background go routine to read the updated files. time.Sleep(2 * time.Second) @@ -511,8 +532,8 @@ func TestSDKFileWatcher_RecoversFromReloadFailure(t *testing.T) { // Rewrite the file with a different valid authorization policy. valid2 := sdkTests["AllowsRpcEmptyDenyMatchInAllow"] - if err := ioutil.WriteFile(path.Join(dir, "policy.json"), []byte(valid2.authzPolicy), os.ModePerm); err != nil { - t.Fatalf("ioutil.WriteFile(%q) failed: %v", path.Join(dir, "policy.json"), err) + if err := ioutil.WriteFile(file, []byte(valid2.authzPolicy), os.ModePerm); err != nil { + t.Fatalf("ioutil.WriteFile(%q) failed: %v", file, err) } // Wait 2 seconds for background go routine to read the updated files. time.Sleep(2 * time.Second) diff --git a/authz/sdk_server_interceptors.go b/authz/sdk_server_interceptors.go index 782fba5f855..70fa1f3eb64 100644 --- a/authz/sdk_server_interceptors.go +++ b/authz/sdk_server_interceptors.go @@ -32,7 +32,7 @@ import ( "google.golang.org/grpc/status" ) -var logger = grpclog.Component("sdk") +var logger = grpclog.Component("authz") // StaticInterceptor contains engines used to make authorization decisions. It // either contains two engines deny engine followed by an allow engine or only @@ -104,7 +104,9 @@ func NewFileWatcher(file string, duration time.Duration) (*FileWatcherIntercepto return nil, fmt.Errorf("requires refresh interval(%v) greater than 0s", duration) } i := &FileWatcherInterceptor{policyFile: file, refreshDuration: duration} - i.updateInternalInterceptor() + if err := i.updateInternalInterceptor(); err != nil { + return nil, err + } ctx, cancel := context.WithCancel(context.Background()) i.cancel = cancel // Create a background go routine for policy refresh. @@ -115,7 +117,9 @@ func NewFileWatcher(file string, duration time.Duration) (*FileWatcherIntercepto func (i *FileWatcherInterceptor) run(ctx context.Context) { ticker := time.NewTicker(i.refreshDuration) for { - i.updateInternalInterceptor() + if err := i.updateInternalInterceptor(); err != nil { + logger.Warningf("%v", err) + } select { case <-ctx.Done(): ticker.Stop() @@ -129,25 +133,24 @@ func (i *FileWatcherInterceptor) run(ctx context.Context) { // and if so, updates the internalInterceptor with the policy. Unlike the // constructor, if there is an error in reading the file or parsing the policy, the // previous internalInterceptors will not be replaced. -func (i *FileWatcherInterceptor) updateInternalInterceptor() { +func (i *FileWatcherInterceptor) updateInternalInterceptor() error { policyContents, err := ioutil.ReadFile(i.policyFile) if err != nil { - logger.Warningf("policyFile(%s) read failed: %v", i.policyFile, err) - return + return fmt.Errorf("policyFile(%s) read failed: %v", i.policyFile, err) } if bytes.Equal(i.policyContents, policyContents) { - return + return nil } interceptor, err := NewStatic(string(policyContents)) if err != nil { - logger.Warningf("failed to update authorization engines: %v", err) - return + return fmt.Errorf("failed to update authorization engines: %v", err) } atomic.StorePointer(&i.internalInterceptor, unsafe.Pointer(interceptor)) i.policyContents = policyContents + return nil } -// Close cleans up resources allocated by the interceptors. +// Close cleans up resources allocated by the interceptor. func (i *FileWatcherInterceptor) Close() { i.cancel() } diff --git a/authz/sdk_server_interceptors_test.go b/authz/sdk_server_interceptors_test.go index 092e216dd6e..d554d11859c 100644 --- a/authz/sdk_server_interceptors_test.go +++ b/authz/sdk_server_interceptors_test.go @@ -19,6 +19,7 @@ package authz_test import ( + "fmt" "io/ioutil" "os" "path" @@ -43,19 +44,18 @@ func createTmpPolicyFile(t *testing.T, dirSuffix string, policy []byte) string { if err := ioutil.WriteFile(filename, policy, os.ModePerm); err != nil { t.Fatalf("ioutil.WriteFile(%q) failed: %v", filename, err) } - t.Logf("Wrote file at: %s", filename) - t.Logf("%s", string(policy)) - return dir + t.Logf("Wrote policy %s to file at %s", string(policy), filename) + return filename } func TestNewStatic(t *testing.T) { tests := map[string]struct { authzPolicy string - wantErr string + wantErr error }{ "InvalidPolicyFailsToCreateInterceptor": { authzPolicy: `{}`, - wantErr: `"name" is not present`, + wantErr: fmt.Errorf(`"name" is not present`), }, "ValidPolicyCreatesInterceptor": { authzPolicy: `{ @@ -71,7 +71,7 @@ func TestNewStatic(t *testing.T) { } for name, test := range tests { t.Run(name, func(t *testing.T) { - if _, err := authz.NewStatic(test.authzPolicy); (err != nil) && (err.Error() != test.wantErr) { + if _, err := authz.NewStatic(test.authzPolicy); fmt.Sprint(err) != fmt.Sprint(test.wantErr) { t.Fatalf("NewStatic(%v) returned err: %v, want err: %v", test.authzPolicy, err, test.wantErr) } }) @@ -82,16 +82,16 @@ func TestNewFileWatcher(t *testing.T) { tests := map[string]struct { authzPolicy string refreshDuration time.Duration - wantErr string + wantErr error }{ "InvalidRefreshDurationFailsToCreateInterceptor": { refreshDuration: time.Duration(0), - wantErr: "requires refresh interval(0s) greater than 0s", + wantErr: fmt.Errorf("requires refresh interval(0s) greater than 0s"), }, "InvalidPolicyFailsToCreateInterceptor": { authzPolicy: `{}`, refreshDuration: time.Duration(1), - wantErr: `"name" is not present`, + wantErr: fmt.Errorf(`failed to update authorization engines: "name" is not present`), }, "ValidPolicyCreatesInterceptor": { authzPolicy: `{ @@ -108,8 +108,8 @@ func TestNewFileWatcher(t *testing.T) { } for name, test := range tests { t.Run(name, func(t *testing.T) { - dir := createTmpPolicyFile(t, name+"*", []byte(test.authzPolicy)) - if _, err := authz.NewFileWatcher(path.Join(dir, "policy.json"), test.refreshDuration); (err != nil) && (err.Error() != test.wantErr) { + file := createTmpPolicyFile(t, name, []byte(test.authzPolicy)) + if _, err := authz.NewFileWatcher(file, test.refreshDuration); fmt.Sprint(err) != fmt.Sprint(test.wantErr) { t.Fatalf("NewFileWatcher(%v) returned err: %v, want err: %v", test.authzPolicy, err, test.wantErr) } }) From 352a5d15e750c23097c5da5fa413bdf73734ef02 Mon Sep 17 00:00:00 2001 From: Ashitha Santhosh Date: Thu, 23 Sep 2021 23:11:52 -0700 Subject: [PATCH 03/15] Update test to detect memory leak --- authz/sdk_server_interceptors_test.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/authz/sdk_server_interceptors_test.go b/authz/sdk_server_interceptors_test.go index d554d11859c..24a440c85db 100644 --- a/authz/sdk_server_interceptors_test.go +++ b/authz/sdk_server_interceptors_test.go @@ -48,7 +48,7 @@ func createTmpPolicyFile(t *testing.T, dirSuffix string, policy []byte) string { return filename } -func TestNewStatic(t *testing.T) { +func (s) TestNewStatic(t *testing.T) { tests := map[string]struct { authzPolicy string wantErr error @@ -78,7 +78,7 @@ func TestNewStatic(t *testing.T) { } } -func TestNewFileWatcher(t *testing.T) { +func (s) TestNewFileWatcher(t *testing.T) { tests := map[string]struct { authzPolicy string refreshDuration time.Duration @@ -109,9 +109,13 @@ func TestNewFileWatcher(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { file := createTmpPolicyFile(t, name, []byte(test.authzPolicy)) - if _, err := authz.NewFileWatcher(file, test.refreshDuration); fmt.Sprint(err) != fmt.Sprint(test.wantErr) { + i, err := authz.NewFileWatcher(file, test.refreshDuration) + if fmt.Sprint(err) != fmt.Sprint(test.wantErr) { t.Fatalf("NewFileWatcher(%v) returned err: %v, want err: %v", test.authzPolicy, err, test.wantErr) } + if i != nil { + i.Close() + } }) } } From 3433e638bfbd40cef7041a3d4f81077349751ddf Mon Sep 17 00:00:00 2001 From: Ashitha Santhosh Date: Thu, 23 Sep 2021 23:37:27 -0700 Subject: [PATCH 04/15] update reload status logging, to also log when successful --- authz/sdk_server_interceptors.go | 7 +++---- authz/sdk_server_interceptors_test.go | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/authz/sdk_server_interceptors.go b/authz/sdk_server_interceptors.go index 70fa1f3eb64..1dcd4e7a4d0 100644 --- a/authz/sdk_server_interceptors.go +++ b/authz/sdk_server_interceptors.go @@ -117,9 +117,8 @@ func NewFileWatcher(file string, duration time.Duration) (*FileWatcherIntercepto func (i *FileWatcherInterceptor) run(ctx context.Context) { ticker := time.NewTicker(i.refreshDuration) for { - if err := i.updateInternalInterceptor(); err != nil { - logger.Warningf("%v", err) - } + err := i.updateInternalInterceptor() + logger.Infof("authorization policy reload status err: %v", err) select { case <-ctx.Done(): ticker.Stop() @@ -143,7 +142,7 @@ func (i *FileWatcherInterceptor) updateInternalInterceptor() error { } interceptor, err := NewStatic(string(policyContents)) if err != nil { - return fmt.Errorf("failed to update authorization engines: %v", err) + return err } atomic.StorePointer(&i.internalInterceptor, unsafe.Pointer(interceptor)) i.policyContents = policyContents diff --git a/authz/sdk_server_interceptors_test.go b/authz/sdk_server_interceptors_test.go index 24a440c85db..f43f9807612 100644 --- a/authz/sdk_server_interceptors_test.go +++ b/authz/sdk_server_interceptors_test.go @@ -91,7 +91,7 @@ func (s) TestNewFileWatcher(t *testing.T) { "InvalidPolicyFailsToCreateInterceptor": { authzPolicy: `{}`, refreshDuration: time.Duration(1), - wantErr: fmt.Errorf(`failed to update authorization engines: "name" is not present`), + wantErr: fmt.Errorf(`"name" is not present`), }, "ValidPolicyCreatesInterceptor": { authzPolicy: `{ From 3a2bdeb357399d39703a3112b67b6f65a34e0ee5 Mon Sep 17 00:00:00 2001 From: Ashitha Santhosh Date: Fri, 24 Sep 2021 15:26:53 -0700 Subject: [PATCH 05/15] remove ioutil --- authz/sdk_end2end_test.go | 17 ++++++++--------- authz/sdk_server_interceptors_test.go | 9 ++++----- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/authz/sdk_end2end_test.go b/authz/sdk_end2end_test.go index eac73bf740e..898921051c9 100644 --- a/authz/sdk_end2end_test.go +++ b/authz/sdk_end2end_test.go @@ -21,7 +21,6 @@ package authz_test import ( "context" "io" - "io/ioutil" "net" "os" "testing" @@ -418,8 +417,8 @@ func (s) TestSDKFileWatcher_ValidPolicyRefresh(t *testing.T) { // Rewrite the file with a different valid authorization policy. valid2 := sdkTests["AllowsRpcEmptyDenyMatchInAllow"] - if err := ioutil.WriteFile(file, []byte(valid2.authzPolicy), os.ModePerm); err != nil { - t.Fatalf("ioutil.WriteFile(%q) failed: %v", file, err) + if err := os.WriteFile(file, []byte(valid2.authzPolicy), os.ModePerm); err != nil { + t.Fatalf("os.WriteFile(%q) failed: %v", file, err) } // Wait 2 seconds for background go routine to read the updated files. time.Sleep(2 * time.Second) @@ -468,8 +467,8 @@ func (s) TestSDKFileWatcher_InvalidPolicySkipReload(t *testing.T) { } // Skips the invalid policy update, and continues to use the valid policy. - if err := ioutil.WriteFile(file, []byte("{}"), os.ModePerm); err != nil { - t.Fatalf("ioutil.WriteFile(%q) failed: %v", file, err) + if err := os.WriteFile(file, []byte("{}"), os.ModePerm); err != nil { + t.Fatalf("os.WriteFile(%q) failed: %v", file, err) } // Wait 2 seconds for background go routine to read the updated files. time.Sleep(2 * time.Second) @@ -518,8 +517,8 @@ func (s) TestSDKFileWatcher_RecoversFromReloadFailure(t *testing.T) { } // Skips the invalid policy update, and continues to use the valid policy. - if err := ioutil.WriteFile(file, []byte("{}"), os.ModePerm); err != nil { - t.Fatalf("ioutil.WriteFile(%q) failed: %v", file, err) + if err := os.WriteFile(file, []byte("{}"), os.ModePerm); err != nil { + t.Fatalf("os.WriteFile(%q) failed: %v", file, err) } // Wait 2 seconds for background go routine to read the updated files. time.Sleep(2 * time.Second) @@ -532,8 +531,8 @@ func (s) TestSDKFileWatcher_RecoversFromReloadFailure(t *testing.T) { // Rewrite the file with a different valid authorization policy. valid2 := sdkTests["AllowsRpcEmptyDenyMatchInAllow"] - if err := ioutil.WriteFile(file, []byte(valid2.authzPolicy), os.ModePerm); err != nil { - t.Fatalf("ioutil.WriteFile(%q) failed: %v", file, err) + if err := os.WriteFile(file, []byte(valid2.authzPolicy), os.ModePerm); err != nil { + t.Fatalf("os.WriteFile(%q) failed: %v", file, err) } // Wait 2 seconds for background go routine to read the updated files. time.Sleep(2 * time.Second) diff --git a/authz/sdk_server_interceptors_test.go b/authz/sdk_server_interceptors_test.go index f43f9807612..ae74c896d96 100644 --- a/authz/sdk_server_interceptors_test.go +++ b/authz/sdk_server_interceptors_test.go @@ -20,7 +20,6 @@ package authz_test import ( "fmt" - "io/ioutil" "os" "path" "testing" @@ -34,15 +33,15 @@ func createTmpPolicyFile(t *testing.T, dirSuffix string, policy []byte) string { // Create a temp directory. Passing an empty string for the first argument // uses the system temp directory. - dir, err := ioutil.TempDir("", dirSuffix) + dir, err := os.MkdirTemp("", dirSuffix) if err != nil { - t.Fatalf("ioutil.TempDir() failed: %v", err) + t.Fatalf("os.MkdirTemp() failed: %v", err) } t.Logf("Using tmpdir: %s", dir) // Write policy into file. filename := path.Join(dir, "policy.json") - if err := ioutil.WriteFile(filename, policy, os.ModePerm); err != nil { - t.Fatalf("ioutil.WriteFile(%q) failed: %v", filename, err) + if err := os.WriteFile(filename, policy, os.ModePerm); err != nil { + t.Fatalf("os.WriteFile(%q) failed: %v", filename, err) } t.Logf("Wrote policy %s to file at %s", string(policy), filename) return filename From 5a428277d3a0c61419567e0b6b0a48e9e59ad601 Mon Sep 17 00:00:00 2001 From: Ashitha Santhosh Date: Mon, 27 Sep 2021 14:40:20 -0700 Subject: [PATCH 06/15] Add loop in test --- authz/sdk_end2end_test.go | 80 ++++++++++++++++++++++++++++++--------- 1 file changed, 62 insertions(+), 18 deletions(-) diff --git a/authz/sdk_end2end_test.go b/authz/sdk_end2end_test.go index 898921051c9..d5b8f3ad36c 100644 --- a/authz/sdk_end2end_test.go +++ b/authz/sdk_end2end_test.go @@ -382,7 +382,7 @@ func (s) TestSDKFileWatcherEnd2End(t *testing.T) { func (s) TestSDKFileWatcher_ValidPolicyRefresh(t *testing.T) { valid1 := sdkTests["DeniesRpcMatchInDenyAndAllow"] file := createTmpPolicyFile(t, "valid_policy_refresh", []byte(valid1.authzPolicy)) - i, _ := authz.NewFileWatcher(file, 1*time.Second) + i, _ := authz.NewFileWatcher(file, 10*time.Millisecond) defer i.Close() // Start a gRPC server with SDK unary server interceptor. @@ -420,12 +420,23 @@ func (s) TestSDKFileWatcher_ValidPolicyRefresh(t *testing.T) { if err := os.WriteFile(file, []byte(valid2.authzPolicy), os.ModePerm); err != nil { t.Fatalf("os.WriteFile(%q) failed: %v", file, err) } - // Wait 2 seconds for background go routine to read the updated files. - time.Sleep(2 * time.Second) + // Wait 30 ms for background go routine to read the updated files. + time.Sleep(30 * time.Millisecond) // Verifying authorization decision. _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) - if got := status.Convert(err); got.Code() != valid2.wantStatusCode || got.Message() != valid2.wantErr { + numRetries := 0 + reloadSuccess := false + got := status.Convert(err) + for numRetries <= 20 { + if got.Code() == valid2.wantStatusCode && got.Message() == valid2.wantErr { + reloadSuccess = true + break + } + time.Sleep(10 * time.Millisecond) + numRetries++ + } + if reloadSuccess == false { t.Fatalf("error want:{%v %v} got:{%v %v}", valid2.wantStatusCode, valid2.wantErr, got.Code(), got.Message()) } } @@ -433,7 +444,7 @@ func (s) TestSDKFileWatcher_ValidPolicyRefresh(t *testing.T) { func (s) TestSDKFileWatcher_InvalidPolicySkipReload(t *testing.T) { valid := sdkTests["DeniesRpcMatchInDenyAndAllow"] file := createTmpPolicyFile(t, "invalid_policy_skip_reload", []byte(valid.authzPolicy)) - i, _ := authz.NewFileWatcher(file, 1*time.Second) + i, _ := authz.NewFileWatcher(file, 10*time.Millisecond) defer i.Close() // Start a gRPC server with SDK unary server interceptors. @@ -470,20 +481,31 @@ func (s) TestSDKFileWatcher_InvalidPolicySkipReload(t *testing.T) { if err := os.WriteFile(file, []byte("{}"), os.ModePerm); err != nil { t.Fatalf("os.WriteFile(%q) failed: %v", file, err) } - // Wait 2 seconds for background go routine to read the updated files. - time.Sleep(2 * time.Second) + // Wait 30 ms for background go routine to read the updated files. + time.Sleep(30 * time.Millisecond) // Verifying authorization decision. _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) - if got := status.Convert(err); got.Code() != valid.wantStatusCode || got.Message() != valid.wantErr { - t.Fatalf("[UnaryCall] error want:{%v %v} got:{%v %v}", valid.wantStatusCode, valid.wantErr, got.Code(), got.Message()) + numRetries := 0 + skipReload := true + for numRetries <= 20 { + if got := status.Convert(err); got.Code() != valid.wantStatusCode || got.Message() != valid.wantErr { + skipReload = false + break + } + time.Sleep(10 * time.Millisecond) + numRetries++ + } + if skipReload == false { + got := status.Convert(err) + t.Fatalf("error want:{%v %v} got:{%v %v}", valid.wantStatusCode, valid.wantErr, got.Code(), got.Message()) } } func (s) TestSDKFileWatcher_RecoversFromReloadFailure(t *testing.T) { valid1 := sdkTests["DeniesRpcMatchInDenyAndAllow"] file := createTmpPolicyFile(t, "recovers_from_reload_failure", []byte(valid1.authzPolicy)) - i, _ := authz.NewFileWatcher(file, 1*time.Second) + i, _ := authz.NewFileWatcher(file, 10*time.Millisecond) defer i.Close() // Start a gRPC server with SDK unary server interceptors. @@ -520,13 +542,24 @@ func (s) TestSDKFileWatcher_RecoversFromReloadFailure(t *testing.T) { if err := os.WriteFile(file, []byte("{}"), os.ModePerm); err != nil { t.Fatalf("os.WriteFile(%q) failed: %v", file, err) } - // Wait 2 seconds for background go routine to read the updated files. - time.Sleep(2 * time.Second) + // Wait 30 ms for background go routine to read the updated files. + time.Sleep(30 * time.Millisecond) // Verifying authorization decision. _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) - if got := status.Convert(err); got.Code() != valid1.wantStatusCode || got.Message() != valid1.wantErr { - t.Fatalf("[UnaryCall] error want:{%v %v} got:{%v %v}", valid1.wantStatusCode, valid1.wantErr, got.Code(), got.Message()) + numRetries := 0 + skipReload := true + for numRetries <= 20 { + if got := status.Convert(err); got.Code() != valid1.wantStatusCode || got.Message() != valid1.wantErr { + skipReload = false + break + } + time.Sleep(10 * time.Millisecond) + numRetries++ + } + if skipReload == false { + got := status.Convert(err) + t.Fatalf("error want:{%v %v} got:{%v %v}", valid1.wantStatusCode, valid1.wantErr, got.Code(), got.Message()) } // Rewrite the file with a different valid authorization policy. @@ -534,12 +567,23 @@ func (s) TestSDKFileWatcher_RecoversFromReloadFailure(t *testing.T) { if err := os.WriteFile(file, []byte(valid2.authzPolicy), os.ModePerm); err != nil { t.Fatalf("os.WriteFile(%q) failed: %v", file, err) } - // Wait 2 seconds for background go routine to read the updated files. - time.Sleep(2 * time.Second) + // Wait 30 ms for background go routine to read the updated files. + time.Sleep(30 * time.Millisecond) // Verifying authorization decision. _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) - if got := status.Convert(err); got.Code() != valid2.wantStatusCode || got.Message() != valid2.wantErr { - t.Fatalf("[UnaryCall] error want:{%v %v} got:{%v %v}", valid2.wantStatusCode, valid2.wantErr, got.Code(), got.Message()) + numRetries = 0 + reloadSuccess := false + got := status.Convert(err) + for numRetries <= 20 { + if got.Code() == valid2.wantStatusCode && got.Message() == valid2.wantErr { + reloadSuccess = true + break + } + time.Sleep(10 * time.Millisecond) + numRetries++ + } + if reloadSuccess == false { + t.Fatalf("error want:{%v %v} got:{%v %v}", valid2.wantStatusCode, valid2.wantErr, got.Code(), got.Message()) } } From 8c02df5ffd60335493eae85e73e2435d1cfeed23 Mon Sep 17 00:00:00 2001 From: Ashitha Santhosh Date: Mon, 27 Sep 2021 14:43:15 -0700 Subject: [PATCH 07/15] Remove ioutil from implementation --- authz/sdk_server_interceptors.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/authz/sdk_server_interceptors.go b/authz/sdk_server_interceptors.go index 1dcd4e7a4d0..65bb6ca97d3 100644 --- a/authz/sdk_server_interceptors.go +++ b/authz/sdk_server_interceptors.go @@ -20,7 +20,7 @@ import ( "bytes" "context" "fmt" - "io/ioutil" + "os" "sync/atomic" "time" "unsafe" @@ -133,7 +133,7 @@ func (i *FileWatcherInterceptor) run(ctx context.Context) { // constructor, if there is an error in reading the file or parsing the policy, the // previous internalInterceptors will not be replaced. func (i *FileWatcherInterceptor) updateInternalInterceptor() error { - policyContents, err := ioutil.ReadFile(i.policyFile) + policyContents, err := os.ReadFile(i.policyFile) if err != nil { return fmt.Errorf("policyFile(%s) read failed: %v", i.policyFile, err) } From 84b6949580d554ee44d5acad97839954643de98a Mon Sep 17 00:00:00 2001 From: Ashitha Santhosh Date: Mon, 27 Sep 2021 14:49:11 -0700 Subject: [PATCH 08/15] refactor --- authz/sdk_end2end_test.go | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/authz/sdk_end2end_test.go b/authz/sdk_end2end_test.go index d5b8f3ad36c..fe5d9d2e236 100644 --- a/authz/sdk_end2end_test.go +++ b/authz/sdk_end2end_test.go @@ -487,19 +487,13 @@ func (s) TestSDKFileWatcher_InvalidPolicySkipReload(t *testing.T) { // Verifying authorization decision. _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) numRetries := 0 - skipReload := true for numRetries <= 20 { if got := status.Convert(err); got.Code() != valid.wantStatusCode || got.Message() != valid.wantErr { - skipReload = false - break + t.Fatalf("error want:{%v %v} got:{%v %v}", valid.wantStatusCode, valid.wantErr, got.Code(), got.Message()) } time.Sleep(10 * time.Millisecond) numRetries++ } - if skipReload == false { - got := status.Convert(err) - t.Fatalf("error want:{%v %v} got:{%v %v}", valid.wantStatusCode, valid.wantErr, got.Code(), got.Message()) - } } func (s) TestSDKFileWatcher_RecoversFromReloadFailure(t *testing.T) { @@ -548,19 +542,13 @@ func (s) TestSDKFileWatcher_RecoversFromReloadFailure(t *testing.T) { // Verifying authorization decision. _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) numRetries := 0 - skipReload := true for numRetries <= 20 { if got := status.Convert(err); got.Code() != valid1.wantStatusCode || got.Message() != valid1.wantErr { - skipReload = false - break + t.Fatalf("error want:{%v %v} got:{%v %v}", valid1.wantStatusCode, valid1.wantErr, got.Code(), got.Message()) } time.Sleep(10 * time.Millisecond) numRetries++ } - if skipReload == false { - got := status.Convert(err) - t.Fatalf("error want:{%v %v} got:{%v %v}", valid1.wantStatusCode, valid1.wantErr, got.Code(), got.Message()) - } // Rewrite the file with a different valid authorization policy. valid2 := sdkTests["AllowsRpcEmptyDenyMatchInAllow"] From 4698d195aa646275de66828e3f9d312118f09f5d Mon Sep 17 00:00:00 2001 From: Ashitha Santhosh Date: Tue, 28 Sep 2021 16:12:00 -0700 Subject: [PATCH 09/15] Fix tests --- authz/sdk_end2end_test.go | 42 ++++++++++++++++----------------------- 1 file changed, 17 insertions(+), 25 deletions(-) diff --git a/authz/sdk_end2end_test.go b/authz/sdk_end2end_test.go index fe5d9d2e236..4fd60452b3d 100644 --- a/authz/sdk_end2end_test.go +++ b/authz/sdk_end2end_test.go @@ -382,7 +382,7 @@ func (s) TestSDKFileWatcherEnd2End(t *testing.T) { func (s) TestSDKFileWatcher_ValidPolicyRefresh(t *testing.T) { valid1 := sdkTests["DeniesRpcMatchInDenyAndAllow"] file := createTmpPolicyFile(t, "valid_policy_refresh", []byte(valid1.authzPolicy)) - i, _ := authz.NewFileWatcher(file, 10*time.Millisecond) + i, _ := authz.NewFileWatcher(file, 100*time.Millisecond) defer i.Close() // Start a gRPC server with SDK unary server interceptor. @@ -420,31 +420,29 @@ func (s) TestSDKFileWatcher_ValidPolicyRefresh(t *testing.T) { if err := os.WriteFile(file, []byte(valid2.authzPolicy), os.ModePerm); err != nil { t.Fatalf("os.WriteFile(%q) failed: %v", file, err) } - // Wait 30 ms for background go routine to read the updated files. - time.Sleep(30 * time.Millisecond) // Verifying authorization decision. - _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) numRetries := 0 reloadSuccess := false - got := status.Convert(err) + var gotStatus *status.Status for numRetries <= 20 { - if got.Code() == valid2.wantStatusCode && got.Message() == valid2.wantErr { + _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) + if gotStatus = status.Convert(err); gotStatus.Code() == valid2.wantStatusCode && gotStatus.Message() == valid2.wantErr { reloadSuccess = true break } - time.Sleep(10 * time.Millisecond) + time.Sleep(100 * time.Millisecond) numRetries++ } if reloadSuccess == false { - t.Fatalf("error want:{%v %v} got:{%v %v}", valid2.wantStatusCode, valid2.wantErr, got.Code(), got.Message()) + t.Fatalf("error want:{%v %v} got:{%v %v}", valid2.wantStatusCode, valid2.wantErr, gotStatus.Code(), gotStatus.Message()) } } func (s) TestSDKFileWatcher_InvalidPolicySkipReload(t *testing.T) { valid := sdkTests["DeniesRpcMatchInDenyAndAllow"] file := createTmpPolicyFile(t, "invalid_policy_skip_reload", []byte(valid.authzPolicy)) - i, _ := authz.NewFileWatcher(file, 10*time.Millisecond) + i, _ := authz.NewFileWatcher(file, 100*time.Millisecond) defer i.Close() // Start a gRPC server with SDK unary server interceptors. @@ -481,17 +479,15 @@ func (s) TestSDKFileWatcher_InvalidPolicySkipReload(t *testing.T) { if err := os.WriteFile(file, []byte("{}"), os.ModePerm); err != nil { t.Fatalf("os.WriteFile(%q) failed: %v", file, err) } - // Wait 30 ms for background go routine to read the updated files. - time.Sleep(30 * time.Millisecond) // Verifying authorization decision. - _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) numRetries := 0 for numRetries <= 20 { + _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) if got := status.Convert(err); got.Code() != valid.wantStatusCode || got.Message() != valid.wantErr { t.Fatalf("error want:{%v %v} got:{%v %v}", valid.wantStatusCode, valid.wantErr, got.Code(), got.Message()) } - time.Sleep(10 * time.Millisecond) + time.Sleep(100 * time.Millisecond) numRetries++ } } @@ -499,7 +495,7 @@ func (s) TestSDKFileWatcher_InvalidPolicySkipReload(t *testing.T) { func (s) TestSDKFileWatcher_RecoversFromReloadFailure(t *testing.T) { valid1 := sdkTests["DeniesRpcMatchInDenyAndAllow"] file := createTmpPolicyFile(t, "recovers_from_reload_failure", []byte(valid1.authzPolicy)) - i, _ := authz.NewFileWatcher(file, 10*time.Millisecond) + i, _ := authz.NewFileWatcher(file, 100*time.Millisecond) defer i.Close() // Start a gRPC server with SDK unary server interceptors. @@ -536,17 +532,15 @@ func (s) TestSDKFileWatcher_RecoversFromReloadFailure(t *testing.T) { if err := os.WriteFile(file, []byte("{}"), os.ModePerm); err != nil { t.Fatalf("os.WriteFile(%q) failed: %v", file, err) } - // Wait 30 ms for background go routine to read the updated files. - time.Sleep(30 * time.Millisecond) // Verifying authorization decision. - _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) numRetries := 0 for numRetries <= 20 { + _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) if got := status.Convert(err); got.Code() != valid1.wantStatusCode || got.Message() != valid1.wantErr { t.Fatalf("error want:{%v %v} got:{%v %v}", valid1.wantStatusCode, valid1.wantErr, got.Code(), got.Message()) } - time.Sleep(10 * time.Millisecond) + time.Sleep(100 * time.Millisecond) numRetries++ } @@ -555,23 +549,21 @@ func (s) TestSDKFileWatcher_RecoversFromReloadFailure(t *testing.T) { if err := os.WriteFile(file, []byte(valid2.authzPolicy), os.ModePerm); err != nil { t.Fatalf("os.WriteFile(%q) failed: %v", file, err) } - // Wait 30 ms for background go routine to read the updated files. - time.Sleep(30 * time.Millisecond) // Verifying authorization decision. - _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) numRetries = 0 reloadSuccess := false - got := status.Convert(err) + var gotStatus *status.Status for numRetries <= 20 { - if got.Code() == valid2.wantStatusCode && got.Message() == valid2.wantErr { + _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) + if gotStatus = status.Convert(err); gotStatus.Code() == valid2.wantStatusCode && gotStatus.Message() == valid2.wantErr { reloadSuccess = true break } - time.Sleep(10 * time.Millisecond) + time.Sleep(100 * time.Millisecond) numRetries++ } if reloadSuccess == false { - t.Fatalf("error want:{%v %v} got:{%v %v}", valid2.wantStatusCode, valid2.wantErr, got.Code(), got.Message()) + t.Fatalf("error want:{%v %v} got:{%v %v}", valid2.wantStatusCode, valid2.wantErr, gotStatus.Code(), gotStatus.Message()) } } From 046854ddc2a438503cf5452783d0ad95c456d24e Mon Sep 17 00:00:00 2001 From: Ashitha Santhosh Date: Tue, 28 Sep 2021 16:45:03 -0700 Subject: [PATCH 10/15] Update reload logs --- authz/sdk_server_interceptors.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/authz/sdk_server_interceptors.go b/authz/sdk_server_interceptors.go index 65bb6ca97d3..faf1ed2e1f7 100644 --- a/authz/sdk_server_interceptors.go +++ b/authz/sdk_server_interceptors.go @@ -117,8 +117,9 @@ func NewFileWatcher(file string, duration time.Duration) (*FileWatcherIntercepto func (i *FileWatcherInterceptor) run(ctx context.Context) { ticker := time.NewTicker(i.refreshDuration) for { - err := i.updateInternalInterceptor() - logger.Infof("authorization policy reload status err: %v", err) + if err := i.updateInternalInterceptor(); err != nil { + logger.Warningf("authorization policy reload status err: %v", err) + } select { case <-ctx.Done(): ticker.Stop() @@ -140,12 +141,14 @@ func (i *FileWatcherInterceptor) updateInternalInterceptor() error { if bytes.Equal(i.policyContents, policyContents) { return nil } - interceptor, err := NewStatic(string(policyContents)) + policyContentsString := string(policyContents) + interceptor, err := NewStatic(policyContentsString) if err != nil { return err } atomic.StorePointer(&i.internalInterceptor, unsafe.Pointer(interceptor)) i.policyContents = policyContents + logger.Infof("authorization policy reload status: successfully loaded new policy %v", policyContentsString) return nil } From 27350e2411c1d994cfd0c51d5a8259f5d079fb46 Mon Sep 17 00:00:00 2001 From: Ashitha Santhosh Date: Wed, 29 Sep 2021 12:17:31 -0700 Subject: [PATCH 11/15] Resolving comments --- authz/sdk_end2end_test.go | 76 ++++++++++++++------------------ authz/sdk_server_interceptors.go | 14 +++--- 2 files changed, 40 insertions(+), 50 deletions(-) diff --git a/authz/sdk_end2end_test.go b/authz/sdk_end2end_test.go index 4fd60452b3d..b8a1a537ff1 100644 --- a/authz/sdk_end2end_test.go +++ b/authz/sdk_end2end_test.go @@ -379,6 +379,30 @@ func (s) TestSDKFileWatcherEnd2End(t *testing.T) { } } +func verifyValidRefresh(ctx context.Context, tsc pb.TestServiceClient, wantCode codes.Code, wantErr string) (lastStatus *status.Status) { + for numRetries := 0; numRetries <= 20; numRetries++ { + _, err := tsc.UnaryCall(ctx, &pb.SimpleRequest{}) + if lastStatus = status.Convert(err); lastStatus.Code() == wantCode && lastStatus.Message() == wantErr { + return nil + } + time.Sleep(20 * time.Millisecond) + numRetries++ + } + return lastStatus +} + +func verifySkipReload(ctx context.Context, tsc pb.TestServiceClient, wantCode codes.Code, wantErr string) (lastStatus *status.Status) { + for numRetries := 0; numRetries <= 20; numRetries++ { + _, err := tsc.UnaryCall(ctx, &pb.SimpleRequest{}) + if lastStatus := status.Convert(err); lastStatus.Code() != wantCode || lastStatus.Message() != wantErr { + return lastStatus + } + time.Sleep(20 * time.Millisecond) + numRetries++ + } + return nil +} + func (s) TestSDKFileWatcher_ValidPolicyRefresh(t *testing.T) { valid1 := sdkTests["DeniesRpcMatchInDenyAndAllow"] file := createTmpPolicyFile(t, "valid_policy_refresh", []byte(valid1.authzPolicy)) @@ -422,20 +446,8 @@ func (s) TestSDKFileWatcher_ValidPolicyRefresh(t *testing.T) { } // Verifying authorization decision. - numRetries := 0 - reloadSuccess := false - var gotStatus *status.Status - for numRetries <= 20 { - _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) - if gotStatus = status.Convert(err); gotStatus.Code() == valid2.wantStatusCode && gotStatus.Message() == valid2.wantErr { - reloadSuccess = true - break - } - time.Sleep(100 * time.Millisecond) - numRetries++ - } - if reloadSuccess == false { - t.Fatalf("error want:{%v %v} got:{%v %v}", valid2.wantStatusCode, valid2.wantErr, gotStatus.Code(), gotStatus.Message()) + if got := verifyValidRefresh(ctx, client, valid2.wantStatusCode, valid2.wantErr); got != nil { + t.Fatalf("error want:{%v %v} got:{%v %v}", valid2.wantStatusCode, valid2.wantErr, got.Code(), got.Message()) } } @@ -481,14 +493,8 @@ func (s) TestSDKFileWatcher_InvalidPolicySkipReload(t *testing.T) { } // Verifying authorization decision. - numRetries := 0 - for numRetries <= 20 { - _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) - if got := status.Convert(err); got.Code() != valid.wantStatusCode || got.Message() != valid.wantErr { - t.Fatalf("error want:{%v %v} got:{%v %v}", valid.wantStatusCode, valid.wantErr, got.Code(), got.Message()) - } - time.Sleep(100 * time.Millisecond) - numRetries++ + if got := verifySkipReload(ctx, client, valid.wantStatusCode, valid.wantErr); got != nil { + t.Fatalf("error want:{%v %v} got:{%v %v}", valid.wantStatusCode, valid.wantErr, got.Code(), got.Message()) } } @@ -534,14 +540,8 @@ func (s) TestSDKFileWatcher_RecoversFromReloadFailure(t *testing.T) { } // Verifying authorization decision. - numRetries := 0 - for numRetries <= 20 { - _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) - if got := status.Convert(err); got.Code() != valid1.wantStatusCode || got.Message() != valid1.wantErr { - t.Fatalf("error want:{%v %v} got:{%v %v}", valid1.wantStatusCode, valid1.wantErr, got.Code(), got.Message()) - } - time.Sleep(100 * time.Millisecond) - numRetries++ + if got := verifySkipReload(ctx, client, valid1.wantStatusCode, valid1.wantErr); got != nil { + t.Fatalf("error want:{%v %v} got:{%v %v}", valid1.wantStatusCode, valid1.wantErr, got.Code(), got.Message()) } // Rewrite the file with a different valid authorization policy. @@ -551,19 +551,7 @@ func (s) TestSDKFileWatcher_RecoversFromReloadFailure(t *testing.T) { } // Verifying authorization decision. - numRetries = 0 - reloadSuccess := false - var gotStatus *status.Status - for numRetries <= 20 { - _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) - if gotStatus = status.Convert(err); gotStatus.Code() == valid2.wantStatusCode && gotStatus.Message() == valid2.wantErr { - reloadSuccess = true - break - } - time.Sleep(100 * time.Millisecond) - numRetries++ - } - if reloadSuccess == false { - t.Fatalf("error want:{%v %v} got:{%v %v}", valid2.wantStatusCode, valid2.wantErr, gotStatus.Code(), gotStatus.Message()) + if got := verifyValidRefresh(ctx, client, valid2.wantStatusCode, valid2.wantErr); got != nil { + t.Fatalf("error want:{%v %v} got:{%v %v}", valid2.wantStatusCode, valid2.wantErr, got.Code(), got.Message()) } } diff --git a/authz/sdk_server_interceptors.go b/authz/sdk_server_interceptors.go index faf1ed2e1f7..d5d57b2ba53 100644 --- a/authz/sdk_server_interceptors.go +++ b/authz/sdk_server_interceptors.go @@ -86,11 +86,12 @@ func (i *StaticInterceptor) StreamInterceptor(srv interface{}, ss grpc.ServerStr // FileWatcherInterceptor contains details used to make authorization decisions // by watching a file path that contains authorization policy in JSON format. type FileWatcherInterceptor struct { - internalInterceptor unsafe.Pointer // *StaticInterceptor - policyFile string - policyContents []byte - refreshDuration time.Duration - cancel context.CancelFunc + internalInterceptor unsafe.Pointer // *StaticInterceptor + policyFile string + policyContents []byte + latestValidPolicyContents []byte + refreshDuration time.Duration + cancel context.CancelFunc } // NewFileWatcher returns a new FileWatcherInterceptor from a policy file @@ -141,13 +142,14 @@ func (i *FileWatcherInterceptor) updateInternalInterceptor() error { if bytes.Equal(i.policyContents, policyContents) { return nil } + i.policyContents = policyContents policyContentsString := string(policyContents) interceptor, err := NewStatic(policyContentsString) if err != nil { return err } atomic.StorePointer(&i.internalInterceptor, unsafe.Pointer(interceptor)) - i.policyContents = policyContents + i.latestValidPolicyContents = i.policyContents logger.Infof("authorization policy reload status: successfully loaded new policy %v", policyContentsString) return nil } From fb9bf96dded14ec84a11a8d1fad7418d52b417fe Mon Sep 17 00:00:00 2001 From: Ashitha Santhosh Date: Wed, 29 Sep 2021 13:05:08 -0700 Subject: [PATCH 12/15] rename function --- authz/sdk_end2end_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/authz/sdk_end2end_test.go b/authz/sdk_end2end_test.go index b8a1a537ff1..32fc157157a 100644 --- a/authz/sdk_end2end_test.go +++ b/authz/sdk_end2end_test.go @@ -379,7 +379,7 @@ func (s) TestSDKFileWatcherEnd2End(t *testing.T) { } } -func verifyValidRefresh(ctx context.Context, tsc pb.TestServiceClient, wantCode codes.Code, wantErr string) (lastStatus *status.Status) { +func verifyValidReload(ctx context.Context, tsc pb.TestServiceClient, wantCode codes.Code, wantErr string) (lastStatus *status.Status) { for numRetries := 0; numRetries <= 20; numRetries++ { _, err := tsc.UnaryCall(ctx, &pb.SimpleRequest{}) if lastStatus = status.Convert(err); lastStatus.Code() == wantCode && lastStatus.Message() == wantErr { @@ -446,7 +446,7 @@ func (s) TestSDKFileWatcher_ValidPolicyRefresh(t *testing.T) { } // Verifying authorization decision. - if got := verifyValidRefresh(ctx, client, valid2.wantStatusCode, valid2.wantErr); got != nil { + if got := verifyValidReload(ctx, client, valid2.wantStatusCode, valid2.wantErr); got != nil { t.Fatalf("error want:{%v %v} got:{%v %v}", valid2.wantStatusCode, valid2.wantErr, got.Code(), got.Message()) } } @@ -551,7 +551,7 @@ func (s) TestSDKFileWatcher_RecoversFromReloadFailure(t *testing.T) { } // Verifying authorization decision. - if got := verifyValidRefresh(ctx, client, valid2.wantStatusCode, valid2.wantErr); got != nil { + if got := verifyValidReload(ctx, client, valid2.wantStatusCode, valid2.wantErr); got != nil { t.Fatalf("error want:{%v %v} got:{%v %v}", valid2.wantStatusCode, valid2.wantErr, got.Code(), got.Message()) } } From 52b49e81075f4d8eb24be5ffd486fba230ce7590 Mon Sep 17 00:00:00 2001 From: Ashitha Santhosh Date: Wed, 29 Sep 2021 15:02:39 -0700 Subject: [PATCH 13/15] Update test --- authz/sdk_end2end_test.go | 106 ++++++++++++++----------------- authz/sdk_server_interceptors.go | 12 ++-- 2 files changed, 53 insertions(+), 65 deletions(-) diff --git a/authz/sdk_end2end_test.go b/authz/sdk_end2end_test.go index 32fc157157a..13653809062 100644 --- a/authz/sdk_end2end_test.go +++ b/authz/sdk_end2end_test.go @@ -64,10 +64,9 @@ func Test(t *testing.T) { } var sdkTests = map[string]struct { - authzPolicy string - md metadata.MD - wantStatusCode codes.Code - wantErr string + authzPolicy string + md metadata.MD + wantStatus *status.Status }{ "DeniesRpcMatchInDenyNoMatchInAllow": { authzPolicy: `{ @@ -109,9 +108,8 @@ var sdkTests = map[string]struct { } ] }`, - md: metadata.Pairs("key-abc", "val-abc"), - wantStatusCode: codes.PermissionDenied, - wantErr: "unauthorized RPC request rejected", + md: metadata.Pairs("key-abc", "val-abc"), + wantStatus: status.New(codes.PermissionDenied, "unauthorized RPC request rejected"), }, "DeniesRpcMatchInDenyAndAllow": { authzPolicy: `{ @@ -141,8 +139,7 @@ var sdkTests = map[string]struct { } ] }`, - wantStatusCode: codes.PermissionDenied, - wantErr: "unauthorized RPC request rejected", + wantStatus: status.New(codes.PermissionDenied, "unauthorized RPC request rejected"), }, "AllowsRpcNoMatchInDenyMatchInAllow": { authzPolicy: `{ @@ -178,10 +175,10 @@ var sdkTests = map[string]struct { } ] }`, - md: metadata.Pairs("key-xyz", "val-xyz"), - wantStatusCode: codes.OK, + md: metadata.Pairs("key-xyz", "val-xyz"), + wantStatus: status.New(codes.OK, ""), }, - "AllowsRpcNoMatchInDenyAndAllow": { + "DeniesRpcNoMatchInDenyAndAllow": { authzPolicy: `{ "name": "authz", "allow_rules": @@ -209,8 +206,7 @@ var sdkTests = map[string]struct { } ] }`, - wantStatusCode: codes.PermissionDenied, - wantErr: "unauthorized RPC request rejected", + wantStatus: status.New(codes.PermissionDenied, "unauthorized RPC request rejected"), }, "AllowsRpcEmptyDenyMatchInAllow": { authzPolicy: `{ @@ -239,7 +235,7 @@ var sdkTests = map[string]struct { } ] }`, - wantStatusCode: codes.OK, + wantStatus: status.New(codes.OK, ""), }, "DeniesRpcEmptyDenyNoMatchInAllow": { authzPolicy: `{ @@ -258,8 +254,7 @@ var sdkTests = map[string]struct { } ] }`, - wantStatusCode: codes.PermissionDenied, - wantErr: "unauthorized RPC request rejected", + wantStatus: status.New(codes.PermissionDenied, "unauthorized RPC request rejected"), }, } @@ -294,8 +289,8 @@ func (s) TestSDKStaticPolicyEnd2End(t *testing.T) { // Verifying authorization decision for Unary RPC. _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) - if got := status.Convert(err); got.Code() != test.wantStatusCode || got.Message() != test.wantErr { - t.Fatalf("[UnaryCall] error want:{%v %v} got:{%v %v}", test.wantStatusCode, test.wantErr, got.Code(), got.Message()) + if got := status.Convert(err); got.Code() != test.wantStatus.Code() || got.Message() != test.wantStatus.Message() { + t.Fatalf("[UnaryCall] error want:{%v} got:{%v}", test.wantStatus.Err(), got.Err()) } // Verifying authorization decision for Streaming RPC. @@ -312,8 +307,8 @@ func (s) TestSDKStaticPolicyEnd2End(t *testing.T) { t.Fatalf("failed stream.Send err: %v", err) } _, err = stream.CloseAndRecv() - if got := status.Convert(err); got.Code() != test.wantStatusCode || got.Message() != test.wantErr { - t.Fatalf("[StreamingCall] error want:{%v %v} got:{%v %v}", test.wantStatusCode, test.wantErr, got.Code(), got.Message()) + if got := status.Convert(err); got.Code() != test.wantStatus.Code() || got.Message() != test.wantStatus.Message() { + t.Fatalf("[StreamingCall] error want:{%v} got:{%v}", test.wantStatus.Err(), got.Err()) } }) } @@ -354,8 +349,8 @@ func (s) TestSDKFileWatcherEnd2End(t *testing.T) { // Verifying authorization decision for Unary RPC. _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) - if got := status.Convert(err); got.Code() != test.wantStatusCode || got.Message() != test.wantErr { - t.Fatalf("[UnaryCall] error want:{%v %v} got:{%v %v}", test.wantStatusCode, test.wantErr, got.Code(), got.Message()) + if got := status.Convert(err); got.Code() != test.wantStatus.Code() || got.Message() != test.wantStatus.Message() { + t.Fatalf("[UnaryCall] error want:{%v} got:{%v}", test.wantStatus.Err(), got.Err()) } // Verifying authorization decision for Streaming RPC. @@ -372,35 +367,22 @@ func (s) TestSDKFileWatcherEnd2End(t *testing.T) { t.Fatalf("failed stream.Send err: %v", err) } _, err = stream.CloseAndRecv() - if got := status.Convert(err); got.Code() != test.wantStatusCode || got.Message() != test.wantErr { - t.Fatalf("[StreamingCall] error want:{%v %v} got:{%v %v}", test.wantStatusCode, test.wantErr, got.Code(), got.Message()) + if got := status.Convert(err); got.Code() != test.wantStatus.Code() || got.Message() != test.wantStatus.Message() { + t.Fatalf("[StreamingCall] error want:{%v} got:{%v}", test.wantStatus.Err(), got.Err()) } }) } } -func verifyValidReload(ctx context.Context, tsc pb.TestServiceClient, wantCode codes.Code, wantErr string) (lastStatus *status.Status) { - for numRetries := 0; numRetries <= 20; numRetries++ { - _, err := tsc.UnaryCall(ctx, &pb.SimpleRequest{}) - if lastStatus = status.Convert(err); lastStatus.Code() == wantCode && lastStatus.Message() == wantErr { +func retryUntil(ctx context.Context, tsc pb.TestServiceClient, want *status.Status) (lastErr error) { + for ctx.Err() == nil { + _, lastErr = tsc.UnaryCall(ctx, &pb.SimpleRequest{}) + if s := status.Convert(lastErr); s.Code() == want.Code() && s.Message() == want.Message() { return nil } time.Sleep(20 * time.Millisecond) - numRetries++ } - return lastStatus -} - -func verifySkipReload(ctx context.Context, tsc pb.TestServiceClient, wantCode codes.Code, wantErr string) (lastStatus *status.Status) { - for numRetries := 0; numRetries <= 20; numRetries++ { - _, err := tsc.UnaryCall(ctx, &pb.SimpleRequest{}) - if lastStatus := status.Convert(err); lastStatus.Code() != wantCode || lastStatus.Message() != wantErr { - return lastStatus - } - time.Sleep(20 * time.Millisecond) - numRetries++ - } - return nil + return lastErr } func (s) TestSDKFileWatcher_ValidPolicyRefresh(t *testing.T) { @@ -435,8 +417,8 @@ func (s) TestSDKFileWatcher_ValidPolicyRefresh(t *testing.T) { // Verifying authorization decision. _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) - if got := status.Convert(err); got.Code() != valid1.wantStatusCode || got.Message() != valid1.wantErr { - t.Fatalf("error want:{%v %v} got:{%v %v}", valid1.wantStatusCode, valid1.wantErr, got.Code(), got.Message()) + if got := status.Convert(err); got.Code() != valid1.wantStatus.Code() || got.Message() != valid1.wantStatus.Message() { + t.Fatalf("error want:{%v} got:{%v}", valid1.wantStatus.Err(), got.Err()) } // Rewrite the file with a different valid authorization policy. @@ -446,15 +428,15 @@ func (s) TestSDKFileWatcher_ValidPolicyRefresh(t *testing.T) { } // Verifying authorization decision. - if got := verifyValidReload(ctx, client, valid2.wantStatusCode, valid2.wantErr); got != nil { - t.Fatalf("error want:{%v %v} got:{%v %v}", valid2.wantStatusCode, valid2.wantErr, got.Code(), got.Message()) + if got := retryUntil(ctx, client, valid2.wantStatus); got != nil { + t.Fatalf("error want:{%v} got:{%v}", valid2.wantStatus.Err(), got) } } func (s) TestSDKFileWatcher_InvalidPolicySkipReload(t *testing.T) { valid := sdkTests["DeniesRpcMatchInDenyAndAllow"] file := createTmpPolicyFile(t, "invalid_policy_skip_reload", []byte(valid.authzPolicy)) - i, _ := authz.NewFileWatcher(file, 100*time.Millisecond) + i, _ := authz.NewFileWatcher(file, 20*time.Millisecond) defer i.Close() // Start a gRPC server with SDK unary server interceptors. @@ -483,8 +465,8 @@ func (s) TestSDKFileWatcher_InvalidPolicySkipReload(t *testing.T) { // Verifying authorization decision. _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) - if got := status.Convert(err); got.Code() != valid.wantStatusCode || got.Message() != valid.wantErr { - t.Fatalf("[UnaryCall] error want:{%v %v} got:{%v %v}", valid.wantStatusCode, valid.wantErr, got.Code(), got.Message()) + if got := status.Convert(err); got.Code() != valid.wantStatus.Code() || got.Message() != valid.wantStatus.Message() { + t.Fatalf("error want:{%v} got:{%v}", valid.wantStatus.Err(), got.Err()) } // Skips the invalid policy update, and continues to use the valid policy. @@ -492,9 +474,13 @@ func (s) TestSDKFileWatcher_InvalidPolicySkipReload(t *testing.T) { t.Fatalf("os.WriteFile(%q) failed: %v", file, err) } + // Wait 40 ms for background go routine to read updated files. + time.Sleep(40 * time.Millisecond) + // Verifying authorization decision. - if got := verifySkipReload(ctx, client, valid.wantStatusCode, valid.wantErr); got != nil { - t.Fatalf("error want:{%v %v} got:{%v %v}", valid.wantStatusCode, valid.wantErr, got.Code(), got.Message()) + _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) + if got := status.Convert(err); got.Code() != valid.wantStatus.Code() || got.Message() != valid.wantStatus.Message() { + t.Fatalf("error want:{%v} got:{%v}", valid.wantStatus.Err(), got.Err()) } } @@ -530,8 +516,8 @@ func (s) TestSDKFileWatcher_RecoversFromReloadFailure(t *testing.T) { // Verifying authorization decision. _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) - if got := status.Convert(err); got.Code() != valid1.wantStatusCode || got.Message() != valid1.wantErr { - t.Fatalf("[UnaryCall] error want:{%v %v} got:{%v %v}", valid1.wantStatusCode, valid1.wantErr, got.Code(), got.Message()) + if got := status.Convert(err); got.Code() != valid1.wantStatus.Code() || got.Message() != valid1.wantStatus.Message() { + t.Fatalf("error want:{%v} got:{%v}", valid1.wantStatus.Err(), got.Err()) } // Skips the invalid policy update, and continues to use the valid policy. @@ -539,9 +525,13 @@ func (s) TestSDKFileWatcher_RecoversFromReloadFailure(t *testing.T) { t.Fatalf("os.WriteFile(%q) failed: %v", file, err) } + // Wait 120 ms for background go routine to read updated files. + time.Sleep(120 * time.Millisecond) + // Verifying authorization decision. - if got := verifySkipReload(ctx, client, valid1.wantStatusCode, valid1.wantErr); got != nil { - t.Fatalf("error want:{%v %v} got:{%v %v}", valid1.wantStatusCode, valid1.wantErr, got.Code(), got.Message()) + _, err = client.UnaryCall(ctx, &pb.SimpleRequest{}) + if got := status.Convert(err); got.Code() != valid1.wantStatus.Code() || got.Message() != valid1.wantStatus.Message() { + t.Fatalf("error want:{%v} got:{%v}", valid1.wantStatus.Err(), got.Err()) } // Rewrite the file with a different valid authorization policy. @@ -551,7 +541,7 @@ func (s) TestSDKFileWatcher_RecoversFromReloadFailure(t *testing.T) { } // Verifying authorization decision. - if got := verifyValidReload(ctx, client, valid2.wantStatusCode, valid2.wantErr); got != nil { - t.Fatalf("error want:{%v %v} got:{%v %v}", valid2.wantStatusCode, valid2.wantErr, got.Code(), got.Message()) + if got := retryUntil(ctx, client, valid2.wantStatus); got != nil { + t.Fatalf("error want:{%v} got:{%v}", valid2.wantStatus.Err(), got) } } diff --git a/authz/sdk_server_interceptors.go b/authz/sdk_server_interceptors.go index d5d57b2ba53..9d5024ab947 100644 --- a/authz/sdk_server_interceptors.go +++ b/authz/sdk_server_interceptors.go @@ -86,12 +86,11 @@ func (i *StaticInterceptor) StreamInterceptor(srv interface{}, ss grpc.ServerStr // FileWatcherInterceptor contains details used to make authorization decisions // by watching a file path that contains authorization policy in JSON format. type FileWatcherInterceptor struct { - internalInterceptor unsafe.Pointer // *StaticInterceptor - policyFile string - policyContents []byte - latestValidPolicyContents []byte - refreshDuration time.Duration - cancel context.CancelFunc + internalInterceptor unsafe.Pointer // *StaticInterceptor + policyFile string + policyContents []byte + refreshDuration time.Duration + cancel context.CancelFunc } // NewFileWatcher returns a new FileWatcherInterceptor from a policy file @@ -149,7 +148,6 @@ func (i *FileWatcherInterceptor) updateInternalInterceptor() error { return err } atomic.StorePointer(&i.internalInterceptor, unsafe.Pointer(interceptor)) - i.latestValidPolicyContents = i.policyContents logger.Infof("authorization policy reload status: successfully loaded new policy %v", policyContentsString) return nil } From 10b4c620e86e706f4b880111ce842e5d6c8f3a78 Mon Sep 17 00:00:00 2001 From: Ashitha Santhosh Date: Tue, 5 Oct 2021 17:09:11 -0700 Subject: [PATCH 14/15] fix tests --- authz/sdk_server_interceptors.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/authz/sdk_server_interceptors.go b/authz/sdk_server_interceptors.go index 9d5024ab947..72dc14ed85e 100644 --- a/authz/sdk_server_interceptors.go +++ b/authz/sdk_server_interceptors.go @@ -20,7 +20,7 @@ import ( "bytes" "context" "fmt" - "os" + "io/ioutil" "sync/atomic" "time" "unsafe" @@ -134,7 +134,7 @@ func (i *FileWatcherInterceptor) run(ctx context.Context) { // constructor, if there is an error in reading the file or parsing the policy, the // previous internalInterceptors will not be replaced. func (i *FileWatcherInterceptor) updateInternalInterceptor() error { - policyContents, err := os.ReadFile(i.policyFile) + policyContents, err := ioutil.ReadFile(i.policyFile) if err != nil { return fmt.Errorf("policyFile(%s) read failed: %v", i.policyFile, err) } From f90b8dbdba4b4e8b01e5f6f46e4209d058dd132b Mon Sep 17 00:00:00 2001 From: Ashitha Santhosh Date: Tue, 5 Oct 2021 21:30:58 -0700 Subject: [PATCH 15/15] use ioutil to fix tests --- authz/sdk_end2end_test.go | 17 +++++++++-------- authz/sdk_server_interceptors_test.go | 9 +++++---- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/authz/sdk_end2end_test.go b/authz/sdk_end2end_test.go index 13653809062..093b2bb437d 100644 --- a/authz/sdk_end2end_test.go +++ b/authz/sdk_end2end_test.go @@ -21,6 +21,7 @@ package authz_test import ( "context" "io" + "io/ioutil" "net" "os" "testing" @@ -423,8 +424,8 @@ func (s) TestSDKFileWatcher_ValidPolicyRefresh(t *testing.T) { // Rewrite the file with a different valid authorization policy. valid2 := sdkTests["AllowsRpcEmptyDenyMatchInAllow"] - if err := os.WriteFile(file, []byte(valid2.authzPolicy), os.ModePerm); err != nil { - t.Fatalf("os.WriteFile(%q) failed: %v", file, err) + if err := ioutil.WriteFile(file, []byte(valid2.authzPolicy), os.ModePerm); err != nil { + t.Fatalf("ioutil.WriteFile(%q) failed: %v", file, err) } // Verifying authorization decision. @@ -470,8 +471,8 @@ func (s) TestSDKFileWatcher_InvalidPolicySkipReload(t *testing.T) { } // Skips the invalid policy update, and continues to use the valid policy. - if err := os.WriteFile(file, []byte("{}"), os.ModePerm); err != nil { - t.Fatalf("os.WriteFile(%q) failed: %v", file, err) + if err := ioutil.WriteFile(file, []byte("{}"), os.ModePerm); err != nil { + t.Fatalf("ioutil.WriteFile(%q) failed: %v", file, err) } // Wait 40 ms for background go routine to read updated files. @@ -521,8 +522,8 @@ func (s) TestSDKFileWatcher_RecoversFromReloadFailure(t *testing.T) { } // Skips the invalid policy update, and continues to use the valid policy. - if err := os.WriteFile(file, []byte("{}"), os.ModePerm); err != nil { - t.Fatalf("os.WriteFile(%q) failed: %v", file, err) + if err := ioutil.WriteFile(file, []byte("{}"), os.ModePerm); err != nil { + t.Fatalf("ioutil.WriteFile(%q) failed: %v", file, err) } // Wait 120 ms for background go routine to read updated files. @@ -536,8 +537,8 @@ func (s) TestSDKFileWatcher_RecoversFromReloadFailure(t *testing.T) { // Rewrite the file with a different valid authorization policy. valid2 := sdkTests["AllowsRpcEmptyDenyMatchInAllow"] - if err := os.WriteFile(file, []byte(valid2.authzPolicy), os.ModePerm); err != nil { - t.Fatalf("os.WriteFile(%q) failed: %v", file, err) + if err := ioutil.WriteFile(file, []byte(valid2.authzPolicy), os.ModePerm); err != nil { + t.Fatalf("ioutil.WriteFile(%q) failed: %v", file, err) } // Verifying authorization decision. diff --git a/authz/sdk_server_interceptors_test.go b/authz/sdk_server_interceptors_test.go index ae74c896d96..f43f9807612 100644 --- a/authz/sdk_server_interceptors_test.go +++ b/authz/sdk_server_interceptors_test.go @@ -20,6 +20,7 @@ package authz_test import ( "fmt" + "io/ioutil" "os" "path" "testing" @@ -33,15 +34,15 @@ func createTmpPolicyFile(t *testing.T, dirSuffix string, policy []byte) string { // Create a temp directory. Passing an empty string for the first argument // uses the system temp directory. - dir, err := os.MkdirTemp("", dirSuffix) + dir, err := ioutil.TempDir("", dirSuffix) if err != nil { - t.Fatalf("os.MkdirTemp() failed: %v", err) + t.Fatalf("ioutil.TempDir() failed: %v", err) } t.Logf("Using tmpdir: %s", dir) // Write policy into file. filename := path.Join(dir, "policy.json") - if err := os.WriteFile(filename, policy, os.ModePerm); err != nil { - t.Fatalf("os.WriteFile(%q) failed: %v", filename, err) + if err := ioutil.WriteFile(filename, policy, os.ModePerm); err != nil { + t.Fatalf("ioutil.WriteFile(%q) failed: %v", filename, err) } t.Logf("Wrote policy %s to file at %s", string(policy), filename) return filename