Skip to content

Commit

Permalink
statefulsets: MinReadySeconds implementation
Browse files Browse the repository at this point in the history
kubernetes#100842
introduced featuregate. This PR implements the logic
behind it.
  • Loading branch information
ravisantoshgudimetla committed Jun 15, 2021
1 parent bc8acbc commit ceb1dbd
Show file tree
Hide file tree
Showing 6 changed files with 364 additions and 49 deletions.
33 changes: 31 additions & 2 deletions pkg/controller/statefulset/stateful_set.go
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
appsinformers "k8s.io/client-go/informers/apps/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
Expand All @@ -39,8 +40,10 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/history"
"k8s.io/kubernetes/pkg/features"

"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -85,7 +88,6 @@ func NewStatefulSetController(
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "statefulset-controller"})

ssc := &StatefulSetController{
kubeClient: kubeClient,
control: NewDefaultStatefulSetControl(
Expand Down Expand Up @@ -221,6 +223,15 @@ func (ssc *StatefulSetController) updatePod(old, cur interface{}) {
}
klog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
ssc.enqueueStatefulSet(set)
// TODO: MinReadySeconds in the Pod will generate an Available condition to be added in
// the Pod status which in turn will trigger a requeue of the owning replica set thus
// having its status updated with the newly available replica.
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) && !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && set.Spec.MinReadySeconds > 0 {
klog.V(2).Infof("StatefulSet %s will be enqueued after %ds for availability check", set.Name, set.Spec.MinReadySeconds)
// Add a second to avoid milliseconds skew in AddAfter.
// See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info.
ssc.enqueueSSAfter(set, (time.Duration(set.Spec.MinReadySeconds)*time.Second)+time.Second)
}
return
}

Expand Down Expand Up @@ -380,6 +391,16 @@ func (ssc *StatefulSetController) enqueueStatefulSet(obj interface{}) {
ssc.queue.Add(key)
}

// enqueueStatefulSet enqueues the given statefulset in the work queue after given time
func (ssc *StatefulSetController) enqueueSSAfter(ss *apps.StatefulSet, duration time.Duration) {
key, err := controller.KeyFunc(ss)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", ss, err))
return
}
ssc.queue.AddAfter(key, duration)
}

// processNextWorkItem dequeues items, processes them, and marks them done. It enforces that the syncHandler is never
// invoked concurrently with the same key.
func (ssc *StatefulSetController) processNextWorkItem() bool {
Expand Down Expand Up @@ -446,10 +467,18 @@ func (ssc *StatefulSetController) sync(key string) error {
// syncStatefulSet syncs a tuple of (statefulset, []*v1.Pod).
func (ssc *StatefulSetController) syncStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error {
klog.V(4).Infof("Syncing StatefulSet %v/%v with %d pods", set.Namespace, set.Name, len(pods))
var status *apps.StatefulSetStatus
var err error
// TODO: investigate where we mutate the set during the update as it is not obvious.
if err := ssc.control.UpdateStatefulSet(set.DeepCopy(), pods); err != nil {
status, err = ssc.control.UpdateStatefulSet(set.DeepCopy(), pods)
if err != nil {
return err
}
klog.V(4).Infof("Successfully synced StatefulSet %s/%s successful", set.Namespace, set.Name)
// One more sync to handle the clock skew. This is also helping in requeuing right after status update
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) && set.Spec.MinReadySeconds > 0 && status != nil && status.AvailableReplicas != *set.Spec.Replicas {
ssc.enqueueSSAfter(set, time.Duration(set.Spec.MinReadySeconds)*time.Second)
}

return nil
}
81 changes: 56 additions & 25 deletions pkg/controller/statefulset/stateful_set_control.go
Expand Up @@ -23,9 +23,11 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller/history"
"k8s.io/kubernetes/pkg/features"
)

// StatefulSetControl implements the control logic for updating StatefulSets and their children Pods. It is implemented
Expand All @@ -36,7 +38,7 @@ type StatefulSetControlInterface interface {
// If an implementation returns a non-nil error, the invocation will be retried using a rate-limited strategy.
// Implementors should sink any errors that they do not wish to trigger a retry, and they may feel free to
// exit exceptionally at any point provided they wish the update to be re-run at a later point in time.
UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error
UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error)
// ListRevisions returns a array of the ControllerRevisions that represent the revisions of set. If the returned
// error is nil, the returns slice of ControllerRevisions is valid.
ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error)
Expand Down Expand Up @@ -71,60 +73,57 @@ type defaultStatefulSetControl struct {
// strategy allows these constraints to be relaxed - pods will be created and deleted eagerly and
// in no particular order. Clients using the burst strategy should be careful to ensure they
// understand the consistency implications of having unpredictable numbers of pods available.
func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error {

func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
// list all revisions and sort them
revisions, err := ssc.ListRevisions(set)
if err != nil {
return err
return nil, err
}
history.SortControllerRevisions(revisions)

currentRevision, updateRevision, err := ssc.performUpdate(set, pods, revisions)
currentRevision, updateRevision, status, err := ssc.performUpdate(set, pods, revisions)
if err != nil {
return utilerrors.NewAggregate([]error{err, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)})
return nil, utilerrors.NewAggregate([]error{err, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)})
}

