Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: scheduler: contextual logging #110833

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 8 additions & 14 deletions cmd/kube-controller-manager/app/testing/testserver.go
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/klog/v2"
"k8s.io/kubernetes/cmd/kube-controller-manager/app"
kubecontrollerconfig "k8s.io/kubernetes/cmd/kube-controller-manager/app/config"
"k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
Expand All @@ -45,20 +46,14 @@ type TestServer struct {
TmpDir string // Temp Dir used, by the apiserver
}

// Logger allows t.Testing and b.Testing to be passed to StartTestServer and StartTestServerOrDie
type Logger interface {
Errorf(format string, args ...interface{})
Fatalf(format string, args ...interface{})
Logf(format string, args ...interface{})
}

// StartTestServer starts a kube-controller-manager. A rest client config and a tear-down func,
// and location of the tmpdir are returned.
//
// Note: we return a tear-down func instead of a stop channel because the later will leak temporary
// files that because Golang testing's call to os.Exit will not give a stop channel go routine
// enough time to remove temporary files.
func StartTestServer(t Logger, customFlags []string) (result TestServer, err error) {
func StartTestServer(ctx context.Context, customFlags []string) (result TestServer, err error) {
logger := klog.FromContext(ctx)
stopCh := make(chan struct{})
tearDown := func() {
close(stopCh)
Expand Down Expand Up @@ -97,7 +92,7 @@ func StartTestServer(t Logger, customFlags []string) (result TestServer, err err
}
s.SecureServing.ServerCert.CertDirectory = result.TmpDir

t.Logf("kube-controller-manager will listen securely on port %d...", s.SecureServing.BindPort)
logger.Info("kube-controller-manager will listen securely", "port", s.SecureServing.BindPort)
}

config, err := s.Config(all, disabled)
Expand All @@ -112,7 +107,7 @@ func StartTestServer(t Logger, customFlags []string) (result TestServer, err err
}
}(stopCh)

t.Logf("Waiting for /healthz to be ok...")
logger.Info("Waiting for /healthz to be ok...")
client, err := kubernetes.NewForConfig(config.LoopbackClientConfig)
if err != nil {
return result, fmt.Errorf("failed to create a client: %v", err)
Expand Down Expand Up @@ -146,14 +141,13 @@ func StartTestServer(t Logger, customFlags []string) (result TestServer, err err
}

// StartTestServerOrDie calls StartTestServer t.Fatal if it does not succeed.
func StartTestServerOrDie(t Logger, flags []string) *TestServer {
result, err := StartTestServer(t, flags)
func StartTestServerOrDie(ctx context.Context, flags []string) *TestServer {
result, err := StartTestServer(ctx, flags)
if err == nil {
return &result
}

t.Fatalf("failed to launch server: %v", err)
return nil
panic(fmt.Errorf("failed to launch server: %v", err))
}

func createListenerOnFreePort() (net.Listener, int, error) {
Expand Down
12 changes: 6 additions & 6 deletions cmd/kube-scheduler/app/options/configfile.go
Expand Up @@ -80,9 +80,9 @@ func encodeConfig(cfg *config.KubeSchedulerConfiguration) (*bytes.Buffer, error)
}

// LogOrWriteConfig logs the completed component config and writes it into the given file name as YAML, if either is enabled
func LogOrWriteConfig(fileName string, cfg *config.KubeSchedulerConfiguration, completedProfiles []config.KubeSchedulerProfile) error {
klogV := klog.V(2)
if !klogV.Enabled() && len(fileName) == 0 {
func LogOrWriteConfig(logger klog.Logger, fileName string, cfg *config.KubeSchedulerConfiguration, completedProfiles []config.KubeSchedulerProfile) error {
loggerV := logger.V(2)
if !loggerV.Enabled() && len(fileName) == 0 {
return nil
}
cfg.Profiles = completedProfiles
Expand All @@ -92,8 +92,8 @@ func LogOrWriteConfig(fileName string, cfg *config.KubeSchedulerConfiguration, c
return err
}

if klogV.Enabled() {
klogV.InfoS("Using component config", "config", buf.String())
if loggerV.Enabled() {
loggerV.Info("Using component config", "config", buf.String())
}

if len(fileName) > 0 {
Expand All @@ -105,7 +105,7 @@ func LogOrWriteConfig(fileName string, cfg *config.KubeSchedulerConfiguration, c
if _, err := io.Copy(configFile, buf); err != nil {
return err
}
klog.InfoS("Wrote configuration", "file", fileName)
logger.Info("Wrote configuration", "file", fileName)
os.Exit(0)
}
return nil
Expand Down
18 changes: 11 additions & 7 deletions cmd/kube-scheduler/app/server.go
Expand Up @@ -108,6 +108,7 @@ for more information about scheduling and the kube-scheduler component.`,
cliflag.SetUsageAndHelpFunc(cmd, *nfs, cols)

if err := cmd.MarkFlagFilename("config", "yaml", "yml", "json"); err != nil {
// nolint:logcheck // TODO (?): return error instead of logging it here
klog.ErrorS(err, "Failed to mark flag filename")
}

Expand Down Expand Up @@ -144,10 +145,12 @@ func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Op

// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
logger := klog.FromContext(ctx)

// To help debugging, immediately log version
klog.InfoS("Starting Kubernetes Scheduler", "version", version.Get())
logger.Info("Starting Kubernetes Scheduler", "version", version.Get())

klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
logger.Info("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))

// Configz registration.
if cz, err := configz.New("componentconfig"); err == nil {
Expand Down Expand Up @@ -213,11 +216,11 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *
select {
case <-ctx.Done():
// We were asked to terminate. Exit 0.
klog.InfoS("Requested to terminate, exiting")
logger.Info("Requested to terminate, exiting")
os.Exit(0)
default:
// We lost the lock.
klog.ErrorS(nil, "Leaderelection lost")
logger.Error(nil, "Leaderelection lost")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
},
Expand Down Expand Up @@ -326,11 +329,12 @@ func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions
recorderFactory := getRecorderFactory(&cc)
completedProfiles := make([]kubeschedulerconfig.KubeSchedulerProfile, 0)
// Create the scheduler.
sched, err := scheduler.New(cc.Client,
sched, err := scheduler.New(
ctx,
cc.Client,
cc.InformerFactory,
cc.DynInformerFactory,
recorderFactory,
ctx.Done(),
scheduler.WithComponentConfigVersion(cc.ComponentConfig.TypeMeta.APIVersion),
scheduler.WithKubeConfig(cc.KubeConfig),
scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
Expand All @@ -349,7 +353,7 @@ func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions
if err != nil {
return nil, nil, err
}
if err := options.LogOrWriteConfig(opts.WriteConfigTo, &cc.ComponentConfig, completedProfiles); err != nil {
if err := options.LogOrWriteConfig(klog.FromContext(ctx), opts.WriteConfigTo, &cc.ComponentConfig, completedProfiles); err != nil {
return nil, nil, err
}

Expand Down
25 changes: 9 additions & 16 deletions cmd/kube-scheduler/app/testing/testserver.go
Expand Up @@ -48,21 +48,15 @@ type TestServer struct {
TmpDir string // Temp Dir used, by the apiserver
}

// Logger allows t.Testing and b.Testing to be passed to StartTestServer and StartTestServerOrDie
type Logger interface {
Errorf(format string, args ...interface{})
Fatalf(format string, args ...interface{})
Logf(format string, args ...interface{})
}

// StartTestServer starts a kube-scheduler. A rest client config and a tear-down func,
// and location of the tmpdir are returned.
//
// Note: we return a tear-down func instead of a stop channel because the later will leak temporary
// files that because Golang testing's call to os.Exit will not give a stop channel go routine
// enough time to remove temporary files.
func StartTestServer(t Logger, customFlags []string) (result TestServer, err error) {
ctx, cancel := context.WithCancel(context.Background())
func StartTestServer(ctx context.Context, customFlags []string) (result TestServer, err error) {
logger := klog.FromContext(ctx)
ctx, cancel := context.WithCancel(ctx)

var errCh chan error
tearDown := func() {
Expand All @@ -73,7 +67,7 @@ func StartTestServer(t Logger, customFlags []string) (result TestServer, err err
if errCh != nil {
err, ok := <-errCh
if ok && err != nil {
klog.ErrorS(err, "Failed to shutdown test server clearly")
logger.Error(err, "Failed to shutdown test server clearly")
}
}
if len(result.TmpDir) != 0 {
Expand Down Expand Up @@ -108,7 +102,7 @@ func StartTestServer(t Logger, customFlags []string) (result TestServer, err err
}
opts.SecureServing.ServerCert.CertDirectory = result.TmpDir

t.Logf("kube-scheduler will listen securely on port %d...", opts.SecureServing.BindPort)
logger.Info("kube-scheduler will listen securely", "port", opts.SecureServing.BindPort)
}

cc, sched, err := app.Setup(ctx, opts)
Expand All @@ -124,7 +118,7 @@ func StartTestServer(t Logger, customFlags []string) (result TestServer, err err
}
}(ctx)

t.Logf("Waiting for /healthz to be ok...")
logger.Info("Waiting for /healthz to be ok...")
client, err := kubernetes.NewForConfig(cc.LoopbackClientConfig)
if err != nil {
return result, fmt.Errorf("failed to create a client: %v", err)
Expand Down Expand Up @@ -158,14 +152,13 @@ func StartTestServer(t Logger, customFlags []string) (result TestServer, err err
}

// StartTestServerOrDie calls StartTestServer t.Fatal if it does not succeed.
func StartTestServerOrDie(t Logger, flags []string) *TestServer {
result, err := StartTestServer(t, flags)
func StartTestServerOrDie(ctx context.Context, flags []string) *TestServer {
result, err := StartTestServer(ctx, flags)
if err == nil {
return &result
}

t.Fatalf("failed to launch server: %v", err)
return nil
panic(fmt.Errorf("failed to launch server: %v", err))
}

func createListenerOnFreePort() (net.Listener, int, error) {
Expand Down
5 changes: 2 additions & 3 deletions hack/logcheck.conf
Expand Up @@ -15,16 +15,15 @@

# Now enable it again for migrated packages.
structured k8s.io/kubernetes/cmd/kube-proxy/.*
structured k8s.io/kubernetes/cmd/kube-scheduler/.*
structured k8s.io/kubernetes/cmd/kubelet/.*
structured k8s.io/kubernetes/pkg/kubelet/.*
structured k8s.io/kubernetes/pkg/proxy/.*
structured k8s.io/kubernetes/pkg/scheduler/.*

# The following packages have been migrated to contextual logging.
# Packages matched here do not have to be listed above because
# "contextual" implies "structured".
# TODO next: contextual k8s.io/kubernetes/pkg/scheduler/.*
contextual k8s.io/kubernetes/cmd/kube-scheduler/.*
contextual k8s.io/kubernetes/pkg/scheduler/.*

# As long as contextual logging is alpha or beta, all WithName, WithValues,
# NewContext calls have to go through klog. Once it is GA, we can lift
Expand Down
34 changes: 17 additions & 17 deletions pkg/scheduler/apis/config/v1beta2/default_plugins.go
Expand Up @@ -27,7 +27,7 @@ import (
)

// getDefaultPlugins returns the default set of plugins.
func getDefaultPlugins() *v1beta2.Plugins {
func getDefaultPlugins(logger klog.Logger) *v1beta2.Plugins {
plugins := &v1beta2.Plugins{
QueueSort: v1beta2.PluginSet{
Enabled: []v1beta2.Plugin{
Expand Down Expand Up @@ -107,34 +107,34 @@ func getDefaultPlugins() *v1beta2.Plugins {
},
},
}
applyFeatureGates(plugins)
applyFeatureGates(logger, plugins)

return plugins
}

func applyFeatureGates(config *v1beta2.Plugins) {
func applyFeatureGates(logger klog.Logger, config *v1beta2.Plugins) {
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority) {
config.Score.Enabled = append(config.Score.Enabled, v1beta2.Plugin{Name: names.VolumeBinding, Weight: pointer.Int32Ptr(1)})
}
}

// mergePlugins merges the custom set into the given default one, handling disabled sets.
func mergePlugins(defaultPlugins, customPlugins *v1beta2.Plugins) *v1beta2.Plugins {
func mergePlugins(logger klog.Logger, defaultPlugins, customPlugins *v1beta2.Plugins) *v1beta2.Plugins {
if customPlugins == nil {
return defaultPlugins
}

defaultPlugins.QueueSort = mergePluginSet(defaultPlugins.QueueSort, customPlugins.QueueSort)
defaultPlugins.PreFilter = mergePluginSet(defaultPlugins.PreFilter, customPlugins.PreFilter)
defaultPlugins.Filter = mergePluginSet(defaultPlugins.Filter, customPlugins.Filter)
defaultPlugins.PostFilter = mergePluginSet(defaultPlugins.PostFilter, customPlugins.PostFilter)
defaultPlugins.PreScore = mergePluginSet(defaultPlugins.PreScore, customPlugins.PreScore)
defaultPlugins.Score = mergePluginSet(defaultPlugins.Score, customPlugins.Score)
defaultPlugins.Reserve = mergePluginSet(defaultPlugins.Reserve, customPlugins.Reserve)
defaultPlugins.Permit = mergePluginSet(defaultPlugins.Permit, customPlugins.Permit)
defaultPlugins.PreBind = mergePluginSet(defaultPlugins.PreBind, customPlugins.PreBind)
defaultPlugins.Bind = mergePluginSet(defaultPlugins.Bind, customPlugins.Bind)
defaultPlugins.PostBind = mergePluginSet(defaultPlugins.PostBind, customPlugins.PostBind)
defaultPlugins.QueueSort = mergePluginSet(logger, defaultPlugins.QueueSort, customPlugins.QueueSort)
defaultPlugins.PreFilter = mergePluginSet(logger, defaultPlugins.PreFilter, customPlugins.PreFilter)
defaultPlugins.Filter = mergePluginSet(logger, defaultPlugins.Filter, customPlugins.Filter)
defaultPlugins.PostFilter = mergePluginSet(logger, defaultPlugins.PostFilter, customPlugins.PostFilter)
defaultPlugins.PreScore = mergePluginSet(logger, defaultPlugins.PreScore, customPlugins.PreScore)
defaultPlugins.Score = mergePluginSet(logger, defaultPlugins.Score, customPlugins.Score)
defaultPlugins.Reserve = mergePluginSet(logger, defaultPlugins.Reserve, customPlugins.Reserve)
defaultPlugins.Permit = mergePluginSet(logger, defaultPlugins.Permit, customPlugins.Permit)
defaultPlugins.PreBind = mergePluginSet(logger, defaultPlugins.PreBind, customPlugins.PreBind)
defaultPlugins.Bind = mergePluginSet(logger, defaultPlugins.Bind, customPlugins.Bind)
defaultPlugins.PostBind = mergePluginSet(logger, defaultPlugins.PostBind, customPlugins.PostBind)
return defaultPlugins
}

Expand All @@ -143,7 +143,7 @@ type pluginIndex struct {
plugin v1beta2.Plugin
}

func mergePluginSet(defaultPluginSet, customPluginSet v1beta2.PluginSet) v1beta2.PluginSet {
func mergePluginSet(logger klog.Logger, defaultPluginSet, customPluginSet v1beta2.PluginSet) v1beta2.PluginSet {
disabledPlugins := sets.NewString()
enabledCustomPlugins := make(map[string]pluginIndex)
// replacedPluginIndex is a set of index of plugins, which have replaced the default plugins.
Expand All @@ -162,7 +162,7 @@ func mergePluginSet(defaultPluginSet, customPluginSet v1beta2.PluginSet) v1beta2
}
// The default plugin is explicitly re-configured, update the default plugin accordingly.
if customPlugin, ok := enabledCustomPlugins[defaultEnabledPlugin.Name]; ok {
klog.InfoS("Default plugin is explicitly re-configured; overriding", "plugin", defaultEnabledPlugin.Name)
logger.Info("Default plugin is explicitly re-configured; overriding", "plugin", defaultEnabledPlugin.Name)
// Update the default plugin in place to preserve order.
defaultEnabledPlugin = customPlugin.plugin
replacedPluginIndex.Insert(customPlugin.index)
Expand Down
7 changes: 5 additions & 2 deletions pkg/scheduler/apis/config/v1beta2/default_plugins_test.go
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/featuregate"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2/ktesting"
"k8s.io/kube-scheduler/config/v1beta2"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
"k8s.io/utils/pointer"
Expand Down Expand Up @@ -117,11 +118,12 @@ func TestApplyFeatureGates(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
for k, v := range test.features {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, k, v)()
}

gotConfig := getDefaultPlugins()
gotConfig := getDefaultPlugins(logger)
if diff := cmp.Diff(test.wantConfig, gotConfig); diff != "" {
t.Errorf("unexpected config diff (-want, +got): %s", diff)
}
Expand Down Expand Up @@ -370,7 +372,8 @@ func TestMergePlugins(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
test.defaultPlugins = mergePlugins(test.defaultPlugins, test.customPlugins)
logger, _ := ktesting.NewTestContext(t)
test.defaultPlugins = mergePlugins(logger, test.defaultPlugins, test.customPlugins)
if d := cmp.Diff(test.expectedPlugins, test.defaultPlugins); d != "" {
t.Fatalf("plugins mismatch (-want +got):\n%s", d)
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/scheduler/apis/config/v1beta2/defaults.go
Expand Up @@ -22,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/util/feature"
componentbaseconfigv1alpha1 "k8s.io/component-base/config/v1alpha1"
"k8s.io/klog/v2"
"k8s.io/kube-scheduler/config/v1beta2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
Expand Down Expand Up @@ -63,9 +64,9 @@ func pluginsNames(p *v1beta2.Plugins) []string {
return n.List()
}

func setDefaults_KubeSchedulerProfile(prof *v1beta2.KubeSchedulerProfile) {
func setDefaults_KubeSchedulerProfile(logger klog.Logger, prof *v1beta2.KubeSchedulerProfile) {
// Set default plugins.
prof.Plugins = mergePlugins(getDefaultPlugins(), prof.Plugins)
prof.Plugins = mergePlugins(logger, getDefaultPlugins(logger), prof.Plugins)

// Set default plugin configs.
scheme := GetPluginArgConversionScheme()
Expand Down Expand Up @@ -101,6 +102,7 @@ func setDefaults_KubeSchedulerProfile(prof *v1beta2.KubeSchedulerProfile) {

// SetDefaults_KubeSchedulerConfiguration sets additional defaults
func SetDefaults_KubeSchedulerConfiguration(obj *v1beta2.KubeSchedulerConfiguration) {
logger := klog.TODO() // called by generated code that doesn't pass a logger
if obj.Parallelism == nil {
obj.Parallelism = pointer.Int32Ptr(16)
}
Expand All @@ -117,7 +119,7 @@ func SetDefaults_KubeSchedulerConfiguration(obj *v1beta2.KubeSchedulerConfigurat
// Add the default set of plugins and apply the configuration.
for i := range obj.Profiles {
prof := &obj.Profiles[i]
setDefaults_KubeSchedulerProfile(prof)
setDefaults_KubeSchedulerProfile(logger, prof)
}

if obj.PercentageOfNodesToScore == nil {
Expand Down