/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package deployment
import (
"context"
"fmt"
"sort"
apps "k8s.io/api/apps/v1"
"k8s.io/kubernetes/pkg/controller"
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
)
// rolloutRolling implements the logic for rolling a new replica set.
func (dc *DeploymentController) rolloutRolling(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true)
if err != nil {
return err
}
allRSs := append(oldRSs, newRS)
// Scale up, if we can.
scaledUp, err := dc.reconcileNewReplicaSet(ctx, allRSs, newRS, d)
if err != nil {
return err
}
if scaledUp {
// Update DeploymentStatus
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}
// Scale down, if we can.
scaledDown, err := dc.reconcileOldReplicaSets(ctx, allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
if err != nil {
return err
}
if scaledDown {
// Update DeploymentStatus
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}
if deploymentutil.DeploymentComplete(d, &d.Status) {
if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
return err
}
}
// Sync deployment status
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}
//该函数实现了滚动更新一个新的副本集的逻辑。具体步骤如下:
//1. 调用getAllReplicaSetsAndSyncRevision函数获取所有的副本集并同步修订版本。
//2. 将获取到的所有副本集添加到allRSs切片中。
//3. 调用reconcileNewReplicaSet函数尝试增加新副本集的副本数,如果成功则更新部署状态。
//4. 如果上一步成功,则调用syncRolloutStatus函数更新部署状态。
//5. 否则,调用reconcileOldReplicaSets函数尝试减少旧副本集的副本数,如果成功则更新部署状态。
//6. 如果上一步成功,则再次调用syncRolloutStatus函数更新部署状态。
//7. 检查部署是否完成,如果完成则调用cleanupDeployment函数清理旧的副本集。
//8. 最后,调用syncRolloutStatus函数更新部署状态。
func (dc *DeploymentController) reconcileNewReplicaSet(ctx context.Context, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
if *(newRS.Spec.Replicas) == *(deployment.Spec.Replicas) {
// Scaling not required.
return false, nil
}
if *(newRS.Spec.Replicas) > *(deployment.Spec.Replicas) {
// Scale down.
scaled, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, newRS, *(deployment.Spec.Replicas), deployment)
return scaled, err
}
newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, newRS)
if err != nil {
return false, err
}
scaled, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, newRS, newReplicasCount, deployment)
return scaled, err
}
//该函数是一个用于协调新 ReplicaSet 的函数,它会根据新 ReplicaSet 和 Deployment 的规格来决定是否需要进行缩放操作。
//具体流程如下:
//1. 首先,函数会比较新 ReplicaSet 和 Deployment 规格中的副本数。如果它们相等,说明不需要进行缩放操作,函数直接返回 false 和 nil。
//2. 如果新 ReplicaSet 的副本数大于 Deployment 的副本数,说明需要进行缩放操作。
//函数会调用 dc.scaleReplicaSetAndRecordEvent 方法来缩小新 ReplicaSet 的规模,并返回缩放结果和可能的错误。
//3. 如果新 ReplicaSet 的副本数小于或等于 Deployment 的副本数,并且存在其他 ReplicaSet,
//那么函数会调用 deploymentutil.NewRSNewReplicas 方法来计算新 ReplicaSet 应该拥有的副本数。
//4. 最后,函数会调用 dc.scaleReplicaSetAndRecordEvent 方法来调整新 ReplicaSet 的规模,并返回缩放结果和可能的错误。
func (dc *DeploymentController) reconcileOldReplicaSets(ctx context.Context, allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
logger := klog.FromContext(ctx)
oldPodsCount := deploymentutil.GetReplicaCountForReplicaSets(oldRSs)
if oldPodsCount == 0 {
// Can't scale down further
return false, nil
}
allPodsCount := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
logger.V(4).Info("New replica set", "replicaSet", klog.KObj(newRS), "availableReplicas", newRS.Status.AvailableReplicas)
maxUnavailable := deploymentutil.MaxUnavailable(*deployment)
//该函数是一个Go语言函数,它属于DeploymentController类型。
//函数名为reconcileOldReplicaSets,它接受五个参数:ctx上下文对象、allRSs所有ReplicaSet对象的切片、
//oldRSs旧的ReplicaSet对象的切片、newRS新的ReplicaSet对象和deployment Deployment对象。
//函数返回两个值:一个布尔值和一个错误对象。
//函数主要功能是协调旧的ReplicaSet对象的副本数量,以确保与Deployment对象的期望副本数量一致。
//它首先通过deploymentutil.GetReplicaCountForReplicaSets函数获取旧的ReplicaSet对象的副本数量,并检查是否为0。
//如果是0,则意味着无法进一步缩容,函数直接返回false和nil。
//接下来,函数通过同样的方式获取所有ReplicaSet对象的副本数量,并使用logger.V(4).Info记录相关信息。
//然后,函数调用deploymentutil.MaxUnavailable函数获取最大不可用副本数。
//函数的后续代码逻辑为,根据最大不可用副本数和新的ReplicaSet对象的可用副本数,判断是否需要缩容旧的ReplicaSet对象的副本数量。
//如果需要缩容,则进行相应的操作。
//最后,函数返回一个布尔值和一个错误对象,表示是否成功协调旧的ReplicaSet对象的副本数量。
// Check if we can scale down. We can scale down in the following 2 cases:
// * Some old replica sets have unhealthy replicas, we could safely scale down those unhealthy replicas since that won't further
// increase unavailability.
// * New replica set has scaled up and it's replicas becomes ready, then we can scale down old replica sets in a further step.
//
// maxScaledDown := allPodsCount - minAvailable - newReplicaSetPodsUnavailable
// take into account not only maxUnavailable and any surge pods that have been created, but also unavailable pods from
// the newRS, so that the unavailable pods from the newRS would not make us scale down old replica sets in a further
// step(that will increase unavailability).
//
// Concrete example:
//
// * 10 replicas
// * 2 maxUnavailable (absolute number, not percent)
// * 3 maxSurge (absolute number, not percent)
//
// case 1:
// * Deployment is updated, newRS is created with 3 replicas, oldRS is scaled down to 8, and newRS is scaled up to 5.
// * The new replica set pods crashloop and never become available.
// * allPodsCount is 13. minAvailable is 8. newRSPodsUnavailable is 5.
// * A node fails and causes one of the oldRS pods to become unavailable. However, 13 - 8 - 5 = 0, so the oldRS won't be scaled down.
// * The user notices the crashloop and does kubectl rollout undo to rollback.
// * newRSPodsUnavailable is 1, since we rolled back to the good replica set, so maxScaledDown = 13 - 8 - 1 = 4. 4 of the crashlooping pods will be scaled down.
// * The total number of pods will then be 9 and the newRS can be scaled up to 10.
//
// case 2:
// Same example, but pushing a new pod template instead of rolling back (aka "roll over"):
// * The new replica set created must start with 0 replicas because allPodsCount is already at 13.
// * However, newRSPodsUnavailable would also be 0, so the 2 old replica sets could be scaled down by 5 (13 - 8 - 0), which would then
// allow the new replica set to be scaled up by 5.
minAvailable := *(deployment.Spec.Replicas) - maxUnavailable
newRSUnavailablePodCount := *(newRS.Spec.Replicas) - newRS.Status.AvailableReplicas
maxScaledDown := allPodsCount - minAvailable - newRSUnavailablePodCount
if maxScaledDown <= 0 {
return false, nil
}
//该函数用于判断是否可以进行缩容操作。缩容可以在以下两种情况下进行:
//1. 一些旧的副本集存在不健康的副本,可以安全地缩容这些不健康的副本,因为这不会进一步增加不可用性。
//2. 新的副本集已经扩容并且其副本变为就绪状态,那么可以在进一步的步骤中缩容旧的副本集。
//函数首先根据给定的参数计算出最小可用副本数minAvailable和新副本集的不可用副本数newRSUnavailablePodCount,
//然后通过公式maxScaledDown = allPodsCount - minAvailable - newRSUnavailablePodCount 计算出最大可缩容数maxScaledDown。
//如果maxScaledDown小于等于0,则返回false,表示不能进行缩容操作。
// Clean up unhealthy replicas first, otherwise unhealthy replicas will block deployment
// and cause timeout. See https://github.com/kubernetes/kubernetes/issues/16737
oldRSs, cleanupCount, err := dc.cleanupUnhealthyReplicas(ctx, oldRSs, deployment, maxScaledDown)
if err != nil {
return false, nil
}
logger.V(4).Info("Cleaned up unhealthy replicas from old RSes", "count", cleanupCount)
// Scale down old replica sets, need check maxUnavailable to ensure we can scale down
allRSs = append(oldRSs, newRS)
scaledDownCount, err := dc.scaleDownOldReplicaSetsForRollingUpdate(ctx, allRSs, oldRSs, deployment)
if err != nil {
return false, nil
}
logger.V(4).Info("Scaled down old RSes", "deployment", klog.KObj(deployment), "count", scaledDownCount)
totalScaledDown := cleanupCount + scaledDownCount
return totalScaledDown > 0, nil
}
//该函数用于清理不健康的副本,并进行滚动更新时的缩容操作。首先清理不健康的副本,以避免阻塞部署并导致超时。然后将旧的副本集缩容,
//同时检查最大不可用副本数以确保可以进行缩容操作。函数返回一个布尔值,表示是否进行了缩容操作。
// cleanupUnhealthyReplicas will scale down old replica sets with unhealthy replicas, so that all unhealthy replicas will be deleted.
func (dc *DeploymentController) cleanupUnhealthyReplicas(ctx context.Context, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment, maxCleanupCount int32) ([]*apps.ReplicaSet, int32, error) {
logger := klog.FromContext(ctx)
sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs))
// Safely scale down all old replica sets with unhealthy replicas. Replica set will sort the pods in the order
// such that not-ready < ready, unscheduled < scheduled, and pending < running. This ensures that unhealthy replicas will
// been deleted first and won't increase unavailability.
//该函数用于清理不健康的副本集中的副本,以便所有不健康的副本都将被删除。
//函数首先通过创建时间对旧的副本集进行排序,然后安全地缩小所有具有不健康副本的旧副本集的规模。
//副本集将按照以下顺序对Pod进行排序:未就绪<就绪,未调度<已调度,等待<运行。
//这确保了不健康的副本将首先被删除,并且不会增加不可用性。
totalScaledDown := int32(0)
for i, targetRS := range oldRSs {
if totalScaledDown >= maxCleanupCount {
break
}
if *(targetRS.Spec.Replicas) == 0 {
// cannot scale down this replica set.
continue
}
logger.V(4).Info("Found available pods in old RS", "replicaSet", klog.KObj(targetRS), "availableReplicas", targetRS.Status.AvailableReplicas)
if *(targetRS.Spec.Replicas) == targetRS.Status.AvailableReplicas {
// no unhealthy replicas found, no scaling required.
continue
}
//该Go函数是一个for循环,用于遍历一组旧的ReplicaSet(oldRSs),并在满足条件的情况下对它们进行缩容操作。
//具体功能如下:
//1. 初始化一个整型变量totalScaledDown为0,用于记录已缩容的ReplicaSet数量。
//2. 遍历oldRSs中的每个ReplicaSet,使用索引i和目标ReplicaSet targetRS。
//3. 如果totalScaledDown大于等于最大清理数量maxCleanupCount,则跳出循环。
//4. 如果目标ReplicaSet的副本数量Spec.Replicas为0,则无法进行缩容操作,继续下一个循环。
//5. 使用logger记录日志信息,表示在目标ReplicaSet中找到了可用的Pod。
//6. 如果目标ReplicaSet的副本数量Spec.Replicas等于其可用副本数量Status.AvailableReplicas,则无需进行缩容操作,继续下一个循环。
//7. 如果以上条件均不满足,则进行缩容操作,并更新totalScaledDown的值。
//综上所述,该函数的功能是在满足条件的情况下对一组旧的ReplicaSet进行缩容操作,并记录已缩容的ReplicaSet数量。
scaledDownCount := min(maxCleanupCount-totalScaledDown, *(targetRS.Spec.Replicas)-targetRS.Status.AvailableReplicas)
newReplicasCount := *(targetRS.Spec.Replicas) - scaledDownCount
if newReplicasCount > *(targetRS.Spec.Replicas) {
return nil, 0, fmt.Errorf("when cleaning up unhealthy replicas, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, *(targetRS.Spec.Replicas), newReplicasCount)
}
_, updatedOldRS, err := dc.scaleReplicaSetAndRecordEvent(ctx, targetRS, newReplicasCount, deployment)
if err != nil {
return nil, totalScaledDown, err
}
totalScaledDown += scaledDownCount
oldRSs[i] = updatedOldRS
}
return oldRSs, totalScaledDown, nil
}
//该函数用于清理不健康的副本,并将其数量减少到目标副本数量。
//它首先计算清理数量,然后尝试将目标副本数量减少到该数量。如果减少后的数量大于目标副本数量,则返回错误。
//然后,它会更新旧的副本集,并将其添加到一个切片中。
//最后,它返回更新后的旧副本集和清理的总数量。
// scaleDownOldReplicaSetsForRollingUpdate scales down old replica sets when deployment strategy is "RollingUpdate".
// Need check maxUnavailable to ensure availability
func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(ctx context.Context, allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) (int32, error) {
logger := klog.FromContext(ctx)
maxUnavailable := deploymentutil.MaxUnavailable(*deployment)
// Check if we can scale down.
minAvailable := *(deployment.Spec.Replicas) - maxUnavailable
// Find the number of available pods.
availablePodCount := deploymentutil.GetAvailableReplicaCountForReplicaSets(allRSs)
if availablePodCount <= minAvailable {
// Cannot scale down.
return 0, nil
}
logger.V(4).Info("Found available pods in deployment, scaling down old RSes", "deployment", klog.KObj(deployment), "availableReplicas", availablePodCount)
//该函数用于在RollingUpdate策略下滚动更新时,将旧的ReplicaSet缩容。它需要检查最大不可用副本数以确保可用性。
//函数首先从上下文中获取日志记录器,并计算最大不可用副本数。
//然后,它检查可用Pod的数量是否大于等于最小可用副本数,如果小于最小可用副本数则不进行缩容操作,返回0。
//否则,函数记录日志并继续缩容旧的ReplicaSet。
sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs))
totalScaledDown := int32(0)
totalScaleDownCount := availablePodCount - minAvailable
for _, targetRS := range oldRSs {
if totalScaledDown >= totalScaleDownCount {
// No further scaling required.
break
}
if *(targetRS.Spec.Replicas) == 0 {
// cannot scale down this ReplicaSet.
continue
}
// Scale down.
scaleDownCount := min(*(targetRS.Spec.Replicas), totalScaleDownCount-totalScaledDown)
newReplicasCount := *(targetRS.Spec.Replicas) - scaleDownCount
if newReplicasCount > *(targetRS.Spec.Replicas) {
return 0, fmt.Errorf("when scaling down old RS, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, *(targetRS.Spec.Replicas), newReplicasCount)
}
_, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, targetRS, newReplicasCount, deployment)
if err != nil {
return totalScaledDown, err
}
totalScaledDown += scaleDownCount
}
//这段代码是用于缩容旧的ReplicaSet的。它首先通过sort.Sort方法将旧的ReplicaSet按照创建时间排序。
//然后,它计算出需要缩容的总副本数,并遍历每个旧的ReplicaSet。
//如果已经缩容的副本数达到了需要缩容的总副本数,就停止缩容操作。
//对于每个需要缩容的ReplicaSet,它计算出可以缩容的副本数,并更新其副本数。
//如果更新后的副本数大于原来的副本数,就会返回错误。如果更新成功,就将缩容的副本数累加到totalScaledDown中。
//最后,函数返回缩容的总副本数和可能发生的错误。
return totalScaledDown, nil
}
//该函数用于在Deployment策略为RollingUpdate时,缩放旧的副本集。
//它会检查最大不可用值(maxUnavailable)以确保可用性。
//函数首先计算出最小可用值,然后查找可用Pod的数量。
//如果可用Pod数量小于或等于最小可用值,则不会进行缩放。
//然后,函数会对旧的副本集进行排序,并迭代每个副本集。对于每个副本集,函数会计算出可以缩放的数量,并将其与副本集当前的副本数进行比较。
//如果计算出的缩放数量大于当前副本数,则函数会返回错误。
//最后,函数会缩放每个副本集,并记录缩放事件。函数返回缩放的总数量和可能的错误。