Skip to content

Commit

Permalink
Change it to retry only when "connection refused" error
Browse files Browse the repository at this point in the history
  • Loading branch information
sanposhiho committed Jun 3, 2022
1 parent 9a622e4 commit 4bd39bb
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 157 deletions.
17 changes: 7 additions & 10 deletions pkg/scheduler/schedule_one.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,12 +823,14 @@ func (sched *Scheduler) handleSchedulingFailure(fwk framework.Framework, podInfo
pod := podInfo.Pod
msg := truncateMessage(err.Error())
fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
updatePod(sched.client, pod, &v1.PodCondition{
if err := updatePod(sched.client, pod, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: reason,
Message: err.Error(),
}, nominatingInfo)
}, nominatingInfo); err != nil {
klog.ErrorS(err, "Error updating pod", "pod", klog.KObj(pod))
}
}

// truncateMessage truncates a message if it hits the NoteLengthLimit.
Expand All @@ -841,22 +843,17 @@ func truncateMessage(message string) string {
return message[:max-len(suffix)] + suffix
}

func updatePod(client clientset.Interface, pod *v1.Pod, condition *v1.PodCondition, nominatingInfo *framework.NominatingInfo) {
func updatePod(client clientset.Interface, pod *v1.Pod, condition *v1.PodCondition, nominatingInfo *framework.NominatingInfo) error {
klog.V(3).InfoS("Updating pod condition", "pod", klog.KObj(pod), "conditionType", condition.Type, "conditionStatus", condition.Status, "conditionReason", condition.Reason)
podStatusCopy := pod.Status.DeepCopy()
// NominatedNodeName is updated only if we are trying to set it, and the value is
// different from the existing one.
nnnNeedsUpdate := nominatingInfo.Mode() == framework.ModeOverride && pod.Status.NominatedNodeName != nominatingInfo.NominatedNodeName
if !podutil.UpdatePodCondition(podStatusCopy, condition) && !nnnNeedsUpdate {
return
return nil
}
if nnnNeedsUpdate {
podStatusCopy.NominatedNodeName = nominatingInfo.NominatedNodeName
}
go func() {
// update Pod's status in goroutine because there might be retries.
if err := util.PatchPodStatus(client, pod, podStatusCopy); err != nil {
klog.ErrorS(err, "Could not update pod status after scheduling attempt", "pod", klog.KObj(pod))
}
}()
return util.PatchPodStatus(client, pod, podStatusCopy)
}
21 changes: 5 additions & 16 deletions pkg/scheduler/schedule_one_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1225,17 +1225,8 @@ func TestUpdatePod(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
actualPatchRequests := 0
var actualPatchData string
done := make(chan struct{}, 1)
if test.expectedPatchRequests == 0 {
done <- struct{}{}
}
cs := &clientsetfake.Clientset{}
cs.AddReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
defer func() {
if actualPatchRequests == test.expectedPatchRequests {
done <- struct{}{}
}
}()
actualPatchRequests++
patch := action.(clienttesting.PatchAction)
actualPatchData = string(patch.GetPatch())
Expand All @@ -1246,14 +1237,12 @@ func TestUpdatePod(t *testing.T) {

pod := st.MakePod().Name("foo").NominatedNodeName(test.currentNominatedNodeName).Conditions(test.currentPodConditions).Obj()

updatePod(cs, pod, test.newPodCondition, test.newNominatingInfo)
if err := updatePod(cs, pod, test.newPodCondition, test.newNominatingInfo); err != nil {
t.Fatalf("Error calling update: %v", err)
}

const timeout = 100 * time.Millisecond
select {
case <-done:
// fake clientset received an expected number of patch requests.
case <-time.After(timeout):
t.Fatalf("wait for the expected number of requests to come in, but they were not completed within the time. expected number: %v actual: %v ,the last received patch data: %v", test.expectedPatchRequests, actualPatchRequests, actualPatchData)
if actualPatchRequests != test.expectedPatchRequests {
t.Fatalf("Actual patch requests (%d) does not equal expected patch requests (%d), actual patch data: %v", actualPatchRequests, test.expectedPatchRequests, actualPatchData)
}

regex, err := regexp.Compile(test.expectedPatchDataPattern)
Expand Down
46 changes: 3 additions & 43 deletions pkg/scheduler/util/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ import (
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
Expand Down Expand Up @@ -93,14 +92,6 @@ func MoreImportantPod(pod1, pod2 *v1.Pod) bool {
return GetPodStartTime(pod1).Before(GetPodStartTime(pod2))
}

const (
// Parameters for retrying with exponential backoff.
retryBackoffInitialDuration = 100 * time.Millisecond
retryBackoffFactor = 1.3
retryBackoffJitter = 0
retryBackoffSteps = 6
)

// PatchPodStatus calculates the delta bytes change from <old.Status> to <newStatus>,
// and then submit a request to API server to patch the pod changes with retries.
func PatchPodStatus(cs kubernetes.Interface, old *v1.Pod, newStatus *v1.PodStatus) error {
Expand All @@ -126,48 +117,17 @@ func PatchPodStatus(cs kubernetes.Interface, old *v1.Pod, newStatus *v1.PodStatu
return nil
}

backoff := wait.Backoff{
Duration: retryBackoffInitialDuration,
Factor: retryBackoffFactor,
Jitter: retryBackoffJitter,
Steps: retryBackoffSteps,
}

patchFn := func() error {
_, err := cs.CoreV1().Pods(old.Namespace).Patch(context.Background(), old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
return err
}

isRetriableFn := func(err error) bool {
// Check if the error is returned from kube-apiserver.
// If so, check retryAfterSeconds to know if kube-apiserver asked us to retry or not.
typedErr, ok := err.(*errors.StatusError)
if !ok {
// unknown error. e.g. api-server is unreachable
// let's retry it and see how it goes.
return true
}

var retryAfterSeconds int32
if typedErr.Status().Details != nil {
retryAfterSeconds = typedErr.Status().Details.RetryAfterSeconds
}
if retryAfterSeconds == 0 {
// we shouldn't retry since this means kube-apiserver rejected our request and didn't ask us to retry.
return false
}
// Otherwise we can retry after retryAfterSeconds.
klog.ErrorS(err, "Server rejected Pod patch (may retry after sleeping)", "pod", klog.KObj(old))
time.Sleep(time.Duration(retryAfterSeconds))
return true
}

return retry.OnError(backoff, isRetriableFn, patchFn)
return retry.OnError(retry.DefaultBackoff, net.IsConnectionRefused, patchFn)
}

// DeletePod deletes the given <pod> from API server
func DeletePod(cs kubernetes.Interface, pod *v1.Pod) error {
return cs.CoreV1().Pods(pod.Namespace).Delete(context.Background(), pod.Name, metav1.DeleteOptions{})
return cs.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
}

// ClearNominatedNodeName internally submit a patch request to API server
Expand Down
98 changes: 10 additions & 88 deletions pkg/scheduler/util/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@ import (
"context"
"errors"
"fmt"
"syscall"
"testing"
"time"

"github.com/google/go-cmp/cmp"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/net"
clientsetfake "k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
Expand Down Expand Up @@ -236,16 +237,16 @@ func TestPatchPodStatus(t *testing.T) {
},
},
{
name: "retry patch request when an unknown error is returned",
name: "retry patch request when an 'connection refused' error is returned",
client: func() *clientsetfake.Clientset {
client := clientsetfake.NewSimpleClientset()

reqcount := 0
client.PrependReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
defer func() { reqcount++ }()
if reqcount == 0 {
// return an unknown error for the first patch request.
return true, &v1.Pod{}, errors.New("unknown")
// return an connection refused error for the first patch request.
return true, &v1.Pod{}, fmt.Errorf("connection refused: %w", syscall.ECONNREFUSED)
}
if reqcount == 1 {
// not return error for the second patch request.
Expand Down Expand Up @@ -277,99 +278,20 @@ func TestPatchPodStatus(t *testing.T) {
},
},
{
name: "doesn't retry patch request when an StatusError, which doesn't contain RetryAfterSeconds, is returned",
name: "only 4 retries at most",
client: func() *clientsetfake.Clientset {
client := clientsetfake.NewSimpleClientset()

reqcount := 0
client.PrependReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
defer func() { reqcount++ }()
if reqcount == 0 {
// return a BadRequest error which doesn't contain RetryAfterSeconds for the first patch request.
return true, &v1.Pod{}, apierrors.NewBadRequest("bad")
}

// return error if requests comes in more than two times.
return true, nil, errors.New("requests comes in more than two times.")
})

return client
}(),
pod: v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "pod1",
},
Spec: v1.PodSpec{
ImagePullSecrets: []v1.LocalObjectReference{{Name: "foo"}},
},
},
validateErr: apierrors.IsBadRequest,
statusToUpdate: v1.PodStatus{
Conditions: []v1.PodCondition{
{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
},
},
},
},
{
name: "retry patch request when an StatusError, which contain RetryAfterSeconds, is returned",
client: func() *clientsetfake.Clientset {
client := clientsetfake.NewSimpleClientset()

reqcount := 0
client.PrependReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
defer func() { reqcount++ }()
if reqcount == 0 {
// return a TooManyRequests error which contain RetryAfterSeconds for the first patch request.
return true, &v1.Pod{}, apierrors.NewTooManyRequests("too many", 1)
}
if reqcount == 1 {
// not return error for the second patch request.
return false, &v1.Pod{}, nil
}

// return error if requests comes in more than three times.
return true, nil, errors.New("requests comes in more than two times.")
})

return client
}(),
pod: v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "pod1",
},
Spec: v1.PodSpec{
ImagePullSecrets: []v1.LocalObjectReference{{Name: "foo"}},
},
},
statusToUpdate: v1.PodStatus{
Conditions: []v1.PodCondition{
{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
},
},
},
},
{
name: "only 6 retries at most",
client: func() *clientsetfake.Clientset {
client := clientsetfake.NewSimpleClientset()

reqcount := 0
client.PrependReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
defer func() { reqcount++ }()
if reqcount >= 6 {
if reqcount >= 4 {
// return error if requests comes in more than six times.
return true, nil, errors.New("requests comes in more than six times.")
}

// return a TooManyRequests error which contain RetryAfterSeconds.
return true, &v1.Pod{}, apierrors.NewTooManyRequests("too many", 1)
// return an connection refused error for the first patch request.
return true, &v1.Pod{}, fmt.Errorf("connection refused: %w", syscall.ECONNREFUSED)
})

return client
Expand All @@ -383,7 +305,7 @@ func TestPatchPodStatus(t *testing.T) {
ImagePullSecrets: []v1.LocalObjectReference{{Name: "foo"}},
},
},
validateErr: apierrors.IsTooManyRequests,
validateErr: net.IsConnectionRefused,
statusToUpdate: v1.PodStatus{
Conditions: []v1.PodCondition{
{
Expand Down

0 comments on commit 4bd39bb

Please sign in to comment.