Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

job controller: set job conditions to false explicitly #100701

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
120 changes: 78 additions & 42 deletions pkg/controller/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ func (jm *Controller) syncJob(key string) (bool, error) {
}
jobConditionsChanged := false
manageJobCalled := false
var isUpdated bool
if jobFailed {
// TODO(#28486): Account for pod failures in status once we can track
// completions without lingering pods.
Expand All @@ -543,10 +544,12 @@ func (jm *Controller) syncJob(key string) (bool, error) {
// update status values accordingly
failed += active
active = 0
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, v1.ConditionTrue, failureReason, failureMessage))
jobConditionsChanged = true
job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobFailed, v1.ConditionTrue, failureReason, failureMessage)
jobConditionsChanged = jobConditionsChanged || isUpdated
jm.recorder.Event(&job, v1.EventTypeWarning, failureReason, failureMessage)
} else {
job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobFailed, v1.ConditionFalse, "", "")
jobConditionsChanged = jobConditionsChanged || isUpdated
if jobNeedsSync && job.DeletionTimestamp == nil {
active, manageJobErr = jm.manageJob(&job, activePods, succeeded, pods)
manageJobCalled = true
Expand Down Expand Up @@ -579,39 +582,50 @@ func (jm *Controller) syncJob(key string) (bool, error) {
}
}
if complete {
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, v1.ConditionTrue, "", ""))
jobConditionsChanged = true
job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobComplete, v1.ConditionTrue, "Job completed", "Job completed")
jobConditionsChanged = jobConditionsChanged || isUpdated
now := metav1.Now()
job.Status.CompletionTime = &now
jm.recorder.Event(&job, v1.EventTypeNormal, "Completed", "Job completed")
} else if utilfeature.DefaultFeatureGate.Enabled(features.SuspendJob) && manageJobCalled {
// Update the conditions / emit events only if manageJob was called in
// this syncJob. Otherwise wait for the right syncJob call to make
// updates.
if job.Spec.Suspend != nil && *job.Spec.Suspend {
// Job can be in the suspended state only if it is NOT completed.
var isUpdated bool
job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobSuspended, v1.ConditionTrue, "JobSuspended", "Job suspended")
if isUpdated {
jobConditionsChanged = true
jm.recorder.Event(&job, v1.EventTypeNormal, "Suspended", "Job suspended")
}
} else {
// Job not suspended.
var isUpdated bool
job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobSuspended, v1.ConditionFalse, "JobResumed", "Job resumed")
if isUpdated {
jobConditionsChanged = true
jm.recorder.Event(&job, v1.EventTypeNormal, "Resumed", "Job resumed")
// Resumed jobs will always reset StartTime to current time. This is
// done because the ActiveDeadlineSeconds timer shouldn't go off
// whilst the Job is still suspended and resetting StartTime is
// consistent with resuming a Job created in the suspended state.
// (ActiveDeadlineSeconds is interpreted as the number of seconds a
// Job is continuously active.)
now := metav1.Now()
job.Status.StartTime = &now
}
} else {
job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobComplete, v1.ConditionFalse, "", "")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this necessary?
Wouldn't it be overridden below?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is JobComplete, not JobSuspended

jobConditionsChanged = jobConditionsChanged || isUpdated
}

if !utilfeature.DefaultFeatureGate.Enabled(features.SuspendJob) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider putting all this logic in a function.

For example, you could just return after removing the condition if the gate is disabled, simplifying the conditionals below.

