package scheduler
import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
configv1 "k8s.io/kube-scheduler/config/v1"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
const (
// Duration the scheduler will wait before expiring an assumed pod.
// See issue #106361 for more details about this parameter and its value.
durationToExpireAssumedPod time.Duration = 0
// ErrNoNodesAvailable is used to describe the error that no nodes available to schedule pods.
var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods")
// Scheduler watches for new unscheduled pods. It attempts to find
// nodes that they fit on and writes bindings back to the api server.
type Scheduler struct {
// It is expected that changes made via Cache will be observed
// by NodeLister and Algorithm.
Cache internalcache.Cache
Extenders []framework.Extender
// NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling
// a pod may take some amount of time and we don't want pods to get
// stale while they sit in a channel.
NextPod func(logger klog.Logger) (*framework.QueuedPodInfo, error)
// FailureHandler is called upon a scheduling failure.
FailureHandler FailureHandlerFn
// SchedulePod tries to schedule the given pod to one of the nodes in the node list.
// Return a struct of ScheduleResult with the name of suggested host on success,
// otherwise will return a FitError with reasons.
SchedulePod func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error)
// Close this to shut down the scheduler.
StopEverything <-chan struct{}
// SchedulingQueue holds pods to be scheduled
SchedulingQueue internalqueue.SchedulingQueue
// Profiles are the scheduling profiles.
Profiles profile.Map
client clientset.Interface
nodeInfoSnapshot *internalcache.Snapshot
percentageOfNodesToScore int32
nextStartNodeIndex int
// logger *must* be initialized when creating a Scheduler,
// otherwise logging functions will access a nil sink and
// panic.
logger klog.Logger
// registeredHandlers contains the registrations of all handlers. It's used to check if all handlers have finished syncing before the scheduling cycles start.
registeredHandlers []cache.ResourceEventHandlerRegistration
//- Cache:用于存储集群的状态信息, 以便Scheduler能够快速访问。
//- Extenders:是一组扩展程序,可以自定义Pod的调度逻辑。
//- NextPod:是一个函数,用于获取下一个待调度的Pod。 -
//- SchedulePod:是一个函数,用于尝试将给定的Pod调度到节点列表中的一个节点上。
//- StopEverything:是一个通道,用于关闭Scheduler。
//- SchedulingQueue:用于存储待调度的Pod。
//- Profiles:是一组调度配置文件,用于定义不同的调度策略。
//- client:是用于与Kubernetes API服务器交互的客户端。
//- nodeInfoSnapshot:是集群节点的快照,包含节点的状态信息。
//- percentageOfNodesToScore:用于指定参与评分的节点百分比。
//- nextStartNodeIndex:用于指定下一个开始调度的节点索引。
//- logger:用于记录日志信息。
//- registeredHandlers:包含所有处理器的注册信息,用于检查处理器是否已同步完成。
func (sched *Scheduler) applyDefaultHandlers() {
sched.SchedulePod = sched.schedulePod
sched.FailureHandler = sched.handleSchedulingFailure
type schedulerOptions struct {
componentConfigVersion string
kubeConfig *restclient.Config
// Overridden by profile level percentageOfNodesToScore if set in v1.
percentageOfNodesToScore int32
podInitialBackoffSeconds int64
podMaxBackoffSeconds int64
podMaxInUnschedulablePodsDuration time.Duration
// Contains out-of-tree plugins to be merged with the in-tree registry.
frameworkOutOfTreeRegistry frameworkruntime.Registry
profiles []schedulerapi.KubeSchedulerProfile
extenders []schedulerapi.Extender
frameworkCapturer FrameworkCapturer
parallelism int32
applyDefaultProfile bool
// Option configures a Scheduler
type Option func(*schedulerOptions)
// ScheduleResult represents the result of scheduling a pod.
type ScheduleResult struct {
// Name of the selected node.
SuggestedHost string
// The number of nodes the scheduler evaluated the pod against in the filtering
// phase and beyond.
// Note that it contains the number of nodes that filtered out by PreFilterResult.
EvaluatedNodes int
// The number of nodes out of the evaluated ones that fit the pod.
FeasibleNodes int
// The nominating info for scheduling cycle.
nominatingInfo *framework.NominatingInfo
//1. Option 是一个函数类型,其参数为一个 *schedulerOptions 指针,用于配置一个 Scheduler。
//2. ScheduleResult 是一个结构体类型,代表调度 Pod 的结果。它包含以下字段:
//- SuggestedHost:被选中的节点名称。
//- EvaluatedNodes:在过滤阶段及之后对 Pod 进行评估的节点数量。
//- FeasibleNodes:在评估节点中适合 Pod 的节点数量。
//- nominatingInfo:提名信息,用于记录调度周期的提名情况。
// WithComponentConfigVersion sets the component config version to the
// KubeSchedulerConfiguration version used. The string should be the full
// scheme group/version of the external type we converted from (for example
// "kubescheduler.config.k8s.io/v1")
func WithComponentConfigVersion(apiVersion string) Option {
return func(o *schedulerOptions) {
o.componentConfigVersion = apiVersion
// WithKubeConfig sets the kube config for Scheduler.
func WithKubeConfig(cfg *restclient.Config) Option {
return func(o *schedulerOptions) {
o.kubeConfig = cfg
// WithProfiles sets profiles for Scheduler. By default, there is one profile
// with the name "default-scheduler".
func WithProfiles(p ...schedulerapi.KubeSchedulerProfile) Option {
return func(o *schedulerOptions) {
o.profiles = p
o.applyDefaultProfile = false
// WithParallelism sets the parallelism for all scheduler algorithms. Default is 16.
func WithParallelism(threads int32) Option {
return func(o *schedulerOptions) {
o.parallelism = threads
// WithPercentageOfNodesToScore sets percentageOfNodesToScore for Scheduler.
// The default value of 0 will use an adaptive percentage: 50 - (num of nodes)/125.
func WithPercentageOfNodesToScore(percentageOfNodesToScore *int32) Option {
return func(o *schedulerOptions) {
if percentageOfNodesToScore != nil {
o.percentageOfNodesToScore = *percentageOfNodesToScore
//- WithComponentConfigVersion 函数用于设置组件配置的版本。
//它接受一个字符串参数 apiVersion,该参数应该是外部类型转换而来的完整方案组/版本(例如 "kubescheduler.config.k8s.io/v1")。
//- WithKubeConfig 函数用于设置调度器的kube配置。
//- WithProfiles 函数用于设置调度器的配置文件。默认情况下,有一个名为 "default-scheduler" 的配置文件。
//- WithParallelism 函数用于设置所有调度算法的并行度。默认值为16。
//- WithPercentageOfNodesToScore 函数用于设置Scheduler的 percentageOfNodesToScore。
//默认值为0,使用自适应百分比:50 - (节点数)/125。
// WithFrameworkOutOfTreeRegistry sets the registry for out-of-tree plugins. Those plugins
// will be appended to the default registry.
func WithFrameworkOutOfTreeRegistry(registry frameworkruntime.Registry) Option {
return func(o *schedulerOptions) {
o.frameworkOutOfTreeRegistry = registry
// WithPodInitialBackoffSeconds sets podInitialBackoffSeconds for Scheduler, the default value is 1
func WithPodInitialBackoffSeconds(podInitialBackoffSeconds int64) Option {
return func(o *schedulerOptions) {
o.podInitialBackoffSeconds = podInitialBackoffSeconds
// WithPodMaxBackoffSeconds sets podMaxBackoffSeconds for Scheduler, the default value is 10
func WithPodMaxBackoffSeconds(podMaxBackoffSeconds int64) Option {
return func(o *schedulerOptions) {
o.podMaxBackoffSeconds = podMaxBackoffSeconds
// WithPodMaxInUnschedulablePodsDuration sets podMaxInUnschedulablePodsDuration for PriorityQueue.
func WithPodMaxInUnschedulablePodsDuration(duration time.Duration) Option {
return func(o *schedulerOptions) {
o.podMaxInUnschedulablePodsDuration = duration
//- WithFrameworkOutOfTreeRegistry 函数用于设置外部插件的注册表,将这些插件追加到默认的注册表中。
//- WithPodInitialBackoffSeconds 函数用于设置 Scheduler 的 podInitialBackoffSeconds,其默认值为 1。
//- WithPodMaxBackoffSeconds 函数用于设置 Scheduler 的 podMaxBackoffSeconds,其默认值为 10。
//- WithPodMaxInUnschedulablePodsDuration 函数用于设置 PriorityQueue 的 podMaxInUnschedulablePodsDuration。
// WithExtenders sets extenders for the Scheduler
func WithExtenders(e ...schedulerapi.Extender) Option {
return func(o *schedulerOptions) {
o.extenders = e
// FrameworkCapturer is used for registering a notify function in building framework.
type FrameworkCapturer func(schedulerapi.KubeSchedulerProfile)
// WithBuildFrameworkCapturer sets a notify function for getting buildFramework details.
func WithBuildFrameworkCapturer(fc FrameworkCapturer) Option {
return func(o *schedulerOptions) {
o.frameworkCapturer = fc
var defaultSchedulerOptions = schedulerOptions{
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()),
podMaxBackoffSeconds: int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()),
podMaxInUnschedulablePodsDuration: internalqueue.DefaultPodMaxInUnschedulablePodsDuration,
parallelism: int32(parallelize.DefaultParallelism),
// Ideally we would statically set the default profile here, but we can't because
// creating the default profile may require testing feature gates, which may get
// set dynamically in tests. Therefore, we delay creating it until New is actually
// invoked.
applyDefaultProfile: true,
//- percentageOfNodesToScore:表示要进行打分的节点的百分比,默认值为schedulerapi.DefaultPercentageOfNodesToScore。
//- podInitialBackoffSeconds:表示Pod初始退避时间的秒数,默认值为internalqueue.DefaultPodInitialBackoffDuration的秒数。
//- podMaxBackoffSeconds:表示Pod最大退避时间的秒数,默认值为internalqueue.DefaultPodMaxBackoffDuration的秒数。
//- podMaxInUnschedulablePodsDuration:表示Pod在不可调度状态下允许的最大持续时间,
//- parallelism:表示并行处理任务的数量,默认值为parallelize.DefaultParallelism。
//- applyDefaultProfile:表示是否应用默认的调度配置文件,默认值为true。
// New returns a Scheduler
func New(ctx context.Context,
client clientset.Interface,
informerFactory informers.SharedInformerFactory,
dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
recorderFactory profile.RecorderFactory,
opts ...Option) (*Scheduler, error) {
logger := klog.FromContext(ctx)
stopEverything := ctx.Done()
options := defaultSchedulerOptions
for _, opt := range opts {
if options.applyDefaultProfile {
var versionedCfg configv1.KubeSchedulerConfiguration
cfg := schedulerapi.KubeSchedulerConfiguration{}
if err := scheme.Scheme.Convert(&versionedCfg, &cfg, nil); err != nil {
return nil, err
options.profiles = cfg.Profiles
registry := frameworkplugins.NewInTreeRegistry()
if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
return nil, err
extenders, err := buildExtenders(logger, options.extenders, options.profiles)
if err != nil {
return nil, fmt.Errorf("couldn't build extenders: %w", err)
podLister := informerFactory.Core().V1().Pods().Lister()
nodeLister := informerFactory.Core().V1().Nodes().Lister()
snapshot := internalcache.NewEmptySnapshot()
metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopEverything)
profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory,
if err != nil {
return nil, fmt.Errorf("initializing profiles: %v", err)
//1. 首先创建一个in-tree注册表registry。
//2. 将options.frameworkOutOfTreeRegistry合并到registry中,如果合并失败则返回错误。
//3. 注册指标收集。
//4. 构建扩展程序extenders,如果构建失败则返回错误。
//5. 获取Pod和Node的列表器。
//6. 创建一个空的快照snapshot。
//7. 创建一个异步指标记录器metricsRecorder。
//8. 使用给定的参数初始化调度器配置profiles,如果初始化失败则返回错误。
if len(profiles) == 0 {
return nil, errors.New("at least one profile is required")
preEnqueuePluginMap := make(map[string][]framework.PreEnqueuePlugin)
queueingHintsPerProfile := make(internalqueue.QueueingHintMapPerProfile)
for profileName, profile := range profiles {
preEnqueuePluginMap[profileName] = profile.PreEnqueuePlugins()
queueingHintsPerProfile[profileName] = buildQueueingHintMap(profile.EnqueueExtensions())
podQueue := internalqueue.NewSchedulingQueue(
for _, fwk := range profiles {
schedulerCache := internalcache.New(ctx, durationToExpireAssumedPod)
// Setup cache debugger.
debugger := cachedebugger.New(nodeLister, podLister, schedulerCache, podQueue)
sched := &Scheduler{
Cache: schedulerCache,
client: client,
nodeInfoSnapshot: snapshot,
percentageOfNodesToScore: options.percentageOfNodesToScore,
Extenders: extenders,
StopEverything: stopEverything,
SchedulingQueue: podQueue,
Profiles: profiles,
logger: logger,
sched.NextPod = podQueue.Pop
if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(queueingHintsPerProfile)); err != nil {
return nil, fmt.Errorf("adding event handlers: %w", err)
return sched, nil
//1. 检查传入的profiles是否为空,如果为空则返回错误。
//2. 根据profiles创建preEnqueuePluginMap和queueingHintsPerProfile。
//3. 使用profiles中指定的SchedulerName创建一个SchedulingQueue对象,该对象用于管理待调度的Pod。
//4. 为每个profile设置PodNominator。
//5. 创建一个schedulerCache对象,用于缓存节点和Pod的信息。
//6. 创建一个debugger对象,用于调试缓存。
//7. 创建一个Scheduler对象,并设置其属性。
//8. 设置Scheduler的NextPod方法。
//9. 应用默认的处理程序。
//10. 添加所有事件处理程序。
// defaultQueueingHintFn is the default queueing hint function.
// It always returns Queue as the queueing hint.
var defaultQueueingHintFn = func(_ klog.Logger, _ *v1.Pod, _, _ interface{}) (framework.QueueingHint, error) {
return framework.Queue, nil
func buildQueueingHintMap(es []framework.EnqueueExtensions) internalqueue.QueueingHintMap {
queueingHintMap := make(internalqueue.QueueingHintMap)
for _, e := range es {
events := e.EventsToRegister()
//1. 函数接收一个[]framework.EnqueueExtensions参数,它是一个调度器扩展点的集合,这些扩展点可以注册事件。
//2. 创建一个空的internalqueue.QueueingHintMap用于存储队列提示映射。
//3. 遍历扩展点集合es中的每个扩展点e。
//4. 调用扩展点e的EventsToRegister()方法,获取该扩展点需要注册的事件。
//5. 将事件添加到队列提示映射queueingHintMap中。
// This will happen when plugin registers with empty events, it's usually the case a pod
// will become reschedulable only for self-update, e.g. schedulingGates plugin, the pod
// will enter into the activeQ via priorityQueue.Update().
if len(events) == 0 {
// Note: Rarely, a plugin implements EnqueueExtensions but returns nil.
// We treat it as: the plugin is not interested in any event, and hence pod failed by that plugin
// cannot be moved by any regular cluster event.
// So, we can just ignore such EventsToRegister here.
registerNodeAdded := false
registerNodeTaintUpdated := false
for _, event := range events {
fn := event.QueueingHintFn
if fn == nil || !utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) {
fn = defaultQueueingHintFn
if event.Event.Resource == framework.Node {
if event.Event.ActionType&framework.Add != 0 {
registerNodeAdded = true
if event.Event.ActionType&framework.UpdateNodeTaint != 0 {
registerNodeTaintUpdated = true
queueingHintMap[event.Event] = append(queueingHintMap[event.Event], &internalqueue.QueueingHintFunction{
PluginName: e.Name(),
QueueingHintFn: fn,
if registerNodeAdded && !registerNodeTaintUpdated {
// Temporally fix for the issue https://github.com/kubernetes/kubernetes/issues/109437
// NodeAdded QueueingHint isn't always called because of preCheck.
// It's definitely not something expected for plugin developers,
// and registering UpdateNodeTaint event is the only mitigation for now.
// So, here registers UpdateNodeTaint event for plugins that has NodeAdded event, but don't have UpdateNodeTaint event.
// It has a bad impact for the requeuing efficiency though, a lot better than some Pods being stuch in the
// unschedulable pod pool.
// This behavior will be removed when we remove the preCheck feature.
// See: https://github.com/kubernetes/kubernetes/issues/110175
queueingHintMap[framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}] =
append(queueingHintMap[framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}],
PluginName: e.Name(),
QueueingHintFn: defaultQueueingHintFn,
return queueingHintMap
// Run begins watching and scheduling. It starts scheduling and blocked until the context is done.
func (sched *Scheduler) Run(ctx context.Context) {
logger := klog.FromContext(ctx)
// We need to start scheduleOne loop in a dedicated goroutine,
// because scheduleOne function hangs on getting the next item
// from the SchedulingQueue.
// If there are no new pods to schedule, it will be hanging there
// and if done in this goroutine it will be blocking closing
// SchedulingQueue, in effect causing a deadlock on shutdown.
go wait.UntilWithContext(ctx, sched.ScheduleOne, 0)
// If the plugins satisfy the io.Closer interface, they are closed.
err := sched.Profiles.Close()
if err != nil {
logger.Error(err, "Failed to close plugins")
//1. 从上下文中获取日志记录器logger。
//2. 调用SchedulingQueue的Run方法,开始监控调度队列。
//3. 在一个独立的goroutine中启动scheduleOne循环,因为scheduleOne函数会在从SchedulingQueue获取下一个项目时挂起。
//4. 等待上下文完成。
//5. 调用SchedulingQueue的Close方法,关闭调度队列。
//6. 尝试关闭满足io.Closer接口的插件。如果关闭失败,记录错误日志。
// NewInformerFactory creates a SharedInformerFactory and initializes a scheduler specific
// in-place podInformer.
func NewInformerFactory(cs clientset.Interface, resyncPeriod time.Duration) informers.SharedInformerFactory {
informerFactory := informers.NewSharedInformerFactory(cs, resyncPeriod)
informerFactory.InformerFor(&v1.Pod{}, newPodInformer)
return informerFactory
func buildExtenders(logger klog.Logger, extenders []schedulerapi.Extender, profiles []schedulerapi.KubeSchedulerProfile) ([]framework.Extender, error) {
var fExtenders []framework.Extender
if len(extenders) == 0 {
return nil, nil
var ignoredExtendedResources []string
var ignorableExtenders []framework.Extender
for i := range extenders {
logger.V(2).Info("Creating extender", "extender", extenders[i])
extender, err := NewHTTPExtender(&extenders[i])
if err != nil {
return nil, err
if !extender.IsIgnorable() {
fExtenders = append(fExtenders, extender)
} else {
ignorableExtenders = append(ignorableExtenders, extender)
for _, r := range extenders[i].ManagedResources {
if r.IgnoredByScheduler {
ignoredExtendedResources = append(ignoredExtendedResources, r.Name)
// place ignorable extenders to the tail of extenders
fExtenders = append(fExtenders, ignorableExtenders...)
//1. 首先判断extenders切片的长度是否为0,如果是则直接返回nil和nil。
//2. 初始化两个切片ignoredExtendedResources和ignorableExtenders,分别用于存储被忽略的扩展资源名称和可忽略的扩展器。
//3. 遍历extenders切片,对每个扩展器进行如下操作:
//- 使用klog.Logger记录日志信息。
//- 调用NewHTTPExtender函数创建一个新的HTTPExtender对象。
//- 如果该扩展器不可忽略,则将其添加到fExtenders切片中。
//- 如果该扩展器可忽略,则将其添加到ignorableExtenders切片中。
//- 遍历该扩展器的ManagedResources字段,将被忽略的资源名称添加到ignoredExtendedResources切片中。
//4. 将ignorableExtenders切片追加到fExtenders切片的末尾。
// If there are any extended resources found from the Extenders, append them to the pluginConfig for each profile.
// This should only have an effect on ComponentConfig, where it is possible to configure Extenders and
// plugin args (and in which case the extender ignored resources take precedence).
if len(ignoredExtendedResources) == 0 {
return fExtenders, nil
for i := range profiles {
prof := &profiles[i]
var found = false
for k := range prof.PluginConfig {
if prof.PluginConfig[k].Name == noderesources.Name {
// Update the existing args
pc := &prof.PluginConfig[k]
args, ok := pc.Args.(*schedulerapi.NodeResourcesFitArgs)
if !ok {
return nil, fmt.Errorf("want args to be of type NodeResourcesFitArgs, got %T", pc.Args)
args.IgnoredResources = ignoredExtendedResources
found = true
if !found {
return nil, fmt.Errorf("can't find NodeResourcesFitArgs in plugin config")
return fExtenders, nil
//并将found设为true。 如果遍历完所有PluginConfig后仍未找到符合条件的元素,则返回nil和错误信息。
type FailureHandlerFn func(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, status *framework.Status, nominatingInfo *framework.NominatingInfo, start time.Time)
func unionedGVKs(queueingHintsPerProfile internalqueue.QueueingHintMapPerProfile) map[framework.GVK]framework.ActionType {
gvkMap := make(map[framework.GVK]framework.ActionType)
for _, queueingHints := range queueingHintsPerProfile {
for evt := range queueingHints {
if _, ok := gvkMap[evt.Resource]; ok {
gvkMap[evt.Resource] |= evt.ActionType
} else {
gvkMap[evt.Resource] = evt.ActionType
return gvkMap
// newPodInformer creates a shared index informer that returns only non-terminal pods.
// The PodInformer allows indexers to be added, but note that only non-conflict indexers are allowed.
func newPodInformer(cs clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
selector := fmt.Sprintf("status.phase!=%v,status.phase!=%v", v1.PodSucceeded, v1.PodFailed)
tweakListOptions := func(options *metav1.ListOptions) {
options.FieldSelector = selector
informer := coreinformers.NewFilteredPodInformer(cs, metav1.NamespaceAll, resyncPeriod, cache.Indexers{}, tweakListOptions)
// Dropping `.metadata.managedFields` to improve memory usage.
// The Extract workflow (i.e. `ExtractPod`) should be unused.
trim := func(obj interface{}) (interface{}, error) {
if accessor, err := meta.Accessor(obj); err == nil {
return obj, nil
return informer
//该函数创建一个共享索引 informer,用于返回非终止状态的 pod。
//它通过设置筛选条件,使得 informer 只能获取到状态不是 "Succeeded" 或 "Failed" 的 pod。
//此外,函数还通过设置 transform 函数来删除 pod 的 .metadata.managedFields 字段,以减少内存使用。
//该函数返回一个经过筛选和转换的 pod informer 实例。