Skip to content

Commit

Permalink
Merge pull request #101343 from neolit123/automated-cherry-pick-of-#9…
Browse files Browse the repository at this point in the history
…9336-origin-release-1.18

Automated cherry pick of #99336: pkg/kubelet: improve the node informer sync check
  • Loading branch information
k8s-ci-robot committed Apr 28, 2021
2 parents 54377e0 + 0703229 commit 461c190
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 47 deletions.
27 changes: 24 additions & 3 deletions pkg/kubelet/config/apiserver.go
Expand Up @@ -17,21 +17,42 @@ limitations under the License.
package config

import (
"k8s.io/api/core/v1"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
api "k8s.io/kubernetes/pkg/apis/core"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
)

// WaitForAPIServerSyncPeriod is the period between checks for the node list/watch initial sync
const WaitForAPIServerSyncPeriod = 1 * time.Second

// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan<- interface{}) {
lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
newSourceApiserverFromLW(lw, updates)

// The Reflector responsible for watching pods at the apiserver should be run only after
// the node sync with the apiserver has completed.
klog.Info("Waiting for node sync before watching apiserver pods")
go func() {
for {
if nodeHasSynced() {
klog.V(4).Info("node sync completed")
break
}
time.Sleep(WaitForAPIServerSyncPeriod)
klog.V(4).Info("node sync has not completed yet")
}
klog.Info("Watching apiserver")
newSourceApiserverFromLW(lw, updates)
}()
}

// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
Expand Down
59 changes: 27 additions & 32 deletions pkg/kubelet/kubelet.go
Expand Up @@ -128,9 +128,6 @@ const (
// Max amount of time to wait for the container runtime to come up.
maxWaitForContainerRuntime = 30 * time.Second

// Max amount of time to wait for node list/watch to initially sync
maxWaitForAPIServerSync = 10 * time.Second

// nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed.
nodeStatusUpdateRetry = 5

Expand Down Expand Up @@ -273,7 +270,7 @@ type Dependencies struct {

// makePodSourceConfig creates a config.PodConfig from the given
// KubeletConfiguration or returns an error.
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string) (*config.PodConfig, error) {
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string, nodeHasSynced func() bool) (*config.PodConfig, error) {
manifestURLHeader := make(http.Header)
if len(kubeCfg.StaticPodURLHeader) > 0 {
for k, v := range kubeCfg.StaticPodURLHeader {
Expand Down Expand Up @@ -313,11 +310,11 @@ func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, ku
}

if kubeDeps.KubeClient != nil {
klog.Infof("Watching apiserver")
klog.Info("Adding apiserver pod source")
if updatechannel == nil {
updatechannel = cfg.Channel(kubetypes.ApiserverSource)
}
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, updatechannel)
}
return cfg, nil
}
Expand Down Expand Up @@ -469,9 +466,32 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klog.V(2).Infof("cloud provider determined current node name to be %s", nodeName)
}

var nodeHasSynced cache.InformerSynced
var nodeLister corelisters.NodeLister

// If kubeClient == nil, we are running in standalone mode (i.e. no API servers)
// If not nil, we are running as part of a cluster and should sync w/API
if kubeDeps.KubeClient != nil {
kubeInformers := informers.NewSharedInformerFactoryWithOptions(kubeDeps.KubeClient, 0, informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fields.Set{api.ObjectNameField: string(nodeName)}.String()
}))
nodeLister = kubeInformers.Core().V1().Nodes().Lister()
nodeHasSynced = func() bool {
return kubeInformers.Core().V1().Nodes().Informer().HasSynced()
}
kubeInformers.Start(wait.NeverStop)
klog.Info("Attempting to sync node with API server")
} else {
// we don't have a client to sync!
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
nodeLister = corelisters.NewNodeLister(nodeIndexer)
nodeHasSynced = func() bool { return true }
klog.Info("Kubelet is running in standalone mode, will skip API server sync")
}

if kubeDeps.PodConfig == nil {
var err error
kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath)
kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath, nodeHasSynced)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -518,31 +538,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
}
serviceLister := corelisters.NewServiceLister(serviceIndexer)

var nodeHasSynced cache.InformerSynced
var nodeLister corelisters.NodeLister

if kubeDeps.KubeClient != nil {
kubeInformers := informers.NewSharedInformerFactoryWithOptions(kubeDeps.KubeClient, 0, informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fields.Set{api.ObjectNameField: string(nodeName)}.String()
}))
nodeLister = kubeInformers.Core().V1().Nodes().Lister()
nodeHasSynced = func() bool {
if kubeInformers.Core().V1().Nodes().Informer().HasSynced() {
return true
}
klog.Infof("kubelet nodes not sync")
return false
}
kubeInformers.Start(wait.NeverStop)
klog.Info("Kubelet client is not nil")
} else {
// we dont have a client to sync!
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
nodeLister = corelisters.NewNodeLister(nodeIndexer)
nodeHasSynced = func() bool { return true }
klog.Info("Kubelet client is nil")
}

// TODO: get the real node object of ourself,
// and use the real node name and UID.
// TODO: what is namespace for node?
Expand Down
13 changes: 1 addition & 12 deletions pkg/kubelet/kubelet_getters.go
Expand Up @@ -22,7 +22,6 @@ import (
"io/ioutil"
"net"
"path/filepath"
"time"

cadvisorapiv1 "github.com/google/cadvisor/info/v1"
"k8s.io/klog"
Expand All @@ -31,7 +30,6 @@ import (

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
Expand Down Expand Up @@ -234,15 +232,6 @@ func (kl *Kubelet) GetNode() (*v1.Node, error) {
if kl.kubeClient == nil {
return kl.initialNode(context.TODO())
}
// if we have a valid kube client, we wait for initial lister to sync
if !kl.nodeHasSynced() {
err := wait.PollImmediate(time.Second, maxWaitForAPIServerSync, func() (bool, error) {
return kl.nodeHasSynced(), nil
})
if err != nil {
return nil, fmt.Errorf("nodes have not yet been read at least once, cannot construct node object")
}
}
return kl.nodeLister.Get(string(kl.nodeName))
}

Expand All @@ -253,7 +242,7 @@ func (kl *Kubelet) GetNode() (*v1.Node, error) {
// zero capacity, and the default labels.
func (kl *Kubelet) getNodeAnyWay() (*v1.Node, error) {
if kl.kubeClient != nil {
if n, err := kl.GetNode(); err == nil {
if n, err := kl.nodeLister.Get(string(kl.nodeName)); err == nil {
return n, nil
}
}
Expand Down

0 comments on commit 461c190

Please sign in to comment.