Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parallelization for processing policies / authorities. #1795

Merged
merged 2 commits into from Apr 24, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
151 changes: 103 additions & 48 deletions pkg/cosign/kubernetes/webhook/validator.go
Expand Up @@ -236,11 +236,13 @@ func (v *Validator) validatePodSpec(ctx context.Context, ps *corev1.PodSpec, opt
// reasonable that the return value is 0, nil since there were no errors, but
// the image was not validated against any matching policy and hence authority.
func validatePolicies(ctx context.Context, ref name.Reference, policies map[string]webhookcip.ClusterImagePolicy, remoteOpts ...ociremote.Option) (map[string]*PolicyResult, map[string][]error) {
// Gather all validated policies here.
policyResults := make(map[string]*PolicyResult)
// For a policy that does not pass at least one authority, gather errors
// here so that we can give meaningful errors to the user.
ret := map[string][]error{}
type retChannelType struct {
name string
policyResult *PolicyResult
errors []error
}
results := make(chan retChannelType, len(policies))

// For each matching policy it must validate at least one Authority within
// it.
// From the Design document, the part about multiple Policies matching:
Expand All @@ -249,31 +251,53 @@ func validatePolicies(ctx context.Context, ref name.Reference, policies map[stri
// If none of the Authorities for a given policy pass the checks, gather
// the errors here. If one passes, do not return the errors.
for cipName, cip := range policies {
// Due to running in gofunc
cipName := cipName
cip := cip
logging.FromContext(ctx).Debugf("Checking Policy: %s", cipName)
policyResult, errs := ValidatePolicy(ctx, ref, cip, remoteOpts...)
if len(errs) > 0 {
ret[cipName] = append(ret[cipName], errs...)
} else {
// Ok, at least one Authority on the policy passed. If there's a CIP level
// policy, apply it against the results of the successful Authorities
// outputs.
if cip.Policy != nil {
logging.FromContext(ctx).Infof("Validating CIP level policy for %s", cipName)
policyJSON, err := json.Marshal(policyResult)
if err != nil {
ret[cipName] = append(ret[cipName], errors.Wrap(err, "marshaling policyresult"))
} else {
logging.FromContext(ctx).Infof("Validating CIP level policy against %s", string(policyJSON))
err = policy.EvaluatePolicyAgainstJSON(ctx, "ClusterImagePolicy", cip.Policy.Type, cip.Policy.Data, policyJSON)
go func() {
result := retChannelType{name: cipName}

result.policyResult, result.errors = ValidatePolicy(ctx, ref, cip, remoteOpts...)
if len(result.errors) == 0 {
// Ok, at least one Authority on the policy passed. If there's a CIP level
// policy, apply it against the results of the successful Authorities
// outputs.
if cip.Policy != nil {
logging.FromContext(ctx).Infof("Validating CIP level policy for %s", cipName)
policyJSON, err := json.Marshal(result.policyResult)
if err != nil {
ret[cipName] = append(ret[cipName], err)
results <- result
} else {
policyResults[cipName] = policyResult
logging.FromContext(ctx).Infof("Validating CIP level policy against %s", string(policyJSON))
err = policy.EvaluatePolicyAgainstJSON(ctx, "ClusterImagePolicy", cip.Policy.Type, cip.Policy.Data, policyJSON)
if err != nil {
result.errors = append(result.errors, err)
}
}
}
} else {
policyResults[cipName] = policyResult
}
results <- result
}()
}
// Gather all validated policies here.
policyResults := make(map[string]*PolicyResult)
// For a policy that does not pass at least one authority, gather errors
// here so that we can give meaningful errors to the user.
ret := map[string][]error{}

for i := 0; i < len(policies); i++ {
result, ok := <-results
if !ok {
ret["internalerror"] = append(ret["internalerror"], fmt.Errorf("results channel failed to produce a result"))
}
switch {
case len(result.errors) > 0:
ret[result.name] = append(ret[result.name], result.errors...)
case len(result.policyResult.AuthorityMatches) > 0:
policyResults[result.name] = result.policyResult
default:
ret[result.name] = append(ret[result.name], fmt.Errorf("failed to process policy: %s", result.name))
}
}
return policyResults, ret
Expand All @@ -285,40 +309,71 @@ func validatePolicies(ctx context.Context, ref name.Reference, policies map[stri
// Returns PolicyResult, or errors encountered if none of the authorities
// passed.
func ValidatePolicy(ctx context.Context, ref name.Reference, cip webhookcip.ClusterImagePolicy, remoteOpts ...ociremote.Option) (*PolicyResult, []error) {
// If none of the Authorities for a given policy pass the checks, gather
// the errors here. If one passes, do not return the errors.
authorityErrors := []error{}
// We collect all the successfully satisfied Authorities into this and
// return it.
policyResult := PolicyResult{AuthorityMatches: make(map[string]AuthorityMatch)}
// Each gofunc creates and puts one of these into a results channel.
// Once each gofunc finishes, we go through the channel and pull out
// the results.
type retChannelType struct {
name string
attestations map[string][]PolicySignature
signatures []PolicySignature
err error
}
results := make(chan retChannelType, len(cip.Authorities))
for _, authority := range cip.Authorities {
authority := authority // due to gofunc
logging.FromContext(ctx).Debugf("Checking Authority: %s", authority.Name)
// Assignment for appendAssign lint error
authorityRemoteOpts := remoteOpts
authorityRemoteOpts = append(authorityRemoteOpts, authority.RemoteOpts...)

if len(authority.Attestations) > 0 {
// We're doing the verify-attestations path, so validate (.att)
validatedAttestations, err := ValidatePolicyAttestationsForAuthority(ctx, ref, authority, authorityRemoteOpts...)
if err != nil {
authorityErrors = append(authorityErrors, err)
} else {
policyResult.AuthorityMatches[authority.Name] = AuthorityMatch{Attestations: validatedAttestations}
}
} else {
// We're doing the verify path, so validate image signatures (.sig)
validatedSignatures, err := ValidatePolicySignaturesForAuthority(ctx, ref, authority, authorityRemoteOpts...)
if err != nil {
authorityErrors = append(authorityErrors, err)
go func() {
result := retChannelType{name: authority.Name}
// Assignment for appendAssign lint error
authorityRemoteOpts := remoteOpts
authorityRemoteOpts = append(authorityRemoteOpts, authority.RemoteOpts...)

if len(authority.Attestations) > 0 {
// We're doing the verify-attestations path, so validate (.att)
validatedAttestations, err := ValidatePolicyAttestationsForAuthority(ctx, ref, authority, authorityRemoteOpts...)
if err != nil {
result.err = err
} else {
result.attestations = validatedAttestations
}
} else {
policyResult.AuthorityMatches[authority.Name] = AuthorityMatch{Signatures: validatedSignatures}
validatedSignatures, err := ValidatePolicySignaturesForAuthority(ctx, ref, authority, authorityRemoteOpts...)
if err != nil {
result.err = err
} else {
result.signatures = validatedSignatures
}
}
results <- result
}()
}
// If none of the Authorities for a given policy pass the checks, gather
// the errors here. If one passes, do not return the errors.
authorityErrors := []error{}
// We collect all the successfully satisfied Authorities into this and
// return it.
policyResult := &PolicyResult{AuthorityMatches: make(map[string]AuthorityMatch)}
for i := 0; i < len(cip.Authorities); i++ {
result, ok := <-results
if !ok {
authorityErrors = append(authorityErrors, fmt.Errorf("results channel failed to produce a result"))
}
switch {
case result.err != nil:
authorityErrors = append(authorityErrors, result.err)
case len(result.signatures) > 0:
policyResult.AuthorityMatches[result.name] = AuthorityMatch{Signatures: result.signatures}
case len(result.attestations) > 0:
policyResult.AuthorityMatches[result.name] = AuthorityMatch{Attestations: result.attestations}
default:
authorityErrors = append(authorityErrors, fmt.Errorf("failed to process authority: %s", result.name))
}
}
if len(authorityErrors) > 0 {
return nil, authorityErrors
}
return &policyResult, authorityErrors
return policyResult, authorityErrors
}

func ociSignatureToPolicySignature(ctx context.Context, sigs []oci.Signature) []PolicySignature {
Expand Down