Skip to content

Commit

Permalink
Merge pull request #111475 from alculquicondor/clear_pod_disruption
Browse files Browse the repository at this point in the history
Add worker to clean up stale DisruptionTarget condition
  • Loading branch information
k8s-ci-robot committed Aug 2, 2022
2 parents d40bc18 + 4188d9b commit bc4c493
Show file tree
Hide file tree
Showing 4 changed files with 446 additions and 44 deletions.
199 changes: 173 additions & 26 deletions pkg/controller/disruption/disruption.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,30 @@ import (
"k8s.io/client-go/util/workqueue"
pdbhelper "k8s.io/component-helpers/apps/poddisruptionbudget"
"k8s.io/klog/v2"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
apipod "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller"
utilpod "k8s.io/kubernetes/pkg/util/pod"
"k8s.io/utils/clock"
)

// DeletionTimeout sets maximum time from the moment a pod is added to DisruptedPods in PDB.Status
// to the time when the pod is expected to be seen by PDB controller as having been marked for deletion.
// If the pod was not marked for deletion during that time it is assumed that it won't be deleted at
// all and the corresponding entry can be removed from pdb.Status.DisruptedPods. It is assumed that
// pod/pdb apiserver to controller latency is relatively small (like 1-2sec) so the below value should
// be more than enough.
// If the controller is running on a different node it is important that the two nodes have synced
// clock (via ntp for example). Otherwise PodDisruptionBudget controller may not provide enough
// protection against unwanted pod disruptions.
const (
DeletionTimeout = 2 * 60 * time.Second
// DeletionTimeout sets maximum time from the moment a pod is added to DisruptedPods in PDB.Status
// to the time when the pod is expected to be seen by PDB controller as having been marked for deletion.
// If the pod was not marked for deletion during that time it is assumed that it won't be deleted at
// all and the corresponding entry can be removed from pdb.Status.DisruptedPods. It is assumed that
// pod/pdb apiserver to controller latency is relatively small (like 1-2sec) so the below value should
// be more than enough.
// If the controller is running on a different node it is important that the two nodes have synced
// clock (via ntp for example). Otherwise PodDisruptionBudget controller may not provide enough
// protection against unwanted pod disruptions.
DeletionTimeout = 2 * time.Minute

// stalePodDisruptionTimeout sets the maximum time a pod can have a stale
// DisruptionTarget condition (the condition is present, but the Pod doesn't
// have a DeletionTimestamp).
// Once the timeout is reached, this controller attempts to set the status
// of the condition to False.
stalePodDisruptionTimeout = 2 * time.Minute
)

type updater func(context.Context, *policy.PodDisruptionBudget) error
Expand Down Expand Up @@ -99,10 +108,16 @@ type DisruptionController struct {
queue workqueue.RateLimitingInterface
recheckQueue workqueue.DelayingInterface

// pod keys that need to be synced due to a stale DisruptionTarget condition.
stalePodDisruptionQueue workqueue.RateLimitingInterface
stalePodDisruptionTimeout time.Duration

broadcaster record.EventBroadcaster
recorder record.EventRecorder

getUpdater func() updater

clock clock.Clock
}

// controllerAndScale is used to return (controller, scale) pairs from the
Expand All @@ -127,12 +142,46 @@ func NewDisruptionController(
restMapper apimeta.RESTMapper,
scaleNamespacer scaleclient.ScalesGetter,
discoveryClient discovery.DiscoveryInterface,
) *DisruptionController {
return NewDisruptionControllerInternal(
podInformer,
pdbInformer,
rcInformer,
rsInformer,
dInformer,
ssInformer,
kubeClient,
restMapper,
scaleNamespacer,
discoveryClient,
clock.RealClock{},
stalePodDisruptionTimeout)
}

// NewDisruptionControllerInternal allows to set a clock and
// stalePodDisruptionTimeout
// It is only supposed to be used by tests.
func NewDisruptionControllerInternal(
podInformer coreinformers.PodInformer,
pdbInformer policyinformers.PodDisruptionBudgetInformer,
rcInformer coreinformers.ReplicationControllerInformer,
rsInformer appsv1informers.ReplicaSetInformer,
dInformer appsv1informers.DeploymentInformer,
ssInformer appsv1informers.StatefulSetInformer,
kubeClient clientset.Interface,
restMapper apimeta.RESTMapper,
scaleNamespacer scaleclient.ScalesGetter,
discoveryClient discovery.DiscoveryInterface,
clock clock.WithTicker,
stalePodDisruptionTimeout time.Duration,
) *DisruptionController {
dc := &DisruptionController{
kubeClient: kubeClient,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "disruption"),
recheckQueue: workqueue.NewNamedDelayingQueue("disruption_recheck"),
broadcaster: record.NewBroadcaster(),
kubeClient: kubeClient,
queue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "disruption"), workqueue.DefaultControllerRateLimiter()),
recheckQueue: workqueue.NewDelayingQueueWithCustomClock(clock, "disruption_recheck"),
stalePodDisruptionQueue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "stale_pod_disruption"), workqueue.DefaultControllerRateLimiter()),
broadcaster: record.NewBroadcaster(),
stalePodDisruptionTimeout: stalePodDisruptionTimeout,
}
dc.recorder = dc.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "controllermanager"})

