/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package statefulset
import (
"context"
"sort"
"sync"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/controller/history"
"k8s.io/kubernetes/pkg/features"
)
// Realistic value for maximum in-flight requests when processing in parallel mode.
const MaxBatchSize = 500
// StatefulSetControl implements the control logic for updating StatefulSets and their children Pods. It is implemented
// as an interface to allow for extensions that provide different semantics. Currently, there is only one implementation.
type StatefulSetControlInterface interface {
// UpdateStatefulSet implements the control logic for Pod creation, update, and deletion, and
// persistent volume creation, update, and deletion.
// If an implementation returns a non-nil error, the invocation will be retried using a rate-limited strategy.
// Implementors should sink any errors that they do not wish to trigger a retry, and they may feel free to
// exit exceptionally at any point provided they wish the update to be re-run at a later point in time.
UpdateStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error)
// ListRevisions returns a array of the ControllerRevisions that represent the revisions of set. If the returned
// error is nil, the returns slice of ControllerRevisions is valid.
ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error)
// AdoptOrphanRevisions adopts any orphaned ControllerRevisions that match set's Selector. If all adoptions are
// successful the returned error is nil.
AdoptOrphanRevisions(set *apps.StatefulSet, revisions []*apps.ControllerRevision) error
}
//这段代码定义了一个名为StatefulSetControlInterface的接口,其中包含了三个方法:
//1. UpdateStatefulSet:用于更新StatefulSet及其子Pods的控制逻辑。实现此方法时,如果返回非空错误,调用将使用速率限制策略进行重试。
//实现者应将任何不希望触发重试的错误沉入,并且可以在任何时间点异常退出,只要希望稍后重新运行更新即可。
//2. ListRevisions:返回代表set修订版本的ControllerRevision数组。如果返回的错误为nil,则返回的ControllerRevision切片是有效的。
//3. AdoptOrphanRevisions:采用与set的Selector匹配的任何孤儿ControllerRevision。如果所有采用都成功,则返回的错误为nil。
//注意:代码中还定义了一个常量MaxBatchSize,其值为500。
// NewDefaultStatefulSetControl returns a new instance of the default implementation StatefulSetControlInterface that
// implements the documented semantics for StatefulSets. podControl is the PodControlInterface used to create, update,
// and delete Pods and to create PersistentVolumeClaims. statusUpdater is the StatefulSetStatusUpdaterInterface used
// to update the status of StatefulSets. You should use an instance returned from NewRealStatefulPodControl() for any
// scenario other than testing.
func NewDefaultStatefulSetControl(
podControl *StatefulPodControl,
statusUpdater StatefulSetStatusUpdaterInterface,
controllerHistory history.Interface,
recorder record.EventRecorder) StatefulSetControlInterface {
return &defaultStatefulSetControl{podControl, statusUpdater, controllerHistory, recorder}
}
// 该函数返回一个实现了StatefulSetControlInterface接口的defaultStatefulSetControl实例。
// 参数包括podControl用于创建、更新和删除Pod以及创建PersistentVolumeClaims;
// statusUpdater用于更新StatefulSets的状态;
// controllerHistory用于管理StatefulSet的控制器历史记录;
// recorder用于记录事件。
// 除了测试场景外,应使用NewRealStatefulPodControl()返回的实例作为podControl参数。
type defaultStatefulSetControl struct {
podControl *StatefulPodControl
statusUpdater StatefulSetStatusUpdaterInterface
controllerHistory history.Interface
recorder record.EventRecorder
}
//该代码定义了一个名为defaultStatefulSetControl的结构体,它有四个字段:
//1. podControl:类型为*StatefulPodControl,用于控制Pod的操作。
//2. statusUpdater:类型为StatefulSetStatusUpdaterInterface,用于更新StatefulSet的状态。
//3. controllerHistory:类型为history.Interface,用于维护控制器的历史记录。
//4. recorder:类型为record.EventRecorder,用于记录事件。 这个结构体主要用于管理和控制StatefulSet的生命周期。
// UpdateStatefulSet executes the core logic loop for a stateful set, applying the predictable and
// consistent monotonic update strategy by default - scale up proceeds in ordinal order, no new pod
// is created while any pod is unhealthy, and pods are terminated in descending order. The burst
// strategy allows these constraints to be relaxed - pods will be created and deleted eagerly and
// in no particular order. Clients using the burst strategy should be careful to ensure they
// understand the consistency implications of having unpredictable numbers of pods available.
func (ssc *defaultStatefulSetControl) UpdateStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
set = set.DeepCopy() // set is modified when a new revision is created in performUpdate. Make a copy now to avoid mutation errors.
// list all revisions and sort them
revisions, err := ssc.ListRevisions(set)
//该函数是defaultStatefulSetControl类型的UpdateStatefulSet方法,用于更新StatefulSet对象的状态。
//函数首先对传入的StatefulSet对象进行深拷贝,以避免更新过程中出现突变错误。
//然后通过调用ssc的ListRevisions方法,列出并排序该StatefulSet的所有修订版本。
//接下来,函数会根据传入的Pod对象列表,更新StatefulSet的状态信息,并返回更新后的StatefulSet状态以及可能出现的错误。
if err != nil {
return nil, err
}
history.SortControllerRevisions(revisions)
//该函数首先检查err是否为nil,如果不为nil,则返回nil和err。然后调用history.SortControllerRevisions函数对revisions进行排序。
currentRevision, updateRevision, status, err := ssc.performUpdate(ctx, set, pods, revisions)
if err != nil {
errs := []error{err}
if agg, ok := err.(utilerrors.Aggregate); ok {
errs = agg.Errors()
}
return nil, utilerrors.NewAggregate(append(errs, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)))
}
//该函数调用ssc.performUpdate执行更新操作,并根据返回的结果进行错误处理和历史记录截断。
//如果performUpdate返回错误,函数会将错误添加到一个错误切片中,并判断错误是否为utilerrors.Aggregate类型,
//如果是,则将其中的错误提取出来合并到错误切片中。
//最后,函数会调用ssc.truncateHistory截断历史记录,并将其错误与之前收集的错误合并后返回。
// maintain the set's revision history limit
return status, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)
//该函数调用了ssc.truncateHistory方法,用于维护集合的修订历史记录限制。传入参数包括集合、Pods、修订版本号、当前修订版本号和更新修订版本号。返回值为操作状态。
}
//该函数是用于更新一个StatefulSet的状态的核心逻辑循环。
//它执行默认的、可预测的、一致的单调更新策略:扩展按顺序进行,当任何Pod不健康时不会创建新的Pod,Pods按降序终止。
//突发策略允许放松这些约束——Pod将被积极地创建和删除,且没有特定的顺序。
//使用突发策略的客户端应该小心确保他们了解具有不可预测的Pod数量的一致性影响。
//函数首先对传入的StatefulSet进行深拷贝,然后列出并排序所有修订版本。
//然后执行performUpdate函数进行更新操作,根据更新结果维护StatefulSet的修订版本历史记录限制。
func (ssc *defaultStatefulSetControl) performUpdate(
ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, *apps.StatefulSetStatus, error) {
var currentStatus *apps.StatefulSetStatus
logger := klog.FromContext(ctx)
//该函数是一个Go语言函数,它定义了一个名为performUpdate的方法,该方法属于defaultStatefulSetControl类型。
//这个方法接收四个参数:
//ctx是一个上下文对象,
//set是一个指向StatefulSet类型的指针,
//pods是一个指向Pod类型的切片,
//revisions是一个指向ControllerRevision类型的切片。
//该方法返回四个值:一个指向ControllerRevision类型的指针,一个指向ControllerRevision类型的指针,
//一个指向StatefulSetStatus类型的指针,以及一个错误类型。 在函数体内部,定义了一个名为currentStatus的变量,
//它是一个指向StatefulSetStatus类型的指针。
//然后,通过FromContext方法从ctx中获取了一个日志记录器对象,并将其赋值给名为logger的变量。
//这个函数的主要功能是在状态fulSet的更新过程中执行一些操作,并返回更新后的状态fulSet的状态信息和其他相关数据。
//但是,根据给定的代码片段,无法确定该函数的完整逻辑和具体操作。
// get the current, and update revisions
currentRevision, updateRevision, collisionCount, err := ssc.getStatefulSetRevisions(set, revisions)
if err != nil {
return currentRevision, updateRevision, currentStatus, err
}
//该函数用于获取当前状态集的修订版本,并更新修订版本。
//函数接收一个状态集和修订版本作为参数,返回当前修订版本、更新修订版本、冲突计数和错误信息。
//如果发生错误,函数会返回当前修订版本、更新修订版本、当前状态和错误信息。
// perform the main update function and get the status
currentStatus, err = ssc.updateStatefulSet(ctx, set, currentRevision, updateRevision, collisionCount, pods)
if err != nil && currentStatus == nil {
return currentRevision, updateRevision, nil, err
}
//该函数主要调用ssc.updateStatefulSet方法来更新StatefulSet的状态,并获取更新后的状态信息。如果更新出错且没有返回新的状态信息,则返回更新前的状态信息和错误。
// make sure to update the latest status even if there is an error with non-nil currentStatus
statusErr := ssc.updateStatefulSetStatus(ctx, set, currentStatus)
if statusErr == nil {
logger.V(4).Info("Updated status", "statefulSet", klog.KObj(set),
"replicas", currentStatus.Replicas,
"readyReplicas", currentStatus.ReadyReplicas,
"currentReplicas", currentStatus.CurrentReplicas,
"updatedReplicas", currentStatus.UpdatedReplicas)
}
//该函数用于更新StatefulSet的状态。
//首先调用ssc.updateStatefulSetStatus方法更新状态,然后根据返回的错误值进行判断。
//如果更新状态时没有错误,就打印日志信息,包括StatefulSet的名称和各种状态值(Replicas、ReadyReplicas、CurrentReplicas、UpdatedReplicas)。
switch {
case err != nil && statusErr != nil:
logger.Error(statusErr, "Could not update status", "statefulSet", klog.KObj(set))
return currentRevision, updateRevision, currentStatus, err
case err != nil:
return currentRevision, updateRevision, currentStatus, err
case statusErr != nil:
return currentRevision, updateRevision, currentStatus, statusErr
}
//这个Go函数通过switch语句根据err和statusErr的值来决定返回的结果。
//如果err和statusErr都不为nil,则记录错误日志并返回当前修订版号、更新修订版号、当前状态和err;
//如果只有err不为nil,则直接返回当前修订版号、更新修订版号、当前状态和err;
//如果只有statusErr不为nil,则直接返回当前修订版号、更新修订版号、当前状态和statusErr。
logger.V(4).Info("StatefulSet revisions", "statefulSet", klog.KObj(set),
"currentRevision", currentStatus.CurrentRevision,
"updateRevision", currentStatus.UpdateRevision)
return currentRevision, updateRevision, currentStatus, nil
//该函数是一个日志输出函数,通过调用logger.V(4).Info方法输出一条包含多个参数的日志信息。
//其中,参数StatefulSet revisions是日志的主题,statefulSet、currentRevision、updateRevision是日志的附加信息。
//函数返回currentRevision、updateRevision、currentStatus以及一个nil错误。
}
func (ssc *defaultStatefulSetControl) ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error) {
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
return nil, err
}
return ssc.controllerHistory.ListControllerRevisions(set, selector)
}
// 该函数是defaultStatefulSetControl类型的成员方法,用于列出给定StatefulSet的所有ControllerRevision。
// 1. 首先,函数将StatefulSet的selector转换为LabelSelector类型。
// 2. 然后,调用controllerHistory的ListControllerRevisions方法,传入StatefulSet和selector,返回所有的ControllerRevision。
// 3. 如果转换selector时出现错误,函数将返回nil和错误信息。 返回值为 []*apps.ControllerRevision,即ControllerRevision的切片。
func (ssc *defaultStatefulSetControl) AdoptOrphanRevisions(
set *apps.StatefulSet,
revisions []*apps.ControllerRevision) error {
for i := range revisions {
adopted, err := ssc.controllerHistory.AdoptControllerRevision(set, controllerKind, revisions[i])
if err != nil {
return err
}
revisions[i] = adopted
}
return nil
}
//该函数是一个Go语言函数,它属于defaultStatefulSetControl类型的一个方法。
//函数名为AdoptOrphanRevisions,它接受一个StatefulSet类型的指针set和一个ControllerRevision类型的切片revisions作为参数,返回一个error类型的值。
//该函数的作用是领养孤儿ControllerRevision(即没有StatefulSet关联的ControllerRevision),
//通过调用ssc.controllerHistory的AdoptControllerRevision方法,将孤儿ControllerRevision与给定的StatefulSet关联起来。
//函数会遍历revisions切片中的每个ControllerRevision,尝试将其领养,并将领养后的ControllerRevision更新到revisions切片中对应的位置。
//如果领养过程中出现错误,则会立即返回错误。如果成功领养所有孤儿ControllerRevision,则函数返回nil。
// truncateHistory truncates any non-live ControllerRevisions in revisions from set's history. The UpdateRevision and
// CurrentRevision in set's Status are considered to be live. Any revisions associated with the Pods in pods are also
// considered to be live. Non-live revisions are deleted, starting with the revision with the lowest Revision, until
// only RevisionHistoryLimit revisions remain. If the returned error is nil the operation was successful. This method
// expects that revisions is sorted when supplied.
func (ssc *defaultStatefulSetControl) truncateHistory(
set *apps.StatefulSet,
pods []*v1.Pod,
revisions []*apps.ControllerRevision,
current *apps.ControllerRevision,
update *apps.ControllerRevision) error {
history := make([]*apps.ControllerRevision, 0, len(revisions))
// mark all live revisions
live := map[string]bool{}
//该函数用于截断StatefulSet历史记录中任何非活动的ControllerRevisions。
//函数会从set的历史记录中删除非活动的修订版本,从修订版本号最低的开始,直到只剩下RevisionHistoryLimit个修订版本为止。
//函数会将UpdateRevision和CurrentRevision视为活动的修订版本,同时将与Pods关联的修订版本也视为活动的修订版本。
//函数期望传入的修订版本已经按照顺序排序。
//函数返回一个错误,如果返回的错误为nil,则表示操作成功。
//函数首先创建一个空的ControllerRevision指针切片history,用于存储活动的修订版本。
//然后创建一个map类型的live变量,用于标记活动的修订版本。
//接下来,函数遍历传入的修订版本切片revisions,将活动的修订版本添加到history切片中,并在live变量中标记该修订版本为活动的。
//最后,函数检查history切片的长度是否超过了RevisionHistoryLimit,
//如果是,则删除history切片中修订版本号最低的修订版本,直到history切片的长度等于RevisionHistoryLimit。
//如果在截断历史记录的过程中出现错误,则返回该错误。
if current != nil {
live[current.Name] = true
}
if update != nil {
live[update.Name] = true
}
for i := range pods {
live[getPodRevision(pods[i])] = true
}
// collect live revisions and historic revisions
for i := range revisions {
if !live[revisions[i].Name] {
history = append(history, revisions[i])
}
}
//该Go函数主要实现了以下几个功能:
//1. 将当前非空对象的名称添加到live映射中,并将其值设置为true。
//2. 如果update非空,将update对象的名称添加到live映射中,并将其值设置为true。
//3. 遍历pods切片,将每个Pod的修订版本名称添加到live映射中,并将其值设置为true。
//4. 遍历revisions切片,将不在live映射中的修订版本对象添加到history切片中。
//这段代码的主要目的是为了收集活跃的修订版本和历史修订版本,并将它们分别存储在live和history映射和切片中。
historyLen := len(history)
historyLimit := int(*set.Spec.RevisionHistoryLimit)
if historyLen <= historyLimit {
return nil
}
// delete any non-live history to maintain the revision limit.
history = history[:(historyLen - historyLimit)]
for i := 0; i < len(history); i++ {
if err := ssc.controllerHistory.DeleteControllerRevision(history[i]); err != nil {
return err
}
}
return nil
}
//该函数用于根据设定的修订历史限制,删除非活跃的历史记录,以保持修订限制。
//首先,获取历史记录的长度并将其与修订历史限制进行比较。
//如果历史记录的长度小于等于修订历史限制,则无需进行删除操作,直接返回nil。
//如果历史记录的长度大于修订历史限制,则通过切片操作将超出部分的历史记录删除,并遍历剩余的历史记录,调用DeleteControllerRevision方法将其逐个删除。
//如果删除过程中出现错误,则返回错误。最后,如果删除成功,则返回nil。
// getStatefulSetRevisions returns the current and update ControllerRevisions for set. It also
// returns a collision count that records the number of name collisions set saw when creating
// new ControllerRevisions. This count is incremented on every name collision and is used in
// building the ControllerRevision names for name collision avoidance. This method may create
// a new revision, or modify the Revision of an existing revision if an update to set is detected.
// This method expects that revisions is sorted when supplied.
func (ssc *defaultStatefulSetControl) getStatefulSetRevisions(
set *apps.StatefulSet,
revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, int32, error) {
var currentRevision, updateRevision *apps.ControllerRevision
revisionCount := len(revisions)
history.SortControllerRevisions(revisions)
//该函数用于获取StatefulSet的当前和更新的ControllerRevisions,并返回一个记录创建新ControllerRevisions时遇到的名称冲突次数的碰撞计数。
//如果检测到StatefulSet的更新,该函数可能会创建新修订版或修改现有修订版的修订号。函数期望传入的修订版已排序。
// Use a local copy of set.Status.CollisionCount to avoid modifying set.Status directly.
// This copy is returned so the value gets carried over to set.Status in updateStatefulSet.
var collisionCount int32
if set.Status.CollisionCount != nil {
collisionCount = *set.Status.CollisionCount
}
//这段Go代码定义了一个局部变量collisionCount,并根据set.Status.CollisionCount的值进行初始化。
//其主要作用是避免直接修改set.Status的值,通过返回这个局部变量的值,将其传递给updateStatefulSet函数,以更新set.Status中的CollisionCount字段。
//详细解释如下: 1. var collisionCount int32:定义了一个名为collisionCount的整型变量,用于存储set.Status.CollisionCount的值。
//2. if set.Status.CollisionCount != nil:判断set.Status.CollisionCount是否非空,即是否已经初始化。
//3. collisionCount = *set.Status.CollisionCount:如果set.Status.CollisionCount非空,将其值赋给collisionCount变量。
//这里使用了指针解引用操作*,因为set.Status.CollisionCount可能是一个指针。
//这段代码的主要目的是在不直接修改set.Status的情况下,获取并传递set.Status.CollisionCount的值。
//通过使用局部变量的方式,可以确保在updateStatefulSet函数中更新set.Status时,collisionCount的值不会丢失。
// create a new revision from the current set
updateRevision, err := newRevision(set, nextRevision(revisions), &collisionCount)
if err != nil {
return nil, nil, collisionCount, err
}
//这段Go代码中的函数创建了一个新的修订版本,使用当前的版本集。
//它首先调用nextRevision(revisions)来获取下一个修订版本号,然后调用newRevision(set, revision, &collisionCount)来创建新的修订版本。
//如果创建过程中出现错误,则返回nil、nil、collisionCount和错误信息。
// find any equivalent revisions
equalRevisions := history.FindEqualRevisions(revisions, updateRevision)
equalCount := len(equalRevisions)
//该代码片段是用Go编写的。它定义了一个函数,该函数查找与给定修订版本相等的任何修订版本,并返回相等修订版本的数量。
//具体而言,该函数通过调用history.FindEqualRevisions方法,将给定的修订版本列表revisions和更新的修订版本updateRevision作为参数传递。
//FindEqualRevisions方法返回一个包含与updateRevision相等的所有修订版本的切片。
//然后,代码通过使用len函数计算equalRevisions切片的长度,从而获取相等修订版本的数量。
//总结一下,这段代码的主要功能是查找与指定更新修订版本相等的修订版本的数量。
if equalCount > 0 && history.EqualRevision(revisions[revisionCount-1], equalRevisions[equalCount-1]) {
// if the equivalent revision is immediately prior the update revision has not changed
updateRevision = revisions[revisionCount-1]
} else if equalCount > 0 {
// if the equivalent revision is not immediately prior we will roll back by incrementing the
// Revision of the equivalent revision
updateRevision, err = ssc.controllerHistory.UpdateControllerRevision(
equalRevisions[equalCount-1],
updateRevision.Revision)
if err != nil {
return nil, nil, collisionCount, err
}
} else {
//if there is no equivalent revision we create a new one
updateRevision, err = ssc.controllerHistory.CreateControllerRevision(set, updateRevision, &collisionCount)
if err != nil {
return nil, nil, collisionCount, err
}
}
//这段Go代码中的函数片段是一个条件语句,用于根据不同的条件更新或创建控制器修订版本。
//具体功能如下:
//- 首先,它检查是否存在等效的修订版本(equalCount大于0)并且该修订版本是否立即位于更新修订版本之前。
//如果是,则将更新修订版本设置为最新修订版本。
//- 如果存在等效的修订版本但不立即位于更新修订版本之前,则通过调用ssc.controllerHistory.UpdateControllerRevision方法来回滚到该等效修订版本,
//并更新修订版本号。如果回滚过程中出现错误,则返回错误信息。
//- 如果不存在等效的修订版本,则通过调用ssc.controllerHistory.CreateControllerRevision方法创建一个新的控制器修订版本。
//如果创建过程中出现错误,则返回错误信息。 总之,这段代码根据条件选择更新或创建控制器修订版本,并可能涉及到回滚到等效的修订版本或创建新修订版本的操作。
// attempt to find the revision that corresponds to the current revision
for i := range revisions {
if revisions[i].Name == set.Status.CurrentRevision {
currentRevision = revisions[i]
break
}
}
//该函数是一个for循环,通过遍历revisions切片,查找并设置与set.Status.CurrentRevision名称相匹配的修订版本。
// if the current revision is nil we initialize the history by setting it to the update revision
if currentRevision == nil {
currentRevision = updateRevision
}
return currentRevision, updateRevision, collisionCount, nil
}
// 这个函数是一个简单的条件判断语句,判断当前的currentRevision是否为nil,
// 如果是,则将其初始化为updateRevision。
// 最后返回更新后的currentRevision、updateRevision、collisionCount和nil。
func slowStartBatch(initialBatchSize int, remaining int, fn func(int) (bool, error)) (int, error) {
successes := 0
j := 0
for batchSize := min(remaining, initialBatchSize); batchSize > 0; batchSize = min(min(2*batchSize, remaining), MaxBatchSize) {
errCh := make(chan error, batchSize)
var wg sync.WaitGroup
wg.Add(batchSize)
for i := 0; i < batchSize; i++ {
go func(k int) {
defer wg.Done()
// Ignore the first parameter - relevant for monotonic only.
if _, err := fn(k); err != nil {
errCh <- err
}
}(j)
j++
}
wg.Wait()
successes += batchSize - len(errCh)
close(errCh)
if len(errCh) > 0 {
errs := make([]error, 0)
for err := range errCh {
errs = append(errs, err)
}
return successes, utilerrors.NewAggregate(errs)
}
remaining -= batchSize
}
return successes, nil
}
// 该函数是一个并发执行函数的批处理工具。它通过调用提供的函数fn来并发执行批处理,并根据执行结果进行错误处理和统计。
// - initialBatchSize:初始批处理大小。
// - remaining:剩余需要处理的数量。
// - fn:一个函数,接收一个整数参数并返回一个布尔值和一个错误。
// 该函数将被并发调用以执行批处理任务。
// 函数主要逻辑如下:
// 1. 初始化成功执行数successes和内部计数器j。
// 2. 使用min函数计算当前批处理大小batchSize,并进入循环,直到batchSize为0或处理完成。
// 3. 创建一个错误通道errCh,用于收集并发执行中的错误。
// 4. 使用sync.WaitGroup来等待所有并发执行完成。
// 5. 并发调用fn函数,将j作为参数,递增j。
// 6. 等待所有并发执行完成,并统计成功执行数。
// 7. 如果存在错误,将错误收集到errs中并返回。
// 8. 更新剩余需要处理的数量。
// 9. 返回成功执行数和错误。
// 该函数通过控制批处理的大小和并发执行,实现了一个自适应的并发批处理逻辑,可以根据执行情况动态调整批处理的大小。
type replicaStatus struct {
replicas int32
readyReplicas int32
availableReplicas int32
currentReplicas int32
updatedReplicas int32
}
// 该代码定义了一个名为replicaStatus的结构体类型,
// 它有5个字段:
// - replicas表示副本的总数;
// - readyReplicas表示准备就绪的副本数量;
// - availableReplicas表示可用的副本数量;
// - currentReplicas表示当前存在的副本数量;
// - updatedReplicas表示已更新的副本数量。
func computeReplicaStatus(pods []*v1.Pod, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision) replicaStatus {
status := replicaStatus{}
for _, pod := range pods {
if isCreated(pod) {
status.replicas++
}
//该函数用于计算副本状态。它接收一个Pod列表、最小就绪秒数、当前修订版和更新修订版作为参数,并返回一个副本状态。
//函数遍历Pod列表,并对每个Pod进行如下操作:
//1. 如果Pod被创建,则将副本数量加1。
//最后,函数返回计算得到的副本状态。
// count the number of running and ready replicas
if isRunningAndReady(pod) {
status.readyReplicas++
// count the number of running and available replicas
if isRunningAndAvailable(pod, minReadySeconds) {
status.availableReplicas++
}
}
//这段Go代码中的函数用于增加就绪副本数和可用副本数的计数。
//具体来说:
//- isRunningAndReady(pod) 函数检查 Pod 是否处于运行且就绪状态,如果是,则将 status.readyReplicas 增加 1。
//- isRunningAndAvailable(pod, minReadySeconds) 函数检查 Pod 是否处于运行且可用状态,
//其中 minReadySeconds 参数表示最小就绪时间(单位:秒)。
//如果 Pod 在最小就绪时间内一直保持就绪状态,则将 status.availableReplicas 增加 1。
//这段代码的主要目的是统计 Kubernetes 中某个 Pod 的就绪副本数和可用副本数,以便于对 Pod 的状态进行管理和监控。
// count the number of current and update replicas
if isCreated(pod) && !isTerminating(pod) {
revision := getPodRevision(pod)
if revision == currentRevision.Name {
status.currentReplicas++
}
if revision == updateRevision.Name {
status.updatedReplicas++
}
}
}
return status
}
// 这段Go代码中的函数逻辑如下:
// - 首先判断Pod是否已经创建并且没有处于终止状态;
// - 如果满足条件,获取Pod的修订版本号;
// - 判断该修订版本号是否与当前修订版本名相同,若相同则当前副本数加一;
// - 判断该修订版本号是否与更新修订版本名相同,若相同则更新副本数加一;
// - 最后返回更新后的状态信息。 这段代码主要是用来统计Pod的当前副本数和更新后的副本数。
func updateStatus(status *apps.StatefulSetStatus, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision, podLists ...[]*v1.Pod) {
status.Replicas = 0
status.ReadyReplicas = 0
status.AvailableReplicas = 0
status.CurrentReplicas = 0
status.UpdatedReplicas = 0
for _, list := range podLists {
replicaStatus := computeReplicaStatus(list, minReadySeconds, currentRevision, updateRevision)
status.Replicas += replicaStatus.replicas
status.ReadyReplicas += replicaStatus.readyReplicas
status.AvailableReplicas += replicaStatus.availableReplicas
status.CurrentReplicas += replicaStatus.currentReplicas
status.UpdatedReplicas += replicaStatus.updatedReplicas
}
}
// 该函数用于更新StatefulSet的状态信息。
// 它接收一个指向StatefulSetStatus的指针、minReadySeconds参数以及currentRevision和updateRevision两个ControllerRevision指针,
// 还有可变长度的podLists参数。
// 函数首先将状态信息中的五个计数器重置为0,然后遍历podLists中的每个Pod列表,
// 通过调用computeReplicaStatus函数计算每个列表的副本状态,并将计算结果累加到状态信息中
func (ssc *defaultStatefulSetControl) processReplica(
ctx context.Context,
set *apps.StatefulSet,
currentRevision *apps.ControllerRevision,
updateRevision *apps.ControllerRevision,
currentSet *apps.StatefulSet,
updateSet *apps.StatefulSet,
monotonic bool,
replicas []*v1.Pod,
i int) (bool, error) {
logger := klog.FromContext(ctx)
//该函数是一个Go语言函数,它定义了一个用于处理StatefulSet副本的过程。
//函数接受多个参数,包括上下文、StatefulSet对象、两个ControllerRevision对象、两个StatefulSet对象、一个布尔值、一个Pod对象数组和一个整数。
//函数返回一个布尔值和一个错误对象。 Markdown格式输出:
//该函数是一个Go语言函数,定义了一个处理StatefulSet副本的过程。它接受以下参数:
//- ctx:上下文对象。
//- set:StatefulSet对象。
//- currentRevision:当前的ControllerRevision对象。
//- updateRevision:要更新的ControllerRevision对象。
//- currentSet:当前的StatefulSet对象。
//- updateSet:要更新的StatefulSet对象。
//- monotonic:一个布尔值,表示是否为单调递增。
//- replicas:一个Pod对象数组。
//- i:一个整数。
//函数返回一个布尔值和一个错误对象。
// Delete and recreate pods which finished running.
//
// Note that pods with phase Succeeded will also trigger this event. This is
// because final pod phase of evicted or otherwise forcibly stopped pods
// (e.g. terminated on node reboot) is determined by the exit code of the
// container, not by the reason for pod termination. We should restart the pod
// regardless of the exit code.
if isFailed(replicas[i]) || isSucceeded(replicas[i]) {
if isFailed(replicas[i]) {
ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod",
"StatefulSet %s/%s is recreating failed Pod %s",
set.Namespace,
set.Name,
replicas[i].Name)
//这段Go代码是一个函数的一部分,它的功能是删除已经运行完成的Pod,并重新创建它们。
//注意,这个函数还会处理那些成功结束的Pod(即状态为Succeeded的Pod),
//因为被驱逐或强制停止的Pod的最终状态是由容器的退出码决定的,而不是由Pod终止的原因决定的。
//因此,无论退出码如何,都应该重新启动Pod。
//函数中使用了isFailed和isSucceeded函数来判断Pod的状态,
//如果状态为Failed或Succeeded,则会使用ssc.recorder.Eventf函数记录一个事件,说明StatefulSet正在重新创建失败或成功的Pod。
} else {
ssc.recorder.Eventf(set, v1.EventTypeNormal, "RecreatingTerminatedPod",
"StatefulSet %s/%s is recreating terminated Pod %s",
set.Namespace,
set.Name,
replicas[i].Name)
//这段Go代码是Kubernetes中StatefulSet控制器的一部分,用于处理有状态副本集(StatefulSet)中已终止的Pod的重建事件。
//在else块中,代码通过ssc.recorder.Eventf方法记录了一个事件,
//事件类型为v1.EventTypeNormal,事件名为"RecreatingTerminatedPod",
//事件消息为"StatefulSet %s/%s is recreating terminated Pod %s",
//其中包含了StatefulSet的命名空间、名称以及需要重建的Pod的名称。
//这个函数的作用是通过事件机制通知用户,某个StatefulSet正在重建已终止的Pod。
}
if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
return true, err
}
replicaOrd := i + getStartOrdinal(set)
replicas[i] = newVersionedStatefulSetPod(
currentSet,
updateSet,
currentRevision.Name,
updateRevision.Name,
replicaOrd)
//该函数的功能是在StatefulSet中删除一个Pod,并创建一个新的Pod来替代它。
//函数首先尝试删除指定的Pod,如果删除成功,则根据当前的StatefulSet和更新后的StatefulSet信息创建一个新的Pod,并将其添加到replicas列表中。
}
//该函数用于删除并重新创建已经完成运行的Pods。
//注意,即使Pod的阶段为Succeeded,也会触发此事件。
//这是因为被驱逐或以其他方式强制停止的Pod的最终Pod阶段(例如在节点重新启动时终止)由容器的退出代码决定,而不是由Pod终止的原因决定。
//无论退出代码如何,我们都应该重新启动Pod。
//函数首先检查每个Pod是否失败或成功,如果是,则记录事件并尝试删除该Pod。
//如果删除成功,则创建一个新的Pod来替代它。
// If we find a Pod that has not been created we create the Pod
if !isCreated(replicas[i]) {
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
if isStale, err := ssc.podControl.PodClaimIsStale(set, replicas[i]); err != nil {
return true, err
} else if isStale {
// If a pod has a stale PVC, no more work can be done this round.
return true, err
}
}
if err := ssc.podControl.CreateStatefulPod(ctx, set, replicas[i]); err != nil {
return true, err
}
if monotonic {
// if the set does not allow bursting, return immediately
return true, nil
}
}
//这段Go代码是一个条件判断语句的集合,用于处理Pod的创建。
//具体来说,它首先检查一个Pod是否已经被创建,如果没有,则根据特定的条件进行进一步的处理。
//如果启用了StatefulSetAutoDeletePVC特性,并且Pod的PersistentVolumeClaim(PVC)是陈旧的,则函数会提前返回。
//如果创建Pod失败,函数也会返回。如果该集合不允许突发(monotonic)行为,函数会在创建Pod后立即返回。
// If the Pod is in pending state then trigger PVC creation to create missing PVCs
if isPending(replicas[i]) {
logger.V(4).Info(
"StatefulSet is triggering PVC creation for pending Pod",
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
if err := ssc.podControl.createMissingPersistentVolumeClaims(ctx, set, replicas[i]); err != nil {
return true, err
}
}
//该函数用于检查StatefulSet中的Pod是否处于pending状态,
//如果是,则触发PVC(Persistent Volume Claim)的创建,以创建缺失的PVC。
//具体实现中,通过调用podControl的createMissingPersistentVolumeClaims方法来创建缺失的PVC,
//如果创建失败,则返回true和错误信息。
// If we find a Pod that is currently terminating, we must wait until graceful deletion
// completes before we continue to make progress.
if isTerminating(replicas[i]) && monotonic {
logger.V(4).Info("StatefulSet is waiting for Pod to Terminate",
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
return true, nil
}
//该函数是一个条件判断语句,判断某个Pod是否正在终止状态,并且是否需要等待其优雅删除完成。如果满足条件,则返回true和nil。
// If we have a Pod that has been created but is not running and ready we can not make progress.
// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
// ordinal, are Running and Ready.
if !isRunningAndReady(replicas[i]) && monotonic {
logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready",
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
return true, nil
}
//该函数是Go语言编写的,用于判断一个Pod是否处于运行和就绪状态。
//如果Pod没有运行且未准备就绪,并且设置为单调递增模式,则函数会返回true和nil。
//这段代码主要用于StatefulSet等待Pod达到运行和就绪状态的情况。
// If we have a Pod that has been created but is not available we can not make progress.
// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
// ordinal, are Available.
if !isRunningAndAvailable(replicas[i], set.Spec.MinReadySeconds) && monotonic {
logger.V(4).Info("StatefulSet is waiting for Pod to be Available",
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
return true, nil
}
//该函数是Go语言编写的,用于判断StatefulSet中的Pod是否可用。
//如果Pod未运行或不可用,并且StatefulSet的monotonic属性为true,则函数会返回true,否则返回false。
//函数通过调用isRunningAndAvailable函数来判断Pod的状态。
// Enforce the StatefulSet invariants
retentionMatch := true
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
var err error
retentionMatch, err = ssc.podControl.ClaimsMatchRetentionPolicy(ctx, updateSet, replicas[i])
// An error is expected if the pod is not yet fully updated, and so return is treated as matching.
if err != nil {
retentionMatch = true
}
}
//这段Go代码是用于检查StatefulSet的保留策略是否匹配的函数。
//- 首先,它检查是否启用了StatefulSet的自动删除PVC的功能。
//- 如果启用了,它会调用ssc.podControl.ClaimsMatchRetentionPolicy方法来检查当前的Pod是否匹配保留策略。
//- 如果检查过程中出现错误,则将retentionMatch设置为true,表示匹配保留策略。
//- 最后,retentionMatch的值将用于决定是否保留或删除StatefulSet的PVC。
if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) && retentionMatch {
return false, nil
}
//该函数是用于判断某个条件是否满足,如果满足则返回false和nil。
//具体判断条件为:identityMatches函数返回值为true,并且storageMatches函数返回值为true,并且retentionMatch的值为true。
//其中,identityMatches函数用于判断某个set是否与replicas[i]相等;
//storageMatches函数用于判断某个set是否与replicas[i]的存储相匹配;
//retentionMatch是一个布尔值,用于判断某个条件是否满足。
//如果以上条件都满足,则函数返回false和nil。
// Make a deep copy so we don't mutate the shared cache
replica := replicas[i].DeepCopy()
if err := ssc.podControl.UpdateStatefulPod(ctx, updateSet, replica); err != nil {
return true, err
}
//这个函数的功能是通过调用DeepCopy方法创建replicas[i]的深拷贝,并将拷贝赋值给replica变量。
//然后,它调用ssc.podControl.UpdateStatefulPod方法来更新状态fulPod,如果更新失败,则返回true和错误信息。
return false, nil
}
func (ssc *defaultStatefulSetControl) processCondemned(ctx context.Context, set *apps.StatefulSet, firstUnhealthyPod *v1.Pod, monotonic bool, condemned []*v1.Pod, i int) (bool, error) {
logger := klog.FromContext(ctx)
if isTerminating(condemned[i]) {
// if we are in monotonic mode, block and wait for terminating pods to expire
if monotonic {
logger.V(4).Info("StatefulSet is waiting for Pod to Terminate prior to scale down",
"statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[i]))
return true, nil
}
return false, nil
}
//该函数是defaultStatefulSetControl类型的processCondemned方法,
//用于处理StatefulSet中的废弃Pod。函数根据是否为单调模式,决定是否等待废弃Pod终止。
//如果处于单调模式且当前Pod为终止状态,则函数会等待Pod终止。如果当前Pod不是终止状态,则函数直接返回。
// if we are in monotonic mode and the condemned target is not the first unhealthy Pod block
if !isRunningAndReady(condemned[i]) && monotonic && condemned[i] != firstUnhealthyPod {
logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready prior to scale down",
"statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod))
return true, nil
}
//该函数片段是一个条件判断语句,
//主要功能是在特定条件下,判断是否需要等待某个Pod变为Running和Ready状态后再进行缩容操作。
//具体来说,它包含了以下几个要点:
//1. 判断当前Pod是否处于运行且就绪状态:通过调用isRunningAndReady(condemned[i])函数来判断当前Pod是否满足运行且就绪的条件。
//2. 判断是否处于单调模式:通过检查monotonic变量的值来判断当前是否处于单调模式。
//3. 判断当前Pod是否为第一个不健康的Pod:通过比较condemned[i]和firstUnhealthyPod的值来判断当前Pod是否为第一个不健康的Pod。
//如果满足上述条件,即当前Pod不运行且不就绪,当前处于单调模式,且当前Pod不是第一个不健康的Pod,则该函数会返回true和nil,
//表示需要等待Pod变为Running和Ready状态后再进行缩容操作。
// if we are in monotonic mode and the condemned target is not the first unhealthy Pod, block.
if !isRunningAndAvailable(condemned[i], set.Spec.MinReadySeconds) && monotonic && condemned[i] != firstUnhealthyPod {
logger.V(4).Info("StatefulSet is waiting for Pod to be Available prior to scale down",
"statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod))
return true, nil
}
//该函数是一段Go语言代码,它是一个条件判断语句。
//该函数主要功能是在特定条件下阻止某个操作的执行。
//具体来说,函数首先判断某个Pod(被谴责的目标)是否处于运行和可用状态,然后判断是否处于单调模式,并且被谴责的目标不是第一个不健康的Pod。
//如果满足这些条件,函数会返回true,表示需要等待Pod变为可用状态后再进行缩容操作。否则,函数返回nil,表示可以进行缩容操作。
logger.V(2).Info("Pod of StatefulSet is terminating for scale down",
"statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[i]))
return true, ssc.podControl.DeleteStatefulPod(set, condemned[i])
}
// 该函数是一个日志记录和操作Pod的函数。
// 它首先使用logger.V(2).Info记录一条日志信息,表示某个StatefulSet的Pod正在因为缩容而终止。
// 日志中包含了StatefulSet和Pod的相关信息。
// 然后,函数通过调用ssc.podControl.DeleteStatefulPod方法来删除指定的Pod,并返回true表示删除成功。
func runForAll(pods []*v1.Pod, fn func(i int) (bool, error), monotonic bool) (bool, error) {
if monotonic {
for i := range pods {
if shouldExit, err := fn(i); shouldExit || err != nil {
return true, err
}
}
} else {
if _, err := slowStartBatch(1, len(pods), fn); err != nil {
return true, err
}
}
return false, nil
}
//该函数runForAll接受三个参数:
//一个pods的切片,一个函数fn,和一个布尔值monotonic。
//根据monotonic的值,函数会以不同的方式调用fn函数。
//如果monotonic为true,则会顺序遍历pods切片,并依次调用fn函数,传入当前索引i作为参数。
//如果fn函数返回true或出现错误,则会立即返回true和错误。
//如果monotonic为false,则会调用slowStartBatch函数,以一定的延迟并发地调用fn函数。
//最终,如果所有fn函数调用都成功完成,则返回false和nil。
// updateStatefulSet performs the update function for a StatefulSet. This method creates, updates, and deletes Pods in
// the set in order to conform the system to the target state for the set. The target state always contains
// set.Spec.Replicas Pods with a Ready Condition. If the UpdateStrategy.Type for the set is
// RollingUpdateStatefulSetStrategyType then all Pods in the set must be at set.Status.CurrentRevision.
// If the UpdateStrategy.Type for the set is OnDeleteStatefulSetStrategyType, the target state implies nothing about
// the revisions of Pods in the set. If the UpdateStrategy.Type for the set is PartitionStatefulSetStrategyType, then
// all Pods with ordinal less than UpdateStrategy.Partition.Ordinal must be at Status.CurrentRevision and all other
// Pods must be at Status.UpdateRevision. If the returned error is nil, the returned StatefulSetStatus is valid and the
// update must be recorded. If the error is not nil, the method should be retried until successful.
func (ssc *defaultStatefulSetControl) updateStatefulSet(
ctx context.Context,
set *apps.StatefulSet,
currentRevision *apps.ControllerRevision,
updateRevision *apps.ControllerRevision,
collisionCount int32,
pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
logger := klog.FromContext(ctx)
//该函数是用于更新StatefulSet的状态的。
//它会根据目标状态来创建、更新和删除Pod,以使系统符合StatefulSet的目标状态。
//目标状态始终包含具有Ready Condition的set.Spec.Replicas Pods。
//如果StatefulSet的UpdateStrategy.Type为RollingUpdateStatefulSetStrategyType,
//则集合中的所有Pod必须处于set.Status.CurrentRevision。
//如果UpdateStrategy.Type为OnDeleteStatefulSetStrategyType,
//则目标状态与集合中Pod的修订版本无关。
//如果UpdateStrategy.Type为PartitionStatefulSetStrategyType,
//则所有序号小于UpdateStrategy.Partition.Ordinal的Pod必须处于Status.CurrentRevision,
//而所有其他Pod必须处于Status.UpdateRevision。
//如果返回的错误为nil,则返回的StatefulSetStatus是有效的,并且必须记录更新。
//如果错误不为nil,则应重试该方法,直到成功。
// get the current and update revisions of the set.
currentSet, err := ApplyRevision(set, currentRevision)
if err != nil {
return nil, err
}
updateSet, err := ApplyRevision(set, updateRevision)
if err != nil {
return nil, err
}
//这段Go代码中的函数功能是:根据给定的集合和两个修订版本号,分别获取当前修订版本对应的集合内容和更新修订版本对应的集合内容。
//具体描述如下:
//1. 首先,函数使用ApplyRevision函数和给定的集合以及当前修订版本号currentRevision,获取当前修订版本对应的集合内容,并将结果赋值给currentSet变量。
//如果出现错误,则返回nil和错误信息。
//2. 然后,函数使用ApplyRevision函数和给定的集合以及更新修订版本号updateRevision,获取更新修订版本对应的集合内容,并将结果赋值给updateSet变量。
//如果出现错误,则返回nil和错误信息。
//需要注意的是,该函数的返回值类型为(interface{}, error),
//其中interface{}表示返回的内容可以是任意类型,error表示返回的错误信息。
// set the generation, and revisions in the returned status
status := apps.StatefulSetStatus{}
status.ObservedGeneration = set.Generation
status.CurrentRevision = currentRevision.Name
status.UpdateRevision = updateRevision.Name
status.CollisionCount = new(int32)
*status.CollisionCount = collisionCount
//该Go函数主要实现了设置StatefulSetStatus结构体中ObservedGeneration、CurrentRevision、UpdateRevision和CollisionCount字段的值。
//- 首先,创建了一个空的StatefulSetStatus结构体变量status。
//- 然后,将set.Generation的值赋给status.ObservedGeneration字段。
//- 接着,将currentRevision.Name的值赋给status.CurrentRevision字段。
//- 再将updateRevision.Name的值赋给status.UpdateRevision字段。
//- 最后,创建一个新的int32类型指针变量status.CollisionCount并将其值设为collisionCount。
//总结来说,该函数的作用是通过给StatefulSetStatus结构体的字段赋值,来初始化或更新一个StatefulSet的状态信息。
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, pods)
//该函数用于更新状态。根据参数设置的最小就绪秒数、当前修订版本、更新修订版本和Pods,来更新状态对象status。
replicaCount := int(*set.Spec.Replicas)
// slice that will contain all Pods such that getStartOrdinal(set) <= getOrdinal(pod) <= getEndOrdinal(set)
replicas := make([]*v1.Pod, replicaCount)
// slice that will contain all Pods such that getOrdinal(pod) < getStartOrdinal(set) OR getOrdinal(pod) > getEndOrdinal(set)
condemned := make([]*v1.Pod, 0, len(pods))
unhealthy := 0
var firstUnhealthyPod *v1.Pod
// First we partition pods into two lists valid replicas and condemned Pods
for _, pod := range pods {
if podInOrdinalRange(pod, set) {
// if the ordinal of the pod is within the range of the current number of replicas,
// insert it at the indirection of its ordinal
replicas[getOrdinal(pod)-getStartOrdinal(set)] = pod
} else if getOrdinal(pod) >= 0 {
// if the ordinal is valid, but not within the range add it to the condemned list
condemned = append(condemned, pod)
}
// If the ordinal could not be parsed (ord < 0), ignore the Pod.
}
//该函数根据给定的Pod集合和Deployment规格,
//将Pod分为两类:有效副本和废弃Pod。
//有效副本是指其序号在当前副本数量范围内且序号有效的Pod,
//而废弃Pod是指序号不在当前副本数量范围内但序号有效的Pod。
//函数通过遍历Pod集合,根据Pod的序号与当前副本数量范围的比较,将Pod分别加入到有效副本和废弃Pod的切片中。
// for any empty indices in the sequence [0,set.Spec.Replicas) create a new Pod at the correct revision
for ord := getStartOrdinal(set); ord <= getEndOrdinal(set); ord++ {
replicaIdx := ord - getStartOrdinal(set)
if replicas[replicaIdx] == nil {
replicas[replicaIdx] = newVersionedStatefulSetPod(
currentSet,
updateSet,
currentRevision.Name,
updateRevision.Name, ord)
}
}
//该Go函数的功能是在有空缺索引的位置上创建新的Pod。
//它根据set的规格,从起始序号到结束序号遍历,如果在replicas数组中对应索引位置上的Pod为空,则创建一个新的有正确修订版本的Pod,并将其赋值给该索引位置。
// sort the condemned Pods by their ordinals
sort.Sort(descendingOrdinal(condemned))
//该函数是一个排序函数,用于将一个名为condemned的Pod切片按照其序号降序排序。
//其中,descendingOrdinal是一个自定义的排序类型,实现了sort.Interface接口的Sort方法,通过比较Pod的序号来实现降序排序。
// find the first unhealthy Pod
for i := range replicas {
if !isHealthy(replicas[i]) {
unhealthy++
if firstUnhealthyPod == nil {
firstUnhealthyPod = replicas[i]
}
}
}
//该函数用于遍历一组Pod副本,查找第一个不健康的状态。
//具体操作为循环遍历replicas切片,通过调用isHealthy函数判断每个副本的健康状态。
//如果不健康,则将unhealthy计数加1,并且如果firstUnhealthyPod为空,则将其赋值为当前副本。
// or the first unhealthy condemned Pod (condemned are sorted in descending order for ease of use)
for i := len(condemned) - 1; i >= 0; i-- {
if !isHealthy(condemned[i]) {
unhealthy++
if firstUnhealthyPod == nil {
firstUnhealthyPod = condemned[i]
}
}
}
//该Go代码片段是一个for循环,遍历一个名为condemned的切片。
//循环从condemned切片的最后一个元素开始,倒序遍历。
//在每次迭代中,它调用isHealthy函数来检查当前Pod的健康状况。
//如果Pod不健康,则将unhealthy计数器加1,并检查是否是第一个不健康的Pod。
//如果是,则将其赋值给firstUnhealthyPod变量。
//该循环的主要目的是找到第一个不健康的Pod,并统计不健康Pod的数量。
if unhealthy > 0 {
logger.V(4).Info("StatefulSet has unhealthy Pods", "statefulSet", klog.KObj(set), "unhealthyReplicas", unhealthy, "pod", klog.KObj(firstUnhealthyPod))
}
//这段Go代码是一个条件判断语句,判断变量unhealthy是否大于0,如果大于0,则调用logger.V(4).Info方法输出一条日志信息。
//日志信息的内容包括:"StatefulSet has unhealthy Pods","statefulSet","unhealthyReplicas","pod"。
//其中,"statefulSet"和"pod"是通过klog.KObj方法获取的对象的字符串表示形式。
// If the StatefulSet is being deleted, don't do anything other than updating
// status.
if set.DeletionTimestamp != nil {
return &status, nil
}
//该函数用于判断一个StatefulSet对象是否正在被删除。如果是,则只更新状态信息,不做其他操作。
//函数通过检查StatefulSet对象的DeletionTimestamp字段是否为nil来判断是否正在被删除。
//如果DeletionTimestamp不为nil,则说明该对象正在被删除,函数会返回当前状态信息和nil。
//否则,函数会继续执行其他操作。
monotonic := !allowsBurst(set)
// First, process each living replica. Exit if we run into an error or something blocking in monotonic mode.
processReplicaFn := func(i int) (bool, error) {
return ssc.processReplica(ctx, set, currentRevision, updateRevision, currentSet, updateSet, monotonic, replicas, i)
}
//这段Go代码定义了一个函数和一个内部函数。
//函数monotonic := !allowsBurst(set)根据set参数的值计算一个布尔值monotonic。
//allowsBurst是一个外部函数,其作用是判断是否允许突发(burst),
//这里通过取反操作得到monotonic,即不允许突发。
//内部函数processReplicaFn := func(i int) (bool, error) {...}定义了一个闭包函数,
//用于处理每个存活的副本。该函数接受一个整数参数i,返回一个布尔值和一个错误。
//在函数体中,调用了ssc.processReplica函数来处理副本,
//该函数接受多个参数,包括上下文、集合、当前修订版本、更新修订版本、当前集合、更新集合、是否单调以及副本列表和副本索引。
//该内部函数的主要作用是遍历副本并逐个处理它们,如果在单调模式下遇到错误或阻塞,则退出处理。
if shouldExit, err := runForAll(replicas, processReplicaFn, monotonic); shouldExit || err != nil {
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
return &status, err
}
//该函数中,runForAll()函数执行processReplicaFn函数操作replicas,并返回一个shouldExit标志和一个错误err。
//如果shouldExit为真或err不为nil,则更新状态并返回。
// Fix pod claims for condemned pods, if necessary.
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
fixPodClaim := func(i int) (bool, error) {
if matchPolicy, err := ssc.podControl.ClaimsMatchRetentionPolicy(ctx, updateSet, condemned[i]); err != nil {
return true, err
} else if !matchPolicy {
if err := ssc.podControl.UpdatePodClaimForRetentionPolicy(ctx, updateSet, condemned[i]); err != nil {
return true, err
}
}
return false, nil
}
if shouldExit, err := runForAll(condemned, fixPodClaim, monotonic); shouldExit || err != nil {
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
return &status, err
}
}
//这段Go代码是用于修复有状态副本集(StatefulSet)中被废弃的Pod的声明(Claim)的。
//如果启用了features.StatefulSetAutoDeletePVC特性,并且ssc.podControl.ClaimsMatchRetentionPolicy函数返回错误或者返回的匹配策略为false,
//则会调用ssc.podControl.UpdatePodClaimForRetentionPolicy函数来更新Pod的声明。
//runForAll函数会遍历所有被废弃的Pod,并调用fixPodClaim函数进行修复。
//如果runForAll函数返回需要退出或者发生了错误,则会更新副本集的状态并返回。
// At this point, in monotonic mode all of the current Replicas are Running, Ready and Available,
// and we can consider termination.
// We will wait for all predecessors to be Running and Ready prior to attempting a deletion.
// We will terminate Pods in a monotonically decreasing order.
// Note that we do not resurrect Pods in this interval. Also note that scaling will take precedence over
// updates.
processCondemnedFn := func(i int) (bool, error) {
return ssc.processCondemned(ctx, set, firstUnhealthyPod, monotonic, condemned, i)
}
if shouldExit, err := runForAll(condemned, processCondemnedFn, monotonic); shouldExit || err != nil {
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
return &status, err
}
//这段Go代码中的函数是一个处理被宣判终止的Pod的函数。
//在函数中,它会按照单调递减的顺序终止Pods,并且在终止Pod之前会等待所有前置Pod都处于Running和Ready状态。
//函数会调用ssc.processCondemned来处理每个被宣判终止的Pod。
//如果函数执行完毕或者出现错误,它会更新状态并返回该状态和错误信息。
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
// for the OnDelete strategy we short circuit. Pods will be updated when they are manually deleted.
if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType {
return &status, nil
}
if utilfeature.DefaultFeatureGate.Enabled(features.MaxUnavailableStatefulSet) {
return updateStatefulSetAfterInvariantEstablished(ctx,
ssc,
set,
replicas,
updateRevision,
status,
)
}
//该函数用于更新状态fulset的状态。
//根据输入参数的不同,函数会有不同的行为。
//如果状态fulset的更新策略为OnDelete,则函数会直接返回当前状态。
//如果启用了MaxUnavailableStatefulSet特性,则会调用updateStatefulSetAfterInvariantEstablished函数进行进一步的状态更新。
// we compute the minimum ordinal of the target sequence for a destructive update based on the strategy.
updateMin := 0
if set.Spec.UpdateStrategy.RollingUpdate != nil {
updateMin = int(*set.Spec.UpdateStrategy.RollingUpdate.Partition)
}
// we terminate the Pod with the largest ordinal that does not match the update revision.
for target := len(replicas) - 1; target >= updateMin; target-- {
// delete the Pod if it is not already terminating and does not match the update revision.
if getPodRevision(replicas[target]) != updateRevision.Name && !isTerminating(replicas[target]) {
logger.V(2).Info("Pod of StatefulSet is terminating for update",
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[target]))
if err := ssc.podControl.DeleteStatefulPod(set, replicas[target]); err != nil {
if !errors.IsNotFound(err) {
return &status, err
}
}
status.CurrentReplicas--
return &status, err
}
//该Go函数用于根据策略计算目标序列的最小序数值,用于破坏性更新。
//接着,函数会终止与更新修订版本不匹配的最大序号的Pod。
//具体流程如下:
//1. 初始化updateMin为0。
//2. 如果set.Spec.UpdateStrategy.RollingUpdate不为nil,则将updateMin设置为*set.Spec.UpdateStrategy.RollingUpdate.Partition的整数值。
//3. 从replicas的最后一个元素开始,迭代到updateMin(包括updateMin)。
//4. 如果当前Pod的修订版本与更新修订版本不匹配且未终止,则记录日志,并尝试删除该Pod。若删除成功,将status.CurrentReplicas减1,
//并返回status和错误信息err。
//5. 若删除Pod时出现错误且错误类型不是NotFound,则返回status和该错误。
//注意:函数中的replicas是Pod的副本列表,updateRevision.Name是更新的修订版本名称,ssc.podControl.DeleteStatefulPod是删除Pod的方法,logger.V(2).Info是记录日志的方法。
// wait for unhealthy Pods on update
if !isHealthy(replicas[target]) {
logger.V(4).Info("StatefulSet is waiting for Pod to update",
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[target]))
return &status, nil
}
}
return &status, nil
}
//这段Go代码是一个条件判断语句,其主要功能是检查StatefulSet中的Pod是否健康。
//具体来说,函数首先检查replicas[target]是否健康,如果不健康,则记录日志并返回当前状态status。
//如果replicas[target]健康,则直接返回当前状态status。
func updateStatefulSetAfterInvariantEstablished(
ctx context.Context,
ssc *defaultStatefulSetControl,
set *apps.StatefulSet,
replicas []*v1.Pod,
updateRevision *apps.ControllerRevision,
status apps.StatefulSetStatus,
) (*apps.StatefulSetStatus, error) {
//该函数是在建立不变性后更新StatefulSet的状态。
//它接收上下文、StatefulSet控制对象、StatefulSet实例、Pod实例数组、控制器修订版本和StatefulSet状态作为参数,
//并返回更新后的StatefulSet状态和错误信息。
//函数的主要逻辑是更新StatefulSet的状态,包括当前修订版本、复制集和条件等,并返回更新后的状态。
logger := klog.FromContext(ctx)
replicaCount := int(*set.Spec.Replicas)
//该代码片段是用Go语言编写的,其中定义了两个变量logger和replicaCount。
//- logger := klog.FromContext(ctx):该行代码从ctx上下文中获取了一个名为logger的变量。
//该变量很可能是用于日志记录的一种日志器对象。
//klog是一个用于日志记录的Go库,FromContext函数可能是用于从上下文中提取特定的日志器对象。
//- replicaCount := int(*set.Spec.Replicas):该行代码定义了一个名为replicaCount的整型变量,
//并将其初始化为set.Spec.Replicas指针指向的值的整型表示。根据变量的命名,该变量可能是用于存储某个集合的副本数量。
//set的类型和Spec的定义没有给出,因此无法确定set.Spec.Replicas的具体类型和含义。
//但是根据常规的命名习惯,Spec通常表示某个对象的规格说明,而Replicas则通常表示副本数量。
//因此,可以推测set.Spec.Replicas可能是一个指针,指向一个表示副本数量的整数值。通过将其转换为int类型,可以将该值用于后续的逻辑处理或计算。
// we compute the minimum ordinal of the target sequence for a destructive update based on the strategy.
updateMin := 0
maxUnavailable := 1
if set.Spec.UpdateStrategy.RollingUpdate != nil {
updateMin = int(*set.Spec.UpdateStrategy.RollingUpdate.Partition)
// if the feature was enabled and then later disabled, MaxUnavailable may have a value
// more than 1. Ignore the passed in value and Use maxUnavailable as 1 to enforce
// expected behavior when feature gate is not enabled.
var err error
maxUnavailable, err = getStatefulSetMaxUnavailable(set.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, replicaCount)
if err != nil {
return &status, err
}
}
//这段Go代码中的函数主要作用是根据给定的策略计算目标序列的最小序数值,用于破坏性更新。
//函数首先将更新的最小序数值初始化为0,将最大不可用数值初始化为1。
//然后,如果设置了更新策略的RollingUpdate属性,将更新的最小序数值设置为RollingUpdate中的Partition属性的值。
//接着,根据RollingUpdate中的MaxUnavailable属性和副本数量,获取最大不可用数值,并返回错误信息。
// Collect all targets in the range between getStartOrdinal(set) and getEndOrdinal(set). Count any targets in that range
// that are unhealthy i.e. terminated or not running and ready as unavailable). Select the
// (MaxUnavailable - Unavailable) Pods, in order with respect to their ordinal for termination. Delete
// those pods and count the successful deletions. Update the status with the correct number of deletions.
unavailablePods := 0
for target := len(replicas) - 1; target >= 0; target-- {
if !isHealthy(replicas[target]) {
unavailablePods++
}
}
//该函数用于收集在指定范围内的所有目标,并计算其中不健康的目标数量。
//然后,它选择最多不可用的Pods,并按照它们的序号进行终止。
//函数会删除这些Pods,并统计成功删除的数量,最后更新状态信息。
//函数使用一个循环遍历所有的副本,并通过isHealthy函数判断每个副本的健康状况,如果不健康则增加不可用Pods的计数。
if unavailablePods >= maxUnavailable {
logger.V(2).Info("StatefulSet found unavailablePods, more than or equal to allowed maxUnavailable",
"statefulSet", klog.KObj(set),
"unavailablePods", unavailablePods,
"maxUnavailable", maxUnavailable)
return &status, nil
}
//该函数是一个条件判断语句,判断unavailablePods是否大于等于maxUnavailable。如果是,则通过logger记录信息并返回status和nil。
// Now we need to delete MaxUnavailable- unavailablePods
// start deleting one by one starting from the highest ordinal first
podsToDelete := maxUnavailable - unavailablePods
//这个Go函数的功能是计算需要删除的Pod数量。它根据MaxUnavailable和unavailablePods的差值,确定需要删除的Pod数量,并将结果存储在podsToDelete变量中。
deletedPods := 0
for target := len(replicas) - 1; target >= updateMin && deletedPods < podsToDelete; target-- {
// delete the Pod if it is healthy and the revision doesnt match the target
if getPodRevision(replicas[target]) != updateRevision.Name && !isTerminating(replicas[target]) {
// delete the Pod if it is healthy and the revision doesnt match the target
logger.V(2).Info("StatefulSet terminating Pod for update",
"statefulSet", klog.KObj(set),
"pod", klog.KObj(replicas[target]))
if err := ssc.podControl.DeleteStatefulPod(set, replicas[target]); err != nil {
if !errors.IsNotFound(err) {
return &status, err
}
}
deletedPods++
status.CurrentReplicas--
}
}
return &status, nil
}
//该函数用于删除StatefulSet中的一部分Pod。
//它根据指定的条件遍历replicas列表,并删除满足条件的Pod。
//具体而言,它从replicas列表的末尾开始遍历,
//如果Pod的修订版本号与目标修订版本号不匹配且Pod未终止,则删除该Pod。
//删除Pod时,如果出现错误(除找不到Pod的错误外),则返回错误信息。函数返回更新后的状态信息和nil。
// updateStatefulSetStatus updates set's Status to be equal to status. If status indicates a complete update, it is
// mutated to indicate completion. If status is semantically equivalent to set's Status no update is performed. If the
// returned error is nil, the update is successful.
func (ssc *defaultStatefulSetControl) updateStatefulSetStatus(
ctx context.Context,
set *apps.StatefulSet,
status *apps.StatefulSetStatus) error {
// complete any in progress rolling update if necessary
completeRollingUpdate(set, status)
// if the status is not inconsistent do not perform an update
if !inconsistentStatus(set, status) {
return nil
}
// copy set and update its status
set = set.DeepCopy()
if err := ssc.statusUpdater.UpdateStatefulSetStatus(ctx, set, status); err != nil {
return err
}
return nil
}
var _ StatefulSetControlInterface = &defaultStatefulSetControl{}
//该函数用于更新StatefulSet的状态。
//它首先完成任何正在进行的滚动更新,然后检查状态是否一致,如果不一致则进行更新。
//更新时会创建StatefulSet的深拷贝,并调用statusUpdater的UpdateStatefulSetStatus方法进行更新。
//如果更新成功,则返回nil;否则返回错误。