From 8384e7232d078b71febbde60bef0d75680803a19 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Wed, 29 Jun 2022 21:59:08 +0200 Subject: [PATCH] kubelet: revert contextual logging support It turned out that using testing.T for logging had a race condition and potential panic because the tests keep goroutines running and testing.T is not supposed to be used anymore after test completion (https://github.com/kubernetes/kubernetes/issues/110854). Either the code must be fixed to terminate all goroutines before a test ends (seems non-trivial because the usage of goroutines is fairly complex in the shutdown manager code), or ktesting must handle this case. A solution for this is pending in https://github.com/kubernetes/klog/pull/337. Either way, solving this will take a bit longer. In the meantime, we should revert the change to get unit testing stable again. In order to make this a local change, ktesting is kept as a dependency of the test and thus Kubernetes. --- pkg/kubelet/kubelet.go | 3 - pkg/kubelet/kubelet_test.go | 4 -- .../nodeshutdown/nodeshutdown_manager.go | 2 - .../nodeshutdown_manager_linux.go | 32 ++++----- .../nodeshutdown_manager_linux_test.go | 71 +++++++++++-------- 5 files changed, 58 insertions(+), 54 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 8482c60db13a..c8640789ba07 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -334,8 +334,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, nodeStatusMaxImages int32, seccompDefault bool, ) (*Kubelet, error) { - logger := klog.TODO() - if rootDirectory == "" { return nil, fmt.Errorf("invalid root directory %q", rootDirectory) } @@ -821,7 +819,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, // setup node shutdown manager shutdownManager, shutdownAdmitHandler := nodeshutdown.NewManager(&nodeshutdown.Config{ - Logger: logger, ProbeManager: klet.probeManager, Recorder: kubeDeps.Recorder, NodeRef: nodeRef, diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 82f0564da0da..eb8269be2c20 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -44,7 +44,6 @@ import ( "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/flowcontrol" - "k8s.io/klog/v2/ktesting" cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing" "k8s.io/kubernetes/pkg/kubelet/cm" "k8s.io/kubernetes/pkg/kubelet/config" @@ -150,8 +149,6 @@ func newTestKubeletWithImageList( imageList []kubecontainer.Image, controllerAttachDetachEnabled bool, initFakeVolumePlugin bool) *TestKubelet { - logger, _ := ktesting.NewTestContext(t) - fakeRuntime := &containertest.FakeRuntime{ ImageList: imageList, // Set ready conditions by default. @@ -324,7 +321,6 @@ func newTestKubeletWithImageList( // setup shutdown manager shutdownManager, shutdownAdmitHandler := nodeshutdown.NewManager(&nodeshutdown.Config{ - Logger: logger, ProbeManager: kubelet.probeManager, Recorder: fakeRecorder, NodeRef: nodeRef, diff --git a/pkg/kubelet/nodeshutdown/nodeshutdown_manager.go b/pkg/kubelet/nodeshutdown/nodeshutdown_manager.go index aac590f967da..00406299be1a 100644 --- a/pkg/kubelet/nodeshutdown/nodeshutdown_manager.go +++ b/pkg/kubelet/nodeshutdown/nodeshutdown_manager.go @@ -21,7 +21,6 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/record" - "k8s.io/klog/v2" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" "k8s.io/kubernetes/pkg/kubelet/eviction" "k8s.io/kubernetes/pkg/kubelet/lifecycle" @@ -38,7 +37,6 @@ type Manager interface { // Config represents Manager configuration type Config struct { - Logger klog.Logger ProbeManager prober.Manager Recorder record.EventRecorder NodeRef *v1.ObjectReference diff --git a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go index fb34f1bfb102..a9733b702101 100644 --- a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go +++ b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go @@ -67,7 +67,6 @@ type dbusInhibiter interface { // managerImpl has functions that can be used to interact with the Node Shutdown Manager. type managerImpl struct { - logger klog.Logger recorder record.EventRecorder nodeRef *v1.ObjectReference probeManager prober.Manager @@ -119,7 +118,6 @@ func NewManager(conf *Config) (Manager, lifecycle.PodAdmitHandler) { conf.Clock = clock.RealClock{} } manager := &managerImpl{ - logger: conf.Logger, probeManager: conf.ProbeManager, recorder: conf.Recorder, nodeRef: conf.NodeRef, @@ -133,7 +131,7 @@ func NewManager(conf *Config) (Manager, lifecycle.PodAdmitHandler) { Path: filepath.Join(conf.StateDirectory, localStorageStateFile), }, } - manager.logger.Info("Creating node shutdown manager", + klog.InfoS("Creating node shutdown manager", "shutdownGracePeriodRequested", conf.ShutdownGracePeriodRequested, "shutdownGracePeriodCriticalPods", conf.ShutdownGracePeriodCriticalPods, "shutdownGracePeriodByPodPriority", shutdownGracePeriodByPodPriority, @@ -161,7 +159,7 @@ func (m *managerImpl) setMetrics() { sta := state{} err := m.storage.Load(&sta) if err != nil { - m.logger.Error(err, "Failed to load graceful shutdown state") + klog.ErrorS(err, "Failed to load graceful shutdown state") } else { if !sta.StartTime.IsZero() { metrics.GracefulShutdownStartTime.Set(timestamp(sta.StartTime)) @@ -186,10 +184,10 @@ func (m *managerImpl) Start() error { } time.Sleep(dbusReconnectPeriod) - m.logger.V(1).Info("Restarting watch for node shutdown events") + klog.V(1).InfoS("Restarting watch for node shutdown events") stop, err = m.start() if err != nil { - m.logger.Error(err, "Unable to watch the node for shutdown events") + klog.ErrorS(err, "Unable to watch the node for shutdown events") } } }() @@ -257,11 +255,11 @@ func (m *managerImpl) start() (chan struct{}, error) { select { case isShuttingDown, ok := <-events: if !ok { - m.logger.Error(err, "Ended to watching the node for shutdown events") + klog.ErrorS(err, "Ended to watching the node for shutdown events") close(stop) return } - m.logger.V(1).Info("Shutdown manager detected new shutdown event, isNodeShuttingDownNow", "event", isShuttingDown) + klog.V(1).InfoS("Shutdown manager detected new shutdown event, isNodeShuttingDownNow", "event", isShuttingDown) var shutdownType string if isShuttingDown { @@ -269,7 +267,7 @@ func (m *managerImpl) start() (chan struct{}, error) { } else { shutdownType = "cancelled" } - m.logger.V(1).Info("Shutdown manager detected new shutdown event", "event", shutdownType) + klog.V(1).InfoS("Shutdown manager detected new shutdown event", "event", shutdownType) if isShuttingDown { m.recorder.Event(m.nodeRef, v1.EventTypeNormal, kubeletevents.NodeShutdown, "Shutdown manager detected shutdown event") } else { @@ -318,12 +316,12 @@ func (m *managerImpl) ShutdownStatus() error { } func (m *managerImpl) processShutdownEvent() error { - m.logger.V(1).Info("Shutdown manager processing shutdown event") + klog.V(1).InfoS("Shutdown manager processing shutdown event") activePods := m.getPods() defer func() { m.dbusCon.ReleaseInhibitLock(m.inhibitLock) - m.logger.V(1).Info("Shutdown manager completed processing shutdown event, node will shutdown shortly") + klog.V(1).InfoS("Shutdown manager completed processing shutdown event, node will shutdown shortly") }() if m.enableMetrics && m.storage != nil { @@ -332,7 +330,7 @@ func (m *managerImpl) processShutdownEvent() error { StartTime: startTime, }) if err != nil { - m.logger.Error(err, "Failed to store graceful shutdown state") + klog.ErrorS(err, "Failed to store graceful shutdown state") } metrics.GracefulShutdownStartTime.Set(timestamp(startTime)) metrics.GracefulShutdownEndTime.Set(0) @@ -344,7 +342,7 @@ func (m *managerImpl) processShutdownEvent() error { EndTime: endTime, }) if err != nil { - m.logger.Error(err, "Failed to store graceful shutdown state") + klog.ErrorS(err, "Failed to store graceful shutdown state") } metrics.GracefulShutdownStartTime.Set(timestamp(endTime)) }() @@ -371,7 +369,7 @@ func (m *managerImpl) processShutdownEvent() error { gracePeriodOverride = *pod.Spec.TerminationGracePeriodSeconds } - m.logger.V(1).Info("Shutdown manager killing pod with gracePeriod", "pod", klog.KObj(pod), "gracePeriod", gracePeriodOverride) + klog.V(1).InfoS("Shutdown manager killing pod with gracePeriod", "pod", klog.KObj(pod), "gracePeriod", gracePeriodOverride) if err := m.killPodFunc(pod, false, &gracePeriodOverride, func(status *v1.PodStatus) { // set the pod status to failed (unless it was already in a successful terminal phase) @@ -381,9 +379,9 @@ func (m *managerImpl) processShutdownEvent() error { status.Message = nodeShutdownMessage status.Reason = nodeShutdownReason }); err != nil { - m.logger.V(1).Info("Shutdown manager failed killing pod", "pod", klog.KObj(pod), "err", err) + klog.V(1).InfoS("Shutdown manager failed killing pod", "pod", klog.KObj(pod), "err", err) } else { - m.logger.V(1).Info("Shutdown manager finished killing pod", "pod", klog.KObj(pod)) + klog.V(1).InfoS("Shutdown manager finished killing pod", "pod", klog.KObj(pod)) } }(pod, group) } @@ -401,7 +399,7 @@ func (m *managerImpl) processShutdownEvent() error { case <-doneCh: timer.Stop() case <-timer.C(): - m.logger.V(1).Info("Shutdown manager pod killing time out", "gracePeriod", group.ShutdownGracePeriodSeconds, "priority", group.Priority) + klog.V(1).InfoS("Shutdown manager pod killing time out", "gracePeriod", group.ShutdownGracePeriodSeconds, "priority", group.Priority) } } diff --git a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux_test.go b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux_test.go index 652b4a32d8e0..e58f2d4dea18 100644 --- a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux_test.go +++ b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux_test.go @@ -20,6 +20,7 @@ limitations under the License. package nodeshutdown import ( + "bytes" "fmt" "os" "strings" @@ -34,8 +35,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/record" featuregatetesting "k8s.io/component-base/featuregate/testing" - "k8s.io/klog/v2/ktesting" - _ "k8s.io/klog/v2/ktesting/init" // activate ktesting command line flags + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/apis/scheduling" pkgfeatures "k8s.io/kubernetes/pkg/features" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" @@ -45,6 +45,14 @@ import ( probetest "k8s.io/kubernetes/pkg/kubelet/prober/testing" "k8s.io/utils/clock" testingclock "k8s.io/utils/clock/testing" + + // https://github.com/kubernetes/kubernetes/pull/110504 started using + // ktesting for the first time in Kubernetes and therefore added it to + // "vendor". We need to revert that PR for a short while (see + // https://github.com/kubernetes/kubernetes/issues/110854) but to + // make the revert local to this directory, we keep importing that + // package (no change to vendor). + _ "k8s.io/klog/v2/ktesting/init" ) // lock is to prevent systemDbus from being modified in the case of concurrency. @@ -211,8 +219,6 @@ func TestManager(t *testing.T) { for _, tc := range tests { t.Run(tc.desc, func(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) - activePodsFunc := func() []*v1.Pod { return tc.activePods } @@ -245,7 +251,6 @@ func TestManager(t *testing.T) { fakeRecorder := &record.FakeRecorder{} nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""} manager, _ := NewManager(&Config{ - Logger: logger, ProbeManager: proberManager, Recorder: fakeRecorder, NodeRef: nodeRef, @@ -329,7 +334,6 @@ func TestFeatureEnabled(t *testing.T) { } for _, tc := range tests { t.Run(tc.desc, func(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) activePodsFunc := func() []*v1.Pod { return nil } @@ -343,7 +347,6 @@ func TestFeatureEnabled(t *testing.T) { nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""} manager, _ := NewManager(&Config{ - Logger: logger, ProbeManager: proberManager, Recorder: fakeRecorder, NodeRef: nodeRef, @@ -360,7 +363,6 @@ func TestFeatureEnabled(t *testing.T) { } func TestRestart(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) systemDbusTmp := systemDbus defer func() { systemDbus = systemDbusTmp @@ -399,7 +401,6 @@ func TestRestart(t *testing.T) { fakeRecorder := &record.FakeRecorder{} nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""} manager, _ := NewManager(&Config{ - Logger: logger, ProbeManager: proberManager, Recorder: fakeRecorder, NodeRef: nodeRef, @@ -624,6 +625,23 @@ func Test_groupByPriority(t *testing.T) { } } +type buffer struct { + b bytes.Buffer + rw sync.RWMutex +} + +func (b *buffer) String() string { + b.rw.RLock() + defer b.rw.RUnlock() + return b.b.String() +} + +func (b *buffer) Write(p []byte) (n int, err error) { + b.rw.Lock() + defer b.rw.Unlock() + return b.b.Write(p) +} + func Test_managerImpl_processShutdownEvent(t *testing.T) { var ( probeManager = probetest.FakeManager{} @@ -647,10 +665,10 @@ func Test_managerImpl_processShutdownEvent(t *testing.T) { clock clock.Clock } tests := []struct { - name string - fields fields - wantErr bool - expectedOutputContains string + name string + fields fields + wantErr bool + exceptOutputContains string }{ { name: "kill pod func take too long", @@ -682,17 +700,20 @@ func Test_managerImpl_processShutdownEvent(t *testing.T) { clock: fakeclock, dbusCon: &fakeDbus{}, }, - wantErr: false, - expectedOutputContains: "Shutdown manager pod killing time out", + wantErr: false, + exceptOutputContains: "Shutdown manager pod killing time out", }, } - for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) + l := klog.Level(1) + l.Set("1") + // hijack the klog output + tmpWriteBuffer := new(buffer) + klog.SetOutput(tmpWriteBuffer) + klog.LogToStderr(false) m := &managerImpl{ - logger: logger, recorder: tt.fields.recorder, nodeRef: tt.fields.nodeRef, probeManager: tt.fields.probeManager, @@ -709,17 +730,11 @@ func Test_managerImpl_processShutdownEvent(t *testing.T) { if err := m.processShutdownEvent(); (err != nil) != tt.wantErr { t.Errorf("managerImpl.processShutdownEvent() error = %v, wantErr %v", err, tt.wantErr) } + klog.Flush() - underlier, ok := logger.GetSink().(ktesting.Underlier) - if !ok { - t.Fatalf("Should have had a ktesting LogSink, got %T", logger.GetSink()) - } - - log := underlier.GetBuffer().String() - if !strings.Contains(log, tt.expectedOutputContains) { - // Log will be shown on failure. To see it - // during a successful run use "go test -v". - t.Errorf("managerImpl.processShutdownEvent() should have logged %s, see actual output above.", tt.expectedOutputContains) + log := tmpWriteBuffer.String() + if !strings.Contains(log, tt.exceptOutputContains) { + t.Errorf("managerImpl.processShutdownEvent() should log %s, got %s", tt.exceptOutputContains, log) } }) }