// maintain the set's revision history limit
return ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)
return status, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)
}

func (ssc *defaultStatefulSetControl) performUpdate(
set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, error) {

set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, *apps.StatefulSetStatus, error) {
var currentStatus *apps.StatefulSetStatus
// get the current, and update revisions
currentRevision, updateRevision, collisionCount, err := ssc.getStatefulSetRevisions(set, revisions)
if err != nil {
return currentRevision, updateRevision, err
return currentRevision, updateRevision, currentStatus, err
}

// perform the main update function and get the status
status, err := ssc.updateStatefulSet(set, currentRevision, updateRevision, collisionCount, pods)
currentStatus, err = ssc.updateStatefulSet(set, currentRevision, updateRevision, collisionCount, pods)
if err != nil {
return currentRevision, updateRevision, err
return currentRevision, updateRevision, currentStatus, err
}

// update the set's status
err = ssc.updateStatefulSetStatus(set, status)
err = ssc.updateStatefulSetStatus(set, currentStatus)
if err != nil {
return currentRevision, updateRevision, err
return currentRevision, updateRevision, currentStatus, err
}

klog.V(4).Infof("StatefulSet %s/%s pod status replicas=%d ready=%d current=%d updated=%d",
set.Namespace,
set.Name,
status.Replicas,
status.ReadyReplicas,
status.CurrentReplicas,
status.UpdatedReplicas)
currentStatus.Replicas,
currentStatus.ReadyReplicas,
currentStatus.CurrentReplicas,
currentStatus.UpdatedReplicas)

klog.V(4).Infof("StatefulSet %s/%s revisions current=%s update=%s",
set.Namespace,
set.Name,
status.CurrentRevision,
status.UpdateRevision)
currentStatus.CurrentRevision,
currentStatus.UpdateRevision)

return currentRevision, updateRevision, nil
return currentRevision, updateRevision, currentStatus, nil
}

func (ssc *defaultStatefulSetControl) ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error) {
Expand Down Expand Up @@ -307,6 +306,15 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
// count the number of running and ready replicas
if isRunningAndReady(pods[i]) {
status.ReadyReplicas++
// count the number of running and available replicas
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) {
if isRunningAndAvailable(pods[i], set.Spec.MinReadySeconds) {
status.AvailableReplicas++
}
} else {
// If the featuregate is not enabled, all the ready replicas should be considered as available replicas
status.AvailableReplicas = status.ReadyReplicas
}
}

// count the number of current and update replicas
Expand Down Expand Up @@ -447,6 +455,19 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
replicas[i].Name)
return &status, nil
}
// If we have a Pod that has been created but is not available we can not make progress.
// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
// ordinal, are Available.
// TODO: Since available is superset of Ready, once we have this featuregate enabled by default, we can remove the
// isRunningAndReady block as only Available pods should be brought down.
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) && !isRunningAndAvailable(replicas[i], set.Spec.MinReadySeconds) && monotonic {
klog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to be Available",
set.Namespace,
set.Name,
replicas[i].Name)
return &status, nil
}
// Enforce the StatefulSet invariants
if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) {
continue
Expand All @@ -458,7 +479,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
}
}

// At this point, all of the current Replicas are Running and Ready, we can consider termination.
// At this point, all of the current Replicas are Running, Ready and Available, we can consider termination.
// We will wait for all predecessors to be Running and Ready prior to attempting a deletion.
// We will terminate Pods in a monotonically decreasing order over [len(pods),set.Spec.Replicas).
// Note that we do not resurrect Pods in this interval. Also note that scaling will take precedence over
Expand Down Expand Up @@ -486,6 +507,17 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
firstUnhealthyPod.Name)
return &status, nil
}
// if we are in monotonic mode and the condemned target is not the first unhealthy Pod, block.
// TODO: Since available is superset of Ready, once we have this featuregate enabled by default, we can remove the
// isRunningAndReady block as only Available pods should be brought down.
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) && !isRunningAndAvailable(condemned[target], set.Spec.MinReadySeconds) && monotonic && condemned[target] != firstUnhealthyPod {
klog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to be Available prior to scale down",
set.Namespace,
set.Name,
firstUnhealthyPod.Name)
return &status, nil
}
klog.V(2).Infof("StatefulSet %s/%s terminating Pod %s for scale down",
set.Namespace,
set.Name,
Expand Down Expand Up @@ -549,7 +581,6 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
func (ssc *defaultStatefulSetControl) updateStatefulSetStatus(
set *apps.StatefulSet,
status *apps.StatefulSetStatus) error {

// complete any in progress rolling update if necessary
completeRollingUpdate(set, status)

Expand Down

0 comments on commit ceb1dbd

Please sign in to comment.