/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"context"
"errors"
"fmt"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
configv1 "k8s.io/kube-scheduler/config/v1"
"k8s.io/kubernetes/pkg/features"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/profile"
)
const (
// Duration the scheduler will wait before expiring an assumed pod.
// See issue #106361 for more details about this parameter and its value.
durationToExpireAssumedPod time.Duration = 0
)
// ErrNoNodesAvailable is used to describe the error that no nodes available to schedule pods.
var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods")
// Scheduler watches for new unscheduled pods. It attempts to find
// nodes that they fit on and writes bindings back to the api server.
type Scheduler struct {
// It is expected that changes made via Cache will be observed
// by NodeLister and Algorithm.
Cache internalcache.Cache
Extenders []framework.Extender
// NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling
// a pod may take some amount of time and we don't want pods to get
// stale while they sit in a channel.
NextPod func(logger klog.Logger) (*framework.QueuedPodInfo, error)
// FailureHandler is called upon a scheduling failure.
FailureHandler FailureHandlerFn
// SchedulePod tries to schedule the given pod to one of the nodes in the node list.
// Return a struct of ScheduleResult with the name of suggested host on success,
// otherwise will return a FitError with reasons.
SchedulePod func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error)
// Close this to shut down the scheduler.
StopEverything <-chan struct{}
// SchedulingQueue holds pods to be scheduled
SchedulingQueue internalqueue.SchedulingQueue
// Profiles are the scheduling profiles.
Profiles profile.Map
client clientset.Interface
nodeInfoSnapshot *internalcache.Snapshot
percentageOfNodesToScore int32
nextStartNodeIndex int
// logger *must* be initialized when creating a Scheduler,
// otherwise logging functions will access a nil sink and
// panic.
logger klog.Logger
// registeredHandlers contains the registrations of all handlers. It's used to check if all handlers have finished syncing before the scheduling cycles start.
registeredHandlers []cache.ResourceEventHandlerRegistration
}
//该Go代码定义了一个名为Scheduler的结构体,用于管理Pod的调度过程。
//它包含多个字段,用于配置调度器的行为,
//例如Cache、Extenders、NextPod、FailureHandler、SchedulePod、StopEverything、SchedulingQueue、Profiles、
//client、nodeInfoSnapshot、percentageOfNodesToScore、nextStartNodeIndex、logger和registeredHandlers。
//这些字段的作用如下:
//- Cache:用于存储集群的状态信息, 以便Scheduler能够快速访问。
//- Extenders:是一组扩展程序,可以自定义Pod的调度逻辑。
//- NextPod:是一个函数,用于获取下一个待调度的Pod。 -
//FailureHandler:是一个函数,用于处理调度失败的情况。
//- SchedulePod:是一个函数,用于尝试将给定的Pod调度到节点列表中的一个节点上。
//成功时返回建议的主机名,失败时返回FitError错误。
//- StopEverything:是一个通道,用于关闭Scheduler。
//- SchedulingQueue:用于存储待调度的Pod。
//- Profiles:是一组调度配置文件,用于定义不同的调度策略。
//- client:是用于与Kubernetes API服务器交互的客户端。
//- nodeInfoSnapshot:是集群节点的快照,包含节点的状态信息。
//- percentageOfNodesToScore:用于指定参与评分的节点百分比。
//- nextStartNodeIndex:用于指定下一个开始调度的节点索引。
//- logger:用于记录日志信息。
//- registeredHandlers:包含所有处理器的注册信息,用于检查处理器是否已同步完成。
//总的来说,这个Scheduler结构体定义了一个调度器所需的各种配置和功能,
//包括节点和Pod的缓存、扩展程序、日志记录、失败处理、调度逻辑等。它提供了一种灵活的方式来定制和管理Pod的调度过程。
func (sched *Scheduler) applyDefaultHandlers() {
sched.SchedulePod = sched.schedulePod
sched.FailureHandler = sched.handleSchedulingFailure
}
//该函数为一个go语言函数,作用是为Scheduler结构体实例sched设置默认的处理函数。
//具体操作是将sched.schedulePod赋值给sched.SchedulePod,将sched.handleSchedulingFailure赋值给sched.FailureHandler。
type schedulerOptions struct {
componentConfigVersion string
kubeConfig *restclient.Config
// Overridden by profile level percentageOfNodesToScore if set in v1.
percentageOfNodesToScore int32
podInitialBackoffSeconds int64
podMaxBackoffSeconds int64
podMaxInUnschedulablePodsDuration time.Duration
// Contains out-of-tree plugins to be merged with the in-tree registry.
frameworkOutOfTreeRegistry frameworkruntime.Registry
profiles []schedulerapi.KubeSchedulerProfile
extenders []schedulerapi.Extender
frameworkCapturer FrameworkCapturer
parallelism int32
applyDefaultProfile bool
}
//该代码定义了一个名为schedulerOptions的结构体,用于配置调度器的参数。
//其中包括了组件配置版本、kubeconfig配置、节点打分百分比、Pod初始退避时间、Pod最大退避时间、
//Pod在不可调度状态下的最大持续时间、外部插件注册表、调度器配置文件、扩展器、框架捕获器和并行度等参数。
// Option configures a Scheduler
type Option func(*schedulerOptions)
// ScheduleResult represents the result of scheduling a pod.
type ScheduleResult struct {
// Name of the selected node.
SuggestedHost string
// The number of nodes the scheduler evaluated the pod against in the filtering
// phase and beyond.
// Note that it contains the number of nodes that filtered out by PreFilterResult.
EvaluatedNodes int
// The number of nodes out of the evaluated ones that fit the pod.
FeasibleNodes int
// The nominating info for scheduling cycle.
nominatingInfo *framework.NominatingInfo
}
//这段代码定义了Go语言中的两个类型和一个函数:
//1. Option 是一个函数类型,其参数为一个 *schedulerOptions 指针,用于配置一个 Scheduler。
//2. ScheduleResult 是一个结构体类型,代表调度 Pod 的结果。它包含以下字段:
//- SuggestedHost:被选中的节点名称。
//- EvaluatedNodes:在过滤阶段及之后对 Pod 进行评估的节点数量。
//- FeasibleNodes:在评估节点中适合 Pod 的节点数量。
//- nominatingInfo:提名信息,用于记录调度周期的提名情况。
//这段代码没有定义函数的具体实现,因此无法对其复杂度进行评估。
// WithComponentConfigVersion sets the component config version to the
// KubeSchedulerConfiguration version used. The string should be the full
// scheme group/version of the external type we converted from (for example
// "kubescheduler.config.k8s.io/v1")
func WithComponentConfigVersion(apiVersion string) Option {
return func(o *schedulerOptions) {
o.componentConfigVersion = apiVersion
}
}
// WithKubeConfig sets the kube config for Scheduler.
func WithKubeConfig(cfg *restclient.Config) Option {
return func(o *schedulerOptions) {
o.kubeConfig = cfg
}
}
// WithProfiles sets profiles for Scheduler. By default, there is one profile
// with the name "default-scheduler".
func WithProfiles(p ...schedulerapi.KubeSchedulerProfile) Option {
return func(o *schedulerOptions) {
o.profiles = p
o.applyDefaultProfile = false
}
}
// WithParallelism sets the parallelism for all scheduler algorithms. Default is 16.
func WithParallelism(threads int32) Option {
return func(o *schedulerOptions) {
o.parallelism = threads
}
}
// WithPercentageOfNodesToScore sets percentageOfNodesToScore for Scheduler.
// The default value of 0 will use an adaptive percentage: 50 - (num of nodes)/125.
func WithPercentageOfNodesToScore(percentageOfNodesToScore *int32) Option {
return func(o *schedulerOptions) {
if percentageOfNodesToScore != nil {
o.percentageOfNodesToScore = *percentageOfNodesToScore
}
}
}
//这些函数是Go语言中的函数,用于设置调度器(scheduler)的配置选项。
//- WithComponentConfigVersion 函数用于设置组件配置的版本。
//它接受一个字符串参数 apiVersion,该参数应该是外部类型转换而来的完整方案组/版本(例如 "kubescheduler.config.k8s.io/v1")。
//- WithKubeConfig 函数用于设置调度器的kube配置。
//- WithProfiles 函数用于设置调度器的配置文件。默认情况下,有一个名为 "default-scheduler" 的配置文件。
//- WithParallelism 函数用于设置所有调度算法的并行度。默认值为16。
//- WithPercentageOfNodesToScore 函数用于设置Scheduler的 percentageOfNodesToScore。
//默认值为0,使用自适应百分比:50 - (节点数)/125。
// WithFrameworkOutOfTreeRegistry sets the registry for out-of-tree plugins. Those plugins
// will be appended to the default registry.
func WithFrameworkOutOfTreeRegistry(registry frameworkruntime.Registry) Option {
return func(o *schedulerOptions) {
o.frameworkOutOfTreeRegistry = registry
}
}
// WithPodInitialBackoffSeconds sets podInitialBackoffSeconds for Scheduler, the default value is 1
func WithPodInitialBackoffSeconds(podInitialBackoffSeconds int64) Option {
return func(o *schedulerOptions) {
o.podInitialBackoffSeconds = podInitialBackoffSeconds
}
}
// WithPodMaxBackoffSeconds sets podMaxBackoffSeconds for Scheduler, the default value is 10
func WithPodMaxBackoffSeconds(podMaxBackoffSeconds int64) Option {
return func(o *schedulerOptions) {
o.podMaxBackoffSeconds = podMaxBackoffSeconds
}
}
// WithPodMaxInUnschedulablePodsDuration sets podMaxInUnschedulablePodsDuration for PriorityQueue.
func WithPodMaxInUnschedulablePodsDuration(duration time.Duration) Option {
return func(o *schedulerOptions) {
o.podMaxInUnschedulablePodsDuration = duration
}
}
//这些函数是Go语言中的函数,用于设置调度器的配置选项。
//- WithFrameworkOutOfTreeRegistry 函数用于设置外部插件的注册表,将这些插件追加到默认的注册表中。
//- WithPodInitialBackoffSeconds 函数用于设置 Scheduler 的 podInitialBackoffSeconds,其默认值为 1。
//- WithPodMaxBackoffSeconds 函数用于设置 Scheduler 的 podMaxBackoffSeconds,其默认值为 10。
//- WithPodMaxInUnschedulablePodsDuration 函数用于设置 PriorityQueue 的 podMaxInUnschedulablePodsDuration。
// WithExtenders sets extenders for the Scheduler
func WithExtenders(e ...schedulerapi.Extender) Option {
return func(o *schedulerOptions) {
o.extenders = e
}
}
//该函数为Go语言中的函数,名为WithExtenders,接收一个变长参数e,类型为schedulerapi.Extender的切片。
//函数返回一个Option类型的函数,该函数接收一个schedulerOptions类型的指针o,将e赋值给o.extenders。
// FrameworkCapturer is used for registering a notify function in building framework.
type FrameworkCapturer func(schedulerapi.KubeSchedulerProfile)
// WithBuildFrameworkCapturer sets a notify function for getting buildFramework details.
func WithBuildFrameworkCapturer(fc FrameworkCapturer) Option {
return func(o *schedulerOptions) {
o.frameworkCapturer = fc
}
}
//该函数是一个名为WithBuildFrameworkCapturer的函数,
//它接收一个FrameworkCapturer类型的参数fc,并返回一个Option类型的函数。
//返回的函数将传入的fc赋值给o.frameworkCapturer。
var defaultSchedulerOptions = schedulerOptions{
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()),
podMaxBackoffSeconds: int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()),
podMaxInUnschedulablePodsDuration: internalqueue.DefaultPodMaxInUnschedulablePodsDuration,
parallelism: int32(parallelize.DefaultParallelism),
// Ideally we would statically set the default profile here, but we can't because
// creating the default profile may require testing feature gates, which may get
// set dynamically in tests. Therefore, we delay creating it until New is actually
// invoked.
applyDefaultProfile: true,
}
//这段Go代码定义了一个名为defaultSchedulerOptions的变量,它是一个schedulerOptions类型的结构体。
//这个结构体用于设置调度器的默认选项,包括以下字段:
//- percentageOfNodesToScore:表示要进行打分的节点的百分比,默认值为schedulerapi.DefaultPercentageOfNodesToScore。
//- podInitialBackoffSeconds:表示Pod初始退避时间的秒数,默认值为internalqueue.DefaultPodInitialBackoffDuration的秒数。
//- podMaxBackoffSeconds:表示Pod最大退避时间的秒数,默认值为internalqueue.DefaultPodMaxBackoffDuration的秒数。
//- podMaxInUnschedulablePodsDuration:表示Pod在不可调度状态下允许的最大持续时间,
//默认值为internalqueue.DefaultPodMaxInUnschedulablePodsDuration。
//- parallelism:表示并行处理任务的数量,默认值为parallelize.DefaultParallelism。
//- applyDefaultProfile:表示是否应用默认的调度配置文件,默认值为true。
//这些选项用于配置调度器的行为,例如决定多少节点需要进行打分、设置Pod的退避策略等。
// New returns a Scheduler
func New(ctx context.Context,
client clientset.Interface,
informerFactory informers.SharedInformerFactory,
dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
recorderFactory profile.RecorderFactory,
opts ...Option) (*Scheduler, error) {
logger := klog.FromContext(ctx)
stopEverything := ctx.Done()
options := defaultSchedulerOptions
for _, opt := range opts {
opt(&options)
}
//该函数名为New,返回一个Scheduler类型指针和一个错误类型。
//函数参数包括上下文ctx、客户端接口client、共享informer工厂informerFactory、动态共享informer工厂dynInformerFactory、
//记录器工厂recorderFactory以及可选参数opts。
//函数首先从上下文中获取日志记录器logger和停止信号stopEverything。
//然后定义默认的调度器选项options,并遍历opts对options进行配置。
//最后返回一个Scheduler实例和错误类型。
if options.applyDefaultProfile {
var versionedCfg configv1.KubeSchedulerConfiguration
scheme.Scheme.Default(&versionedCfg)
cfg := schedulerapi.KubeSchedulerConfiguration{}
if err := scheme.Scheme.Convert(&versionedCfg, &cfg, nil); err != nil {
return nil, err
}
options.profiles = cfg.Profiles
}
//这段Go代码主要功能是应用默认配置到调度器配置中。
//首先,它创建了一个configv1.KubeSchedulerConfiguration类型的变量versionedCfg,
//并使用scheme.Scheme.Default函数为其应用默认值。
//接着,它创建了一个schedulerapi.KubeSchedulerConfiguration类型的变量cfg,
//并将versionedCfg中的值通过scheme.Scheme.Convert函数转换并赋值给cfg。
//最后,将cfg.Profiles赋值给options.profiles。
registry := frameworkplugins.NewInTreeRegistry()
if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
return nil, err
}
metrics.Register()
extenders, err := buildExtenders(logger, options.extenders, options.profiles)
if err != nil {
return nil, fmt.Errorf("couldn't build extenders: %w", err)
}
podLister := informerFactory.Core().V1().Pods().Lister()
nodeLister := informerFactory.Core().V1().Nodes().Lister()
snapshot := internalcache.NewEmptySnapshot()
metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopEverything)
profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory,
frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
frameworkruntime.WithClientSet(client),
frameworkruntime.WithKubeConfig(options.kubeConfig),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
frameworkruntime.WithParallelism(int(options.parallelism)),
frameworkruntime.WithExtenders(extenders),
frameworkruntime.WithMetricsRecorder(metricsRecorder),
)
if err != nil {
return nil, fmt.Errorf("initializing profiles: %v", err)
}
//这段Go代码的功能是初始化一个调度器配置。
//1. 首先创建一个in-tree注册表registry。
//2. 将options.frameworkOutOfTreeRegistry合并到registry中,如果合并失败则返回错误。
//3. 注册指标收集。
//4. 构建扩展程序extenders,如果构建失败则返回错误。
//5. 获取Pod和Node的列表器。
//6. 创建一个空的快照snapshot。
//7. 创建一个异步指标记录器metricsRecorder。
//8. 使用给定的参数初始化调度器配置profiles,如果初始化失败则返回错误。
//其中,buildExtenders函数用于构建扩展程序,informFactory是一个informers工厂,用于创建Pod和Node的列表器。internalcache.NewEmptySnapshot()创建一个空的快照,metrics.NewMetricsAsyncRecorder创建一个异步指标记录器。profile.NewMap用于初始化调度器配置。
if len(profiles) == 0 {
return nil, errors.New("at least one profile is required")
}
preEnqueuePluginMap := make(map[string][]framework.PreEnqueuePlugin)
queueingHintsPerProfile := make(internalqueue.QueueingHintMapPerProfile)
for profileName, profile := range profiles {
preEnqueuePluginMap[profileName] = profile.PreEnqueuePlugins()
queueingHintsPerProfile[profileName] = buildQueueingHintMap(profile.EnqueueExtensions())
}
podQueue := internalqueue.NewSchedulingQueue(
profiles[options.profiles[0].SchedulerName].QueueSortFunc(),
informerFactory,
internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),
internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),
internalqueue.WithPodLister(podLister),
internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
internalqueue.WithPreEnqueuePluginMap(preEnqueuePluginMap),
internalqueue.WithQueueingHintMapPerProfile(queueingHintsPerProfile),
internalqueue.WithPluginMetricsSamplePercent(pluginMetricsSamplePercent),
internalqueue.WithMetricsRecorder(*metricsRecorder),
)
for _, fwk := range profiles {
fwk.SetPodNominator(podQueue)
}
schedulerCache := internalcache.New(ctx, durationToExpireAssumedPod)
// Setup cache debugger.
debugger := cachedebugger.New(nodeLister, podLister, schedulerCache, podQueue)
debugger.ListenForSignal(ctx)
sched := &Scheduler{
Cache: schedulerCache,
client: client,
nodeInfoSnapshot: snapshot,
percentageOfNodesToScore: options.percentageOfNodesToScore,
Extenders: extenders,
StopEverything: stopEverything,
SchedulingQueue: podQueue,
Profiles: profiles,
logger: logger,
}
sched.NextPod = podQueue.Pop
sched.applyDefaultHandlers()
if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(queueingHintsPerProfile)); err != nil {
return nil, fmt.Errorf("adding event handlers: %w", err)
}
return sched, nil
}
//该函数主要实现了以下功能:
//1. 检查传入的profiles是否为空,如果为空则返回错误。
//2. 根据profiles创建preEnqueuePluginMap和queueingHintsPerProfile。
//3. 使用profiles中指定的SchedulerName创建一个SchedulingQueue对象,该对象用于管理待调度的Pod。
//4. 为每个profile设置PodNominator。
//5. 创建一个schedulerCache对象,用于缓存节点和Pod的信息。
//6. 创建一个debugger对象,用于调试缓存。
//7. 创建一个Scheduler对象,并设置其属性。
//8. 设置Scheduler的NextPod方法。
//9. 应用默认的处理程序。
//10. 添加所有事件处理程序。
//综上所述,该函数的主要功能是创建一个Scheduler对象,并对其进行初始化。
// defaultQueueingHintFn is the default queueing hint function.
// It always returns Queue as the queueing hint.
var defaultQueueingHintFn = func(_ klog.Logger, _ *v1.Pod, _, _ interface{}) (framework.QueueingHint, error) {
return framework.Queue, nil
}
func buildQueueingHintMap(es []framework.EnqueueExtensions) internalqueue.QueueingHintMap {
queueingHintMap := make(internalqueue.QueueingHintMap)
for _, e := range es {
events := e.EventsToRegister()
//该函数的功能是构建一个队列提示映射(QueueingHintMap),它将事件注册到队列中。
//1. 函数接收一个[]framework.EnqueueExtensions参数,它是一个调度器扩展点的集合,这些扩展点可以注册事件。
//2. 创建一个空的internalqueue.QueueingHintMap用于存储队列提示映射。
//3. 遍历扩展点集合es中的每个扩展点e。
//4. 调用扩展点e的EventsToRegister()方法,获取该扩展点需要注册的事件。
//5. 将事件添加到队列提示映射queueingHintMap中。
//最终,函数返回构建完成的队列提示映射queueingHintMap。
// This will happen when plugin registers with empty events, it's usually the case a pod
// will become reschedulable only for self-update, e.g. schedulingGates plugin, the pod
// will enter into the activeQ via priorityQueue.Update().
if len(events) == 0 {
continue
}
// Note: Rarely, a plugin implements EnqueueExtensions but returns nil.
// We treat it as: the plugin is not interested in any event, and hence pod failed by that plugin
// cannot be moved by any regular cluster event.
// So, we can just ignore such EventsToRegister here.
registerNodeAdded := false
registerNodeTaintUpdated := false
for _, event := range events {
fn := event.QueueingHintFn
if fn == nil || !utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) {
fn = defaultQueueingHintFn
}
if event.Event.Resource == framework.Node {
if event.Event.ActionType&framework.Add != 0 {
registerNodeAdded = true
}
if event.Event.ActionType&framework.UpdateNodeTaint != 0 {
registerNodeTaintUpdated = true
}
}
//这段Go代码中的函数是一个循环,用于遍历一组事件(events),
//并根据这些事件的类型来更新两个布尔变量registerNodeAdded和registerNodeTaintUpdated。
//具体来说,函数首先检查事件是否启用了调度队列提示功能
//(通过utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints)来判断),
//如果没有启用,则使用默认的队列提示函数defaultQueueingHintFn
//。然后,如果事件资源类型为framework.Node,并且事件动作类型包含framework.Add或framework.UpdateNodeTaint,
//则分别将registerNodeAdded和registerNodeTaintUpdated设置为true。
//这段代码的主要目的是为了根据事件的类型来决定是否需要注册节点添加或节点污点更新的操作。
queueingHintMap[event.Event] = append(queueingHintMap[event.Event], &internalqueue.QueueingHintFunction{
PluginName: e.Name(),
QueueingHintFn: fn,
})
}
if registerNodeAdded && !registerNodeTaintUpdated {
// Temporally fix for the issue https://github.com/kubernetes/kubernetes/issues/109437
// NodeAdded QueueingHint isn't always called because of preCheck.
// It's definitely not something expected for plugin developers,
// and registering UpdateNodeTaint event is the only mitigation for now.
//
// So, here registers UpdateNodeTaint event for plugins that has NodeAdded event, but don't have UpdateNodeTaint event.
// It has a bad impact for the requeuing efficiency though, a lot better than some Pods being stuch in the
// unschedulable pod pool.
// This behavior will be removed when we remove the preCheck feature.
// See: https://github.com/kubernetes/kubernetes/issues/110175
queueingHintMap[framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}] =
append(queueingHintMap[framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}],
&internalqueue.QueueingHintFunction{
PluginName: e.Name(),
QueueingHintFn: defaultQueueingHintFn,
},
)
}
}
return queueingHintMap
}
//这段Go代码是一个函数,它根据传入的事件列表和注册选项生成一个队列提示映射。
//该映射将事件与相应的队列提示函数进行关联。
//如果注册了NodeAdded事件但未注册UpdateNodeTaint事件,则会临时修复一个Kubernetes问题,
//通过为这些插件注册UpdateNodeTaint事件来避免某些Pod被卡在不可调度的Pod池中。
//该函数返回生成的队列提示映射。
// Run begins watching and scheduling. It starts scheduling and blocked until the context is done.
func (sched *Scheduler) Run(ctx context.Context) {
logger := klog.FromContext(ctx)
sched.SchedulingQueue.Run(logger)
// We need to start scheduleOne loop in a dedicated goroutine,
// because scheduleOne function hangs on getting the next item
// from the SchedulingQueue.
// If there are no new pods to schedule, it will be hanging there
// and if done in this goroutine it will be blocking closing
// SchedulingQueue, in effect causing a deadlock on shutdown.
go wait.UntilWithContext(ctx, sched.ScheduleOne, 0)
<-ctx.Done()
sched.SchedulingQueue.Close()
// If the plugins satisfy the io.Closer interface, they are closed.
err := sched.Profiles.Close()
if err != nil {
logger.Error(err, "Failed to close plugins")
}
}
//该函数是Scheduler类型的Run方法,用于开始监控和调度。
//它启动调度并阻塞,直到上下文完成。具体步骤如下:
//1. 从上下文中获取日志记录器logger。
//2. 调用SchedulingQueue的Run方法,开始监控调度队列。
//3. 在一个独立的goroutine中启动scheduleOne循环,因为scheduleOne函数会在从SchedulingQueue获取下一个项目时挂起。
//如果没有任何新Pod需要调度,它将一直挂起。
//如果在当前goroutine中执行,它将阻塞关闭SchedulingQueue,从而在关闭时导致死锁。
//4. 等待上下文完成。
//5. 调用SchedulingQueue的Close方法,关闭调度队列。
//6. 尝试关闭满足io.Closer接口的插件。如果关闭失败,记录错误日志。
// NewInformerFactory creates a SharedInformerFactory and initializes a scheduler specific
// in-place podInformer.
func NewInformerFactory(cs clientset.Interface, resyncPeriod time.Duration) informers.SharedInformerFactory {
informerFactory := informers.NewSharedInformerFactory(cs, resyncPeriod)
informerFactory.InformerFor(&v1.Pod{}, newPodInformer)
return informerFactory
}
//该函数创建一个SharedInformerFactory,并为特定的调度程序初始化一个就地podInformer。
//函数接收一个clientset.Interface类型和一个时间周期作为参数,返回一个SharedInformerFactory。
//内部通过调用informers.NewSharedInformerFactory创建一个新的SharedInformerFactory实例,
//并使用InformerFor方法为v1.Pod类型创建一个新的podInformer。
//最后返回创建的SharedInformerFactory实例。
func buildExtenders(logger klog.Logger, extenders []schedulerapi.Extender, profiles []schedulerapi.KubeSchedulerProfile) ([]framework.Extender, error) {
var fExtenders []framework.Extender
if len(extenders) == 0 {
return nil, nil
}
var ignoredExtendedResources []string
var ignorableExtenders []framework.Extender
for i := range extenders {
logger.V(2).Info("Creating extender", "extender", extenders[i])
extender, err := NewHTTPExtender(&extenders[i])
if err != nil {
return nil, err
}
if !extender.IsIgnorable() {
fExtenders = append(fExtenders, extender)
} else {
ignorableExtenders = append(ignorableExtenders, extender)
}
for _, r := range extenders[i].ManagedResources {
if r.IgnoredByScheduler {
ignoredExtendedResources = append(ignoredExtendedResources, r.Name)
}
}
}
// place ignorable extenders to the tail of extenders
fExtenders = append(fExtenders, ignorableExtenders...)
//该函数主要负责根据传入的extenders参数构建一个framework.Extender类型的切片fExtenders。具体流程如下:
//1. 首先判断extenders切片的长度是否为0,如果是则直接返回nil和nil。
//2. 初始化两个切片ignoredExtendedResources和ignorableExtenders,分别用于存储被忽略的扩展资源名称和可忽略的扩展器。
//3. 遍历extenders切片,对每个扩展器进行如下操作:
//- 使用klog.Logger记录日志信息。
//- 调用NewHTTPExtender函数创建一个新的HTTPExtender对象。
//- 如果该扩展器不可忽略,则将其添加到fExtenders切片中。
//- 如果该扩展器可忽略,则将其添加到ignorableExtenders切片中。
//- 遍历该扩展器的ManagedResources字段,将被忽略的资源名称添加到ignoredExtendedResources切片中。
//4. 将ignorableExtenders切片追加到fExtenders切片的末尾。
//最终,函数返回构建好的fExtenders切片和可能出现的错误。
// If there are any extended resources found from the Extenders, append them to the pluginConfig for each profile.
// This should only have an effect on ComponentConfig, where it is possible to configure Extenders and
// plugin args (and in which case the extender ignored resources take precedence).
if len(ignoredExtendedResources) == 0 {
return fExtenders, nil
}
for i := range profiles {
prof := &profiles[i]
var found = false
for k := range prof.PluginConfig {
if prof.PluginConfig[k].Name == noderesources.Name {
// Update the existing args
pc := &prof.PluginConfig[k]
args, ok := pc.Args.(*schedulerapi.NodeResourcesFitArgs)
if !ok {
return nil, fmt.Errorf("want args to be of type NodeResourcesFitArgs, got %T", pc.Args)
}
args.IgnoredResources = ignoredExtendedResources
found = true
break
}
}
if !found {
return nil, fmt.Errorf("can't find NodeResourcesFitArgs in plugin config")
}
}
return fExtenders, nil
}
//该函数首先检查ignoredExtendedResources是否为空,如果为空则直接返回fExtenders和nil。
//如果不为空,则遍历profiles中的每一个元素,然后遍历该元素的PluginConfig。
//当找到PluginConfig中的Name等于noderesources.Name时,将ignoredExtendedResources赋值给Args的IgnoredResources字段,
//并将found设为true。 如果遍历完所有PluginConfig后仍未找到符合条件的元素,则返回nil和错误信息。
//最后返回fExtenders和nil。
type FailureHandlerFn func(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, status *framework.Status, nominatingInfo *framework.NominatingInfo, start time.Time)
func unionedGVKs(queueingHintsPerProfile internalqueue.QueueingHintMapPerProfile) map[framework.GVK]framework.ActionType {
gvkMap := make(map[framework.GVK]framework.ActionType)
for _, queueingHints := range queueingHintsPerProfile {
for evt := range queueingHints {
if _, ok := gvkMap[evt.Resource]; ok {
gvkMap[evt.Resource] |= evt.ActionType
} else {
gvkMap[evt.Resource] = evt.ActionType
}
}
}
return gvkMap
}
//该函数用于将一个internalqueue.QueueingHintMapPerProfile类型的参数转换为一个map[framework.GVK]framework.ActionType类型的结果。
//具体实现过程为:遍历输入参数中的每个queueingHints,再遍历queueingHints中的每个事件evt,将evt.Resource作为键,
//evt.ActionType作为值存入gvkMap中。如果gvkMap中已经存在该键,则将该键对应的值与evt.ActionType进行按位或运算后再存入gvkMap中。
//最后返回gvkMap作为结果。
// newPodInformer creates a shared index informer that returns only non-terminal pods.
// The PodInformer allows indexers to be added, but note that only non-conflict indexers are allowed.
func newPodInformer(cs clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
selector := fmt.Sprintf("status.phase!=%v,status.phase!=%v", v1.PodSucceeded, v1.PodFailed)
tweakListOptions := func(options *metav1.ListOptions) {
options.FieldSelector = selector
}
informer := coreinformers.NewFilteredPodInformer(cs, metav1.NamespaceAll, resyncPeriod, cache.Indexers{}, tweakListOptions)
// Dropping `.metadata.managedFields` to improve memory usage.
// The Extract workflow (i.e. `ExtractPod`) should be unused.
trim := func(obj interface{}) (interface{}, error) {
if accessor, err := meta.Accessor(obj); err == nil {
accessor.SetManagedFields(nil)
}
return obj, nil
}
informer.SetTransform(trim)
return informer
}
//该函数创建一个共享索引 informer,用于返回非终止状态的 pod。
//它通过设置筛选条件,使得 informer 只能获取到状态不是 "Succeeded" 或 "Failed" 的 pod。
//此外,函数还通过设置 transform 函数来删除 pod 的 .metadata.managedFields 字段,以减少内存使用。
//该函数返回一个经过筛选和转换的 pod informer 实例。