Skip to content

Commit

Permalink
fix: remove list and watch on secrets. Fixes argoproj#8534 (argoproj#…
Browse files Browse the repository at this point in the history
…8555)

Signed-off-by: bjenuhb <Basanth_JenuHB@intuit.com>
  • Loading branch information
basanthjenuhb authored and terrytangyuan committed Sep 7, 2022
1 parent 9c9efa6 commit 7e966fb
Show file tree
Hide file tree
Showing 17 changed files with 203 additions and 73 deletions.
26 changes: 7 additions & 19 deletions cmd/argo/commands/server.go
Expand Up @@ -48,7 +48,6 @@ func NewServerCommand() *cobra.Command {
htst bool
namespaced bool // --namespaced
managedNamespace string // --managed-namespace
ssoNamespace string
enableOpenBrowser bool
eventOperationQueueSize int
eventWorkerCount int
Expand Down Expand Up @@ -97,10 +96,16 @@ See %s`, help.ArgoServer),
managedNamespace = namespace
}

ssoNamespace := namespace
if managedNamespace != "" {
ssoNamespace = managedNamespace
}

log.WithFields(log.Fields{
"authModes": authModes,
"namespace": namespace,
"managedNamespace": managedNamespace,
"ssoNamespace": ssoNamespace,
"baseHRef": baseHRef,
"secure": secure,
}).Info()
Expand Down Expand Up @@ -142,33 +147,17 @@ See %s`, help.ArgoServer),
log.Warn("You are running without client authentication. Learn how to enable client authentication: https://argoproj.github.io/argo-workflows/argo-server-auth-mode/")
}

