Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add conditions to PDB status #98127

Merged
merged 2 commits into from Mar 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 13 additions & 0 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/apis/policy/types.go
Expand Up @@ -77,6 +77,10 @@ type PodDisruptionBudgetStatus struct {

// total number of pods counted by this disruption budget
ExpectedPods int32

// Conditions contain conditions for PDB
// +optional
Conditions []metav1.Condition
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs to be validated with a call to metav1validation.ValidateConditions

since this is a new field, it can be validated even for v1beta1 (which means actually connecting v1beta1 status updates to validation specifically for this field)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added validation of the conditions to ValidatePodDisruptionBudgetStatus. This isn't actually used yet, but #99290 addresses this. I will make sure this validation gets run for all versions, while the existing fields will only be validated for v1.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go ahead and wire the validation here... we don't want to merge unvalidated APIs assuming validation will be added in a follow-up before release

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the code for wiring up status validation from #99290 into this PR.

}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/policy/v1beta1/zz_generated.conversion.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 10 additions & 3 deletions pkg/apis/policy/validation/validation.go
Expand Up @@ -23,8 +23,10 @@ import (
"strings"

"k8s.io/api/core/v1"
policyapiv1beta1 "k8s.io/api/policy/v1beta1"
apimachineryvalidation "k8s.io/apimachinery/pkg/api/validation"
unversionedvalidation "k8s.io/apimachinery/pkg/apis/meta/v1/validation"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation/field"
appsvalidation "k8s.io/kubernetes/pkg/apis/apps/validation"
Expand All @@ -40,7 +42,6 @@ import (
// with any errors.
func ValidatePodDisruptionBudget(pdb *policy.PodDisruptionBudget) field.ErrorList {
mortent marked this conversation as resolved.
Show resolved Hide resolved
allErrs := ValidatePodDisruptionBudgetSpec(pdb.Spec, field.NewPath("spec"))
allErrs = append(allErrs, ValidatePodDisruptionBudgetStatus(pdb.Status, field.NewPath("status"))...)
return allErrs
}

Expand Down Expand Up @@ -68,10 +69,16 @@ func ValidatePodDisruptionBudgetSpec(spec policy.PodDisruptionBudgetSpec, fldPat
return allErrs
}

// ValidatePodDisruptionBudgetStatus validates a PodDisruptionBudgetStatus and returns an ErrorList
// ValidatePodDisruptionBudgetStatusUpdate validates a PodDisruptionBudgetStatus and returns an ErrorList
// with any errors.
func ValidatePodDisruptionBudgetStatus(status policy.PodDisruptionBudgetStatus, fldPath *field.Path) field.ErrorList {
func ValidatePodDisruptionBudgetStatusUpdate(status, oldStatus policy.PodDisruptionBudgetStatus, fldPath *field.Path, apiVersion schema.GroupVersion) field.ErrorList {
allErrs := field.ErrorList{}
allErrs = append(allErrs, unversionedvalidation.ValidateConditions(status.Conditions, fldPath.Child("conditions"))...)
// Don't run other validations for v1beta1 since we don't want to introduce
// new validations retroactively.
if apiVersion == policyapiv1beta1.SchemeGroupVersion {
return allErrs
}
allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(status.DisruptionsAllowed), fldPath.Child("disruptionsAllowed"))...)
allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(status.CurrentHealthy), fldPath.Child("currentHealthy"))...)
allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(status.DesiredHealthy), fldPath.Child("desiredHealthy"))...)
Expand Down
165 changes: 146 additions & 19 deletions pkg/apis/policy/validation/validation_test.go
Expand Up @@ -19,10 +19,13 @@ package validation
import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"k8s.io/api/core/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/validation/field"
api "k8s.io/kubernetes/pkg/apis/core"
Expand Down Expand Up @@ -98,26 +101,150 @@ func TestValidateMinAvailablePodAndMaxUnavailableDisruptionBudgetSpec(t *testing
}

