From 076ac90d81493554ff5e3fb54d58fd9b84df9cf8 Mon Sep 17 00:00:00 2001 From: Ville Aikas Date: Sat, 23 Apr 2022 16:02:19 -0700 Subject: [PATCH 1/2] Add parallelization for processing policies / authorities. Signed-off-by: Ville Aikas --- pkg/cosign/kubernetes/webhook/validator.go | 163 +++++++++++++++------ 1 file changed, 115 insertions(+), 48 deletions(-) diff --git a/pkg/cosign/kubernetes/webhook/validator.go b/pkg/cosign/kubernetes/webhook/validator.go index 7590892a589..a055acaa967 100644 --- a/pkg/cosign/kubernetes/webhook/validator.go +++ b/pkg/cosign/kubernetes/webhook/validator.go @@ -21,6 +21,7 @@ import ( "crypto/x509" "encoding/json" "fmt" + "sync" "github.com/google/go-containerregistry/pkg/authn/k8schain" "github.com/google/go-containerregistry/pkg/name" @@ -236,11 +237,15 @@ func (v *Validator) validatePodSpec(ctx context.Context, ps *corev1.PodSpec, opt // reasonable that the return value is 0, nil since there were no errors, but // the image was not validated against any matching policy and hence authority. func validatePolicies(ctx context.Context, ref name.Reference, policies map[string]webhookcip.ClusterImagePolicy, remoteOpts ...ociremote.Option) (map[string]*PolicyResult, map[string][]error) { - // Gather all validated policies here. - policyResults := make(map[string]*PolicyResult) - // For a policy that does not pass at least one authority, gather errors - // here so that we can give meaningful errors to the user. - ret := map[string][]error{} + wg := sync.WaitGroup{} + + type retChannelType struct { + name string + policyResult *PolicyResult + errors []error + } + results := make(chan retChannelType, len(policies)) + // For each matching policy it must validate at least one Authority within // it. // From the Design document, the part about multiple Policies matching: @@ -249,33 +254,59 @@ func validatePolicies(ctx context.Context, ref name.Reference, policies map[stri // If none of the Authorities for a given policy pass the checks, gather // the errors here. If one passes, do not return the errors. for cipName, cip := range policies { + // Due to running in gofunc + cipName := cipName + cip := cip logging.FromContext(ctx).Debugf("Checking Policy: %s", cipName) - policyResult, errs := ValidatePolicy(ctx, ref, cip, remoteOpts...) - if len(errs) > 0 { - ret[cipName] = append(ret[cipName], errs...) - } else { - // Ok, at least one Authority on the policy passed. If there's a CIP level - // policy, apply it against the results of the successful Authorities - // outputs. - if cip.Policy != nil { - logging.FromContext(ctx).Infof("Validating CIP level policy for %s", cipName) - policyJSON, err := json.Marshal(policyResult) - if err != nil { - ret[cipName] = append(ret[cipName], errors.Wrap(err, "marshaling policyresult")) - } else { - logging.FromContext(ctx).Infof("Validating CIP level policy against %s", string(policyJSON)) - err = policy.EvaluatePolicyAgainstJSON(ctx, "ClusterImagePolicy", cip.Policy.Type, cip.Policy.Data, policyJSON) + wg.Add(1) + go func() { + defer wg.Done() + result := retChannelType{name: cipName} + + result.policyResult, result.errors = ValidatePolicy(ctx, ref, cip, remoteOpts...) + if len(result.errors) == 0 { + // Ok, at least one Authority on the policy passed. If there's a CIP level + // policy, apply it against the results of the successful Authorities + // outputs. + if cip.Policy != nil { + logging.FromContext(ctx).Infof("Validating CIP level policy for %s", cipName) + policyJSON, err := json.Marshal(result.policyResult) if err != nil { - ret[cipName] = append(ret[cipName], err) + results <- result } else { - policyResults[cipName] = policyResult + logging.FromContext(ctx).Infof("Validating CIP level policy against %s", string(policyJSON)) + err = policy.EvaluatePolicyAgainstJSON(ctx, "ClusterImagePolicy", cip.Policy.Type, cip.Policy.Data, policyJSON) + if err != nil { + result.errors = append(result.errors, err) + } } } - } else { - policyResults[cipName] = policyResult } + results <- result + }() + } + // Gather all validated policies here. + policyResults := make(map[string]*PolicyResult) + // For a policy that does not pass at least one authority, gather errors + // here so that we can give meaningful errors to the user. + ret := map[string][]error{} + + for i := 0; i < len(policies); i++ { + result, ok := <-results + if !ok { + break + } + switch { + case len(result.errors) > 0: + ret[result.name] = append(ret[result.name], result.errors...) + case len(result.policyResult.AuthorityMatches) > 0: + policyResults[result.name] = result.policyResult + default: + ret[result.name] = append(ret[result.name], fmt.Errorf("failed to process policy: %s", result.name)) } } + + wg.Wait() return policyResults, ret } @@ -285,40 +316,76 @@ func validatePolicies(ctx context.Context, ref name.Reference, policies map[stri // Returns PolicyResult, or errors encountered if none of the authorities // passed. func ValidatePolicy(ctx context.Context, ref name.Reference, cip webhookcip.ClusterImagePolicy, remoteOpts ...ociremote.Option) (*PolicyResult, []error) { - // If none of the Authorities for a given policy pass the checks, gather - // the errors here. If one passes, do not return the errors. - authorityErrors := []error{} - // We collect all the successfully satisfied Authorities into this and - // return it. - policyResult := PolicyResult{AuthorityMatches: make(map[string]AuthorityMatch)} + wg := sync.WaitGroup{} + // Each gofunc creates and puts one of these into a results channel. + // Once each gofunc finishes, we go through the channel and pull out + // the results. + type retChannelType struct { + name string + attestations map[string][]PolicySignature + signatures []PolicySignature + err error + } + results := make(chan retChannelType, len(cip.Authorities)) for _, authority := range cip.Authorities { + authority := authority // due to gofunc logging.FromContext(ctx).Debugf("Checking Authority: %s", authority.Name) - // Assignment for appendAssign lint error - authorityRemoteOpts := remoteOpts - authorityRemoteOpts = append(authorityRemoteOpts, authority.RemoteOpts...) - if len(authority.Attestations) > 0 { - // We're doing the verify-attestations path, so validate (.att) - validatedAttestations, err := ValidatePolicyAttestationsForAuthority(ctx, ref, authority, authorityRemoteOpts...) - if err != nil { - authorityErrors = append(authorityErrors, err) - } else { - policyResult.AuthorityMatches[authority.Name] = AuthorityMatch{Attestations: validatedAttestations} - } - } else { - // We're doing the verify path, so validate image signatures (.sig) - validatedSignatures, err := ValidatePolicySignaturesForAuthority(ctx, ref, authority, authorityRemoteOpts...) - if err != nil { - authorityErrors = append(authorityErrors, err) + wg.Add(1) + go func() { + defer wg.Done() + result := retChannelType{name: authority.Name} + // Assignment for appendAssign lint error + authorityRemoteOpts := remoteOpts + authorityRemoteOpts = append(authorityRemoteOpts, authority.RemoteOpts...) + + if len(authority.Attestations) > 0 { + // We're doing the verify-attestations path, so validate (.att) + validatedAttestations, err := ValidatePolicyAttestationsForAuthority(ctx, ref, authority, authorityRemoteOpts...) + if err != nil { + result.err = err + } else { + result.attestations = validatedAttestations + } } else { - policyResult.AuthorityMatches[authority.Name] = AuthorityMatch{Signatures: validatedSignatures} + validatedSignatures, err := ValidatePolicySignaturesForAuthority(ctx, ref, authority, authorityRemoteOpts...) + if err != nil { + result.err = err + } else { + result.signatures = validatedSignatures + } } + results <- result + }() + } + // If none of the Authorities for a given policy pass the checks, gather + // the errors here. If one passes, do not return the errors. + authorityErrors := []error{} + // We collect all the successfully satisfied Authorities into this and + // return it. + policyResult := &PolicyResult{AuthorityMatches: make(map[string]AuthorityMatch)} + for i := 0; i < len(cip.Authorities); i++ { + result, ok := <-results + if !ok { + break + } + switch { + case result.err != nil: + authorityErrors = append(authorityErrors, result.err) + case len(result.signatures) > 0: + policyResult.AuthorityMatches[result.name] = AuthorityMatch{Signatures: result.signatures} + case len(result.attestations) > 0: + policyResult.AuthorityMatches[result.name] = AuthorityMatch{Attestations: result.attestations} + default: + authorityErrors = append(authorityErrors, fmt.Errorf("failed to process authority: %s", result.name)) } } + wg.Wait() + if len(authorityErrors) > 0 { return nil, authorityErrors } - return &policyResult, authorityErrors + return policyResult, authorityErrors } func ociSignatureToPolicySignature(ctx context.Context, sigs []oci.Signature) []PolicySignature { From f5cca3622ce1c4123db2a35a18c7c22a3ec086fc Mon Sep 17 00:00:00 2001 From: Ville Aikas Date: Sat, 23 Apr 2022 18:37:37 -0700 Subject: [PATCH 2/2] Simplify by removing the wg. If error encountered, create an internal error. Signed-off-by: Ville Aikas --- pkg/cosign/kubernetes/webhook/validator.go | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/pkg/cosign/kubernetes/webhook/validator.go b/pkg/cosign/kubernetes/webhook/validator.go index a055acaa967..e9578ba4aa5 100644 --- a/pkg/cosign/kubernetes/webhook/validator.go +++ b/pkg/cosign/kubernetes/webhook/validator.go @@ -21,7 +21,6 @@ import ( "crypto/x509" "encoding/json" "fmt" - "sync" "github.com/google/go-containerregistry/pkg/authn/k8schain" "github.com/google/go-containerregistry/pkg/name" @@ -237,8 +236,6 @@ func (v *Validator) validatePodSpec(ctx context.Context, ps *corev1.PodSpec, opt // reasonable that the return value is 0, nil since there were no errors, but // the image was not validated against any matching policy and hence authority. func validatePolicies(ctx context.Context, ref name.Reference, policies map[string]webhookcip.ClusterImagePolicy, remoteOpts ...ociremote.Option) (map[string]*PolicyResult, map[string][]error) { - wg := sync.WaitGroup{} - type retChannelType struct { name string policyResult *PolicyResult @@ -258,9 +255,7 @@ func validatePolicies(ctx context.Context, ref name.Reference, policies map[stri cipName := cipName cip := cip logging.FromContext(ctx).Debugf("Checking Policy: %s", cipName) - wg.Add(1) go func() { - defer wg.Done() result := retChannelType{name: cipName} result.policyResult, result.errors = ValidatePolicy(ctx, ref, cip, remoteOpts...) @@ -294,7 +289,7 @@ func validatePolicies(ctx context.Context, ref name.Reference, policies map[stri for i := 0; i < len(policies); i++ { result, ok := <-results if !ok { - break + ret["internalerror"] = append(ret["internalerror"], fmt.Errorf("results channel failed to produce a result")) } switch { case len(result.errors) > 0: @@ -305,8 +300,6 @@ func validatePolicies(ctx context.Context, ref name.Reference, policies map[stri ret[result.name] = append(ret[result.name], fmt.Errorf("failed to process policy: %s", result.name)) } } - - wg.Wait() return policyResults, ret } @@ -316,7 +309,6 @@ func validatePolicies(ctx context.Context, ref name.Reference, policies map[stri // Returns PolicyResult, or errors encountered if none of the authorities // passed. func ValidatePolicy(ctx context.Context, ref name.Reference, cip webhookcip.ClusterImagePolicy, remoteOpts ...ociremote.Option) (*PolicyResult, []error) { - wg := sync.WaitGroup{} // Each gofunc creates and puts one of these into a results channel. // Once each gofunc finishes, we go through the channel and pull out // the results. @@ -331,9 +323,7 @@ func ValidatePolicy(ctx context.Context, ref name.Reference, cip webhookcip.Clus authority := authority // due to gofunc logging.FromContext(ctx).Debugf("Checking Authority: %s", authority.Name) - wg.Add(1) go func() { - defer wg.Done() result := retChannelType{name: authority.Name} // Assignment for appendAssign lint error authorityRemoteOpts := remoteOpts @@ -367,7 +357,7 @@ func ValidatePolicy(ctx context.Context, ref name.Reference, cip webhookcip.Clus for i := 0; i < len(cip.Authorities); i++ { result, ok := <-results if !ok { - break + authorityErrors = append(authorityErrors, fmt.Errorf("results channel failed to produce a result")) } switch { case result.err != nil: @@ -380,8 +370,6 @@ func ValidatePolicy(ctx context.Context, ref name.Reference, cip webhookcip.Clus authorityErrors = append(authorityErrors, fmt.Errorf("failed to process authority: %s", result.name)) } } - wg.Wait() - if len(authorityErrors) > 0 { return nil, authorityErrors }