Skip to content

Commit

Permalink
Merge pull request #107986 from wzshiming/promote/shutdown-based-on-p…
Browse files Browse the repository at this point in the history
…od-priority

Promote graceful shutdown based on pod priority to beta
  • Loading branch information
k8s-ci-robot committed Mar 23, 2022
2 parents 1580b69 + ced991c commit a6e65a2
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 5 deletions.
4 changes: 2 additions & 2 deletions pkg/features/kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ const (

// owner: @wzshiming
// alpha: v1.23
//
// beta: v1.24
// Make the kubelet use shutdown configuration based on pod priority values for graceful shutdown.
GracefulNodeShutdownBasedOnPodPriority featuregate.Feature = "GracefulNodeShutdownBasedOnPodPriority"

Expand Down Expand Up @@ -917,7 +917,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
ExecProbeTimeout: {Default: true, PreRelease: featuregate.GA}, // lock to default and remove after v1.22 based on KEP #1972 update
KubeletCredentialProviders: {Default: false, PreRelease: featuregate.Alpha},
GracefulNodeShutdown: {Default: true, PreRelease: featuregate.Beta},
GracefulNodeShutdownBasedOnPodPriority: {Default: false, PreRelease: featuregate.Alpha},
GracefulNodeShutdownBasedOnPodPriority: {Default: true, PreRelease: featuregate.Beta},
ServiceLBNodePortControl: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.26
MixedProtocolLBService: {Default: false, PreRelease: featuregate.Alpha},
VolumeCapacityPriority: {Default: false, PreRelease: featuregate.Alpha},
Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
ShutdownGracePeriodRequested: kubeCfg.ShutdownGracePeriod.Duration,
ShutdownGracePeriodCriticalPods: kubeCfg.ShutdownGracePeriodCriticalPods.Duration,
ShutdownGracePeriodByPodPriority: kubeCfg.ShutdownGracePeriodByPodPriority,
StateDirectory: rootDirectory,
})
klet.shutdownManager = shutdownManager
klet.admitHandlers.AddPodAdmitHandler(shutdownAdmitHandler)
Expand Down
27 changes: 27 additions & 0 deletions pkg/kubelet/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,26 @@ var (
StabilityLevel: metrics.ALPHA,
},
)

// GracefulShutdownStartTime is a gauge that records the time at which the kubelet started graceful shutdown.
GracefulShutdownStartTime = metrics.NewGauge(
&metrics.GaugeOpts{
Subsystem: KubeletSubsystem,
Name: "graceful_shutdown_start_time_seconds",
Help: "Last graceful shutdown start time since unix epoch in seconds",
StabilityLevel: metrics.ALPHA,
},
)

// GracefulShutdownEndTime is a gauge that records the time at which the kubelet completed graceful shutdown.
GracefulShutdownEndTime = metrics.NewGauge(
&metrics.GaugeOpts{
Subsystem: KubeletSubsystem,
Name: "graceful_shutdown_end_time_seconds",
Help: "Last graceful shutdown start time since unix epoch in seconds",
StabilityLevel: metrics.ALPHA,
},
)
)

var registerMetrics sync.Once
Expand Down Expand Up @@ -504,6 +524,13 @@ func Register(collectors ...metrics.StableCollector) {
for _, collector := range collectors {
legacyregistry.CustomMustRegister(collector)
}

if utilfeature.DefaultFeatureGate.Enabled(features.GracefulNodeShutdown) &&
utilfeature.DefaultFeatureGate.Enabled(features.GracefulNodeShutdownBasedOnPodPriority) {
legacyregistry.MustRegister(GracefulShutdownStartTime)
legacyregistry.MustRegister(GracefulShutdownEndTime)
}

})
}

Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/nodeshutdown/nodeshutdown_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Config struct {
ShutdownGracePeriodRequested time.Duration
ShutdownGracePeriodCriticalPods time.Duration
ShutdownGracePeriodByPodPriority []kubeletconfig.ShutdownGracePeriodByPodPriority
StateDirectory string
Clock clock.Clock
}