func TestValidatePodDisruptionBudgetStatus(t *testing.T) {
successCases := []policy.PodDisruptionBudgetStatus{
{DisruptionsAllowed: 10},
{CurrentHealthy: 5},
{DesiredHealthy: 3},
{ExpectedPods: 2}}
for _, c := range successCases {
errors := ValidatePodDisruptionBudgetStatus(c, field.NewPath("status"))
if len(errors) > 0 {
t.Errorf("unexpected failure %v for %v", errors, c)
}
const expectNoErrors = false
const expectErrors = true
testCases := []struct {
name string
pdbStatus policy.PodDisruptionBudgetStatus
expectErrForVersion map[schema.GroupVersion]bool
}{
{
name: "DisruptionsAllowed: 10",
pdbStatus: policy.PodDisruptionBudgetStatus{
DisruptionsAllowed: 10,
},
expectErrForVersion: map[schema.GroupVersion]bool{
policy.SchemeGroupVersion: expectNoErrors,
policyv1beta1.SchemeGroupVersion: expectNoErrors,
},
},
{
name: "CurrentHealthy: 5",
pdbStatus: policy.PodDisruptionBudgetStatus{
CurrentHealthy: 5,
},
expectErrForVersion: map[schema.GroupVersion]bool{
policy.SchemeGroupVersion: expectNoErrors,
policyv1beta1.SchemeGroupVersion: expectNoErrors,
},
},
{
name: "DesiredHealthy: 3",
pdbStatus: policy.PodDisruptionBudgetStatus{
DesiredHealthy: 3,
},
expectErrForVersion: map[schema.GroupVersion]bool{
policy.SchemeGroupVersion: expectNoErrors,
policyv1beta1.SchemeGroupVersion: expectNoErrors,
},
},
{
name: "ExpectedPods: 2",
pdbStatus: policy.PodDisruptionBudgetStatus{
ExpectedPods: 2,
},
expectErrForVersion: map[schema.GroupVersion]bool{
policy.SchemeGroupVersion: expectNoErrors,
policyv1beta1.SchemeGroupVersion: expectNoErrors,
},
},
{
name: "DisruptionsAllowed: -10",
pdbStatus: policy.PodDisruptionBudgetStatus{
DisruptionsAllowed: -10,
},
expectErrForVersion: map[schema.GroupVersion]bool{
policy.SchemeGroupVersion: expectErrors,
policyv1beta1.SchemeGroupVersion: expectNoErrors,
},
},
{
name: "CurrentHealthy: -5",
pdbStatus: policy.PodDisruptionBudgetStatus{
CurrentHealthy: -5,
},
expectErrForVersion: map[schema.GroupVersion]bool{
policy.SchemeGroupVersion: expectErrors,
policyv1beta1.SchemeGroupVersion: expectNoErrors,
},
},
{
name: "DesiredHealthy: -3",
pdbStatus: policy.PodDisruptionBudgetStatus{
DesiredHealthy: -3,
},
expectErrForVersion: map[schema.GroupVersion]bool{
policy.SchemeGroupVersion: expectErrors,
policyv1beta1.SchemeGroupVersion: expectNoErrors,
},
},
{
name: "ExpectedPods: -2",
pdbStatus: policy.PodDisruptionBudgetStatus{
ExpectedPods: -2,
},
expectErrForVersion: map[schema.GroupVersion]bool{
policy.SchemeGroupVersion: expectErrors,
policyv1beta1.SchemeGroupVersion: expectNoErrors,
},
},
{
name: "Conditions valid",
pdbStatus: policy.PodDisruptionBudgetStatus{
Conditions: []metav1.Condition{
{
Type: policyv1beta1.DisruptionAllowedCondition,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.Time{
Time: time.Now().Add(-5 * time.Minute),
},
Reason: policyv1beta1.SufficientPodsReason,
Message: "message",
ObservedGeneration: 3,
},
},
},
expectErrForVersion: map[schema.GroupVersion]bool{
policy.SchemeGroupVersion: expectNoErrors,
policyv1beta1.SchemeGroupVersion: expectNoErrors,
},
},
{
name: "Conditions not valid",
pdbStatus: policy.PodDisruptionBudgetStatus{
Conditions: []metav1.Condition{
{
Type: policyv1beta1.DisruptionAllowedCondition,
Status: metav1.ConditionTrue,
},
{
Type: policyv1beta1.DisruptionAllowedCondition,
Status: metav1.ConditionFalse,
},
},
},
expectErrForVersion: map[schema.GroupVersion]bool{
policy.SchemeGroupVersion: expectErrors,
policyv1beta1.SchemeGroupVersion: expectErrors,
},
},
}
failureCases := []policy.PodDisruptionBudgetStatus{
{DisruptionsAllowed: -10},
{CurrentHealthy: -5},
{DesiredHealthy: -3},
{ExpectedPods: -2}}
for _, c := range failureCases {
errors := ValidatePodDisruptionBudgetStatus(c, field.NewPath("status"))
if len(errors) == 0 {
t.Errorf("unexpected success for %v", c)

for _, tc := range testCases {
for apiVersion, expectErrors := range tc.expectErrForVersion {
t.Run(fmt.Sprintf("apiVersion: %s, %s", apiVersion.String(), tc.name), func(t *testing.T) {
errors := ValidatePodDisruptionBudgetStatusUpdate(tc.pdbStatus, policy.PodDisruptionBudgetStatus{},
field.NewPath("status"), apiVersion)
errCount := len(errors)

if errCount > 0 && !expectErrors {
t.Errorf("unexpected failure %v for %v", errors, tc.pdbStatus)
}

if errCount == 0 && expectErrors {
t.Errorf("expected errors but didn't one for %v", tc.pdbStatus)
}
})
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/policy/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 22 additions & 4 deletions pkg/controller/disruption/disruption.go
Expand Up @@ -49,6 +49,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
pdbhelper "k8s.io/component-helpers/apps/poddisruptionbudget"
"k8s.io/klog/v2"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller"
Expand All @@ -63,7 +64,9 @@ import (
// If the controller is running on a different node it is important that the two nodes have synced
// clock (via ntp for example). Otherwise PodDisruptionBudget controller may not provide enough
// protection against unwanted pod disruptions.
const DeletionTimeout = 2 * 60 * time.Second
const (
DeletionTimeout = 2 * 60 * time.Second
)

type updater func(*policy.PodDisruptionBudget) error

Expand Down Expand Up @@ -579,7 +582,7 @@ func (dc *DisruptionController) sync(key string) error {
}
if err != nil {
klog.Errorf("Failed to sync pdb %s/%s: %v", pdb.Namespace, pdb.Name, err)
return dc.failSafe(pdb)
return dc.failSafe(pdb, err)
}

return nil
Expand Down Expand Up @@ -774,9 +777,21 @@ func (dc *DisruptionController) buildDisruptedPodMap(pods []*v1.Pod, pdb *policy
// implement the "fail open" part of the design since if we manage to update
// this field correctly, we will prevent the /evict handler from approving an
// eviction when it may be unsafe to do so.
func (dc *DisruptionController) failSafe(pdb *policy.PodDisruptionBudget) error {
func (dc *DisruptionController) failSafe(pdb *policy.PodDisruptionBudget, err error) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to add another condition to indicate we're hitting failSafe rather than just disruptions allowed. We could consume that info here: https://github.com/kubernetes/kubernetes/blob/release-1.20/pkg/registry/core/pod/storage/eviction.go#L208

Otherwise, we might evict pods with DesiredHealthy && CurrentHealthy being stale.

Maybe we can still achieve this by testing Reason == policy.SyncFailedReason if we have a util function, but sync failure seems like it should be it's own status condition rather than embedded in DisruptionsAllowed. WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add a separate condition for this, but I wanted to avoid creating more conditions than necessary. So if we did this, it would be something like a SyncFailed condition? I'm not sure what kind of reasons we would use here, but maybe we could enumerate the "likely" errors we might encounter.

And would conditions be the appropriate way to handle this? The pdb.Status.DisruptionsAllowed would be useful here since it is updated by the eviction api, but I guess it doesn't work here since we don't know if a value of zero means we have the minimum number of pods to satisfy the PDB or if we have fewer than we need. Would it be an option to also decrement the pdb.Status.CurrentHealthy count after eviction?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise, we might evict pods with DesiredHealthy && CurrentHealthy being stale.

is that true? failSafe sets DisruptionsAllowed to 0, which prevents evictions, right?

if SyncFailed=True always implies DisruptionAllowed=False, that seems like a good use of the distinct reason field on a DisruptionAllowed=False condition rather than a separate condition

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, DisruptionsAllowed is an integer, and this is the 'budget' used to determine how many pods are evicted. If we are hoping to explicitly disallow eviction because the PDB has gone stale, we need another field.

I don't have a strong preference on the field name or type, so long as it's for that specific purpose.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same controller that would add the "I am stale because I can't sync" condition is already setting DisruptionsAllowed to 0 when it can't sync... I don't see why another field is required

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because eviction sometimes doesn't consider DisruptionsAllowed if a pod is unready and there is enough CurrentHealthy: https://github.com/kubernetes/kubernetes/blob/release-1.20/pkg/registry/core/pod/storage/eviction.go#L208

We need to know if CurrentHealthy is stale or not, we want to remove unready pods when DisruptionsAllowed==0 && CurrentHealthy >= DesiredHealthy.

actually, I'm not sure I understand why we need to check CurrentHealthy / DesiredHealthy in this case... removing a not-ready pod should always be ok, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do end up needing to determine if the controller is failing to sync, checking for DisruptionsAllowed=False, Reason= SyncFailed could be reasonable, but observers cannot depend on this condition being added/removed reliably until clusters are completely upgraded to v1.21. That means code we release in 1.21 has to tolerate the condition not being added/removed reliably:

  • a multi-server cluster where the active controller-manager is still 1.20 will not have this condition added
  • a multi-server cluster where the leader controller-manager changed to 1.21, added the SyncFailed condition, then changed back to 1.20 will not have this condition removed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the discussion issue here: #99598

The current patch is fine if we don't like manipulating CurrentHealthy. My suggestion is to add a util function to detect this specific status, and ensure that document the intent to others that 'SyncFailed' means you probably should not rely on the PDB status counts as they may be stale.

This is somewhat of an edge case that probably we should have accounted for prior to the eviction API change, but I missed it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replied in #99598

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressing this in a separate PR sounds good to me. I don't think this change makes the issue worse, and gives us a potential tool to address the issue in a follow up

newPdb := pdb.DeepCopy()
newPdb.Status.DisruptionsAllowed = 0

if newPdb.Status.Conditions == nil {
newPdb.Status.Conditions = make([]metav1.Condition, 0)
}
apimeta.SetStatusCondition(&newPdb.Status.Conditions, metav1.Condition{
Type: policy.DisruptionAllowedCondition,
Status: metav1.ConditionFalse,
Reason: policy.SyncFailedReason,
Message: err.Error(),
ObservedGeneration: newPdb.Status.ObservedGeneration,
})

return dc.getUpdater()(newPdb)
}

Expand All @@ -797,7 +812,8 @@ func (dc *DisruptionController) updatePdbStatus(pdb *policy.PodDisruptionBudget,
pdb.Status.ExpectedPods == expectedCount &&
pdb.Status.DisruptionsAllowed == disruptionsAllowed &&
apiequality.Semantic.DeepEqual(pdb.Status.DisruptedPods, disruptedPods) &&
pdb.Status.ObservedGeneration == pdb.Generation {
pdb.Status.ObservedGeneration == pdb.Generation &&
pdbhelper.ConditionsAreUpToDate(pdb) {
return nil
}

Expand All @@ -811,6 +827,8 @@ func (dc *DisruptionController) updatePdbStatus(pdb *policy.PodDisruptionBudget,
ObservedGeneration: pdb.Generation,
}

pdbhelper.UpdateDisruptionAllowedCondition(newPdb)

return dc.getUpdater()(newPdb)
}

Expand Down
19 changes: 19 additions & 0 deletions pkg/controller/disruption/disruption_test.go
Expand Up @@ -32,6 +32,7 @@ import (
policy "k8s.io/api/policy/v1beta1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/meta/testrestmapper"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -73,6 +74,8 @@ func (ps *pdbStates) Get(key string) policy.PodDisruptionBudget {
func (ps *pdbStates) VerifyPdbStatus(t *testing.T, key string, disruptionsAllowed, currentHealthy, desiredHealthy, expectedPods int32,
disruptedPodMap map[string]metav1.Time) {
actualPDB := ps.Get(key)
actualConditions := actualPDB.Status.Conditions
actualPDB.Status.Conditions = nil
expectedStatus := policy.PodDisruptionBudgetStatus{
DisruptionsAllowed: disruptionsAllowed,
CurrentHealthy: currentHealthy,
Expand All @@ -86,6 +89,22 @@ func (ps *pdbStates) VerifyPdbStatus(t *testing.T, key string, disruptionsAllowe
debug.PrintStack()
t.Fatalf("PDB %q status mismatch. Expected %+v but got %+v.", key, expectedStatus, actualStatus)
}

cond := apimeta.FindStatusCondition(actualConditions, policy.DisruptionAllowedCondition)
if cond == nil {
t.Fatalf("Expected condition %q, but didn't find it", policy.DisruptionAllowedCondition)
}
if disruptionsAllowed > 0 {
if cond.Status != metav1.ConditionTrue {
t.Fatalf("Expected condition %q to have status %q, but was %q",
policy.DisruptionAllowedCondition, metav1.ConditionTrue, cond.Status)
}
} else {
if cond.Status != metav1.ConditionFalse {
t.Fatalf("Expected condition %q to have status %q, but was %q",
policy.DisruptionAllowedCondition, metav1.ConditionFalse, cond.Status)
}
}
}

func (ps *pdbStates) VerifyDisruptionAllowed(t *testing.T, key string, disruptionsAllowed int32) {
Expand Down