Skip to content

Commit

Permalink
kubelet: revert contextual logging support
Browse files Browse the repository at this point in the history
It turned out that using testing.T for logging had a race condition and
potential panic because the tests keep goroutines running and testing.T is not
supposed to be used anymore after test
completion (kubernetes#110854).

Either the code must be fixed to terminate all goroutines before a test
ends (seems non-trivial because the usage of goroutines is fairly complex in
the shutdown manager code), or ktesting must handle this case. A solution for
this is pending in kubernetes/klog#337.

Either way, solving this will take a bit longer. In the meantime, we should
revert the change to get unit testing stable again.

In order to make this a local change, ktesting is kept as a dependency of the
test and thus Kubernetes.
  • Loading branch information
pohly committed Jun 30, 2022
1 parent 10bea49 commit 8384e72
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 54 deletions.
3 changes: 0 additions & 3 deletions pkg/kubelet/kubelet.go
Expand Up @@ -334,8 +334,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
nodeStatusMaxImages int32,
seccompDefault bool,
) (*Kubelet, error) {
logger := klog.TODO()

if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
}
Expand Down Expand Up @@ -821,7 +819,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,

// setup node shutdown manager
shutdownManager, shutdownAdmitHandler := nodeshutdown.NewManager(&nodeshutdown.Config{
Logger: logger,
ProbeManager: klet.probeManager,
Recorder: kubeDeps.Recorder,
NodeRef: nodeRef,
Expand Down
4 changes: 0 additions & 4 deletions pkg/kubelet/kubelet_test.go
Expand Up @@ -44,7 +44,6 @@ import (
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/klog/v2/ktesting"
cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/config"
Expand Down Expand Up @@ -150,8 +149,6 @@ func newTestKubeletWithImageList(
imageList []kubecontainer.Image,
controllerAttachDetachEnabled bool,
initFakeVolumePlugin bool) *TestKubelet {
logger, _ := ktesting.NewTestContext(t)

fakeRuntime := &containertest.FakeRuntime{
ImageList: imageList,
// Set ready conditions by default.
Expand Down Expand Up @@ -324,7 +321,6 @@ func newTestKubeletWithImageList(

// setup shutdown manager
shutdownManager, shutdownAdmitHandler := nodeshutdown.NewManager(&nodeshutdown.Config{
Logger: logger,
ProbeManager: kubelet.probeManager,
Recorder: fakeRecorder,
NodeRef: nodeRef,
Expand Down
2 changes: 0 additions & 2 deletions pkg/kubelet/nodeshutdown/nodeshutdown_manager.go
Expand Up @@ -21,7 +21,6 @@ import (

v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/eviction"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
Expand All @@ -38,7 +37,6 @@ type Manager interface {

// Config represents Manager configuration
type Config struct {
Logger klog.Logger
ProbeManager prober.Manager
Recorder record.EventRecorder
NodeRef *v1.ObjectReference
Expand Down
32 changes: 15 additions & 17 deletions pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go
Expand Up @@ -67,7 +67,6 @@ type dbusInhibiter interface {

// managerImpl has functions that can be used to interact with the Node Shutdown Manager.
type managerImpl struct {
logger klog.Logger
recorder record.EventRecorder
nodeRef *v1.ObjectReference
probeManager prober.Manager
Expand Down Expand Up @@ -119,7 +118,6 @@ func NewManager(conf *Config) (Manager, lifecycle.PodAdmitHandler) {
conf.Clock = clock.RealClock{}
}
manager := &managerImpl{
logger: conf.Logger,
probeManager: conf.ProbeManager,
recorder: conf.Recorder,
nodeRef: conf.NodeRef,
Expand All @@ -133,7 +131,7 @@ func NewManager(conf *Config) (Manager, lifecycle.PodAdmitHandler) {
Path: filepath.Join(conf.StateDirectory, localStorageStateFile),
},
}
manager.logger.Info("Creating node shutdown manager",
klog.InfoS("Creating node shutdown manager",
"shutdownGracePeriodRequested", conf.ShutdownGracePeriodRequested,
"shutdownGracePeriodCriticalPods", conf.ShutdownGracePeriodCriticalPods,
"shutdownGracePeriodByPodPriority", shutdownGracePeriodByPodPriority,
Expand Down Expand Up @@ -161,7 +159,7 @@ func (m *managerImpl) setMetrics() {
sta := state{}
err := m.storage.Load(&sta)
if err != nil {
m.logger.Error(err, "Failed to load graceful shutdown state")
klog.ErrorS(err, "Failed to load graceful shutdown state")
} else {
if !sta.StartTime.IsZero() {
metrics.GracefulShutdownStartTime.Set(timestamp(sta.StartTime))
Expand All @@ -186,10 +184,10 @@ func (m *managerImpl) Start() error {
}

time.Sleep(dbusReconnectPeriod)
m.logger.V(1).Info("Restarting watch for node shutdown events")
klog.V(1).InfoS("Restarting watch for node shutdown events")
stop, err = m.start()
if err != nil {
m.logger.Error(err, "Unable to watch the node for shutdown events")
klog.ErrorS(err, "Unable to watch the node for shutdown events")
}
}
}()
Expand Down Expand Up @@ -257,19 +255,19 @@ func (m *managerImpl) start() (chan struct{}, error) {
select {
case isShuttingDown, ok := <-events:
if !ok {
m.logger.Error(err, "Ended to watching the node for shutdown events")
klog.ErrorS(err, "Ended to watching the node for shutdown events")
close(stop)
return
}
m.logger.V(1).Info("Shutdown manager detected new shutdown event, isNodeShuttingDownNow", "event", isShuttingDown)
klog.V(1).InfoS("Shutdown manager detected new shutdown event, isNodeShuttingDownNow", "event", isShuttingDown)

var shutdownType string
if isShuttingDown {
shutdownType = "shutdown"
} else {
shutdownType = "cancelled"
}
m.logger.V(1).Info("Shutdown manager detected new shutdown event", "event", shutdownType)
klog.V(1).InfoS("Shutdown manager detected new shutdown event", "event", shutdownType)
if isShuttingDown {
m.recorder.Event(m.nodeRef, v1.EventTypeNormal, kubeletevents.NodeShutdown, "Shutdown manager detected shutdown event")
} else {
Expand Down Expand Up @@ -318,12 +316,12 @@ func (m *managerImpl) ShutdownStatus() error {
}

func (m *managerImpl) processShutdownEvent() error {
m.logger.V(1).Info("Shutdown manager processing shutdown event")
klog.V(1).InfoS("Shutdown manager processing shutdown event")
activePods := m.getPods()

defer func() {
m.dbusCon.ReleaseInhibitLock(m.inhibitLock)
m.logger.V(1).Info("Shutdown manager completed processing shutdown event, node will shutdown shortly")
klog.V(1).InfoS("Shutdown manager completed processing shutdown event, node will shutdown shortly")
}()

if m.enableMetrics && m.storage != nil {
Expand All @@ -332,7 +330,7 @@ func (m *managerImpl) processShutdownEvent() error {
StartTime: startTime,
})
if err != nil {
m.logger.Error(err, "Failed to store graceful shutdown state")
klog.ErrorS(err, "Failed to store graceful shutdown state")
}
metrics.GracefulShutdownStartTime.Set(timestamp(startTime))
metrics.GracefulShutdownEndTime.Set(0)
Expand All @@ -344,7 +342,7 @@ func (m *managerImpl) processShutdownEvent() error {
EndTime: endTime,
})
if err != nil {
m.logger.Error(err, "Failed to store graceful shutdown state")
klog.ErrorS(err, "Failed to store graceful shutdown state")
}
metrics.GracefulShutdownStartTime.Set(timestamp(endTime))
}()
Expand All @@ -371,7 +369,7 @@ func (m *managerImpl) processShutdownEvent() error {
gracePeriodOverride = *pod.Spec.TerminationGracePeriodSeconds
}

m.logger.V(1).Info("Shutdown manager killing pod with gracePeriod", "pod", klog.KObj(pod), "gracePeriod", gracePeriodOverride)
klog.V(1).InfoS("Shutdown manager killing pod with gracePeriod", "pod", klog.KObj(pod), "gracePeriod", gracePeriodOverride)

if err := m.killPodFunc(pod, false, &gracePeriodOverride, func(status *v1.PodStatus) {
// set the pod status to failed (unless it was already in a successful terminal phase)
Expand All @@ -381,9 +379,9 @@ func (m *managerImpl) processShutdownEvent() error {
status.Message = nodeShutdownMessage
status.Reason = nodeShutdownReason
}); err != nil {
m.logger.V(1).Info("Shutdown manager failed killing pod", "pod", klog.KObj(pod), "err", err)
klog.V(1).InfoS("Shutdown manager failed killing pod", "pod", klog.KObj(pod), "err", err)
} else {
m.logger.V(1).Info("Shutdown manager finished killing pod", "pod", klog.KObj(pod))
klog.V(1).InfoS("Shutdown manager finished killing pod", "pod", klog.KObj(pod))
}
}(pod, group)
}
Expand All @@ -401,7 +399,7 @@ func (m *managerImpl) processShutdownEvent() error {
case <-doneCh:
timer.Stop()
case <-timer.C():
m.logger.V(1).Info("Shutdown manager pod killing time out", "gracePeriod", group.ShutdownGracePeriodSeconds, "priority", group.Priority)
klog.V(1).InfoS("Shutdown manager pod killing time out", "gracePeriod", group.ShutdownGracePeriodSeconds, "priority", group.Priority)
}
}

Expand Down
71 changes: 43 additions & 28 deletions pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux_test.go
Expand Up @@ -20,6 +20,7 @@ limitations under the License.
package nodeshutdown

import (
"bytes"
"fmt"
"os"
"strings"
Expand All @@ -34,8 +35,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2/ktesting"
_ "k8s.io/klog/v2/ktesting/init" // activate ktesting command line flags
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/apis/scheduling"
pkgfeatures "k8s.io/kubernetes/pkg/features"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
Expand All @@ -45,6 +45,14 @@ import (
probetest "k8s.io/kubernetes/pkg/kubelet/prober/testing"
"k8s.io/utils/clock"
testingclock "k8s.io/utils/clock/testing"

// https://github.com/kubernetes/kubernetes/pull/110504 started using
// ktesting for the first time in Kubernetes and therefore added it to
// "vendor". We need to revert that PR for a short while (see
// https://github.com/kubernetes/kubernetes/issues/110854) but to
// make the revert local to this directory, we keep importing that
// package (no change to vendor).
_ "k8s.io/klog/v2/ktesting/init"
)

// lock is to prevent systemDbus from being modified in the case of concurrency.
Expand Down Expand Up @@ -211,8 +219,6 @@ func TestManager(t *testing.T) {

for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)

activePodsFunc := func() []*v1.Pod {
return tc.activePods
}
Expand Down Expand Up @@ -245,7 +251,6 @@ func TestManager(t *testing.T) {
fakeRecorder := &record.FakeRecorder{}
nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
manager, _ := NewManager(&Config{
Logger: logger,
ProbeManager: proberManager,
Recorder: fakeRecorder,
NodeRef: nodeRef,
Expand Down Expand Up @@ -329,7 +334,6 @@ func TestFeatureEnabled(t *testing.T) {
}
for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
activePodsFunc := func() []*v1.Pod {
return nil
}
Expand All @@ -343,7 +347,6 @@ func TestFeatureEnabled(t *testing.T) {
nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}

manager, _ := NewManager(&Config{
Logger: logger,
ProbeManager: proberManager,
Recorder: fakeRecorder,
NodeRef: nodeRef,
Expand All @@ -360,7 +363,6 @@ func TestFeatureEnabled(t *testing.T) {
}

func TestRestart(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
systemDbusTmp := systemDbus
defer func() {
systemDbus = systemDbusTmp
Expand Down Expand Up @@ -399,7 +401,6 @@ func TestRestart(t *testing.T) {
fakeRecorder := &record.FakeRecorder{}
nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
manager, _ := NewManager(&Config{
Logger: logger,
ProbeManager: proberManager,
Recorder: fakeRecorder,
NodeRef: nodeRef,
Expand Down Expand Up @@ -624,6 +625,23 @@ func Test_groupByPriority(t *testing.T) {
}
}

type buffer struct {
b bytes.Buffer
rw sync.RWMutex
}

func (b *buffer) String() string {
b.rw.RLock()
defer b.rw.RUnlock()
return b.b.String()
}

func (b *buffer) Write(p []byte) (n int, err error) {
b.rw.Lock()
defer b.rw.Unlock()
return b.b.Write(p)
}

func Test_managerImpl_processShutdownEvent(t *testing.T) {
var (
probeManager = probetest.FakeManager{}
Expand All @@ -647,10 +665,10 @@ func Test_managerImpl_processShutdownEvent(t *testing.T) {
clock clock.Clock
}
tests := []struct {
name string
fields fields
wantErr bool
expectedOutputContains string
name string
fields fields
wantErr bool
exceptOutputContains string
}{
{
name: "kill pod func take too long",
Expand Down Expand Up @@ -682,17 +700,20 @@ func Test_managerImpl_processShutdownEvent(t *testing.T) {
clock: fakeclock,
dbusCon: &fakeDbus{},
},
wantErr: false,
expectedOutputContains: "Shutdown manager pod killing time out",
wantErr: false,
exceptOutputContains: "Shutdown manager pod killing time out",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
l := klog.Level(1)
l.Set("1")
// hijack the klog output
tmpWriteBuffer := new(buffer)
klog.SetOutput(tmpWriteBuffer)
klog.LogToStderr(false)

m := &managerImpl{
logger: logger,
recorder: tt.fields.recorder,
nodeRef: tt.fields.nodeRef,
probeManager: tt.fields.probeManager,
Expand All @@ -709,17 +730,11 @@ func Test_managerImpl_processShutdownEvent(t *testing.T) {
if err := m.processShutdownEvent(); (err != nil) != tt.wantErr {
t.Errorf("managerImpl.processShutdownEvent() error = %v, wantErr %v", err, tt.wantErr)
}
klog.Flush()

underlier, ok := logger.GetSink().(ktesting.Underlier)
if !ok {
t.Fatalf("Should have had a ktesting LogSink, got %T", logger.GetSink())
}

log := underlier.GetBuffer().String()
if !strings.Contains(log, tt.expectedOutputContains) {
// Log will be shown on failure. To see it
// during a successful run use "go test -v".
t.Errorf("managerImpl.processShutdownEvent() should have logged %s, see actual output above.", tt.expectedOutputContains)
log := tmpWriteBuffer.String()
if !strings.Contains(log, tt.exceptOutputContains) {
t.Errorf("managerImpl.processShutdownEvent() should log %s, got %s", tt.exceptOutputContains, log)
}
})
}
Expand Down

0 comments on commit 8384e72

Please sign in to comment.