Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
package statefulset
import (
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilfeature "k8s.io/apiserver/pkg/util/feature"
// Realistic value for maximum in-flight requests when processing in parallel mode.
const MaxBatchSize = 500
// StatefulSetControl implements the control logic for updating StatefulSets and their children Pods. It is implemented
// as an interface to allow for extensions that provide different semantics. Currently, there is only one implementation.
type StatefulSetControlInterface interface {
// UpdateStatefulSet implements the control logic for Pod creation, update, and deletion, and
// persistent volume creation, update, and deletion.
// If an implementation returns a non-nil error, the invocation will be retried using a rate-limited strategy.
// Implementors should sink any errors that they do not wish to trigger a retry, and they may feel free to
// exit exceptionally at any point provided they wish the update to be re-run at a later point in time.
UpdateStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error)
// ListRevisions returns a array of the ControllerRevisions that represent the revisions of set. If the returned
// error is nil, the returns slice of ControllerRevisions is valid.
ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error)
// AdoptOrphanRevisions adopts any orphaned ControllerRevisions that match set's Selector. If all adoptions are
// successful the returned error is nil.
AdoptOrphanRevisions(set *apps.StatefulSet, revisions []*apps.ControllerRevision) error
//1. UpdateStatefulSet:用于更新StatefulSet及其子Pods的控制逻辑。实现此方法时,如果返回非空错误,调用将使用速率限制策略进行重试。
//2. ListRevisions:返回代表set修订版本的ControllerRevision数组。如果返回的错误为nil,则返回的ControllerRevision切片是有效的。
//3. AdoptOrphanRevisions:采用与set的Selector匹配的任何孤儿ControllerRevision。如果所有采用都成功,则返回的错误为nil。
// NewDefaultStatefulSetControl returns a new instance of the default implementation StatefulSetControlInterface that
// implements the documented semantics for StatefulSets. podControl is the PodControlInterface used to create, update,
// and delete Pods and to create PersistentVolumeClaims. statusUpdater is the StatefulSetStatusUpdaterInterface used
// to update the status of StatefulSets. You should use an instance returned from NewRealStatefulPodControl() for any
// scenario other than testing.
func NewDefaultStatefulSetControl(
podControl *StatefulPodControl,
statusUpdater StatefulSetStatusUpdaterInterface,
controllerHistory history.Interface,
recorder record.EventRecorder) StatefulSetControlInterface {
return &defaultStatefulSetControl{podControl, statusUpdater, controllerHistory, recorder}
// 该函数返回一个实现了StatefulSetControlInterface接口的defaultStatefulSetControl实例。
// 参数包括podControl用于创建、更新和删除Pod以及创建PersistentVolumeClaims;
// statusUpdater用于更新StatefulSets的状态;
// controllerHistory用于管理StatefulSet的控制器历史记录;
// recorder用于记录事件。
// 除了测试场景外,应使用NewRealStatefulPodControl()返回的实例作为podControl参数。
type defaultStatefulSetControl struct {
podControl *StatefulPodControl
statusUpdater StatefulSetStatusUpdaterInterface
controllerHistory history.Interface
recorder record.EventRecorder
//1. podControl:类型为*StatefulPodControl,用于控制Pod的操作。
//2. statusUpdater:类型为StatefulSetStatusUpdaterInterface,用于更新StatefulSet的状态。
//3. controllerHistory:类型为history.Interface,用于维护控制器的历史记录。
//4. recorder:类型为record.EventRecorder,用于记录事件。 这个结构体主要用于管理和控制StatefulSet的生命周期。
// UpdateStatefulSet executes the core logic loop for a stateful set, applying the predictable and
// consistent monotonic update strategy by default - scale up proceeds in ordinal order, no new pod
// is created while any pod is unhealthy, and pods are terminated in descending order. The burst
// strategy allows these constraints to be relaxed - pods will be created and deleted eagerly and
// in no particular order. Clients using the burst strategy should be careful to ensure they
// understand the consistency implications of having unpredictable numbers of pods available.
func (ssc *defaultStatefulSetControl) UpdateStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
set = set.DeepCopy() // set is modified when a new revision is created in performUpdate. Make a copy now to avoid mutation errors.
// list all revisions and sort them
revisions, err := ssc.ListRevisions(set)
if err != nil {
return nil, err
currentRevision, updateRevision, status, err := ssc.performUpdate(ctx, set, pods, revisions)
if err != nil {
errs := []error{err}
if agg, ok := err.(utilerrors.Aggregate); ok {
errs = agg.Errors()
return nil, utilerrors.NewAggregate(append(errs, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)))
// maintain the set's revision history limit
return status, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)
func (ssc *defaultStatefulSetControl) performUpdate(
ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, *apps.StatefulSetStatus, error) {
var currentStatus *apps.StatefulSetStatus
logger := klog.FromContext(ctx)
//一个指向StatefulSetStatus类型的指针,以及一个错误类型。 在函数体内部,定义了一个名为currentStatus的变量,
// get the current, and update revisions
currentRevision, updateRevision, collisionCount, err := ssc.getStatefulSetRevisions(set, revisions)
if err != nil {
return currentRevision, updateRevision, currentStatus, err
// perform the main update function and get the status
currentStatus, err = ssc.updateStatefulSet(ctx, set, currentRevision, updateRevision, collisionCount, pods)
if err != nil && currentStatus == nil {
return currentRevision, updateRevision, nil, err
// make sure to update the latest status even if there is an error with non-nil currentStatus
statusErr := ssc.updateStatefulSetStatus(ctx, set, currentStatus)
if statusErr == nil {
logger.V(4).Info("Updated status", "statefulSet", klog.KObj(set),
"replicas", currentStatus.Replicas,
"readyReplicas", currentStatus.ReadyReplicas,
"currentReplicas", currentStatus.CurrentReplicas,
"updatedReplicas", currentStatus.UpdatedReplicas)
switch {
case err != nil && statusErr != nil:
logger.Error(statusErr, "Could not update status", "statefulSet", klog.KObj(set))
return currentRevision, updateRevision, currentStatus, err
case err != nil:
return currentRevision, updateRevision, currentStatus, err
case statusErr != nil:
return currentRevision, updateRevision, currentStatus, statusErr
logger.V(4).Info("StatefulSet revisions", "statefulSet", klog.KObj(set),
"currentRevision", currentStatus.CurrentRevision,
"updateRevision", currentStatus.UpdateRevision)
return currentRevision, updateRevision, currentStatus, nil
//其中,参数StatefulSet revisions是日志的主题,statefulSet、currentRevision、updateRevision是日志的附加信息。
func (ssc *defaultStatefulSetControl) ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error) {
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
return nil, err
return ssc.controllerHistory.ListControllerRevisions(set, selector)
// 该函数是defaultStatefulSetControl类型的成员方法,用于列出给定StatefulSet的所有ControllerRevision。
// 1. 首先,函数将StatefulSet的selector转换为LabelSelector类型。
// 2. 然后,调用controllerHistory的ListControllerRevisions方法,传入StatefulSet和selector,返回所有的ControllerRevision。
// 3. 如果转换selector时出现错误,函数将返回nil和错误信息。 返回值为 []*apps.ControllerRevision,即ControllerRevision的切片。
func (ssc *defaultStatefulSetControl) AdoptOrphanRevisions(
set *apps.StatefulSet,
revisions []*apps.ControllerRevision) error {
for i := range revisions {
adopted, err := ssc.controllerHistory.AdoptControllerRevision(set, controllerKind, revisions[i])
if err != nil {
return err
revisions[i] = adopted
return nil
// truncateHistory truncates any non-live ControllerRevisions in revisions from set's history. The UpdateRevision and
// CurrentRevision in set's Status are considered to be live. Any revisions associated with the Pods in pods are also
// considered to be live. Non-live revisions are deleted, starting with the revision with the lowest Revision, until
// only RevisionHistoryLimit revisions remain. If the returned error is nil the operation was successful. This method
// expects that revisions is sorted when supplied.
func (ssc *defaultStatefulSetControl) truncateHistory(
set *apps.StatefulSet,
pods []*v1.Pod,
revisions []*apps.ControllerRevision,
current *apps.ControllerRevision,
update *apps.ControllerRevision) error {
history := make([]*apps.ControllerRevision, 0, len(revisions))
// mark all live revisions
live := map[string]bool{}
if current != nil {
live[current.Name] = true
if update != nil {
live[update.Name] = true
for i := range pods {
live[getPodRevision(pods[i])] = true
// collect live revisions and historic revisions
for i := range revisions {
if !live[revisions[i].Name] {
history = append(history, revisions[i])
//1. 将当前非空对象的名称添加到live映射中,并将其值设置为true。
//2. 如果update非空,将update对象的名称添加到live映射中,并将其值设置为true。
//3. 遍历pods切片,将每个Pod的修订版本名称添加到live映射中,并将其值设置为true。
//4. 遍历revisions切片,将不在live映射中的修订版本对象添加到history切片中。
historyLen := len(history)
historyLimit := int(*set.Spec.RevisionHistoryLimit)
if historyLen <= historyLimit {
return nil
// delete any non-live history to maintain the revision limit.
history = history[:(historyLen - historyLimit)]
for i := 0; i < len(history); i++ {
if err := ssc.controllerHistory.DeleteControllerRevision(history[i]); err != nil {
return err
return nil
// getStatefulSetRevisions returns the current and update ControllerRevisions for set. It also
// returns a collision count that records the number of name collisions set saw when creating
// new ControllerRevisions. This count is incremented on every name collision and is used in
// building the ControllerRevision names for name collision avoidance. This method may create
// a new revision, or modify the Revision of an existing revision if an update to set is detected.
// This method expects that revisions is sorted when supplied.
func (ssc *defaultStatefulSetControl) getStatefulSetRevisions(
set *apps.StatefulSet,
revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, int32, error) {
var currentRevision, updateRevision *apps.ControllerRevision
revisionCount := len(revisions)
// Use a local copy of set.Status.CollisionCount to avoid modifying set.Status directly.
// This copy is returned so the value gets carried over to set.Status in updateStatefulSet.
var collisionCount int32
if set.Status.CollisionCount != nil {
collisionCount = *set.Status.CollisionCount
//详细解释如下: 1. var collisionCount int32:定义了一个名为collisionCount的整型变量,用于存储set.Status.CollisionCount的值。
//2. if set.Status.CollisionCount != nil:判断set.Status.CollisionCount是否非空,即是否已经初始化。
//3. collisionCount = *set.Status.CollisionCount:如果set.Status.CollisionCount非空,将其值赋给collisionCount变量。
// create a new revision from the current set
updateRevision, err := newRevision(set, nextRevision(revisions), &collisionCount)
if err != nil {
return nil, nil, collisionCount, err
//它首先调用nextRevision(revisions)来获取下一个修订版本号,然后调用newRevision(set, revision, &collisionCount)来创建新的修订版本。
// find any equivalent revisions
equalRevisions := history.FindEqualRevisions(revisions, updateRevision)
equalCount := len(equalRevisions)
if equalCount > 0 && history.EqualRevision(revisions[revisionCount-1], equalRevisions[equalCount-1]) {
// if the equivalent revision is immediately prior the update revision has not changed
updateRevision = revisions[revisionCount-1]
} else if equalCount > 0 {
// if the equivalent revision is not immediately prior we will roll back by incrementing the
// Revision of the equivalent revision
updateRevision, err = ssc.controllerHistory.UpdateControllerRevision(
if err != nil {
return nil, nil, collisionCount, err
} else {
//if there is no equivalent revision we create a new one
updateRevision, err = ssc.controllerHistory.CreateControllerRevision(set, updateRevision, &collisionCount)
if err != nil {
return nil, nil, collisionCount, err
//- 首先,它检查是否存在等效的修订版本(equalCount大于0)并且该修订版本是否立即位于更新修订版本之前。
//- 如果存在等效的修订版本但不立即位于更新修订版本之前,则通过调用ssc.controllerHistory.UpdateControllerRevision方法来回滚到该等效修订版本,
//- 如果不存在等效的修订版本,则通过调用ssc.controllerHistory.CreateControllerRevision方法创建一个新的控制器修订版本。
//如果创建过程中出现错误,则返回错误信息。 总之,这段代码根据条件选择更新或创建控制器修订版本,并可能涉及到回滚到等效的修订版本或创建新修订版本的操作。
// attempt to find the revision that corresponds to the current revision
for i := range revisions {
if revisions[i].Name == set.Status.CurrentRevision {
currentRevision = revisions[i]
// if the current revision is nil we initialize the history by setting it to the update revision
if currentRevision == nil {
currentRevision = updateRevision
return currentRevision, updateRevision, collisionCount, nil
// 这个函数是一个简单的条件判断语句,判断当前的currentRevision是否为nil,
// 如果是,则将其初始化为updateRevision。
// 最后返回更新后的currentRevision、updateRevision、collisionCount和nil。
func slowStartBatch(initialBatchSize int, remaining int, fn func(int) (bool, error)) (int, error) {
successes := 0
j := 0
for batchSize := min(remaining, initialBatchSize); batchSize > 0; batchSize = min(min(2*batchSize, remaining), MaxBatchSize) {
errCh := make(chan error, batchSize)
var wg sync.WaitGroup
for i := 0; i < batchSize; i++ {
go func(k int) {
defer wg.Done()
// Ignore the first parameter - relevant for monotonic only.
if _, err := fn(k); err != nil {
errCh <- err
successes += batchSize - len(errCh)
if len(errCh) > 0 {
errs := make([]error, 0)
for err := range errCh {
errs = append(errs, err)
return successes, utilerrors.NewAggregate(errs)
remaining -= batchSize
return successes, nil
// 该函数是一个并发执行函数的批处理工具。它通过调用提供的函数fn来并发执行批处理,并根据执行结果进行错误处理和统计。
// - initialBatchSize:初始批处理大小。
// - remaining:剩余需要处理的数量。
// - fn:一个函数,接收一个整数参数并返回一个布尔值和一个错误。
// 该函数将被并发调用以执行批处理任务。
// 函数主要逻辑如下:
// 1. 初始化成功执行数successes和内部计数器j。
// 2. 使用min函数计算当前批处理大小batchSize,并进入循环,直到batchSize为0或处理完成。
// 3. 创建一个错误通道errCh,用于收集并发执行中的错误。
// 4. 使用sync.WaitGroup来等待所有并发执行完成。
// 5. 并发调用fn函数,将j作为参数,递增j。
// 6. 等待所有并发执行完成,并统计成功执行数。
// 7. 如果存在错误,将错误收集到errs中并返回。
// 8. 更新剩余需要处理的数量。
// 9. 返回成功执行数和错误。
// 该函数通过控制批处理的大小和并发执行,实现了一个自适应的并发批处理逻辑,可以根据执行情况动态调整批处理的大小。
type replicaStatus struct {
replicas int32
readyReplicas int32
availableReplicas int32
currentReplicas int32
updatedReplicas int32
// 该代码定义了一个名为replicaStatus的结构体类型,
// 它有5个字段:
// - replicas表示副本的总数;
// - readyReplicas表示准备就绪的副本数量;
// - availableReplicas表示可用的副本数量;
// - currentReplicas表示当前存在的副本数量;
// - updatedReplicas表示已更新的副本数量。
func computeReplicaStatus(pods []*v1.Pod, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision) replicaStatus {
status := replicaStatus{}
for _, pod := range pods {
if isCreated(pod) {
//1. 如果Pod被创建,则将副本数量加1。
// count the number of running and ready replicas
if isRunningAndReady(pod) {
// count the number of running and available replicas
if isRunningAndAvailable(pod, minReadySeconds) {
//- isRunningAndReady(pod) 函数检查 Pod 是否处于运行且就绪状态,如果是,则将 status.readyReplicas 增加 1。
//- isRunningAndAvailable(pod, minReadySeconds) 函数检查 Pod 是否处于运行且可用状态,
//其中 minReadySeconds 参数表示最小就绪时间(单位:秒)。
//如果 Pod 在最小就绪时间内一直保持就绪状态,则将 status.availableReplicas 增加 1。
//这段代码的主要目的是统计 Kubernetes 中某个 Pod 的就绪副本数和可用副本数,以便于对 Pod 的状态进行管理和监控。
// count the number of current and update replicas
if isCreated(pod) && !isTerminating(pod) {
revision := getPodRevision(pod)
if revision == currentRevision.Name {
if revision == updateRevision.Name {
return status
// 这段Go代码中的函数逻辑如下:
// - 首先判断Pod是否已经创建并且没有处于终止状态;
// - 如果满足条件,获取Pod的修订版本号;
// - 判断该修订版本号是否与当前修订版本名相同,若相同则当前副本数加一;
// - 判断该修订版本号是否与更新修订版本名相同,若相同则更新副本数加一;
// - 最后返回更新后的状态信息。 这段代码主要是用来统计Pod的当前副本数和更新后的副本数。
func updateStatus(status *apps.StatefulSetStatus, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision, podLists ...[]*v1.Pod) {
status.Replicas = 0
status.ReadyReplicas = 0
status.AvailableReplicas = 0
status.CurrentReplicas = 0
status.UpdatedReplicas = 0
for _, list := range podLists {
replicaStatus := computeReplicaStatus(list, minReadySeconds, currentRevision, updateRevision)
status.Replicas += replicaStatus.replicas
status.ReadyReplicas += replicaStatus.readyReplicas
status.AvailableReplicas += replicaStatus.availableReplicas
status.CurrentReplicas += replicaStatus.currentReplicas
status.UpdatedReplicas += replicaStatus.updatedReplicas
// 该函数用于更新StatefulSet的状态信息。
// 它接收一个指向StatefulSetStatus的指针、minReadySeconds参数以及currentRevision和updateRevision两个ControllerRevision指针,
// 还有可变长度的podLists参数。
// 函数首先将状态信息中的五个计数器重置为0,然后遍历podLists中的每个Pod列表,
// 通过调用computeReplicaStatus函数计算每个列表的副本状态,并将计算结果累加到状态信息中
func (ssc *defaultStatefulSetControl) processReplica(
ctx context.Context,
set *apps.StatefulSet,
currentRevision *apps.ControllerRevision,
updateRevision *apps.ControllerRevision,
currentSet *apps.StatefulSet,
updateSet *apps.StatefulSet,
monotonic bool,
replicas []*v1.Pod,
i int) (bool, error) {
logger := klog.FromContext(ctx)
//函数返回一个布尔值和一个错误对象。 Markdown格式输出:
//- ctx:上下文对象。
//- set:StatefulSet对象。
//- currentRevision:当前的ControllerRevision对象。
//- updateRevision:要更新的ControllerRevision对象。
//- currentSet:当前的StatefulSet对象。
//- updateSet:要更新的StatefulSet对象。
//- monotonic:一个布尔值,表示是否为单调递增。
//- replicas:一个Pod对象数组。
//- i:一个整数。
// Delete and recreate pods which finished running.
// Note that pods with phase Succeeded will also trigger this event. This is
// because final pod phase of evicted or otherwise forcibly stopped pods
// (e.g. terminated on node reboot) is determined by the exit code of the
// container, not by the reason for pod termination. We should restart the pod
// regardless of the exit code.
if isFailed(replicas[i]) || isSucceeded(replicas[i]) {
if isFailed(replicas[i]) {
ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod",
"StatefulSet %s/%s is recreating failed Pod %s",
} else {
ssc.recorder.Eventf(set, v1.EventTypeNormal, "RecreatingTerminatedPod",
"StatefulSet %s/%s is recreating terminated Pod %s",
//事件消息为"StatefulSet %s/%s is recreating terminated Pod %s",
if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
return true, err
replicaOrd := i + getStartOrdinal(set)
replicas[i] = newVersionedStatefulSetPod(
// If we find a Pod that has not been created we create the Pod
if !isCreated(replicas[i]) {
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
if isStale, err := ssc.podControl.PodClaimIsStale(set, replicas[i]); err != nil {
return true, err
} else if isStale {
// If a pod has a stale PVC, no more work can be done this round.
return true, err
if err := ssc.podControl.CreateStatefulPod(ctx, set, replicas[i]); err != nil {
return true, err
if monotonic {
// if the set does not allow bursting, return immediately
return true, nil
// If the Pod is in pending state then trigger PVC creation to create missing PVCs
if isPending(replicas[i]) {
"StatefulSet is triggering PVC creation for pending Pod",
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
if err := ssc.podControl.createMissingPersistentVolumeClaims(ctx, set, replicas[i]); err != nil {
return true, err
//如果是,则触发PVC(Persistent Volume Claim)的创建,以创建缺失的PVC。
// If we find a Pod that is currently terminating, we must wait until graceful deletion
// completes before we continue to make progress.
if isTerminating(replicas[i]) && monotonic {
logger.V(4).Info("StatefulSet is waiting for Pod to Terminate",
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
return true, nil
// If we have a Pod that has been created but is not running and ready we can not make progress.
// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
// ordinal, are Running and Ready.
if !isRunningAndReady(replicas[i]) && monotonic {
logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready",
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
return true, nil
// If we have a Pod that has been created but is not available we can not make progress.
// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
// ordinal, are Available.
if !isRunningAndAvailable(replicas[i], set.Spec.MinReadySeconds) && monotonic {
logger.V(4).Info("StatefulSet is waiting for Pod to be Available",
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
return true, nil
// Enforce the StatefulSet invariants
retentionMatch := true
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
var err error
retentionMatch, err = ssc.podControl.ClaimsMatchRetentionPolicy(ctx, updateSet, replicas[i])
// An error is expected if the pod is not yet fully updated, and so return is treated as matching.
if err != nil {
retentionMatch = true
//- 首先,它检查是否启用了StatefulSet的自动删除PVC的功能。
//- 如果启用了,它会调用ssc.podControl.ClaimsMatchRetentionPolicy方法来检查当前的Pod是否匹配保留策略。
//- 如果检查过程中出现错误,则将retentionMatch设置为true,表示匹配保留策略。
//- 最后,retentionMatch的值将用于决定是否保留或删除StatefulSet的PVC。
if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) && retentionMatch {
return false, nil
// Make a deep copy so we don't mutate the shared cache
replica := replicas[i].DeepCopy()
if err := ssc.podControl.UpdateStatefulPod(ctx, updateSet, replica); err != nil {
return true, err
return false, nil
func (ssc *defaultStatefulSetControl) processCondemned(ctx context.Context, set *apps.StatefulSet, firstUnhealthyPod *v1.Pod, monotonic bool, condemned []*v1.Pod, i int) (bool, error) {
logger := klog.FromContext(ctx)
if isTerminating(condemned[i]) {
// if we are in monotonic mode, block and wait for terminating pods to expire
if monotonic {
logger.V(4).Info("StatefulSet is waiting for Pod to Terminate prior to scale down",
"statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[i]))
return true, nil
return false, nil
// if we are in monotonic mode and the condemned target is not the first unhealthy Pod block
if !isRunningAndReady(condemned[i]) && monotonic && condemned[i] != firstUnhealthyPod {
logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready prior to scale down",
"statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod))
return true, nil
//1. 判断当前Pod是否处于运行且就绪状态:通过调用isRunningAndReady(condemned[i])函数来判断当前Pod是否满足运行且就绪的条件。
//2. 判断是否处于单调模式:通过检查monotonic变量的值来判断当前是否处于单调模式。
//3. 判断当前Pod是否为第一个不健康的Pod:通过比较condemned[i]和firstUnhealthyPod的值来判断当前Pod是否为第一个不健康的Pod。
// if we are in monotonic mode and the condemned target is not the first unhealthy Pod, block.
if !isRunningAndAvailable(condemned[i], set.Spec.MinReadySeconds) && monotonic && condemned[i] != firstUnhealthyPod {
logger.V(4).Info("StatefulSet is waiting for Pod to be Available prior to scale down",
"statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod))
return true, nil
logger.V(2).Info("Pod of StatefulSet is terminating for scale down",
"statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[i]))
return true, ssc.podControl.DeleteStatefulPod(set, condemned[i])
// 该函数是一个日志记录和操作Pod的函数。
// 它首先使用logger.V(2).Info记录一条日志信息,表示某个StatefulSet的Pod正在因为缩容而终止。
// 日志中包含了StatefulSet和Pod的相关信息。
// 然后,函数通过调用ssc.podControl.DeleteStatefulPod方法来删除指定的Pod,并返回true表示删除成功。
func runForAll(pods []*v1.Pod, fn func(i int) (bool, error), monotonic bool) (bool, error) {
if monotonic {
for i := range pods {
if shouldExit, err := fn(i); shouldExit || err != nil {
return true, err
} else {
if _, err := slowStartBatch(1, len(pods), fn); err != nil {
return true, err
return false, nil
// updateStatefulSet performs the update function for a StatefulSet. This method creates, updates, and deletes Pods in
// the set in order to conform the system to the target state for the set. The target state always contains
// set.Spec.Replicas Pods with a Ready Condition. If the UpdateStrategy.Type for the set is
// RollingUpdateStatefulSetStrategyType then all Pods in the set must be at set.Status.CurrentRevision.
// If the UpdateStrategy.Type for the set is OnDeleteStatefulSetStrategyType, the target state implies nothing about
// the revisions of Pods in the set. If the UpdateStrategy.Type for the set is PartitionStatefulSetStrategyType, then
// all Pods with ordinal less than UpdateStrategy.Partition.Ordinal must be at Status.CurrentRevision and all other
// Pods must be at Status.UpdateRevision. If the returned error is nil, the returned StatefulSetStatus is valid and the
// update must be recorded. If the error is not nil, the method should be retried until successful.
func (ssc *defaultStatefulSetControl) updateStatefulSet(
ctx context.Context,
set *apps.StatefulSet,
currentRevision *apps.ControllerRevision,
updateRevision *apps.ControllerRevision,
collisionCount int32,
pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
logger := klog.FromContext(ctx)
//目标状态始终包含具有Ready Condition的set.Spec.Replicas Pods。
// get the current and update revisions of the set.
currentSet, err := ApplyRevision(set, currentRevision)
if err != nil {
return nil, err
updateSet, err := ApplyRevision(set, updateRevision)
if err != nil {
return nil, err
//1. 首先,函数使用ApplyRevision函数和给定的集合以及当前修订版本号currentRevision,获取当前修订版本对应的集合内容,并将结果赋值给currentSet变量。
//2. 然后,函数使用ApplyRevision函数和给定的集合以及更新修订版本号updateRevision,获取更新修订版本对应的集合内容,并将结果赋值给updateSet变量。
//需要注意的是,该函数的返回值类型为(interface{}, error),
// set the generation, and revisions in the returned status
status := apps.StatefulSetStatus{}
status.ObservedGeneration = set.Generation
status.CurrentRevision = currentRevision.Name
status.UpdateRevision = updateRevision.Name
status.CollisionCount = new(int32)
*status.CollisionCount = collisionCount
//- 首先,创建了一个空的StatefulSetStatus结构体变量status。
//- 然后,将set.Generation的值赋给status.ObservedGeneration字段。
//- 接着,将currentRevision.Name的值赋给status.CurrentRevision字段。
//- 再将updateRevision.Name的值赋给status.UpdateRevision字段。
//- 最后,创建一个新的int32类型指针变量status.CollisionCount并将其值设为collisionCount。
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, pods)
replicaCount := int(*set.Spec.Replicas)
// slice that will contain all Pods such that getStartOrdinal(set) <= getOrdinal(pod) <= getEndOrdinal(set)
replicas := make([]*v1.Pod, replicaCount)
// slice that will contain all Pods such that getOrdinal(pod) < getStartOrdinal(set) OR getOrdinal(pod) > getEndOrdinal(set)
condemned := make([]*v1.Pod, 0, len(pods))
unhealthy := 0
var firstUnhealthyPod *v1.Pod
// First we partition pods into two lists valid replicas and condemned Pods
for _, pod := range pods {
if podInOrdinalRange(pod, set) {
// if the ordinal of the pod is within the range of the current number of replicas,
// insert it at the indirection of its ordinal
replicas[getOrdinal(pod)-getStartOrdinal(set)] = pod
} else if getOrdinal(pod) >= 0 {
// if the ordinal is valid, but not within the range add it to the condemned list
condemned = append(condemned, pod)
// If the ordinal could not be parsed (ord < 0), ignore the Pod.
// for any empty indices in the sequence [0,set.Spec.Replicas) create a new Pod at the correct revision
for ord := getStartOrdinal(set); ord <= getEndOrdinal(set); ord++ {
replicaIdx := ord - getStartOrdinal(set)
if replicas[replicaIdx] == nil {
replicas[replicaIdx] = newVersionedStatefulSetPod(
updateRevision.Name, ord)
// sort the condemned Pods by their ordinals
// find the first unhealthy Pod
for i := range replicas {
if !isHealthy(replicas[i]) {
if firstUnhealthyPod == nil {
firstUnhealthyPod = replicas[i]
// or the first unhealthy condemned Pod (condemned are sorted in descending order for ease of use)
for i := len(condemned) - 1; i >= 0; i-- {
if !isHealthy(condemned[i]) {
if firstUnhealthyPod == nil {
firstUnhealthyPod = condemned[i]
if unhealthy > 0 {
logger.V(4).Info("StatefulSet has unhealthy Pods", "statefulSet", klog.KObj(set), "unhealthyReplicas", unhealthy, "pod", klog.KObj(firstUnhealthyPod))
//日志信息的内容包括:"StatefulSet has unhealthy Pods","statefulSet","unhealthyReplicas","pod"。
// If the StatefulSet is being deleted, don't do anything other than updating
// status.
if set.DeletionTimestamp != nil {
return &status, nil
monotonic := !allowsBurst(set)
// First, process each living replica. Exit if we run into an error or something blocking in monotonic mode.
processReplicaFn := func(i int) (bool, error) {
return ssc.processReplica(ctx, set, currentRevision, updateRevision, currentSet, updateSet, monotonic, replicas, i)
//函数monotonic := !allowsBurst(set)根据set参数的值计算一个布尔值monotonic。
//内部函数processReplicaFn := func(i int) (bool, error) {...}定义了一个闭包函数,
if shouldExit, err := runForAll(replicas, processReplicaFn, monotonic); shouldExit || err != nil {
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
return &status, err
// Fix pod claims for condemned pods, if necessary.
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
fixPodClaim := func(i int) (bool, error) {
if matchPolicy, err := ssc.podControl.ClaimsMatchRetentionPolicy(ctx, updateSet, condemned[i]); err != nil {
return true, err
} else if !matchPolicy {
if err := ssc.podControl.UpdatePodClaimForRetentionPolicy(ctx, updateSet, condemned[i]); err != nil {
return true, err
return false, nil
if shouldExit, err := runForAll(condemned, fixPodClaim, monotonic); shouldExit || err != nil {
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
return &status, err
// At this point, in monotonic mode all of the current Replicas are Running, Ready and Available,
// and we can consider termination.
// We will wait for all predecessors to be Running and Ready prior to attempting a deletion.
// We will terminate Pods in a monotonically decreasing order.
// Note that we do not resurrect Pods in this interval. Also note that scaling will take precedence over
// updates.
processCondemnedFn := func(i int) (bool, error) {
return ssc.processCondemned(ctx, set, firstUnhealthyPod, monotonic, condemned, i)
if shouldExit, err := runForAll(condemned, processCondemnedFn, monotonic); shouldExit || err != nil {
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
return &status, err
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
// for the OnDelete strategy we short circuit. Pods will be updated when they are manually deleted.
if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType {
return &status, nil
if utilfeature.DefaultFeatureGate.Enabled(features.MaxUnavailableStatefulSet) {
return updateStatefulSetAfterInvariantEstablished(ctx,
// we compute the minimum ordinal of the target sequence for a destructive update based on the strategy.
updateMin := 0
if set.Spec.UpdateStrategy.RollingUpdate != nil {
updateMin = int(*set.Spec.UpdateStrategy.RollingUpdate.Partition)
// we terminate the Pod with the largest ordinal that does not match the update revision.
for target := len(replicas) - 1; target >= updateMin; target-- {
// delete the Pod if it is not already terminating and does not match the update revision.
if getPodRevision(replicas[target]) != updateRevision.Name && !isTerminating(replicas[target]) {
logger.V(2).Info("Pod of StatefulSet is terminating for update",
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[target]))
if err := ssc.podControl.DeleteStatefulPod(set, replicas[target]); err != nil {
if !errors.IsNotFound(err) {
return &status, err
return &status, err
//1. 初始化updateMin为0。
//2. 如果set.Spec.UpdateStrategy.RollingUpdate不为nil,则将updateMin设置为*set.Spec.UpdateStrategy.RollingUpdate.Partition的整数值。
//3. 从replicas的最后一个元素开始,迭代到updateMin(包括updateMin)。
//4. 如果当前Pod的修订版本与更新修订版本不匹配且未终止,则记录日志,并尝试删除该Pod。若删除成功,将status.CurrentReplicas减1,
//5. 若删除Pod时出现错误且错误类型不是NotFound,则返回status和该错误。
// wait for unhealthy Pods on update
if !isHealthy(replicas[target]) {
logger.V(4).Info("StatefulSet is waiting for Pod to update",
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[target]))
return &status, nil
return &status, nil
func updateStatefulSetAfterInvariantEstablished(
ctx context.Context,
ssc *defaultStatefulSetControl,
set *apps.StatefulSet,
replicas []*v1.Pod,
updateRevision *apps.ControllerRevision,
status apps.StatefulSetStatus,
) (*apps.StatefulSetStatus, error) {
logger := klog.FromContext(ctx)
replicaCount := int(*set.Spec.Replicas)
//- logger := klog.FromContext(ctx):该行代码从ctx上下文中获取了一个名为logger的变量。
//- replicaCount := int(*set.Spec.Replicas):该行代码定义了一个名为replicaCount的整型变量,
// we compute the minimum ordinal of the target sequence for a destructive update based on the strategy.
updateMin := 0
maxUnavailable := 1
if set.Spec.UpdateStrategy.RollingUpdate != nil {
updateMin = int(*set.Spec.UpdateStrategy.RollingUpdate.Partition)
// if the feature was enabled and then later disabled, MaxUnavailable may have a value
// more than 1. Ignore the passed in value and Use maxUnavailable as 1 to enforce
// expected behavior when feature gate is not enabled.
var err error
maxUnavailable, err = getStatefulSetMaxUnavailable(set.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, replicaCount)
if err != nil {
return &status, err
// Collect all targets in the range between getStartOrdinal(set) and getEndOrdinal(set). Count any targets in that range
// that are unhealthy i.e. terminated or not running and ready as unavailable). Select the
// (MaxUnavailable - Unavailable) Pods, in order with respect to their ordinal for termination. Delete
// those pods and count the successful deletions. Update the status with the correct number of deletions.
unavailablePods := 0
for target := len(replicas) - 1; target >= 0; target-- {
if !isHealthy(replicas[target]) {
if unavailablePods >= maxUnavailable {
logger.V(2).Info("StatefulSet found unavailablePods, more than or equal to allowed maxUnavailable",
"statefulSet", klog.KObj(set),
"unavailablePods", unavailablePods,
"maxUnavailable", maxUnavailable)
return &status, nil
// Now we need to delete MaxUnavailable- unavailablePods
// start deleting one by one starting from the highest ordinal first
podsToDelete := maxUnavailable - unavailablePods
deletedPods := 0
for target := len(replicas) - 1; target >= updateMin && deletedPods < podsToDelete; target-- {
// delete the Pod if it is healthy and the revision doesnt match the target
if getPodRevision(replicas[target]) != updateRevision.Name && !isTerminating(replicas[target]) {
// delete the Pod if it is healthy and the revision doesnt match the target
logger.V(2).Info("StatefulSet terminating Pod for update",
"statefulSet", klog.KObj(set),
"pod", klog.KObj(replicas[target]))
if err := ssc.podControl.DeleteStatefulPod(set, replicas[target]); err != nil {
if !errors.IsNotFound(err) {
return &status, err
return &status, nil
// updateStatefulSetStatus updates set's Status to be equal to status. If status indicates a complete update, it is
// mutated to indicate completion. If status is semantically equivalent to set's Status no update is performed. If the
// returned error is nil, the update is successful.
func (ssc *defaultStatefulSetControl) updateStatefulSetStatus(
ctx context.Context,
set *apps.StatefulSet,
status *apps.StatefulSetStatus) error {
// complete any in progress rolling update if necessary
completeRollingUpdate(set, status)
// if the status is not inconsistent do not perform an update
if !inconsistentStatus(set, status) {
return nil
// copy set and update its status
set = set.DeepCopy()
if err := ssc.statusUpdater.UpdateStatefulSetStatus(ctx, set, status); err != nil {
return err
return nil
var _ StatefulSetControlInterface = &defaultStatefulSetControl{}