Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kubelet: revert contextual logging support #110869

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 0 additions & 3 deletions pkg/kubelet/kubelet.go
Expand Up @@ -334,8 +334,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
nodeStatusMaxImages int32,
seccompDefault bool,
) (*Kubelet, error) {
logger := klog.TODO()

if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
}
Expand Down Expand Up @@ -821,7 +819,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,

// setup node shutdown manager
shutdownManager, shutdownAdmitHandler := nodeshutdown.NewManager(&nodeshutdown.Config{
Logger: logger,
ProbeManager: klet.probeManager,
Recorder: kubeDeps.Recorder,
NodeRef: nodeRef,
Expand Down
4 changes: 0 additions & 4 deletions pkg/kubelet/kubelet_test.go
Expand Up @@ -44,7 +44,6 @@ import (
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/klog/v2/ktesting"
cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/config"
Expand Down Expand Up @@ -150,8 +149,6 @@ func newTestKubeletWithImageList(
imageList []kubecontainer.Image,
controllerAttachDetachEnabled bool,
initFakeVolumePlugin bool) *TestKubelet {
logger, _ := ktesting.NewTestContext(t)

fakeRuntime := &containertest.FakeRuntime{
ImageList: imageList,
// Set ready conditions by default.
Expand Down Expand Up @@ -324,7 +321,6 @@ func newTestKubeletWithImageList(

// setup shutdown manager
shutdownManager, shutdownAdmitHandler := nodeshutdown.NewManager(&nodeshutdown.Config{
Logger: logger,
ProbeManager: kubelet.probeManager,
Recorder: fakeRecorder,
NodeRef: nodeRef,
Expand Down
2 changes: 0 additions & 2 deletions pkg/kubelet/nodeshutdown/nodeshutdown_manager.go
Expand Up @@ -21,7 +21,6 @@ import (

v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/eviction"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
Expand All @@ -38,7 +37,6 @@ type Manager interface {

// Config represents Manager configuration
type Config struct {
Logger klog.Logger
ProbeManager prober.Manager
Recorder record.EventRecorder
NodeRef *v1.ObjectReference
Expand Down
32 changes: 15 additions & 17 deletions pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go
Expand Up @@ -67,7 +67,6 @@ type dbusInhibiter interface {

// managerImpl has functions that can be used to interact with the Node Shutdown Manager.
type managerImpl struct {
logger klog.Logger
recorder record.EventRecorder
nodeRef *v1.ObjectReference
probeManager prober.Manager
Expand Down Expand Up @@ -119,7 +118,6 @@ func NewManager(conf *Config) (Manager, lifecycle.PodAdmitHandler) {
conf.Clock = clock.RealClock{}
}
manager := &managerImpl{
logger: conf.Logger,
probeManager: conf.ProbeManager,
recorder: conf.Recorder,
nodeRef: conf.NodeRef,
Expand All @@ -133,7 +131,7 @@ func NewManager(conf *Config) (Manager, lifecycle.PodAdmitHandler) {
Path: filepath.Join(conf.StateDirectory, localStorageStateFile),
},
}
manager.logger.Info("Creating node shutdown manager",
klog.InfoS("Creating node shutdown manager",
"shutdownGracePeriodRequested", conf.ShutdownGracePeriodRequested,
"shutdownGracePeriodCriticalPods", conf.ShutdownGracePeriodCriticalPods,
"shutdownGracePeriodByPodPriority", shutdownGracePeriodByPodPriority,
Expand Down Expand Up @@ -161,7 +159,7 @@ func (m *managerImpl) setMetrics() {
sta := state{}
err := m.storage.Load(&sta)
if err != nil {
m.logger.Error(err, "Failed to load graceful shutdown state")
klog.ErrorS(err, "Failed to load graceful shutdown state")
} else {
if !sta.StartTime.IsZero() {
metrics.GracefulShutdownStartTime.Set(timestamp(sta.StartTime))
Expand All @@ -186,10 +184,10 @@ func (m *managerImpl) Start() error {
}

time.Sleep(dbusReconnectPeriod)
m.logger.V(1).Info("Restarting watch for node shutdown events")
klog.V(1).InfoS("Restarting watch for node shutdown events")
stop, err = m.start()
if err != nil {
m.logger.Error(err, "Unable to watch the node for shutdown events")
klog.ErrorS(err, "Unable to watch the node for shutdown events")
}
}
}()
Expand Down Expand Up @@ -257,19 +255,19 @@ func (m *managerImpl) start() (chan struct{}, error) {
select {
case isShuttingDown, ok := <-events:
if !ok {
m.logger.Error(err, "Ended to watching the node for shutdown events")
klog.ErrorS(err, "Ended to watching the node for shutdown events")
close(stop)
return
}
m.logger.V(1).Info("Shutdown manager detected new shutdown event, isNodeShuttingDownNow", "event", isShuttingDown)
klog.V(1).InfoS("Shutdown manager detected new shutdown event, isNodeShuttingDownNow", "event", isShuttingDown)

var shutdownType string
if isShuttingDown {
shutdownType = "shutdown"
} else {
shutdownType = "cancelled"
}
m.logger.V(1).Info("Shutdown manager detected new shutdown event", "event", shutdownType)
klog.V(1).InfoS("Shutdown manager detected new shutdown event", "event", shutdownType)
if isShuttingDown {
m.recorder.Event(m.nodeRef, v1.EventTypeNormal, kubeletevents.NodeShutdown, "Shutdown manager detected shutdown event")
} else {
Expand Down Expand Up @@ -318,12 +316,12 @@ func (m *managerImpl) ShutdownStatus() error {
}

func (m *managerImpl) processShutdownEvent() error {
m.logger.V(1).Info("Shutdown manager processing shutdown event")
klog.V(1).InfoS("Shutdown manager processing shutdown event")
activePods := m.getPods()

defer func() {
m.dbusCon.ReleaseInhibitLock(m.inhibitLock)
m.logger.V(1).Info("Shutdown manager completed processing shutdown event, node will shutdown shortly")
klog.V(1).InfoS("Shutdown manager completed processing shutdown event, node will shutdown shortly")
}()

if m.enableMetrics && m.storage != nil {
Expand All @@ -332,7 +330,7 @@ func (m *managerImpl) processShutdownEvent() error {
StartTime: startTime,
})
if err != nil {
m.logger.Error(err, "Failed to store graceful shutdown state")
klog.ErrorS(err, "Failed to store graceful shutdown state")
}
metrics.GracefulShutdownStartTime.Set(timestamp(startTime))
metrics.GracefulShutdownEndTime.Set(0)
Expand All @@ -344,7 +342,7 @@ func (m *managerImpl) processShutdownEvent() error {
EndTime: endTime,
})
if err != nil {
m.logger.Error(err, "Failed to store graceful shutdown state")
klog.ErrorS(err, "Failed to store graceful shutdown state")
}
metrics.GracefulShutdownStartTime.Set(timestamp(endTime))
}()
Expand All @@ -371,7 +369,7 @@ func (m *managerImpl) processShutdownEvent() error {
gracePeriodOverride = *pod.Spec.TerminationGracePeriodSeconds
}

m.logger.V(1).Info("Shutdown manager killing pod with gracePeriod", "pod", klog.KObj(pod), "gracePeriod", gracePeriodOverride)
klog.V(1).InfoS("Shutdown manager killing pod with gracePeriod", "pod", klog.KObj(pod), "gracePeriod", gracePeriodOverride)

if err := m.killPodFunc(pod, false, &gracePeriodOverride, func(status *v1.PodStatus) {
// set the pod status to failed (unless it was already in a successful terminal phase)
Expand All @@ -381,9 +379,9 @@ func (m *managerImpl) processShutdownEvent() error {
status.Message = nodeShutdownMessage
status.Reason = nodeShutdownReason
}); err != nil {
m.logger.V(1).Info("Shutdown manager failed killing pod", "pod", klog.KObj(pod), "err", err)
klog.V(1).InfoS("Shutdown manager failed killing pod", "pod", klog.KObj(pod), "err", err)
} else {
m.logger.V(1).Info("Shutdown manager finished killing pod", "pod", klog.KObj(pod))
klog.V(1).InfoS("Shutdown manager finished killing pod", "pod", klog.KObj(pod))
}
}(pod, group)
}
Expand All @@ -401,7 +399,7 @@ func (m *managerImpl) processShutdownEvent() error {
case <-doneCh:
timer.Stop()
case <-timer.C():
m.logger.V(1).Info("Shutdown manager pod killing time out", "gracePeriod", group.ShutdownGracePeriodSeconds, "priority", group.Priority)
klog.V(1).InfoS("Shutdown manager pod killing time out", "gracePeriod", group.ShutdownGracePeriodSeconds, "priority", group.Priority)
}
}

Expand Down
71 changes: 43 additions & 28 deletions pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux_test.go
Expand Up @@ -20,6 +20,7 @@ limitations under the License.
package nodeshutdown

import (
"bytes"
"fmt"
"os"
"strings"
Expand All @@ -34,8 +35,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2/ktesting"
_ "k8s.io/klog/v2/ktesting/init" // activate ktesting command line flags
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/apis/scheduling"
pkgfeatures "k8s.io/kubernetes/pkg/features"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
Expand All @@ -45,6 +45,14 @@ import (
probetest "k8s.io/kubernetes/pkg/kubelet/prober/testing"
"k8s.io/utils/clock"
testingclock "k8s.io/utils/clock/testing"

// https://github.com/kubernetes/kubernetes/pull/110504 started using
// ktesting for the first time in Kubernetes and therefore added it to
// "vendor". We need to revert that PR for a short while (see
// https://github.com/kubernetes/kubernetes/issues/110854) but to
// make the revert local to this directory, we keep importing that
// package (no change to vendor).
_ "k8s.io/klog/v2/ktesting/init"
)

// lock is to prevent systemDbus from being modified in the case of concurrency.
Expand Down Expand Up @@ -211,8 +219,6 @@ func TestManager(t *testing.T) {

for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)

activePodsFunc := func() []*v1.Pod {
return tc.activePods
}
Expand Down Expand Up @@ -245,7 +251,6 @@ func TestManager(t *testing.T) {
fakeRecorder := &record.FakeRecorder{}
nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
manager, _ := NewManager(&Config{
Logger: logger,
ProbeManager: proberManager,
Recorder: fakeRecorder,
NodeRef: nodeRef,
Expand Down Expand Up @@ -329,7 +334,6 @@ func TestFeatureEnabled(t *testing.T) {
}
for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
activePodsFunc := func() []*v1.Pod {
return nil
}
Expand All @@ -343,7 +347,6 @@ func TestFeatureEnabled(t *testing.T) {
nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}

manager, _ := NewManager(&Config{
Logger: logger,
ProbeManager: proberManager,
Recorder: fakeRecorder,
NodeRef: nodeRef,
Expand All @@ -360,7 +363,6 @@ func TestFeatureEnabled(t *testing.T) {
}

func TestRestart(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
systemDbusTmp := systemDbus
defer func() {
systemDbus = systemDbusTmp
Expand Down Expand Up @@ -399,7 +401,6 @@ func TestRestart(t *testing.T) {
fakeRecorder := &record.FakeRecorder{}
nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
manager, _ := NewManager(&Config{
Logger: logger,
ProbeManager: proberManager,
Recorder: fakeRecorder,
NodeRef: nodeRef,
Expand Down Expand Up @@ -624,6 +625,23 @@ func Test_groupByPriority(t *testing.T) {
}
}

type buffer struct {
b bytes.Buffer
rw sync.RWMutex
}

func (b *buffer) String() string {
b.rw.RLock()
defer b.rw.RUnlock()
return b.b.String()
}

func (b *buffer) Write(p []byte) (n int, err error) {
b.rw.Lock()
defer b.rw.Unlock()
return b.b.Write(p)
}

func Test_managerImpl_processShutdownEvent(t *testing.T) {
var (
probeManager = probetest.FakeManager{}
Expand All @@ -647,10 +665,10 @@ func Test_managerImpl_processShutdownEvent(t *testing.T) {
clock clock.Clock
}
tests := []struct {
name string
fields fields
wantErr bool
expectedOutputContains string
name string
fields fields
wantErr bool
exceptOutputContains string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expect here? Or you're on purpose.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am reverting to the code before my changes. It was like that.

}{
{
name: "kill pod func take too long",
Expand Down Expand Up @@ -682,17 +700,20 @@ func Test_managerImpl_processShutdownEvent(t *testing.T) {
clock: fakeclock,
dbusCon: &fakeDbus{},
},
wantErr: false,
expectedOutputContains: "Shutdown manager pod killing time out",
wantErr: false,
exceptOutputContains: "Shutdown manager pod killing time out",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
l := klog.Level(1)
l.Set("1")
// hijack the klog output
tmpWriteBuffer := new(buffer)
klog.SetOutput(tmpWriteBuffer)
klog.LogToStderr(false)
Copy link
Member

@kerthcet kerthcet Jun 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we defer to revert this with defer LogToStderr(true)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here - I am just reverting.


m := &managerImpl{
logger: logger,
recorder: tt.fields.recorder,
nodeRef: tt.fields.nodeRef,
probeManager: tt.fields.probeManager,
Expand All @@ -709,17 +730,11 @@ func Test_managerImpl_processShutdownEvent(t *testing.T) {
if err := m.processShutdownEvent(); (err != nil) != tt.wantErr {
t.Errorf("managerImpl.processShutdownEvent() error = %v, wantErr %v", err, tt.wantErr)
}
klog.Flush()

underlier, ok := logger.GetSink().(ktesting.Underlier)
if !ok {
t.Fatalf("Should have had a ktesting LogSink, got %T", logger.GetSink())
}

log := underlier.GetBuffer().String()
if !strings.Contains(log, tt.expectedOutputContains) {
// Log will be shown on failure. To see it
// during a successful run use "go test -v".
t.Errorf("managerImpl.processShutdownEvent() should have logged %s, see actual output above.", tt.expectedOutputContains)
log := tmpWriteBuffer.String()
if !strings.Contains(log, tt.exceptOutputContains) {
t.Errorf("managerImpl.processShutdownEvent() should log %s, got %s", tt.exceptOutputContains, log)
}
})
}
Expand Down