job.Status.Conditions, isUpdated = removeJobCondition(job.Status.Conditions, batch.JobSuspended)
jobConditionsChanged = jobConditionsChanged || isUpdated
} else if !hasCondition(job.Status.Conditions, batch.JobSuspended) {
// The feature gate is enabled, but the condition doesn't exist. This
// should never happen, insert the default placeholder with status false.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't this happen in the first sync?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/This should never happen/This should not be the case/ :)

Yes, this will happen in the first sync.

// This will be executed exactly once.
job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobSuspended, v1.ConditionFalse, "", "")
jobConditionsChanged = jobConditionsChanged || isUpdated
}
if !utilfeature.DefaultFeatureGate.Enabled(features.SuspendJob) || complete || !manageJobCalled {
// Do nothing. We only update from suspended -> resumed or vice versa
// only if manageJob was called in this syncJob. Otherwise wait for the
// right syncJob call to make updates. Also, completed Jobs cannot be
// suspended.
} else if job.Spec.Suspend != nil && *job.Spec.Suspend {
job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobSuspended, v1.ConditionTrue, "JobSuspended", "Job suspended")
jobConditionsChanged = jobConditionsChanged || isUpdated
if isUpdated {
jm.recorder.Event(&job, v1.EventTypeNormal, "Suspended", "Job suspended")
}
} else {
job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobSuspended, v1.ConditionFalse, "JobResumed", "Job resumed")
jobConditionsChanged = jobConditionsChanged || isUpdated
if isUpdated {
jm.recorder.Event(&job, v1.EventTypeNormal, "Resumed", "Job resumed")
// Resumed jobs will always reset StartTime to current time. This is
// done because the ActiveDeadlineSeconds timer shouldn't go off
// whilst the Job is still suspended and resetting StartTime is
// consistent with resuming a Job created in the suspended state.
// (ActiveDeadlineSeconds is interpreted as the number of seconds a
// Job is continuously active.)
now := metav1.Now()
job.Status.StartTime = &now
}
}
}
Expand Down Expand Up @@ -972,11 +986,8 @@ func errorFromChannel(errCh <-chan error) error {
}

// ensureJobConditionStatus appends or updates an existing job condition of the
// given type with the given status value. Note that this function will not
// append to the conditions list if the new condition's status is false
// (because going from nothing to false is meaningless); it can, however,
// update the status condition to false. The function returns a bool to let the
// caller know if the list was changed (either appended or updated).
// given type with the given status value. The function also returns a bool to
// let the caller know if the list was updated.
func ensureJobConditionStatus(list []batch.JobCondition, cType batch.JobConditionType, status v1.ConditionStatus, reason, message string) ([]batch.JobCondition, bool) {
for i := range list {
if list[i].Type == cType {
Expand All @@ -986,13 +997,38 @@ func ensureJobConditionStatus(list []batch.JobCondition, cType batch.JobConditio
list[i].Reason = reason
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't forget to update the Probe timestamp

Copy link
Member Author

@adtac adtac Apr 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I did begin to change LastProbeTime, but there's a problem with that. By definition, LastProbeTime must be updated every time ensureJobConditionStatus is called regardless of whether the status/reason/message matches (because it's just a probe). That means that jobConditionsChanged is always true every time syncJob is called. That means that we'll do a Jobs.UpdatedStatus() every time syncJob is called and that forget is always true. These are significantly more far-reaching consequences than I had originally anticipated.

The performance impact of syncing the Job with the API server every time syncJob is called could be disastrous for long running jobs that see very infrequent updates.

So I'd like to hold off from making LastProbeTime change in this PR. It also sends the wrong message to update it only when job condition status value changes (i.e. the if part of this if-else). In fact, AFAICT there are also no other k8s controllers that do anything like LastProbeTime, so I think removing that field might even be the most consistent approach. There is some value in knowing LastProbeTime from the user's perspective, but idk if it's possible to accurately update that without performance concerns.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that if the block in line 643 is reached, you can update all LastProbeTimes

list[i].Message = message
return list, true
} else {
return list, false
}
return list, false
}
}
// A condition with that type doesn't exist in the list.
if status != v1.ConditionFalse {
return append(list, newCondition(cType, status, reason, message)), true
return append(list, newCondition(cType, status, reason, message)), true
}

// hasCondition checks if the given list has a condition of the given type. It
// doesn't care about the status.
func hasCondition(list []batch.JobCondition, cType batch.JobConditionType) bool {
for _, jc := range list {
if jc.Type == cType {
return true
}
}
return false
}

// removeJobCondition removes a Job condition of the given type from a list.
// You must not use this feature to signal that a condition is false; instead
// use the JobCondition.Status field to do that. This should only be used when
// a JobCondition should never be present (e.g. when a feature gate is
// disabled). The function also returns a bool to let the caller know if the
// list was updated.
func removeJobCondition(list []batch.JobCondition, cType batch.JobConditionType) ([]batch.JobCondition, bool) {
var result []batch.JobCondition
for _, jc := range list {
if jc.Type == cType {
continue
}
result = append(result, jc)
}
return list, false
return result, len(result) != len(list)
}