/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"container/heap"
"context"
"errors"
"fmt"
"math/rand"
"strconv"
"sync"
"sync/atomic"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/apis/core/validation"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util"
utiltrace "k8s.io/utils/trace"
)
const (
// Percentage of plugin metrics to be sampled.
pluginMetricsSamplePercent = 10
// minFeasibleNodesToFind is the minimum number of nodes that would be scored
// in each scheduling cycle. This is a semi-arbitrary value to ensure that a
// certain minimum of nodes are checked for feasibility. This in turn helps
// ensure a minimum level of spreading.
minFeasibleNodesToFind = 100
// minFeasibleNodesPercentageToFind is the minimum percentage of nodes that
// would be scored in each scheduling cycle. This is a semi-arbitrary value
// to ensure that a certain minimum of nodes are checked for feasibility.
// This in turn helps ensure a minimum level of spreading.
minFeasibleNodesPercentageToFind = 5
// numberOfHighestScoredNodesToReport is the number of node scores
// to be included in ScheduleResult.
numberOfHighestScoredNodesToReport = 3
)
//这段代码定义了四个常量,分别是:
//- pluginMetricsSamplePercent:插件指标的采样百分比,值为10。
//- minFeasibleNodesToFind:每个调度周期中要评分的最小节点数,值为100。
//- minFeasibleNodesPercentageToFind:每个调度周期中要评分的最小节点百分比,值为5。
//- numberOfHighestScoredNodesToReport:要在调度结果中报告的最高评分节点数,值为3。
// ScheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) ScheduleOne(ctx context.Context) {
logger := klog.FromContext(ctx)
podInfo, err := sched.NextPod(logger)
if err != nil {
logger.Error(err, "Error while retrieving next pod from scheduling queue")
return
}
// pod could be nil when schedulerQueue is closed
if podInfo == nil || podInfo.Pod == nil {
return
}
//该函数是一个Go函数,名为ScheduleOne,它为单个pod执行整个调度工作流程。
//它在调度算法的主机适应性上进行序列化。函数首先从上下文中获取logger,然后使用sched.NextPod(logger)方法获取下一个要调度的pod信息。
//如果获取过程中出现错误,则记录错误信息并返回。
//如果获取到的pod信息为空或pod为空,则直接返回。
pod := podInfo.Pod
// TODO(knelasevero): Remove duplicated keys from log entry calls
// When contextualized logging hits GA
// https://github.com/kubernetes/kubernetes/issues/111672
logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod))
ctx = klog.NewContext(ctx, logger)
logger.V(4).Info("About to try and schedule pod", "pod", klog.KObj(pod))
fwk, err := sched.frameworkForPod(pod)
if err != nil {
// This shouldn't happen, because we only accept for scheduling the pods
// which specify a scheduler name that matches one of the profiles.
logger.Error(err, "Error occurred")
return
}
if sched.skipPodSchedule(ctx, fwk, pod) {
return
}
//这段Go代码中的函数主要包括两部分逻辑。
//首先,通过klog.LoggerWithValues和klog.NewContext函数为日志记录器logger添加了pod信息,并创建了一个新的上下文ctx。
//然后使用logger.V(4).Info记录了一条日志,表示即将尝试调度Pod。
//接下来,调用sched.frameworkForPod(pod)方法获取与Pod相匹配的调度框架fwk。
//如果获取发生错误,则使用logger.Error记录错误日志并返回。
//如果获取成功,则调用sched.skipPodSchedule(ctx, fwk, pod)判断是否跳过Pod的调度。
//如果返回true,则表示跳过调度,函数直接返回。
logger.V(3).Info("Attempting to schedule pod", "pod", klog.KObj(pod))
// Synchronously attempt to find a fit for the pod.
start := time.Now()
state := framework.NewCycleState()
state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
// Initialize an empty podsToActivate struct, which will be filled up by plugins or stay empty.
podsToActivate := framework.NewPodsToActivate()
state.Write(framework.PodsToActivateKey, podsToActivate)
schedulingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
scheduleResult, assumedPodInfo, status := sched.schedulingCycle(schedulingCycleCtx, state, fwk, podInfo, start, podsToActivate)
if !status.IsSuccess() {
sched.FailureHandler(schedulingCycleCtx, fwk, assumedPodInfo, status, scheduleResult.nominatingInfo, start)
return
}
//这段Go代码是一个调度Pod的函数。它首先使用logger.V(3).Info记录尝试调度Pod的日志。
//然后,它同步地尝试为Pod找到一个合适的节点。
//在开始调度之前,它创建了一个新的CycleState对象,并设置了记录插件指标的标志。
//接着,它初始化了一个空的PodsToActivate对象,并将其写入到CycleState中。
//然后,它创建了一个可取消的上下文对象,并在函数结束时取消它。
//最后,它调用schedulingCycle函数进行调度,并根据调度结果处理成功或失败的情况。
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
go func() {
bindingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
metrics.Goroutines.WithLabelValues(metrics.Binding).Inc()
defer metrics.Goroutines.WithLabelValues(metrics.Binding).Dec()
status := sched.bindingCycle(bindingCycleCtx, state, fwk, scheduleResult, assumedPodInfo, start, podsToActivate)
if !status.IsSuccess() {
sched.handleBindingCycleError(bindingCycleCtx, state, fwk, assumedPodInfo, start, scheduleResult, status)
return
}
// Usually, DonePod is called inside the scheduling queue,
// but in this case, we need to call it here because this Pod won't go back to the scheduling queue.
sched.SchedulingQueue.Done(assumedPodInfo.Pod.UID)
}()
}
//这个go函数是用于将Pod绑定到其宿主机的异步操作。
//它首先创建一个可取消的上下文,然后增加一个goroutine计数器,并在defer语句中减少该计数器。
//接着,它调用sched.bindingCycle方法来执行绑定周期操作,并根据操作结果处理错误。
//如果操作成功,则调用sched.SchedulingQueue.Done方法来标记Pod绑定完成。
var clearNominatedNode = &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: ""}
// schedulingCycle tries to schedule a single Pod.
func (sched *Scheduler) schedulingCycle(
ctx context.Context,
state *framework.CycleState,
fwk framework.Framework,
podInfo *framework.QueuedPodInfo,
start time.Time,
podsToActivate *framework.PodsToActivate,
) (ScheduleResult, *framework.QueuedPodInfo, *framework.Status) {
logger := klog.FromContext(ctx)
pod := podInfo.Pod
scheduleResult, err := sched.SchedulePod(ctx, fwk, state, pod)
if err != nil {
defer func() {
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
}()
if err == ErrNoNodesAvailable {
status := framework.NewStatus(framework.UnschedulableAndUnresolvable).WithError(err)
return ScheduleResult{nominatingInfo: clearNominatedNode}, podInfo, status
}
//该函数是一个Go语言函数,定义在一个名为Scheduler的结构体中,名为schedulingCycle。
//该函数尝试调度一个Pod。
//函数参数包括上下文ctx、状态state、框架fwk、待调度的Pod信息podInfo、开始时间start、待激活的PodspodsToActivate。
//函数返回一个ScheduleResult结构体、一个*framework.QueuedPodInfo指针和一个*framework.Status指针。
//函数首先从上下文中获取日志记录器logger,然后调用sched.SchedulePod方法尝试调度Pod。
//如果调度成功,函数将返回调度结果、podInfo和nil。
//如果调度失败且错误为ErrNoNodesAvailable,函数将记录调度算法的延迟指标,并返回一个带有UnschedulableAndUnresolvable状态和错误信息的ScheduleResult结构体、podInfo和status。
fitError, ok := err.(*framework.FitError)
if !ok {
logger.Error(err, "Error selecting node for pod", "pod", klog.KObj(pod))
return ScheduleResult{nominatingInfo: clearNominatedNode}, podInfo, framework.AsStatus(err)
}
// SchedulePod() may have failed because the pod would not fit on any host, so we try to
// preempt, with the expectation that the next time the pod is tried for scheduling it
// will fit due to the preemption. It is also possible that a different pod will schedule
// into the resources that were preempted, but this is harmless.
if !fwk.HasPostFilterPlugins() {
logger.V(3).Info("No PostFilter plugins are registered, so no preemption will be performed")
return ScheduleResult{}, podInfo, framework.NewStatus(framework.Unschedulable).WithError(err)
}
//这段Go代码是处理调度过程中出现错误的逻辑。
//首先,它会判断错误类型是否为*framework.FitError,如果不是,则记录错误日志并返回一个空的ScheduleResult,
//同时将错误包装成framework.AsStatus(err)。
//如果错误类型是*framework.FitError,则会尝试进行预占操作,以期望在下一次调度时,该Pod能够适应。
//然后它会检查是否有注册了PostFilter插件,如果没有,则记录日志并返回一个包含framework.Unschedulable状态的ScheduleResult,
//同时将错误包装成framework.NewStatus(framework.Unschedulable).WithError(err)。
// Run PostFilter plugins to attempt to make the pod schedulable in a future scheduling cycle.
result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap)
msg := status.Message()
fitError.Diagnosis.PostFilterMsg = msg
if status.Code() == framework.Error {
logger.Error(nil, "Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", msg)
} else {
logger.V(5).Info("Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", msg)
}
var nominatingInfo *framework.NominatingInfo
if result != nil {
nominatingInfo = result.NominatingInfo
}
return ScheduleResult{nominatingInfo: nominatingInfo}, podInfo, framework.NewStatus(framework.Unschedulable).WithError(err)
}
//该函数主要执行调度过程中的后过滤器插件,以尝试使Pod在未来的调度周期中可调度。具体步骤如下:
//1. 运行后过滤器插件,并获取运行结果和状态。
//2. 设置诊断信息中的后过滤器消息。
//3. 如果状态为错误,则记录错误日志;否则记录信息日志。
//4. 如果运行结果不为空,则获取其中的提名信息。
//5. 返回提名信息和Pod信息,以及一个不可调度的状态。
//总结:该函数主要负责在调度过程中执行后过滤器插件,并处理运行结果和状态,最终返回提名信息和不可调度的状态。
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
// Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
// This allows us to keep scheduling without waiting on binding to occur.
assumedPodInfo := podInfo.DeepCopy()
assumedPod := assumedPodInfo.Pod
// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
err = sched.assume(logger, assumedPod, scheduleResult.SuggestedHost)
if err != nil {
// This is most probably result of a BUG in retrying logic.
// We report an error here so that pod scheduling can be retried.
// This relies on the fact that Error will check if the pod has been bound
// to a node and if so will not add it back to the unscheduled pods queue
// (otherwise this would cause an infinite loop).
return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, framework.AsStatus(err)
}
//这段Go代码主要实现了以下功能:
//1. 记录调度算法的延迟指标。
//2. 假设Pod已经运行在给定的节点上,即使它还没有被绑定。这允许我们在不等待绑定发生的情况下继续调度。
//3. 尝试将Pod的节点名设置为推荐的主机名,并记录错误信息。如果出现错误,则返回一个清除了提名节点信息的ScheduleResult对象和假设的Pod信息,以及将错误转换为状态对象。
//这段代码中的关键函数包括: - metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start)):记录调度算法的延迟指标,
//其中start是调度开始的时间点。
//- sched.assume(logger, assumedPod, scheduleResult.SuggestedHost):假设Pod已经运行在给定的节点上,并将Pod的节点名设置为推荐的主机名。如果出现错误,则返回错误信息。
// Run the Reserve method of reserve plugins.
if sts := fwk.RunReservePluginsReserve(ctx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
// trigger un-reserve to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.Cache.ForgetPod(logger, assumedPod); forgetErr != nil {
logger.Error(forgetErr, "Scheduler cache ForgetPod failed")
}
if sts.IsRejected() {
fitErr := &framework.FitError{
NumAllNodes: 1,
Pod: pod,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{scheduleResult.SuggestedHost: sts},
},
}
fitErr.Diagnosis.AddPluginStatus(sts)
return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, framework.NewStatus(sts.Code()).WithError(fitErr)
}
return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, sts
}
//该Go函数主要执行以下操作:
//1. 调用fwk.RunReservePluginsReserve方法运行reserve插件的Reserve方法。
//2. 如果Reserve方法执行失败,则触发un-reserve操作以清理与预留的Pod相关的状态。
//3. 如果un-reserve操作成功,则从调度器缓存中忘记Pod,并记录忘记操作失败的错误(如果有)。
//4. 如果Reserve方法被拒绝,则创建一个FitError对象并返回。 Markdown格式输出如下:
//1. 调用fwk.RunReservePluginsReserve方法运行reserve插件的Reserve方法。
//2. 如果Reserve方法执行失败,则触发un-reserve操作以清理与预留的Pod相关的状态。
//- 调用fwk.RunReservePluginsUnreserve方法执行un-reserve操作。
//- 如果忘记Pod操作失败,则记录错误。
//3. 如果Reserve方法被拒绝,则创建一个FitError对象并返回。
// Run "permit" plugins.
runPermitStatus := fwk.RunPermitPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if !runPermitStatus.IsWait() && !runPermitStatus.IsSuccess() {
// trigger un-reserve to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.Cache.ForgetPod(logger, assumedPod); forgetErr != nil {
logger.Error(forgetErr, "Scheduler cache ForgetPod failed")
}
//这段Go代码中的函数是用于运行框架中的"permit"插件,并在必要时进行清理操作。
//首先,该函数通过调用fwk.RunPermitPlugins来运行"permit"插件,并获取运行状态。
//如果运行状态不是等待状态且不是成功状态,则需要进行清理操作。
//接下来,该函数调用fwk.RunReservePluginsUnreserve来触发取消预留操作,以清理与预留的Pod相关的状态。
//然后,该函数调用sched.Cache.ForgetPod来从调度器缓存中忘记Pod,并记录错误信息。
//总之,该函数的主要功能是运行"permit"插件,并在必要时进行清理操作,包括取消预留和忘记Pod。
if runPermitStatus.IsRejected() {
fitErr := &framework.FitError{
NumAllNodes: 1,
Pod: pod,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{scheduleResult.SuggestedHost: runPermitStatus},
},
}
fitErr.Diagnosis.AddPluginStatus(runPermitStatus)
return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, framework.NewStatus(runPermitStatus.Code()).WithError(fitErr)
}
return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, runPermitStatus
}
//这段Go代码是 Kubernetes 调度器中的片段,用于处理调度结果。
//- 如果 runPermitStatus.IsRejected() 返回 true,表示调度被拒绝,则创建一个 framework.FitError 错误对象,
//记录节点状态和错误信息,并返回一个空的 ScheduleResult 和 assumedPodInfo,以及带有错误信息的 framework.Status。
//- 如果 runPermitStatus.IsRejected() 返回 false,表示调度成功,
//则直接返回一个带有 clearNominatedNode 的 ScheduleResult 和 assumedPodInfo,以及 runPermitStatus。
//这段代码的主要作用是根据 runPermitStatus 的状态来决定调度是否成功,并返回相应的结果。
// At the end of a successful scheduling cycle, pop and move up Pods if needed.
if len(podsToActivate.Map) != 0 {
sched.SchedulingQueue.Activate(logger, podsToActivate.Map)
// Clear the entries after activation.
podsToActivate.Map = make(map[string]*v1.Pod)
}
return scheduleResult, assumedPodInfo, nil
}
//这段Go代码是调度器在一个成功的调度周期结束时,检查是否有需要激活的Pods,如果有,则将其激活并清空激活列表。
//具体来说:
//1. 如果podsToActivate.Map不为空,即有待激活的Pods,则调用sched.SchedulingQueue.Activate方法将这些Pods激活。
//2. 激活后,清空podsToActivate.Map,即清空待激活Pods的列表。
//3. 返回调度结果scheduleResult、已假设的Pod信息assumedPodInfo和nil错误。
//这段代码的作用是确保在调度周期结束时,所有需要激活的Pods都被正确处理,并为下一个调度周期做准备。
// bindingCycle tries to bind an assumed Pod.
func (sched *Scheduler) bindingCycle(
ctx context.Context,
state *framework.CycleState,
fwk framework.Framework,
scheduleResult ScheduleResult,
assumedPodInfo *framework.QueuedPodInfo,
start time.Time,
podsToActivate *framework.PodsToActivate) *framework.Status {
logger := klog.FromContext(ctx)
assumedPod := assumedPodInfo.Pod
//该函数是Scheduler的一个方法,用于尝试绑定一个假设的Pod。
//它通过传入上下文、状态、框架、调度结果、假设的Pod信息、开始时间和待激活的Pods,返回一个框架状态。
//具体流程包括:从上下文中获取日志记录器;
//使用假设的Pod信息获取Pod;
//调用绑定函数进行绑定操作;根据绑定结果返回相应的框架状态。
// Run "permit" plugins.
if status := fwk.WaitOnPermit(ctx, assumedPod); !status.IsSuccess() {
if status.IsRejected() {
fitErr := &framework.FitError{
NumAllNodes: 1,
Pod: assumedPodInfo.Pod,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{scheduleResult.SuggestedHost: status},
UnschedulablePlugins: sets.New(status.Plugin()),
},
}
return framework.NewStatus(status.Code()).WithError(fitErr)
}
return status
}
//该函数用于运行"permit"插件,并根据插件的执行结果进行相应的处理。
//如果插件执行失败并且被拒绝,则创建并返回一个FitError错误;否则返回插件的执行状态。
// Run "prebind" plugins.
if status := fwk.RunPreBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost); !status.IsSuccess() {
return status
}
//该函数的作用是运行"prebind"插件。它首先通过调用fwk.RunPreBindPlugins方法来执行"prebind"插件,
//并将上下文、状态、假设的Pod和建议的主机作为参数传递给该方法。
//如果运行插件后的状态不成功,则函数会直接返回该状态。
// Run "bind" plugins.
if status := sched.bind(ctx, fwk, assumedPod, scheduleResult.SuggestedHost, state); !status.IsSuccess() {
return status
}
// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
logger.V(2).Info("Successfully bound pod to node", "pod", klog.KObj(assumedPod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes)
metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start))
metrics.PodSchedulingAttempts.Observe(float64(assumedPodInfo.Attempts))
if assumedPodInfo.InitialAttemptTimestamp != nil {
metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(assumedPodInfo)).Observe(metrics.SinceInSeconds(*assumedPodInfo.InitialAttemptTimestamp))
metrics.PodSchedulingSLIDuration.WithLabelValues(getAttemptsLabel(assumedPodInfo)).Observe(metrics.SinceInSeconds(*assumedPodInfo.InitialAttemptTimestamp))
}
//这段Go代码中的函数主要执行以下操作:
//1. 调用sched.bind()函数运行名为"bind"的插件,并检查返回的状态是否成功。如果不成功,则返回该状态。
//2. 如果日志的详细程度高于等于2,则记录一条成功将Pod绑定到节点的日志,同时记录一些指标,如评估的节点数和可行节点数。
//3. 更新Pod相关的指标,例如记录Pod被调度的次数和调度尝试的持续时间。
//4. 如果Pod有初始尝试时间,则记录Pod调度的持续时间和SLI(服务级别指标)持续时间。
//这段代码的主要目的是在调度Pod后执行一些后续操作,包括记录日志和更新指标。
// Run "postbind" plugins.
fwk.RunPostBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
// At the end of a successful binding cycle, move up Pods if needed.
if len(podsToActivate.Map) != 0 {
sched.SchedulingQueue.Activate(logger, podsToActivate.Map)
// Unlike the logic in schedulingCycle(), we don't bother deleting the entries
// as `podsToActivate.Map` is no longer consumed.
}
return nil
}
//这段Go代码是 Kubernetes 调度器中的一个函数片段,主要执行以下两个操作:
//1. 运行 "postbind" 插件:fwk.RunPostBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
//这行代码会调用所有注册的 "postbind" 插件。这些插件是在绑定周期成功结束后运行的,用于执行一些额外的操作,例如更新 Pod 的状态或记录日志。
//2. 激活就绪的 Pod:如果 podsToActivate.Map 不为空,即有待激活的 Pod,
//则调用 sched.SchedulingQueue.Activate(logger, podsToActivate.Map) 来激活这些 Pod。这会将这些 Pod 加入到调度队列中,
//以便它们可以被调度到合适的节点上运行。与调度周期中的逻辑不同,这里不需要删除条目,因为 podsToActivate.Map 不再被使用。
//这段代码的主要目的是在成功完成绑定周期后,执行必要的后处理操作,并激活就绪的 Pod。
func (sched *Scheduler) handleBindingCycleError(
ctx context.Context,
state *framework.CycleState,
fwk framework.Framework,
podInfo *framework.QueuedPodInfo,
start time.Time,
scheduleResult ScheduleResult,
status *framework.Status) {
logger := klog.FromContext(ctx)
//该函数是Scheduler的一个方法,用于处理绑定周期错误。
//它通过记录日志来记录错误信息,其中包括上下文、框架状态、队列中的Pod信息、开始时间、调度结果和状态等。
assumedPod := podInfo.Pod
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.Cache.ForgetPod(logger, assumedPod); forgetErr != nil {
logger.Error(forgetErr, "scheduler cache ForgetPod failed")
} else {
// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
// as the assumed Pod had occupied a certain amount of resources in scheduler cache.
//
// Avoid moving the assumed Pod itself as it's always Unschedulable.
// It's intentional to "defer" this operation; otherwise MoveAllToActiveOrBackoffQueue() would
// add this event to in-flight events and thus move the assumed pod to backoffQ anyways if the plugins don't have appropriate QueueingHint.
if status.IsRejected() {
defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, internalqueue.AssignedPodDelete, assumedPod, nil, func(pod *v1.Pod) bool {
return assumedPod.UID != pod.UID
})
} else {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, internalqueue.AssignedPodDelete, assumedPod, nil, nil)
}
}
//该函数主要完成以下功能:
//1. 调用fwk.RunReservePluginsUnreserve方法,触发未预留插件清理与预留Pod相关联的状态。
//2. 忘记缓存中的Pod,如果忘记失败,则记录错误日志。
//3. 如果Pod被拒绝,则将除被假设的Pod本身以外的所有Pod移动到活动队列或退避队列;否则,将所有Pod移动到活动队列或退避队列。
sched.FailureHandler(ctx, fwk, podInfo, status, clearNominatedNode, start)
}
//第一个函数sched.FailureHandler(ctx, fwk, podInfo, status, clearNominatedNode, start)是一个处理调度失败的函数。
//它接受多个参数,包括上下文、框架、Pod信息、状态、是否清除提名节点以及开始时间,用于处理Pod调度失败的情况。
func (sched *Scheduler) frameworkForPod(pod *v1.Pod) (framework.Framework, error) {
fwk, ok := sched.Profiles[pod.Spec.SchedulerName]
if !ok {
return nil, fmt.Errorf("profile not found for scheduler name %q", pod.Spec.SchedulerName)
}
return fwk, nil
}
//第二个函数(sched *Scheduler) frameworkForPod(pod *v1.Pod) (framework.Framework, error)是一个根据Pod获取对应调度框架的函数。
//它接受一个Pod作为参数,通过Pod的Spec.SchedulerName来查找对应的调度框架。
//如果找到,则返回该框架;如果找不到,则返回一个错误。
// skipPodSchedule returns true if we could skip scheduling the pod for specified cases.
func (sched *Scheduler) skipPodSchedule(ctx context.Context, fwk framework.Framework, pod *v1.Pod) bool {
// Case 1: pod is being deleted.
if pod.DeletionTimestamp != nil {
fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
klog.FromContext(ctx).V(3).Info("Skip schedule deleting pod", "pod", klog.KObj(pod))
return true
}
//该函数是一个Go语言函数,名为skipPodSchedule,它属于Scheduler类型。函数的主要功能是判断是否可以跳过指定Pod的调度。
//具体来说,函数首先检查Pod的DeletionTimestamp是否为nil,如果是,则记录事件和日志,并返回true,表示可以跳过调度。
//否则,函数不进行任何操作,返回false。
// Case 2: pod that has been assumed could be skipped.
// An assumed pod can be added again to the scheduling queue if it got an update event
// during its previous scheduling cycle but before getting assumed.
isAssumed, err := sched.Cache.IsAssumedPod(pod)
if err != nil {
// TODO(91633): pass ctx into a revised HandleError
utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err))
return false
}
return isAssumed
}
//该函数用于判断一个Pod是否已被调度器假设(Assumed)。
//- 首先,它调用sched.Cache.IsAssumedPod(pod)方法来检查Pod是否已被假设。
//- 如果检查过程中出现错误,会通过utilruntime.HandleError方法记录错误信息,并返回false。
//- 如果检查没有错误,则直接返回检查结果。
//这个函数的主要作用是在调度Pod时,判断该Pod是否需要重新进入调度队列。
//如果Pod在上一次调度周期中更新了事件,但在被假设之前,它可能会被再次添加到调度队列中。
// schedulePod tries to schedule the given pod to one of the nodes in the node list.
// If it succeeds, it will return the name of the node.
// If it fails, it will return a FitError with reasons.
func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
defer trace.LogIfLong(100 * time.Millisecond)
if err := sched.Cache.UpdateSnapshot(klog.FromContext(ctx), sched.nodeInfoSnapshot); err != nil {
return result, err
}
trace.Step("Snapshotting scheduler cache and node infos done")
//该函数是一个调度函数,用于尝试将给定的Pod调度到节点列表中的一个节点上。
//如果成功,它将返回节点的名称;如果失败,它将返回一个FitError错误,其中包含原因。
//函数首先创建一个utiltrace对象用于记录跟踪信息,并在函数退出时记录跟踪信息的时长。
//然后,它通过调用sched.Cache.UpdateSnapshot函数更新调度程序的缓存和节点信息快照。如果更新出现错误,函数将返回错误。
//最后,函数通过调用trace.Step记录一个跟踪步骤,表示快照节点信息和调度程序缓存的操作已经完成。
if sched.nodeInfoSnapshot.NumNodes() == 0 {
return result, ErrNoNodesAvailable
}
feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)
if err != nil {
return result, err
}
trace.Step("Computing predicates done")
if len(feasibleNodes) == 0 {
return result, &framework.FitError{
Pod: pod,
NumAllNodes: sched.nodeInfoSnapshot.NumNodes(),
Diagnosis: diagnosis,
}
}
//这段Go代码是关于调度器在一个集群中为一个Pod寻找合适节点的逻辑。
//首先,它检查当前集群中是否有可用节点,如果没有则返回错误ErrNoNodesAvailable。
//接下来,它调用sched.findNodesThatFitPod函数来找到能够容纳该Pod的节点。如果该函数返回错误,则直接返回错误。
//如果找到了合适的节点,代码会继续执行
//;如果没有找到合适的节点,则返回一个FitError错误,其中包含了Pod信息、集群中节点的数量以及诊断信息。
// When only one node after predicate, just use it.
if len(feasibleNodes) == 1 {
return ScheduleResult{
SuggestedHost: feasibleNodes[0].Node().Name,
EvaluatedNodes: 1 + len(diagnosis.NodeToStatusMap),
FeasibleNodes: 1,
}, nil
}
//该函数是一个Go语言函数片段,用于在满足一定条件时返回一个ScheduleResult结构体实例。
//首先,函数通过判断feasibleNodes切片的长度是否为1来确定是否满足某种条件。
//如果满足条件,即feasibleNodes长度为1,
//则创建并返回一个ScheduleResult结构体实例,
//其中SuggestedHost字段被设置为feasibleNodes[0].Node().Name,EvaluatedNodes字段被设置为1 + len(diagnosis.NodeToStatusMap),
//FeasibleNodes字段被设置为1。
//这个函数的主要作用是在找到唯一一个符合条件的节点时,生成一个调度结果,建议将任务调度到该节点上,并统计评估的节点数和可行节点数。
priorityList, err := prioritizeNodes(ctx, sched.Extenders, fwk, state, pod, feasibleNodes)
if err != nil {
return result, err
}
host, _, err := selectHost(priorityList, numberOfHighestScoredNodesToReport)
trace.Step("Prioritizing done")
return ScheduleResult{
SuggestedHost: host,
EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap),
FeasibleNodes: len(feasibleNodes),
}, err
}
//这段Go代码是一个调度过程中的一部分,用于根据一系列规则对节点进行优先级排序,并最终选择一个最适合运行Pod的节点。
//首先,函数调用prioritizeNodes来对可行节点进行优先级排序,得到一个优先级列表priorityList。如果在这个过程中出现错误,则会返回一个错误结果。
//接下来,函数调用selectHost来从优先级列表中选择一个最适合运行Pod的节点。选择节点的过程会考虑到节点的得分和其他因素。
//选择完成后,会记录一个调度过程的步骤。
//最后,函数返回一个ScheduleResult结构体实例,其中包含了建议的节点主机名host、评估的节点数和可行节点数。
//如果在调度过程中出现错误,
//则会将错误一起返回。
//这个函数的主要作用是在给定的一组可行节点中,根据预定的规则和策略选择一个最优节点来运行Pod,并生成相应的调度结果。
// Filters the nodes to find the ones that fit the pod based on the framework
// filter plugins and filter extenders.
func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*framework.NodeInfo, framework.Diagnosis, error) {
logger := klog.FromContext(ctx)
diagnosis := framework.Diagnosis{
NodeToStatusMap: make(framework.NodeToStatusMap),
}
//这段Go代码定义了一个名为findNodesThatFitPod的函数,它属于Scheduler类型。该函数的功能是筛选出适合运行给定Pod的节点。
//函数的输入参数包括:
//- ctx:上下文对象,用于控制函数执行的生命周期。
//- fwk:一个实现了Framework接口的对象,用于执行筛选插件和扩展程序。
//- state:一个CycleState对象,包含了调度过程中的状态信息。
//- pod:一个指向v1.Pod对象的指针,表示需要调度的Pod。
//函数的输出结果包括:
//- 一个包含所有适合运行Pod的节点信息的切片。
//- 一个Diagnosis对象,包含了在筛选过程中收集到的诊断信息。
//- 一个错误对象,如果在筛选过程中发生错误,则会返回该错误。
//在函数内部,它首先创建了一个logger对象,用于记录日志信息。
//然后创建了一个diagnosis对象,并初始化了它的NodeToStatusMap字段。
//接下来,函数调用fwk.Filter方法来筛选出适合运行Pod的节点。这个方法会根据框架中的筛选插件和扩展程序来判断节点是否适合运行Pod。
//如果节点不适合,则会在diagnosis.NodeToStatusMap中记录下该节点不适合的原因。
//最后,函数返回筛选出的节点信息切片、诊断信息和错误对象(如果有)。
allNodes, err := sched.nodeInfoSnapshot.NodeInfos().List()
if err != nil {
return nil, diagnosis, err
}
// Run "prefilter" plugins.
preRes, s := fwk.RunPreFilterPlugins(ctx, state, pod)
if !s.IsSuccess() {
if !s.IsRejected() {
return nil, diagnosis, s.AsError()
}
// All nodes in NodeToStatusMap will have the same status so that they can be handled in the preemption.
// Some non trivial refactoring is needed to avoid this copy.
for _, n := range allNodes {
diagnosis.NodeToStatusMap[n.Node().Name] = s
}
//该Go函数主要实现了以下功能:
//1. 调用sched.nodeInfoSnapshot.NodeInfos().List()获取所有节点的信息。
//2. 运行"prefilter"插件,通过fwk.RunPreFilterPlugins(ctx, state, pod)获取插件运行结果。
//3. 如果插件运行结果不成功且未被拒绝,则将所有节点的状态更新为该插件的运行结果状态,并返回错误信息。
//具体分析如下:
//- 首先,函数会尝试获取集群中所有节点的信息,并将结果保存在allNodes变量中。如果获取节点信息时出现错误,函数会立即返回错误信息。
//- 接下来,函数会运行"prefilter"插件,并将运行结果保存在preRes变量中。如果插件运行结果不成功(即未通过插件的验证),
//则会根据插件的运行结果状态进行处理。
//- 如果插件运行结果状态既不是成功也不是被拒绝,则函数会将所有节点的状态更新为该插件的运行结果状态,
//并将该状态保存在diagnosis.NodeToStatusMap中。这样做的目的是为了在后续的预删除操作中,能够统一处理所有节点的状态。
//- 最后,如果插件运行结果状态是被拒绝的,则函数会直接返回错误信息。
//总之,该函数的主要作用是在调度过程中运行"prefilter"插件,并根据插件的运行结果来更新节点的状态或返回错误信息。
// Record the messages from PreFilter in Diagnosis.PreFilterMsg.
msg := s.Message()
diagnosis.PreFilterMsg = msg
logger.V(5).Info("Status after running PreFilter plugins for pod", "pod", klog.KObj(pod), "status", msg)
diagnosis.AddPluginStatus(s)
return nil, diagnosis, nil
}
//该函数主要功能是记录PreFilter插件运行后的内容。
//1. 首先获取消息msg := s.Message();
//2. 然后将该消息记录到diagnosis.PreFilterMsg中;
//3. 使用logger.V(5).Info记录日志,包括pod信息和状态消息;
//4. 最后将插件状态添加到diagnosis中,并返回nil, diagnosis, nil。
// "NominatedNodeName" can potentially be set in a previous scheduling cycle as a result of preemption.
// This node is likely the only candidate that will fit the pod, and hence we try it first before iterating over all nodes.
if len(pod.Status.NominatedNodeName) > 0 {
feasibleNodes, err := sched.evaluateNominatedNode(ctx, pod, fwk, state, diagnosis)
if err != nil {
logger.Error(err, "Evaluation failed on nominated node", "pod", klog.KObj(pod), "node", pod.Status.NominatedNodeName)
}
// Nominated node passes all the filters, scheduler is good to assign this node to the pod.
if len(feasibleNodes) != 0 {
return feasibleNodes, diagnosis, nil
}
}
//这段Go代码的功能是在调度Pod之前,优先尝试在上一次调度周期中因抢占而被提名的节点上是否可以放置该Pod。
//如果提名的节点通过了所有的过滤器,则将该节点分配给Pod。
nodes := allNodes
if !preRes.AllNodes() {
nodes = make([]*framework.NodeInfo, 0, len(preRes.NodeNames))
for _, n := range allNodes {
if !preRes.NodeNames.Has(n.Node().Name) {
// We consider Nodes that are filtered out by PreFilterResult as rejected via UnschedulableAndUnresolvable.
// We have to record them in NodeToStatusMap so that they won't be considered as candidates in the preemption.
diagnosis.NodeToStatusMap[n.Node().Name] = framework.NewStatus(framework.UnschedulableAndUnresolvable, "node is filtered out by the prefilter result")
continue
}
nodes = append(nodes, n)
}
}
feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, &diagnosis, nodes)
// always try to update the sched.nextStartNodeIndex regardless of whether an error has occurred
// this is helpful to make sure that all the nodes have a chance to be searched
processedNodes := len(feasibleNodes) + len(diagnosis.NodeToStatusMap)
sched.nextStartNodeIndex = (sched.nextStartNodeIndex + processedNodes) % len(nodes)
if err != nil {
return nil, diagnosis, err
}
//该函数是Go语言编写的,用于在给定节点列表中找到满足特定条件的节点。
//首先,函数会检查preRes.AllNodes()是否为真,如果不为真,则遍历allNodes列表,
//将不在preRes.NodeNames中的节点过滤掉,并将过滤掉的节点标记为"rejected via UnschedulableAndUnresolvable"。
//接下来,函数调用sched.findNodesThatPassFilters方法来找到通过特定过滤器的节点,并将结果保存在feasibleNodes变量中。
//最后,函数更新sched.nextStartNodeIndex属性,并返回结果。
feasibleNodesAfterExtender, err := findNodesThatPassExtenders(ctx, sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
if err != nil {
return nil, diagnosis, err
}
if len(feasibleNodesAfterExtender) != len(feasibleNodes) {
// Extenders filtered out some nodes.
//
// Extender doesn't support any kind of requeueing feature like EnqueueExtensions in the scheduling framework.
// When Extenders reject some Nodes and the pod ends up being unschedulable,
// we put framework.ExtenderName to pInfo.UnschedulablePlugins.
// This Pod will be requeued from unschedulable pod pool to activeQ/backoffQ
// by any kind of cluster events.
// https://github.com/kubernetes/kubernetes/issues/122019
if diagnosis.UnschedulablePlugins == nil {
diagnosis.UnschedulablePlugins = sets.New[string]()
}
diagnosis.UnschedulablePlugins.Insert(framework.ExtenderName)
}
return feasibleNodesAfterExtender, diagnosis, nil
}
//这段Go代码的功能是在给定的节点列表中,通过调用一组扩展程序来找到满足特定条件的节点。
//首先,函数调用findNodesThatPassExtenders方法,该方法会遍历给定的扩展程序列表,
//并将通过扩展程序过滤的节点保存在feasibleNodesAfterExtender变量中。
//如果feasibleNodesAfterExtender的长度与feasibleNodes的长度不相等,则说明有节点被扩展程序过滤掉了。
//在这种情况下,函数会将framework.ExtenderName添加到diagnosis.UnschedulablePlugins集合中,以表示该扩展程序导致了节点被拒绝。
//最后,函数返回feasibleNodesAfterExtender、diagnosis和nil作为结果。
func (sched *Scheduler) evaluateNominatedNode(ctx context.Context, pod *v1.Pod, fwk framework.Framework, state *framework.CycleState, diagnosis framework.Diagnosis) ([]*framework.NodeInfo, error) {
nnn := pod.Status.NominatedNodeName
nodeInfo, err := sched.nodeInfoSnapshot.Get(nnn)
if err != nil {
return nil, err
}
node := []*framework.NodeInfo{nodeInfo}
feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, &diagnosis, node)
if err != nil {
return nil, err
}
feasibleNodes, err = findNodesThatPassExtenders(ctx, sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
if err != nil {
return nil, err
}
return feasibleNodes, nil
}
//该函数是Scheduler的一个方法,用于评估提名的节点是否适合放置Pod。
//它首先通过NominatedNodeName获取节点信息,然后调用findNodesThatPassFilters方法过滤不满足条件的节点,
//并调用findNodesThatPassExtenders方法进一步过滤节点。最终返回满足条件的节点列表。
// hasScoring checks if scoring nodes is configured.
func (sched *Scheduler) hasScoring(fwk framework.Framework) bool {
if fwk.HasScorePlugins() {
return true
}
for _, extender := range sched.Extenders {
if extender.IsPrioritizer() {
return true
}
}
return false
}
//该函数用于判断调度器中是否配置了评分节点。
//首先检查给定的框架是否有评分插件,如果有,则返回true。
//如果没有,则遍历调度器的扩展程序,如果某个扩展程序是优先级判断器,则返回true。
//最后,如果没有找到评分插件或优先级判断器,则返回false。
// hasExtenderFilters checks if any extenders filter nodes.
func (sched *Scheduler) hasExtenderFilters() bool {
for _, extender := range sched.Extenders {
if extender.IsFilter() {
return true
}
}
return false
}
//该函数用于判断调度器中是否存在扩展程序过滤节点。
//它遍历调度器的扩展程序列表,如果找到任何一个具有过滤功能的扩展程序,则返回true,表示存在扩展程序过滤节点。
//如果没有找到任何具有过滤功能的扩展程序,则返回false。
// findNodesThatPassFilters finds the nodes that fit the filter plugins.
func (sched *Scheduler) findNodesThatPassFilters(
ctx context.Context,
fwk framework.Framework,
state *framework.CycleState,
pod *v1.Pod,
diagnosis *framework.Diagnosis,
nodes []*framework.NodeInfo) ([]*framework.NodeInfo, error) {
numAllNodes := len(nodes)
numNodesToFind := sched.numFeasibleNodesToFind(fwk.PercentageOfNodesToScore(), int32(numAllNodes))
if !sched.hasExtenderFilters() && !sched.hasScoring(fwk) {
numNodesToFind = 1
}
//该函数是Scheduler的一个方法,用于查找符合过滤条件的节点。
//它根据传入的过滤插件、调度框架、调度状态、Pod和诊断信息,从给定的节点列表中找到符合条件的节点。
//函数首先计算出需要查找的节点数量,根据是否有扩展器过滤器和评分器来决定最终的查找数量。
//如果没有扩展器过滤器和评分器,则只需要找到一个符合条件的节点。
//函数返回最终找到的符合条件的节点列表和可能的错误。
// Create feasible list with enough space to avoid growing it
// and allow assigning.
feasibleNodes := make([]*framework.NodeInfo, numNodesToFind)
if !fwk.HasFilterPlugins() {
for i := range feasibleNodes {
feasibleNodes[i] = nodes[(sched.nextStartNodeIndex+i)%numAllNodes]
}
return feasibleNodes, nil
}
//这段代码是Scheduler的一个方法,用于创建一个可行节点列表,列表的长度为numNodesToFind,以避免增长并允许分配。
//如果fwk没有过滤插件,则将nodes中的节点按顺序填充到feasibleNodes中,并返回该列表。
errCh := parallelize.NewErrorChannel()
var feasibleNodesLen int32
ctx, cancel := context.WithCancel(ctx)
defer cancel()
type nodeStatus struct {
node string
status *framework.Status
}
result := make([]*nodeStatus, numAllNodes)
checkNode := func(i int) {
// We check the nodes starting from where we left off in the previous scheduling cycle,
// this is to make sure all nodes have the same chance of being examined across pods.
nodeInfo := nodes[(sched.nextStartNodeIndex+i)%numAllNodes]
status := fwk.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo)
if status.Code() == framework.Error {
errCh.SendErrorWithCancel(status.AsError(), cancel)
return
}
if status.IsSuccess() {
length := atomic.AddInt32(&feasibleNodesLen, 1)
if length > numNodesToFind {
cancel()
atomic.AddInt32(&feasibleNodesLen, -1)
} else {
feasibleNodes[length-1] = nodeInfo
}
} else {
result[i] = &nodeStatus{node: nodeInfo.Node().Name, status: status}
}
}
//这段Go代码中的函数是一个并发执行节点检查的函数。
//它通过调用parallelize.NewErrorChannel()创建了一个错误通道errCh,用于收集并发执行过程中的错误信息。
//函数使用context.WithCancel(ctx)创建了一个可取消的上下文ctx,并在函数结束时通过defer cancel()取消该上下文,
//以确保所有并发操作都被正确终止。 函数内部定义了一个nodeStatus结构体,用于存储节点的名称和状态。
//result是一个用于存储检查结果的切片,其长度为numAllNodes。checkNode是一个闭包函数,用于检查节点是否适合放置一个Pod。
//它通过计算索引,从上一次调度周期中未检查的节点开始检查。
//在检查节点时,它会调用fwk.RunFilterPluginsWithNominatedPods来运行过滤插件,
//并根据插件的返回状态来更新feasibleNodesLen和feasibleNodes,或者将节点的状态存储到result中。
//如果节点检查过程中出现错误,会通过错误通道发送错误信息并取消上下文。
//总之,这个函数的作用是并发地检查一组节点,并收集检查结果和错误信息。
beginCheckNode := time.Now()
statusCode := framework.Success
defer func() {
// We record Filter extension point latency here instead of in framework.go because framework.RunFilterPlugins
// function is called for each node, whereas we want to have an overall latency for all nodes per scheduling cycle.
// Note that this latency also includes latency for `addNominatedPods`, which calls framework.RunPreFilterAddPod.
metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Filter, statusCode.String(), fwk.ProfileName()).Observe(metrics.SinceInSeconds(beginCheckNode))
}()
// Stops searching for more nodes once the configured number of feasible nodes
// are found.
fwk.Parallelizer().Until(ctx, numAllNodes, checkNode, metrics.Filter)
feasibleNodes = feasibleNodes[:feasibleNodesLen]
for _, item := range result {
if item == nil {
continue
}
diagnosis.NodeToStatusMap[item.node] = item.status
diagnosis.AddPluginStatus(item.status)
}
if err := errCh.ReceiveError(); err != nil {
statusCode = framework.Error
return feasibleNodes, err
}
return feasibleNodes, nil
}
//这段Go代码中的函数是用于在调度周期内并行检查多个节点是否适合放置Pod的函数。
//它首先记录了过滤扩展点的延迟时间,并在函数退出时更新相关指标。
//然后使用fwk.Parallelizer().Until方法并行执行checkNode函数,该函数会对每个节点进行检查,
//并根据检查结果更新feasibleNodes和diagnosis.NodeToStatusMap。
//最后,该函数会从错误通道接收错误信息,并根据错误信息更新状态码并返回结果。
// numFeasibleNodesToFind returns the number of feasible nodes that once found, the scheduler stops
// its search for more feasible nodes.
func (sched *Scheduler) numFeasibleNodesToFind(percentageOfNodesToScore *int32, numAllNodes int32) (numNodes int32) {
if numAllNodes < minFeasibleNodesToFind {
return numAllNodes
}
// Use profile percentageOfNodesToScore if it's set. Otherwise, use global percentageOfNodesToScore.
var percentage int32
if percentageOfNodesToScore != nil {
percentage = *percentageOfNodesToScore
} else {
percentage = sched.percentageOfNodesToScore
}
if percentage == 0 {
percentage = int32(50) - numAllNodes/125
if percentage < minFeasibleNodesPercentageToFind {
percentage = minFeasibleNodesPercentageToFind
}
}
numNodes = numAllNodes * percentage / 100
if numNodes < minFeasibleNodesToFind {
return minFeasibleNodesToFind
}
return numNodes
}
//该函数是Go语言编写的,用于计算调度器需要找到的可行节点的数量。
//函数首先判断所有节点的数量是否小于最小可行节点数,如果是,则直接返回所有节点数。
//然后根据传入的节点评分百分比或全局百分比计算需要找到的节点数。
//如果百分比为0,则根据节点总数计算一个默认的百分比。
//最后,返回计算得到的可行节点数,如果小于最小可行节点数,则返回最小可行节点数。
func findNodesThatPassExtenders(ctx context.Context, extenders []framework.Extender, pod *v1.Pod, feasibleNodes []*framework.NodeInfo, statuses framework.NodeToStatusMap) ([]*framework.NodeInfo, error) {
logger := klog.FromContext(ctx)
// Extenders are called sequentially.
// Nodes in original feasibleNodes can be excluded in one extender, and pass on to the next
// extender in a decreasing manner.
for _, extender := range extenders {
if len(feasibleNodes) == 0 {
break
}
if !extender.IsInterested(pod) {
continue
}
//该函数的功能是通过调用一系列的extenders来筛选出能够运行pod的节点。
//它会依次调用每个extender,并将筛选后的节点传递给下一个extender。
//如果某个extender对pod不感兴趣,则会跳过该extender。
// Status of failed nodes in failedAndUnresolvableMap will be added or overwritten in <statuses>,
// so that the scheduler framework can respect the UnschedulableAndUnresolvable status for
// particular nodes, and this may eventually improve preemption efficiency.
// Note: users are recommended to configure the extenders that may return UnschedulableAndUnresolvable
// status ahead of others.
feasibleList, failedMap, failedAndUnresolvableMap, err := extender.Filter(pod, feasibleNodes)
if err != nil {
if extender.IsIgnorable() {
logger.Info("Skipping extender as it returned error and has ignorable flag set", "extender", extender, "err", err)
continue
}
return nil, err
}
//这段Go代码是调用一个名为extender.Filter的函数,该函数用于过滤出适合放置Pod的节点,并更新节点的状态。
//函数的返回值包括可行节点列表feasibleList、失败节点映射failedMap、失败且不可解决节点映射failedAndUnresolvableMap和错误信息err。
//如果extender.Filter函数返回错误,且该错误可被忽略(通过extender.IsIgnorable()判断),则会打印日志信息并继续执行;否则,直接返回错误。
for failedNodeName, failedMsg := range failedAndUnresolvableMap {
var aggregatedReasons []string
if _, found := statuses[failedNodeName]; found {
aggregatedReasons = statuses[failedNodeName].Reasons()
}
aggregatedReasons = append(aggregatedReasons, failedMsg)
statuses[failedNodeName] = framework.NewStatus(framework.UnschedulableAndUnresolvable, aggregatedReasons...)
}
//这段Go代码遍历failedAndUnresolvableMap,对于每个失败且不可解决的节点,获取其已有的状态信息(如果存在),
//将当前失败消息添加到状态原因列表中,并更新节点的状态为UnschedulableAndUnresolvable。
//这里使用了framework.NewStatus函数创建新的状态对象。
for failedNodeName, failedMsg := range failedMap {
if _, found := failedAndUnresolvableMap[failedNodeName]; found {
// failedAndUnresolvableMap takes precedence over failedMap
// note that this only happens if the extender returns the node in both maps
continue
}
if _, found := statuses[failedNodeName]; !found {
statuses[failedNodeName] = framework.NewStatus(framework.Unschedulable, failedMsg)
} else {
statuses[failedNodeName].AppendReason(failedMsg)
}
}
feasibleNodes = feasibleList
}
return feasibleNodes, nil
}
//这段Go代码是一个for循环,用于遍历failedMap,并根据条件对failedAndUnresolvableMap和statuses进行操作。
//接着将feasibleList赋值给feasibleNodes,并最终返回feasibleNodes和nil。
//具体来说:
//- 遍历failedMap中的每个元素,其中failedNodeName为键,failedMsg为值。
//- 判断failedAndUnresolvableMap中是否存在键为failedNodeName的元素,如果存在则跳过当前循环。
//- 判断statuses中是否存在键为failedNodeName的元素,如果存在则将failedMsg追加为该元素的reason,
//如果不存在则创建一个新的framework.Status对象,并将其添加到statuses中。
//最后,将feasibleList赋值给feasibleNodes,并返回feasibleNodes和nil。
// prioritizeNodes prioritizes the nodes by running the score plugins,
// which return a score for each node from the call to RunScorePlugins().
// The scores from each plugin are added together to make the score for that node, then
// any extenders are run as well.
// All scores are finally combined (added) to get the total weighted scores of all nodes
func prioritizeNodes(
ctx context.Context,
extenders []framework.Extender,
fwk framework.Framework,
state *framework.CycleState,
pod *v1.Pod,
nodes []*framework.NodeInfo,
) ([]framework.NodePluginScores, error) {
logger := klog.FromContext(ctx)
// If no priority configs are provided, then all nodes will have a score of one.
// This is required to generate the priority list in the required format
if len(extenders) == 0 && !fwk.HasScorePlugins() {
result := make([]framework.NodePluginScores, 0, len(nodes))
for i := range nodes {
result = append(result, framework.NodePluginScores{
Name: nodes[i].Node().Name,
TotalScore: 1,
})
}
return result, nil
} //该函数的作用是通过运行评分插件来优先级排序节点。
// 首先,它根据调用RunScorePlugins()方法从每个插件返回的分数计算每个节点的分数
// 然后,它运行任何扩展程序。最后,将所有分数相加得到节点的总权重分数。
//如果未提供优先级配置,则所有节点的得分为1。
// Run PreScore plugins.
preScoreStatus := fwk.RunPreScorePlugins(ctx, state, pod, nodes)
if !preScoreStatus.IsSuccess() {
return nil, preScoreStatus.AsError()
}
//该函数运行PreScore插件,通过fwk.RunPreScorePlugins方法执行。如果运行成功,则继续执行后续代码;如果运行失败,则返回错误信息。
// Run the Score plugins.
nodesScores, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes)
if !scoreStatus.IsSuccess() {
return nil, scoreStatus.AsError()
}
// Additional details logged at level 10 if enabled.
loggerVTen := logger.V(10)
if loggerVTen.Enabled() {
for _, nodeScore := range nodesScores {
for _, pluginScore := range nodeScore.Scores {
loggerVTen.Info("Plugin scored node for pod", "pod", klog.KObj(pod), "plugin", pluginScore.Name, "node", nodeScore.Name, "score", pluginScore.Score)
}
}
}
//该函数运行Score插件,通过fwk.RunScorePlugins方法为每个节点计算一个得分。
//如果运行成功,则继续执行后续代码;如果运行失败,则返回错误信息。如果日志级别10被启用,则会记录每个插件为每个节点打分的详细信息。
if len(extenders) != 0 && nodes != nil {
// allNodeExtendersScores has all extenders scores for all nodes.
// It is keyed with node name.
allNodeExtendersScores := make(map[string]*framework.NodePluginScores, len(nodes))
var mu sync.Mutex
var wg sync.WaitGroup
for i := range extenders {
if !extenders[i].IsInterested(pod) {
continue
}
wg.Add(1)
go func(extIndex int) {
metrics.Goroutines.WithLabelValues(metrics.PrioritizingExtender).Inc()
defer func() {
metrics.Goroutines.WithLabelValues(metrics.PrioritizingExtender).Dec()
wg.Done()
}()
prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)
if err != nil {
// Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
logger.V(5).Info("Failed to run extender's priority function. No score given by this extender.", "error", err, "pod", klog.KObj(pod), "extender", extenders[extIndex].Name())
return
}
mu.Lock()
defer mu.Unlock()
for i := range *prioritizedList {
nodename := (*prioritizedList)[i].Host
score := (*prioritizedList)[i].Score
if loggerVTen.Enabled() {
loggerVTen.Info("Extender scored node for pod", "pod", klog.KObj(pod), "extender", extenders[extIndex].Name(), "node", nodename, "score", score)
}
//这段Go代码的功能是使用一组扩展器(extendenders)对一组节点(nodes)进行优先级排序。
//首先,它会遍历所有扩展器,并对每个感兴趣的扩展器启动一个goroutine来调用其优先级函数。
//扩展器的优先级函数返回一个加权优先级列表(prioritizedList)和一个权重(weight)。
//然后,这段代码会将每个扩展器对每个节点的得分存储在一个映射(allNodeExtendersScores)中,以便后续使用。
// MaxExtenderPriority may diverge from the max priority used in the scheduler and defined by MaxNodeScore,
// therefore we need to scale the score returned by extenders to the score range used by the scheduler.
finalscore := score * weight * (framework.MaxNodeScore / extenderv1.MaxExtenderPriority)
if allNodeExtendersScores[nodename] == nil {
allNodeExtendersScores[nodename] = &framework.NodePluginScores{
Name: nodename,
Scores: make([]framework.PluginScore, 0, len(extenders)),
}
}
allNodeExtendersScores[nodename].Scores = append(allNodeExtendersScores[nodename].Scores, framework.PluginScore{
Name: extenders[extIndex].Name(),
Score: finalscore,
})
allNodeExtendersScores[nodename].TotalScore += finalscore
}
}(i)
}
// wait for all go routines to finish
wg.Wait()
for i := range nodesScores {
if score, ok := allNodeExtendersScores[nodes[i].Node().Name]; ok {
nodesScores[i].Scores = append(nodesScores[i].Scores, score.Scores...)
nodesScores[i].TotalScore += score.TotalScore
}
}
}
if loggerVTen.Enabled() {
for i := range nodesScores {
loggerVTen.Info("Calculated node's final score for pod", "pod", klog.KObj(pod), "node", nodesScores[i].Name, "score", nodesScores[i].TotalScore)
}
}
return nodesScores, nil
}
var errEmptyPriorityList = errors.New("empty priorityList")
// selectHost takes a prioritized list of nodes and then picks one
// in a reservoir sampling manner from the nodes that had the highest score.
// It also returns the top {count} Nodes,
// and the top of the list will be always the selected host.
func selectHost(nodeScoreList []framework.NodePluginScores, count int) (string, []framework.NodePluginScores, error) {
if len(nodeScoreList) == 0 {
return "", nil, errEmptyPriorityList
}
var h nodeScoreHeap = nodeScoreList
heap.Init(&h)
cntOfMaxScore := 1
selectedIndex := 0
// The top of the heap is the NodeScoreResult with the highest score.
sortedNodeScoreList := make([]framework.NodePluginScores, 0, count)
sortedNodeScoreList = append(sortedNodeScoreList, heap.Pop(&h).(framework.NodePluginScores))
// This for-loop will continue until all Nodes with the highest scores get checked for a reservoir sampling,
// and sortedNodeScoreList gets (count - 1) elements.
for ns := heap.Pop(&h).(framework.NodePluginScores); ; ns = heap.Pop(&h).(framework.NodePluginScores) {
if ns.TotalScore != sortedNodeScoreList[0].TotalScore && len(sortedNodeScoreList) == count {
break
}
if ns.TotalScore == sortedNodeScoreList[0].TotalScore {
cntOfMaxScore++
if rand.Intn(cntOfMaxScore) == 0 {
// Replace the candidate with probability of 1/cntOfMaxScore
selectedIndex = cntOfMaxScore - 1
}
}
sortedNodeScoreList = append(sortedNodeScoreList, ns)
if h.Len() == 0 {
break
}
}
if selectedIndex != 0 {
// replace the first one with selected one
previous := sortedNodeScoreList[0]
sortedNodeScoreList[0] = sortedNodeScoreList[selectedIndex]
sortedNodeScoreList[selectedIndex] = previous
}
if len(sortedNodeScoreList) > count {
sortedNodeScoreList = sortedNodeScoreList[:count]
}
return sortedNodeScoreList[0].Name, sortedNodeScoreList, nil
}
// nodeScoreHeap is a heap of framework.NodePluginScores.
type nodeScoreHeap []framework.NodePluginScores
// nodeScoreHeap implements heap.Interface.
var _ heap.Interface = &nodeScoreHeap{}
func (h nodeScoreHeap) Len() int { return len(h) }
func (h nodeScoreHeap) Less(i, j int) bool { return h[i].TotalScore > h[j].TotalScore }
func (h nodeScoreHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *nodeScoreHeap) Push(x interface{}) {
*h = append(*h, x.(framework.NodePluginScores))
}
func (h *nodeScoreHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
// assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.
// assume modifies `assumed`.
func (sched *Scheduler) assume(logger klog.Logger, assumed *v1.Pod, host string) error {
// Optimistically assume that the binding will succeed and send it to apiserver
// in the background.
// If the binding fails, scheduler will release resources allocated to assumed pod
// immediately.
assumed.Spec.NodeName = host
if err := sched.Cache.AssumePod(logger, assumed); err != nil {
logger.Error(err, "Scheduler cache AssumePod failed")
return err
}
// if "assumed" is a nominated pod, we should remove it from internal cache
if sched.SchedulingQueue != nil {
sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed)
}
return nil
}
// bind binds a pod to a given node defined in a binding object.
// The precedence for binding is: (1) extenders and (2) framework plugins.
// We expect this to run asynchronously, so we handle binding metrics internally.
func (sched *Scheduler) bind(ctx context.Context, fwk framework.Framework, assumed *v1.Pod, targetNode string, state *framework.CycleState) (status *framework.Status) {
logger := klog.FromContext(ctx)
defer func() {
sched.finishBinding(logger, fwk, assumed, targetNode, status)
}()
bound, err := sched.extendersBinding(logger, assumed, targetNode)
if bound {
return framework.AsStatus(err)
}
return fwk.RunBindPlugins(ctx, state, assumed, targetNode)
}
// TODO(#87159): Move this to a Plugin.
func (sched *Scheduler) extendersBinding(logger klog.Logger, pod *v1.Pod, node string) (bool, error) {
for _, extender := range sched.Extenders {
if !extender.IsBinder() || !extender.IsInterested(pod) {
continue
}
err := extender.Bind(&v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name, UID: pod.UID},
Target: v1.ObjectReference{Kind: "Node", Name: node},
})
if err != nil && extender.IsIgnorable() {
logger.Info("Skipping extender in bind as it returned error and has ignorable flag set", "extender", extender, "err", err)
continue
}
return true, err
}
return false, nil
}
func (sched *Scheduler) finishBinding(logger klog.Logger, fwk framework.Framework, assumed *v1.Pod, targetNode string, status *framework.Status) {
if finErr := sched.Cache.FinishBinding(logger, assumed); finErr != nil {
logger.Error(finErr, "Scheduler cache FinishBinding failed")
}
if !status.IsSuccess() {
logger.V(1).Info("Failed to bind pod", "pod", klog.KObj(assumed))
return
}
fwk.EventRecorder().Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode)
}
func getAttemptsLabel(p *framework.QueuedPodInfo) string {
// We breakdown the pod scheduling duration by attempts capped to a limit
// to avoid ending up with a high cardinality metric.
if p.Attempts >= 15 {
return "15+"
}
return strconv.Itoa(p.Attempts)
}
// handleSchedulingFailure records an event for the pod that indicates the
// pod has failed to schedule. Also, update the pod condition and nominated node name if set.
func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, status *framework.Status, nominatingInfo *framework.NominatingInfo, start time.Time) {
calledDone := false
defer func() {
if !calledDone {
// Basically, AddUnschedulableIfNotPresent calls DonePod internally.
// But, AddUnschedulableIfNotPresent isn't called in some corner cases.
// Here, we call DonePod explicitly to avoid leaking the pod.
sched.SchedulingQueue.Done(podInfo.Pod.UID)
}
}()
//该函数是Go语言编写的,用于处理调度失败的情况。
//它会记录一个表示Pod调度失败的事件,并更新Pod的状态和提名节点名(如果已设置)。函数主要包含以下几点内容:
//1. 定义了一个名为handleSchedulingFailure的函数,它接受多个参数,
//包括上下文ctx、框架fwk、排队的Pod信息podInfo、状态status、提名信息nominatingInfo和开始时间start。
//2. 在函数内部,定义了一个名为calledDone的布尔变量,用于标记是否已经调用了DonePod方法。
//3. 使用defer语句定义了一个匿名函数,该函数会在handleSchedulingFailure函数退出时执行。
//匿名函数中,通过判断calledDone是否已被标记为true来决定是否需要显式调用sched.SchedulingQueue.Done(podInfo.Pod.UID)方法,
//以避免泄露Pod。
//4. 函数的主体部分包括记录调度失败事件和更新Pod状态和提名节点名的逻辑,这部分内容在给定的代码片段中没有展示出来。
logger := klog.FromContext(ctx)
reason := v1.PodReasonSchedulerError
if status.IsRejected() {
reason = v1.PodReasonUnschedulable
}
switch reason {
case v1.PodReasonUnschedulable:
metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
case v1.PodReasonSchedulerError:
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
}
//该函数主要通过klog从上下文ctx中获取logger,根据status的状态来设置Pod失败的原因,然后根据不同的失败原因来记录相应的metrics。
//具体步骤如下:
//1. 从上下文ctx中获取logger。
//2. 初始化reason为SchedulerError。
//3. 如果status被拒绝,则将reason更新为Unschedulable。
//4. 根据reason的不同,记录相应的metrics:
//- 如果reason为Unschedulable,则调用PodUnschedulable方法,并传入fwk的ProfileName和start与当前时间的秒数差作为参数。
//- 如果reason为SchedulerError,则调用PodScheduleError方法,并传入fwk的ProfileName和start与当前时间的秒数差作为参数。
pod := podInfo.Pod
err := status.AsError()
errMsg := status.Message()
if err == ErrNoNodesAvailable {
logger.V(2).Info("Unable to schedule pod; no nodes are registered to the cluster; waiting", "pod", klog.KObj(pod))
} else if fitError, ok := err.(*framework.FitError); ok { // Inject UnschedulablePlugins to PodInfo, which will be used later for moving Pods between queues efficiently.
podInfo.UnschedulablePlugins = fitError.Diagnosis.UnschedulablePlugins
podInfo.PendingPlugins = fitError.Diagnosis.PendingPlugins
logger.V(2).Info("Unable to schedule pod; no fit; waiting", "pod", klog.KObj(pod), "err", errMsg)
} else {
logger.Error(err, "Error scheduling pod; retrying", "pod", klog.KObj(pod))
}
//这段Go代码主要处理了三种不同类型的错误,并根据错误类型记录日志或更新podInfo对象的属性。
//首先,如果错误为ErrNoNodesAvailable,则会记录一条日志,指示无法调度pod,因为没有可用的节点,并等待。
//其次,如果错误类型为*framework.FitError,则将该错误的诊断信息(UnschedulablePlugins和PendingPlugins)注入到podInfo对象中,
//以供后续使用,然后记录一条日志,指示无法调度pod,因为没有合适的节点,并等待。
//最后,如果错误类型不是上述两种类型,则记录一条错误日志,指示调度pod时出错,并重试。
//总的来说,这段代码主要是根据不同的错误类型进行错误处理,并通过日志记录相关信息。
// Check if the Pod exists in informer cache.
podLister := fwk.SharedInformerFactory().Core().V1().Pods().Lister()
cachedPod, e := podLister.Pods(pod.Namespace).Get(pod.Name)
if e != nil {
logger.Info("Pod doesn't exist in informer cache", "pod", klog.KObj(pod), "err", e)
// We need to call DonePod here because we don't call AddUnschedulableIfNotPresent in this case.
} else {
// In the case of extender, the pod may have been bound successfully, but timed out returning its response to the scheduler.
// It could result in the live version to carry .spec.nodeName, and that's inconsistent with the internal-queued version.
if len(cachedPod.Spec.NodeName) != 0 {
logger.Info("Pod has been assigned to node. Abort adding it back to queue.", "pod", klog.KObj(pod), "node", cachedPod.Spec.NodeName)
// We need to call DonePod here because we don't call AddUnschedulableIfNotPresent in this case.
} else {
// As <cachedPod> is from SharedInformer, we need to do a DeepCopy() here.
// ignore this err since apiserver doesn't properly validate affinity terms
// and we can't fix the validation for backwards compatibility.
podInfo.PodInfo, _ = framework.NewPodInfo(cachedPod.DeepCopy())
if err := sched.SchedulingQueue.AddUnschedulableIfNotPresent(logger, podInfo, sched.SchedulingQueue.SchedulingCycle()); err != nil {
logger.Error(err, "Error occurred")
}
calledDone = true
}
}
//该函数用于检查Pod是否存在于informer缓存中。
//如果存在,则根据Pod的状态进行不同的处理。如果Pod已经被分配到节点上,则不再将其添加到队列中;
//否则,将Pod信息添加到队列中,以便进行调度。
//如果Pod不存在于缓存中,则需要调用DonePod函数。
// Update the scheduling queue with the nominated pod information. Without
// this, there would be a race condition between the next scheduling cycle
// and the time the scheduler receives a Pod Update for the nominated pod.
// Here we check for nil only for tests.
if sched.SchedulingQueue != nil {
logger := klog.FromContext(ctx)
sched.SchedulingQueue.AddNominatedPod(logger, podInfo.PodInfo, nominatingInfo)
}
if err == nil {
// Only tests can reach here.
return
}
msg := truncateMessage(errMsg)
fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
if err := updatePod(ctx, sched.client, pod, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: reason,
Message: errMsg,
}, nominatingInfo); err != nil {
klog.FromContext(ctx).Error(err, "Error updating pod", "pod", klog.KObj(pod))
}
}
//这段Go代码主要功能是更新调度队列中的提名Pod信息,避免调度周期和调度器接收提名Pod更新之间的竞态条件。
//具体来说:
//- 首先检查sched.SchedulingQueue是否为nil,仅在测试中会为nil。
//- 若不为nil,则使用logger记录日志,并调用AddNominatedPod方法将提名Pod信息加入调度队列。
//- 若没有错误发生,仅在测试中会执行,直接返回。
//- 截断错误消息,并使用EventRecorder记录事件,事件类型为FailedScheduling,事件原因为msg。
//- 调用updatePod方法更新Pod的条件,将PodScheduled状态设置为ConditionFalse,原因设置为reason,消息设置为errMsg,
//并将提名信息传递给updatePod方法。
//- 若更新Pod时发生错误,记录错误日志。
// truncateMessage truncates a message if it hits the NoteLengthLimit.
func truncateMessage(message string) string {
max := validation.NoteLengthLimit
if len(message) <= max {
return message
}
suffix := " ..."
return message[:max-len(suffix)] + suffix
}
//该函数用于截断字符串,如果给定的消息长度超过了validation.NoteLengthLimit规定的最大长度,则在末尾添加"..."并返回截断后的消息字符串。
//如果给定的消息长度小于等于最大长度,则直接返回原消息字符串。
//该函数用于截断字符串,如果给定的字符串长度超过了validation.NoteLengthLimit规定的最大长度,则在字符串末尾添加"..."并返回截断后的字符串;
//如果给定的字符串长度不超过最大长度,则直接返回原字符串。
func updatePod(ctx context.Context, client clientset.Interface, pod *v1.Pod, condition *v1.PodCondition, nominatingInfo *framework.NominatingInfo) error {
logger := klog.FromContext(ctx)
logger.V(3).Info("Updating pod condition", "pod", klog.KObj(pod), "conditionType", condition.Type, "conditionStatus", condition.Status, "conditionReason", condition.Reason)
podStatusCopy := pod.Status.DeepCopy()
// NominatedNodeName is updated only if we are trying to set it, and the value is
// different from the existing one.
nnnNeedsUpdate := nominatingInfo.Mode() == framework.ModeOverride && pod.Status.NominatedNodeName != nominatingInfo.NominatedNodeName
if !podutil.UpdatePodCondition(podStatusCopy, condition) && !nnnNeedsUpdate {
return nil
}
if nnnNeedsUpdate {
podStatusCopy.NominatedNodeName = nominatingInfo.NominatedNodeName
}
return util.PatchPodStatus(ctx, client, pod, podStatusCopy)
}
//该函数用于更新Pod的条件状态,如果满足一定条件,则更新Pod的NominatedNodeName字段,
//并通过PatchPodStatus函数将更新后的Pod状态应用到实际的Pod对象中。
//- 首先,从上下文中获取日志记录器,并输出相关日志信息。
//- 然后,创建一个Pod状态的深拷贝。
//- 接着,判断是否需要更新NominatedNodeName字段,
//只有当nominatingInfo的模式为ModeOverride且当前NominatedNodeName与nominatingInfo中的NominatedNodeName不相同时,才需要更新。
//- 如果不需要更新Pod的条件状态和NominatedNodeName字段,则直接返回。
//- 如果需要更新NominatedNodeName字段,则将其赋值给podStatusCopy中的NominatedNodeName字段。
//- 最后,调用PatchPodStatus函数将podStatusCopy中的更新应用到实际的Pod对象中,并返回操作结果。
//请注意,该函数中涉及到的一些参数和函数的具体实现和用途可能需要结合具体的上下文和代码来理解。