if namespaced {
// Case 1: If ssoNamespace is not specified, default it to installation namespace
if ssoNamespace == "" {
ssoNamespace = namespace
}
// Case 2: If ssoNamespace is not equal to installation or managed namespace, default it to installation namespace
if ssoNamespace != namespace && ssoNamespace != managedNamespace {
log.Warn("--sso-namespace should be equal to --managed-namespace or the installation namespace")
ssoNamespace = namespace
}
} else {
if ssoNamespace != "" {
log.Warn("ignoring --sso-namespace because --namespaced is false")
}
ssoNamespace = namespace
}
opts := apiserver.ArgoServerOpts{
BaseHRef: baseHRef,
TLSConfig: tlsConfig,
HSTS: htst,
Namespaced: namespaced,
Namespace: namespace,
SSONameSpace: ssoNamespace,
Clients: clients,
RestConfig: config,
AuthModes: modes,
ManagedNamespace: managedNamespace,
SSONamespace: ssoNamespace,
ConfigName: configMap,
EventOperationQueueSize: eventOperationQueueSize,
EventWorkerCount: eventWorkerCount,
Expand Down Expand Up @@ -223,7 +212,6 @@ See %s`, help.ArgoServer),
command.Flags().StringVar(&configMap, "configmap", "workflow-controller-configmap", "Name of K8s configmap to retrieve workflow controller configuration")
command.Flags().BoolVar(&namespaced, "namespaced", false, "run as namespaced mode")
command.Flags().StringVar(&managedNamespace, "managed-namespace", "", "namespace that watches, default to the installation namespace")
command.Flags().StringVar(&ssoNamespace, "sso-namespace", "", "namespace that will be used for SSO RBAC. Defaults to installation namespace. Used only in namespaced mode")
command.Flags().BoolVarP(&enableOpenBrowser, "browser", "b", false, "enable automatic launching of the browser [local mode]")
command.Flags().IntVar(&eventOperationQueueSize, "event-operation-queue-size", 16, "how many events operations that can be queued at once")
command.Flags().IntVar(&eventWorkerCount, "event-worker-count", 4, "how many event workers to run")
Expand Down
1 change: 0 additions & 1 deletion docs/cli/argo_server.md
Expand Up @@ -30,7 +30,6 @@ See https://argoproj.github.io/argo-workflows/argo-server/
--managed-namespace string namespace that watches, default to the installation namespace
--namespaced run as namespaced mode
-p, --port int Port to listen on (default 2746)
--sso-namespace string namespace that will be used for SSO RBAC. Defaults to installation namespace. Used only in namespaced mode
--x-frame-options string Set X-Frame-Options header in HTTP responses. (default "DENY")
```

Expand Down
Expand Up @@ -18,8 +18,6 @@ rules:
verbs:
- get
- create
- list
- watch
- apiGroups:
- ""
resources:
Expand Down
2 changes: 0 additions & 2 deletions manifests/install.yaml
Expand Up @@ -913,8 +913,6 @@ rules:
verbs:
- get
- create
- list
- watch
- apiGroups:
- ""
resources:
Expand Down
2 changes: 0 additions & 2 deletions manifests/namespace-install.yaml
Expand Up @@ -822,8 +822,6 @@ rules:
verbs:
- get
- create
- list
- watch
- apiGroups:
- ""
resources:
Expand Down
Expand Up @@ -18,8 +18,6 @@ rules:
verbs:
- get
- create
- list
- watch
- apiGroups:
- ""
resources:
Expand Down
2 changes: 0 additions & 2 deletions manifests/quick-start-minimal.yaml
Expand Up @@ -851,8 +851,6 @@ rules:
verbs:
- get
- create
- list
- watch
- apiGroups:
- ""
resources:
Expand Down
2 changes: 0 additions & 2 deletions manifests/quick-start-mysql.yaml
Expand Up @@ -851,8 +851,6 @@ rules:
verbs:
- get
- create
- list
- watch
- apiGroups:
- ""
resources:
Expand Down
2 changes: 0 additions & 2 deletions manifests/quick-start-postgres.yaml
Expand Up @@ -851,8 +851,6 @@ rules:
verbs:
- get
- create
- list
- watch
- apiGroups:
- ""
resources:
Expand Down
11 changes: 6 additions & 5 deletions server/apiserver/argoserver.go
Expand Up @@ -98,7 +98,7 @@ type ArgoServerOpts struct {
// config map name
ConfigName string
ManagedNamespace string
SSONameSpace string
SSONamespace string
HSTS bool
EventOperationQueueSize int
EventWorkerCount int
Expand All @@ -116,8 +116,8 @@ func init() {
}

func getResourceCacheNamespace(opts ArgoServerOpts) string {
if opts.Namespaced {
return opts.SSONameSpace
if opts.ManagedNamespace != "" {
return opts.ManagedNamespace
}
return v1.NamespaceAll
}
Expand All @@ -135,12 +135,13 @@ func NewArgoServer(ctx context.Context, opts ArgoServerOpts) (*argoServer, error
if err != nil {
return nil, err
}
resourceCache = cache.NewResourceCache(opts.Clients.Kubernetes, ctx, getResourceCacheNamespace(opts))
resourceCache = cache.NewResourceCache(opts.Clients.Kubernetes, getResourceCacheNamespace(opts))
resourceCache.Run(ctx.Done())
log.Info("SSO enabled")
} else {
log.Info("SSO disabled")
}
gatekeeper, err := auth.NewGatekeeper(opts.AuthModes, opts.Clients, opts.RestConfig, ssoIf, auth.DefaultClientForAuthorization, opts.Namespace, opts.SSONameSpace, opts.Namespaced, resourceCache)
gatekeeper, err := auth.NewGatekeeper(opts.AuthModes, opts.Clients, opts.RestConfig, ssoIf, auth.DefaultClientForAuthorization, opts.Namespace, opts.SSONamespace, opts.Namespaced, resourceCache)
if err != nil {
return nil, err
}
Expand Down
14 changes: 7 additions & 7 deletions server/auth/gatekeeper.go
Expand Up @@ -207,7 +207,7 @@ func (s gatekeeper) getClients(ctx context.Context, req interface{}) (*servertyp
return nil, nil, status.Error(codes.Unauthenticated, err.Error())
}
if s.ssoIf.IsRBACEnabled() {
clients, err := s.rbacAuthorization(claims, req)
clients, err := s.rbacAuthorization(ctx, claims, req)
if err != nil {
log.WithError(err).Error("failed to perform RBAC authorization")
return nil, nil, status.Error(codes.PermissionDenied, "not allowed")
Expand Down Expand Up @@ -279,8 +279,8 @@ func (s *gatekeeper) canDelegateRBACToRequestNamespace(req interface{}) bool {
return len(namespace) != 0 && s.ssoNamespace != namespace
}

func (s *gatekeeper) getClientsForServiceAccount(claims *types.Claims, serviceAccount *corev1.ServiceAccount) (*servertypes.Clients, error) {
authorization, err := s.authorizationForServiceAccount(serviceAccount)
func (s *gatekeeper) getClientsForServiceAccount(ctx context.Context, claims *types.Claims, serviceAccount *corev1.ServiceAccount) (*servertypes.Clients, error) {
authorization, err := s.authorizationForServiceAccount(ctx, serviceAccount)
if err != nil {
return nil, err
}
Expand All @@ -292,7 +292,7 @@ func (s *gatekeeper) getClientsForServiceAccount(claims *types.Claims, serviceAc
return clients, nil
}

func (s *gatekeeper) rbacAuthorization(claims *types.Claims, req interface{}) (*servertypes.Clients, error) {
func (s *gatekeeper) rbacAuthorization(ctx context.Context, claims *types.Claims, req interface{}) (*servertypes.Clients, error) {
ssoDelegationAllowed, ssoDelegated := false, false
loginAccount, err := s.getServiceAccount(claims, s.ssoNamespace)
if err != nil {
Expand All @@ -311,14 +311,14 @@ func (s *gatekeeper) rbacAuthorization(claims *types.Claims, req interface{}) (*
}
// important! write an audit entry (i.e. log entry) so we know which user performed an operation
log.WithFields(log.Fields{"serviceAccount": delegatedAccount.Name, "loginServiceAccount": loginAccount.Name, "subject": claims.Subject, "email": claims.Email, "ssoDelegationAllowed": ssoDelegationAllowed, "ssoDelegated": ssoDelegated}).Info("selected SSO RBAC service account for user")
return s.getClientsForServiceAccount(claims, delegatedAccount)
return s.getClientsForServiceAccount(ctx, claims, delegatedAccount)
}

func (s *gatekeeper) authorizationForServiceAccount(serviceAccount *corev1.ServiceAccount) (string, error) {
func (s *gatekeeper) authorizationForServiceAccount(ctx context.Context, serviceAccount *corev1.ServiceAccount) (string, error) {
if len(serviceAccount.Secrets) == 0 {
return "", fmt.Errorf("expected at least one secret for SSO RBAC service account: %s", serviceAccount.GetName())
}
secret, err := s.cache.SecretLister.Secrets(serviceAccount.GetNamespace()).Get(serviceAccount.Secrets[0].Name)
secret, err := s.cache.GetSecret(ctx, serviceAccount.GetNamespace(), serviceAccount.Secrets[0].Name)
if err != nil {
return "", fmt.Errorf("failed to get service account secret: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion server/auth/gatekeeper_test.go
Expand Up @@ -105,7 +105,8 @@ func TestServer_GetWFClient(t *testing.T) {
},
},
)
resourceCache := cache.NewResourceCache(kubeClient, context.TODO(), corev1.NamespaceAll)
resourceCache := cache.NewResourceCache(kubeClient, corev1.NamespaceAll)
resourceCache.Run(context.TODO().Done())
var clientForAuthorization ClientForAuthorization = func(authorization string) (*rest.Config, *servertypes.Clients, error) {
return &rest.Config{}, &servertypes.Clients{Workflow: &fakewfclientset.Clientset{}, Kubernetes: &kubefake.Clientset{}}, nil
}
Expand Down
26 changes: 3 additions & 23 deletions server/cache/cache.go
@@ -1,26 +1,6 @@
package cache

import (
"context"
"time"

"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/listers/core/v1"
)

type ResourceCache struct {
v1.ServiceAccountLister
v1.SecretLister
}

func NewResourceCache(client kubernetes.Interface, ctx context.Context, namespace string) *ResourceCache {
informerFactory := informers.NewSharedInformerFactoryWithOptions(client, time.Minute*20, informers.WithNamespace(namespace))
cache := &ResourceCache{
ServiceAccountLister: informerFactory.Core().V1().ServiceAccounts().Lister(),
SecretLister: informerFactory.Core().V1().Secrets().Lister(),
}
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
return cache
type Interface interface {
Get(key string) (any, bool)
Add(key string, value any)
}
44 changes: 44 additions & 0 deletions server/cache/lru_ttl_cache.go
@@ -0,0 +1,44 @@
package cache

import (
"time"

"k8s.io/utils/lru"
)

var currentTime = time.Now

type lruTtlCache struct {
timeout time.Duration
cache *lru.Cache
}

type item struct {
expiryTime time.Time
value any
}

func NewLRUTtlCache(timeout time.Duration, size int) *lruTtlCache {
return &lruTtlCache{
timeout: timeout,
cache: lru.New(size),
}
}

func (c *lruTtlCache) Get(key string) (any, bool) {
if data, ok := c.cache.Get(key); ok {
item := data.(*item)
if currentTime().Before(item.expiryTime) {
return item.value, true
}
c.cache.Remove(key)
}
return nil, false
}

func (c *lruTtlCache) Add(key string, value any) {
c.cache.Add(key, &item{
expiryTime: currentTime().Add(c.timeout),
value: value,
})
}
64 changes: 64 additions & 0 deletions server/cache/lru_ttl_cache_test.go
@@ -0,0 +1,64 @@
package cache

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestNewTimedCache(t *testing.T) {

t.Run("NewLRUTtlCache should return a new instance", func(t *testing.T) {
cache := NewLRUTtlCache(time.Second, 1)
assert.NotNil(t, cache)
})

t.Run("TimedCache should cache based on LRU size", func(t *testing.T) {
cache := NewLRUTtlCache(time.Second*10, 2)
cache.Add("one", "one")
cache.Add("two", "two")

// Both "one" and "two" should be available since maxSize is 2
_, ok := cache.Get("one")
assert.True(t, ok)

_, ok = cache.Get("two")
assert.True(t, ok)

// "three" should be available since its newly added
cache.Add("three", "three")
_, ok = cache.Get("three")
assert.True(t, ok)

// "one" should not be available since maxSize is 2
_, ok = cache.Get("one")
assert.False(t, ok)
})

t.Run("TimedCache should cache based on timeout", func(t *testing.T) {
tempCurrentTime := currentTime

cache := NewLRUTtlCache(time.Minute*1, 2)

currentTime = getTimeFunc(0, 0)
cache.Add("one", "one")

currentTime = getTimeFunc(0, 30)
_, ok := cache.Get("one")
assert.True(t, ok)

currentTime = getTimeFunc(1, 30)
// "one" should not be available since timeout is 1 min
_, ok = cache.Get("one")
assert.False(t, ok)
currentTime = tempCurrentTime
})

}

func getTimeFunc(min int, sec int) func() time.Time {
return func() time.Time {
return time.Date(0, 0, 0, 0, min, sec, 0, time.UTC)
}
}

0 comments on commit 7e966fb

Please sign in to comment.