Expand Down Expand Up @@ -172,6 +221,8 @@ func NewDisruptionController(
dc.scaleNamespacer = scaleNamespacer
dc.discoveryClient = discoveryClient

dc.clock = clock

return dc
}

Expand Down Expand Up @@ -376,6 +427,7 @@ func (dc *DisruptionController) Run(ctx context.Context) {

defer dc.queue.ShutDown()
defer dc.recheckQueue.ShutDown()
defer dc.stalePodDisruptionQueue.ShutDown()

klog.Infof("Starting disruption controller")
defer klog.Infof("Shutting down disruption controller")
Expand All @@ -386,6 +438,7 @@ func (dc *DisruptionController) Run(ctx context.Context) {

go wait.UntilWithContext(ctx, dc.worker, time.Second)
go wait.Until(dc.recheckWorker, time.Second, ctx.Done())
go wait.UntilWithContext(ctx, dc.stalePodDisruptionWorker, time.Second)

<-ctx.Done()
}
Expand Down Expand Up @@ -427,22 +480,28 @@ func (dc *DisruptionController) addPod(obj interface{}) {
pdb := dc.getPdbForPod(pod)
if pdb == nil {
klog.V(4).Infof("No matching pdb for pod %q", pod.Name)
return
} else {
klog.V(4).Infof("addPod %q -> PDB %q", pod.Name, pdb.Name)
dc.enqueuePdb(pdb)
}
if has, cleanAfter := dc.nonTerminatingPodHasStaleDisruptionCondition(pod); has {
dc.enqueueStalePodDisruptionCleanup(pod, cleanAfter)
}
klog.V(4).Infof("addPod %q -> PDB %q", pod.Name, pdb.Name)
dc.enqueuePdb(pdb)
}

func (dc *DisruptionController) updatePod(old, cur interface{}) {
func (dc *DisruptionController) updatePod(_, cur interface{}) {
pod := cur.(*v1.Pod)
klog.V(4).Infof("updatePod called on pod %q", pod.Name)
pdb := dc.getPdbForPod(pod)
if pdb == nil {
klog.V(4).Infof("No matching pdb for pod %q", pod.Name)
return
} else {
klog.V(4).Infof("updatePod %q -> PDB %q", pod.Name, pdb.Name)
dc.enqueuePdb(pdb)
}
if has, cleanAfter := dc.nonTerminatingPodHasStaleDisruptionCondition(pod); has {
dc.enqueueStalePodDisruptionCleanup(pod, cleanAfter)
}
klog.V(4).Infof("updatePod %q -> PDB %q", pod.Name, pdb.Name)
dc.enqueuePdb(pdb)
}

func (dc *DisruptionController) deletePod(obj interface{}) {
Expand Down Expand Up @@ -492,6 +551,16 @@ func (dc *DisruptionController) enqueuePdbForRecheck(pdb *policy.PodDisruptionBu
dc.recheckQueue.AddAfter(key, delay)
}

func (dc *DisruptionController) enqueueStalePodDisruptionCleanup(pod *v1.Pod, d time.Duration) {
key, err := controller.KeyFunc(pod)
if err != nil {
klog.ErrorS(err, "Couldn't get key for Pod object", "pod", klog.KObj(pod))
return
}
dc.stalePodDisruptionQueue.AddAfter(key, d)
klog.V(4).InfoS("Enqueued pod to cleanup stale DisruptionTarget condition", "pod", klog.KObj(pod))
}

func (dc *DisruptionController) getPdbForPod(pod *v1.Pod) *policy.PodDisruptionBudget {
// GetPodPodDisruptionBudgets returns an error only if no
// PodDisruptionBudgets are found. We don't return that as an error to the
Expand Down Expand Up @@ -563,10 +632,31 @@ func (dc *DisruptionController) processNextRecheckWorkItem() bool {
return true
}

func (dc *DisruptionController) stalePodDisruptionWorker(ctx context.Context) {
for dc.processNextStalePodDisruptionWorkItem(ctx) {
}
}

func (dc *DisruptionController) processNextStalePodDisruptionWorkItem(ctx context.Context) bool {
key, quit := dc.stalePodDisruptionQueue.Get()
if quit {
return false
}
defer dc.stalePodDisruptionQueue.Done(key)
err := dc.syncStalePodDisruption(ctx, key.(string))
if err == nil {
dc.queue.Forget(key)
return true
}
utilruntime.HandleError(fmt.Errorf("error syncing Pod %v to clear DisruptionTarget condition, requeueing: %v", key.(string), err))
dc.stalePodDisruptionQueue.AddRateLimited(key)
return true
}

func (dc *DisruptionController) sync(ctx context.Context, key string) error {
startTime := time.Now()
startTime := dc.clock.Now()
defer func() {
klog.V(4).Infof("Finished syncing PodDisruptionBudget %q (%v)", key, time.Since(startTime))
klog.V(4).Infof("Finished syncing PodDisruptionBudget %q (%v)", key, dc.clock.Since(startTime))
}()

namespace, name, err := cache.SplitMetaNamespaceKey(key)
Expand Down Expand Up @@ -617,7 +707,7 @@ func (dc *DisruptionController) trySync(ctx context.Context, pdb *policy.PodDisr
strings.Join(unmanagedPods, ",'"))
}

currentTime := time.Now()
currentTime := dc.clock.Now()
disruptedPods, recheckTime := dc.buildDisruptedPodMap(pods, pdb, currentTime)
currentHealthy := countHealthyPods(pods, disruptedPods, currentTime)
err = dc.updatePdbStatus(ctx, pdb, currentHealthy, desiredHealthy, expectedCount, disruptedPods)
Expand All @@ -631,6 +721,48 @@ func (dc *DisruptionController) trySync(ctx context.Context, pdb *policy.PodDisr
return err
}

func (dc *DisruptionController) syncStalePodDisruption(ctx context.Context, key string) error {
startTime := dc.clock.Now()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
defer func() {
klog.V(4).InfoS("Finished syncing Pod to clear DisruptionTarget condition", "pod", klog.KRef(namespace, name), "duration", dc.clock.Since(startTime))
}()
pod, err := dc.podLister.Pods(namespace).Get(name)
if errors.IsNotFound(err) {
klog.V(4).InfoS("Skipping clearing DisruptionTarget condition because pod was deleted", "pod", klog.KObj(pod))
return nil
}
if err != nil {
return err
}

hasCond, cleanAfter := dc.nonTerminatingPodHasStaleDisruptionCondition(pod)
if !hasCond {
return nil
}
if cleanAfter > 0 {
dc.enqueueStalePodDisruptionCleanup(pod, cleanAfter)
return nil
}

newStatus := pod.Status.DeepCopy()
updated := apipod.UpdatePodCondition(newStatus, &v1.PodCondition{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionFalse,
})
if !updated {
return nil
}
if _, _, _, err := utilpod.PatchPodStatus(ctx, dc.kubeClient, namespace, name, pod.UID, pod.Status, *newStatus); err != nil {
return err
}
klog.V(2).InfoS("Reset stale DisruptionTarget condition to False", "pod", klog.KObj(pod))
return nil
}

func (dc *DisruptionController) getExpectedPodCount(ctx context.Context, pdb *policy.PodDisruptionBudget, pods []*v1.Pod) (expectedCount, desiredHealthy int32, unmanagedPods []string, err error) {
err = nil
// TODO(davidopp): consider making the way expectedCount and rules about
Expand Down Expand Up @@ -747,7 +879,7 @@ func countHealthyPods(pods []*v1.Pod, disruptedPods map[string]metav1.Time, curr
if disruptionTime, found := disruptedPods[pod.Name]; found && disruptionTime.Time.Add(DeletionTimeout).After(currentTime) {
continue
}
if podutil.IsPodReady(pod) {
if apipod.IsPodReady(pod) {
currentHealthy++
}
}
Expand Down Expand Up @@ -857,3 +989,18 @@ func (dc *DisruptionController) writePdbStatus(ctx context.Context, pdb *policy.
_, err := dc.kubeClient.PolicyV1().PodDisruptionBudgets(pdb.Namespace).UpdateStatus(ctx, pdb, metav1.UpdateOptions{})
return err
}

func (dc *DisruptionController) nonTerminatingPodHasStaleDisruptionCondition(pod *v1.Pod) (bool, time.Duration) {
if pod.DeletionTimestamp != nil {
return false, 0
}
_, cond := apipod.GetPodCondition(&pod.Status, v1.AlphaNoCompatGuaranteeDisruptionTarget)
if cond == nil || cond.Status != v1.ConditionTrue {
return false, 0
}
waitFor := dc.stalePodDisruptionTimeout - dc.clock.Since(cond.LastTransitionTime.Time)
if waitFor < 0 {
waitFor = 0
}
return true, waitFor
}

0 comments on commit bc4c493

Please sign in to comment.