Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

authz: create file watcher interceptor for gRPC SDK API #4760

Merged
merged 16 commits into from Oct 9, 2021
Merged
42 changes: 17 additions & 25 deletions authz/sdk_end2end_test.go
Expand Up @@ -382,7 +382,7 @@ func (s) TestSDKFileWatcherEnd2End(t *testing.T) {
func (s) TestSDKFileWatcher_ValidPolicyRefresh(t *testing.T) {
valid1 := sdkTests["DeniesRpcMatchInDenyAndAllow"]
file := createTmpPolicyFile(t, "valid_policy_refresh", []byte(valid1.authzPolicy))
i, _ := authz.NewFileWatcher(file, 10*time.Millisecond)
i, _ := authz.NewFileWatcher(file, 100*time.Millisecond)
defer i.Close()

// Start a gRPC server with SDK unary server interceptor.
Expand Down Expand Up @@ -420,31 +420,29 @@ func (s) TestSDKFileWatcher_ValidPolicyRefresh(t *testing.T) {
if err := os.WriteFile(file, []byte(valid2.authzPolicy), os.ModePerm); err != nil {
t.Fatalf("os.WriteFile(%q) failed: %v", file, err)
}
// Wait 30 ms for background go routine to read the updated files.
time.Sleep(30 * time.Millisecond)

// Verifying authorization decision.
_, err = client.UnaryCall(ctx, &pb.SimpleRequest{})
numRetries := 0
reloadSuccess := false
ashithasantosh marked this conversation as resolved.
Show resolved Hide resolved
got := status.Convert(err)
var gotStatus *status.Status
for numRetries <= 20 {
ashithasantosh marked this conversation as resolved.
Show resolved Hide resolved
if got.Code() == valid2.wantStatusCode && got.Message() == valid2.wantErr {
_, err = client.UnaryCall(ctx, &pb.SimpleRequest{})
if gotStatus = status.Convert(err); gotStatus.Code() == valid2.wantStatusCode && gotStatus.Message() == valid2.wantErr {
reloadSuccess = true
break
}
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
ashithasantosh marked this conversation as resolved.
Show resolved Hide resolved
numRetries++
}
if reloadSuccess == false {
t.Fatalf("error want:{%v %v} got:{%v %v}", valid2.wantStatusCode, valid2.wantErr, got.Code(), got.Message())
t.Fatalf("error want:{%v %v} got:{%v %v}", valid2.wantStatusCode, valid2.wantErr, gotStatus.Code(), gotStatus.Message())
}
}

func (s) TestSDKFileWatcher_InvalidPolicySkipReload(t *testing.T) {
valid := sdkTests["DeniesRpcMatchInDenyAndAllow"]
file := createTmpPolicyFile(t, "invalid_policy_skip_reload", []byte(valid.authzPolicy))
i, _ := authz.NewFileWatcher(file, 10*time.Millisecond)
i, _ := authz.NewFileWatcher(file, 100*time.Millisecond)
defer i.Close()

// Start a gRPC server with SDK unary server interceptors.
Expand Down Expand Up @@ -481,25 +479,23 @@ func (s) TestSDKFileWatcher_InvalidPolicySkipReload(t *testing.T) {
if err := os.WriteFile(file, []byte("{}"), os.ModePerm); err != nil {
t.Fatalf("os.WriteFile(%q) failed: %v", file, err)
}
// Wait 30 ms for background go routine to read the updated files.
time.Sleep(30 * time.Millisecond)

// Verifying authorization decision.
_, err = client.UnaryCall(ctx, &pb.SimpleRequest{})
numRetries := 0
for numRetries <= 20 {
_, err = client.UnaryCall(ctx, &pb.SimpleRequest{})
if got := status.Convert(err); got.Code() != valid.wantStatusCode || got.Message() != valid.wantErr {
t.Fatalf("error want:{%v %v} got:{%v %v}", valid.wantStatusCode, valid.wantErr, got.Code(), got.Message())
}
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
numRetries++
}
}

func (s) TestSDKFileWatcher_RecoversFromReloadFailure(t *testing.T) {
valid1 := sdkTests["DeniesRpcMatchInDenyAndAllow"]
file := createTmpPolicyFile(t, "recovers_from_reload_failure", []byte(valid1.authzPolicy))
i, _ := authz.NewFileWatcher(file, 10*time.Millisecond)
i, _ := authz.NewFileWatcher(file, 100*time.Millisecond)
defer i.Close()

// Start a gRPC server with SDK unary server interceptors.
Expand Down Expand Up @@ -536,17 +532,15 @@ func (s) TestSDKFileWatcher_RecoversFromReloadFailure(t *testing.T) {
if err := os.WriteFile(file, []byte("{}"), os.ModePerm); err != nil {
t.Fatalf("os.WriteFile(%q) failed: %v", file, err)
}
// Wait 30 ms for background go routine to read the updated files.
time.Sleep(30 * time.Millisecond)

// Verifying authorization decision.
_, err = client.UnaryCall(ctx, &pb.SimpleRequest{})
numRetries := 0
ashithasantosh marked this conversation as resolved.
Show resolved Hide resolved
for numRetries <= 20 {
_, err = client.UnaryCall(ctx, &pb.SimpleRequest{})
if got := status.Convert(err); got.Code() != valid1.wantStatusCode || got.Message() != valid1.wantErr {
t.Fatalf("error want:{%v %v} got:{%v %v}", valid1.wantStatusCode, valid1.wantErr, got.Code(), got.Message())
}
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
numRetries++
}

Expand All @@ -555,23 +549,21 @@ func (s) TestSDKFileWatcher_RecoversFromReloadFailure(t *testing.T) {
if err := os.WriteFile(file, []byte(valid2.authzPolicy), os.ModePerm); err != nil {
t.Fatalf("os.WriteFile(%q) failed: %v", file, err)
}
// Wait 30 ms for background go routine to read the updated files.
time.Sleep(30 * time.Millisecond)

// Verifying authorization decision.
_, err = client.UnaryCall(ctx, &pb.SimpleRequest{})
numRetries = 0
reloadSuccess := false
got := status.Convert(err)
var gotStatus *status.Status
for numRetries <= 20 {
if got.Code() == valid2.wantStatusCode && got.Message() == valid2.wantErr {
_, err = client.UnaryCall(ctx, &pb.SimpleRequest{})
if gotStatus = status.Convert(err); gotStatus.Code() == valid2.wantStatusCode && gotStatus.Message() == valid2.wantErr {
reloadSuccess = true
break
}
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
numRetries++
}
if reloadSuccess == false {
t.Fatalf("error want:{%v %v} got:{%v %v}", valid2.wantStatusCode, valid2.wantErr, got.Code(), got.Message())
t.Fatalf("error want:{%v %v} got:{%v %v}", valid2.wantStatusCode, valid2.wantErr, gotStatus.Code(), gotStatus.Message())
}
}
9 changes: 6 additions & 3 deletions authz/sdk_server_interceptors.go
Expand Up @@ -117,8 +117,9 @@ func NewFileWatcher(file string, duration time.Duration) (*FileWatcherIntercepto
func (i *FileWatcherInterceptor) run(ctx context.Context) {
ticker := time.NewTicker(i.refreshDuration)
for {
err := i.updateInternalInterceptor()
logger.Infof("authorization policy reload status err: %v", err)
if err := i.updateInternalInterceptor(); err != nil {
logger.Warningf("authorization policy reload status err: %v", err)
}
select {
case <-ctx.Done():
ticker.Stop()
Expand All @@ -140,12 +141,14 @@ func (i *FileWatcherInterceptor) updateInternalInterceptor() error {
if bytes.Equal(i.policyContents, policyContents) {
return nil
}
interceptor, err := NewStatic(string(policyContents))
policyContentsString := string(policyContents)
interceptor, err := NewStatic(policyContentsString)
if err != nil {
return err
ashithasantosh marked this conversation as resolved.
Show resolved Hide resolved
}
atomic.StorePointer(&i.internalInterceptor, unsafe.Pointer(interceptor))
i.policyContents = policyContents
logger.Infof("authorization policy reload status: successfully loaded new policy %v", policyContentsString)
return nil
}

Expand Down