Expand Down
62 changes: 59 additions & 3 deletions pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package nodeshutdown

import (
"fmt"
"path/filepath"
"sort"
"sync"
"time"
Expand All @@ -36,6 +37,7 @@ import (
kubeletevents "k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/eviction"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/nodeshutdown/systemd"
"k8s.io/kubernetes/pkg/kubelet/prober"
"k8s.io/utils/clock"
Expand All @@ -47,6 +49,7 @@ const (
nodeShutdownNotAdmittedReason = "NodeShutdown"
nodeShutdownNotAdmittedMessage = "Pod was rejected as the node is shutting down."
dbusReconnectPeriod = 1 * time.Second
localStorageStateFile = "graceful_node_shutdown_state"
)

var systemDbus = func() (dbusInhibiter, error) {
Expand Down Expand Up @@ -81,6 +84,9 @@ type managerImpl struct {
nodeShuttingDownNow bool

clock clock.Clock

enableMetrics bool
storage storage
}

// NewManager returns a new node shutdown manager.
Expand Down Expand Up @@ -120,6 +126,10 @@ func NewManager(conf *Config) (Manager, lifecycle.PodAdmitHandler) {
syncNodeStatus: conf.SyncNodeStatusFunc,
shutdownGracePeriodByPodPriority: shutdownGracePeriodByPodPriority,
clock: conf.Clock,
enableMetrics: utilfeature.DefaultFeatureGate.Enabled(features.GracefulNodeShutdownBasedOnPodPriority),
storage: localStorage{
Path: filepath.Join(conf.StateDirectory, localStorageStateFile),
},
}
klog.InfoS("Creating node shutdown manager",
"shutdownGracePeriodRequested", conf.ShutdownGracePeriodRequested,
Expand All @@ -143,6 +153,24 @@ func (m *managerImpl) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAd
return lifecycle.PodAdmitResult{Admit: true}
}

// setMetrics sets the metrics for the node shutdown manager.
func (m *managerImpl) setMetrics() {
if m.enableMetrics && m.storage != nil {
sta := state{}
err := m.storage.Load(&sta)
if err != nil {
klog.ErrorS(err, "Failed to load graceful shutdown state")
} else {
if !sta.StartTime.IsZero() {
metrics.GracefulShutdownStartTime.Set(timestamp(sta.StartTime))
}
if !sta.EndTime.IsZero() {
metrics.GracefulShutdownEndTime.Set(timestamp(sta.EndTime))
}
}
}
}

// Start starts the node shutdown manager and will start watching the node for shutdown events.
func (m *managerImpl) Start() error {
stop, err := m.start()
Expand All @@ -163,6 +191,8 @@ func (m *managerImpl) Start() error {
}
}
}()

m.setMetrics()
return nil
}

Expand Down Expand Up @@ -289,6 +319,35 @@ func (m *managerImpl) processShutdownEvent() error {
klog.V(1).InfoS("Shutdown manager processing shutdown event")
activePods := m.getPods()

defer func() {
m.dbusCon.ReleaseInhibitLock(m.inhibitLock)
klog.V(1).InfoS("Shutdown manager completed processing shutdown event, node will shutdown shortly")
}()

if m.enableMetrics && m.storage != nil {
startTime := time.Now()
err := m.storage.Store(state{
StartTime: startTime,
})
if err != nil {
klog.ErrorS(err, "Failed to store graceful shutdown state")
}
metrics.GracefulShutdownStartTime.Set(timestamp(startTime))
metrics.GracefulShutdownEndTime.Set(0)

defer func() {
endTime := time.Now()
err := m.storage.Store(state{
StartTime: startTime,
EndTime: endTime,
})
if err != nil {
klog.ErrorS(err, "Failed to store graceful shutdown state")
}
metrics.GracefulShutdownStartTime.Set(timestamp(endTime))
}()
}

groups := groupByPriority(m.shutdownGracePeriodByPodPriority, activePods)
for _, group := range groups {
// If there are no pods in a particular range,
Expand Down Expand Up @@ -347,9 +406,6 @@ func (m *managerImpl) processShutdownEvent() error {
}
}

m.dbusCon.ReleaseInhibitLock(m.inhibitLock)
klog.V(1).InfoS("Shutdown manager completed processing shutdown event, node will shutdown shortly")

return nil
}

Expand Down
96 changes: 96 additions & 0 deletions pkg/kubelet/nodeshutdown/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package nodeshutdown

import (
"encoding/json"
"io"
"io/ioutil"
"os"
"path/filepath"
"time"
)

type storage interface {
Store(data interface{}) (err error)
Load(data interface{}) (err error)
}

type localStorage struct {
Path string
}

func (l localStorage) Store(data interface{}) (err error) {
b, err := json.Marshal(data)
if err != nil {
return err
}
return atomicWrite(l.Path, b, 0644)
}

func (l localStorage) Load(data interface{}) (err error) {
b, err := os.ReadFile(l.Path)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
return json.Unmarshal(b, data)
}

func timestamp(t time.Time) float64 {
if t.IsZero() {
return 0
}
return float64(t.Unix())
}

type state struct {
StartTime time.Time `json:"startTime"`
EndTime time.Time `json:"endTime"`
}

// atomicWrite atomically writes data to a file specified by filename.
func atomicWrite(filename string, data []byte, perm os.FileMode) error {
f, err := ioutil.TempFile(filepath.Dir(filename), ".tmp-"+filepath.Base(filename))
if err != nil {
return err
}
err = os.Chmod(f.Name(), perm)
if err != nil {
f.Close()
return err
}
n, err := f.Write(data)
if err != nil {
f.Close()
return err
}
if n < len(data) {
f.Close()
return io.ErrShortWrite
}
if err := f.Sync(); err != nil {
f.Close()
return err
}
if err := f.Close(); err != nil {
return err
}
return os.Rename(f.Name(), filename)
}
69 changes: 69 additions & 0 deletions pkg/kubelet/nodeshutdown/storage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package nodeshutdown

import (
"fmt"
"os"
"path/filepath"
"testing"
"time"
)

func TestLocalStorage(t *testing.T) {
var localStorageStateFileName = "graceful_node_shutdown_state"
tempdir := os.TempDir()
path := filepath.Join(tempdir, localStorageStateFileName)
l := localStorage{
Path: path,
}
now := time.Now()
want := state{
StartTime: now,
EndTime: now,
}
err := l.Store(want)
if err != nil {
t.Error(err)
return
}

got := state{}
err = l.Load(&got)
if err != nil {
t.Error(err)
return
}

if !want.StartTime.Equal(got.StartTime) || !want.EndTime.Equal(got.EndTime) {
t.Errorf("got %+v, want %+v", got, want)
return
}

raw, err := os.ReadFile(path)
if err != nil {
t.Error(err)
return
}
nowStr := now.Format(time.RFC3339Nano)
wantRaw := fmt.Sprintf(`{"startTime":"` + nowStr + `","endTime":"` + nowStr + `"}`)
if string(raw) != wantRaw {
t.Errorf("got %s, want %s", string(raw), wantRaw)
return
}

}
5 changes: 5 additions & 0 deletions test/e2e_node/node_shutdown_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,11 @@ var _ = SIGDescribe("GracefulNodeShutdown [Serial] [NodeFeature:GracefulNodeShut
return nil
}, podStatusUpdateTimeout, pollInterval).Should(gomega.BeNil())
}

ginkgo.By("should have state file")
stateFile := "/var/lib/kubelet/graceful_node_shutdown_state"
_, err = os.Stat(stateFile)
framework.ExpectNoError(err)
})
})
})
Expand Down

0 comments on commit a6e65a2

Please sign in to comment.