/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package deployment contains all the logic for handling Kubernetes Deployments.
// It implements a set of strategies (rolling, recreate) for deploying an application,
// the means to rollback to previous versions, proportional scaling for mitigating
// risk, cleanup policy, and other useful features of Deployments.
package deployment
import (
"context"
"fmt"
"reflect"
"time"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
appsinformers "k8s.io/client-go/informers/apps/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
appslisters "k8s.io/client-go/listers/apps/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/deployment/util"
)
const (
// maxRetries is the number of times a deployment will be retried before it is dropped out of the queue.
// With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the times
// a deployment is going to be requeued:
//
// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
maxRetries = 15
)
// controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = apps.SchemeGroupVersion.WithKind("Deployment")
// DeploymentController is responsible for synchronizing Deployment objects stored
// in the system with actual running replica sets and pods.
type DeploymentController struct {
// rsControl is used for adopting/releasing replica sets.
rsControl controller.RSControlInterface
client clientset.Interface
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
// To allow injection of syncDeployment for testing.
syncHandler func(ctx context.Context, dKey string) error
// used for unit testing
enqueueDeployment func(deployment *apps.Deployment)
// dLister can list/get deployments from the shared informer's store
dLister appslisters.DeploymentLister
// rsLister can list/get replica sets from the shared informer's store
rsLister appslisters.ReplicaSetLister
// podLister can list/get pods from the shared informer's store
podLister corelisters.PodLister
// dListerSynced returns true if the Deployment store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
dListerSynced cache.InformerSynced
// rsListerSynced returns true if the ReplicaSet store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
rsListerSynced cache.InformerSynced
// podListerSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podListerSynced cache.InformerSynced
// Deployments that need to be synced
queue workqueue.RateLimitingInterface
}
// NewDeploymentController creates a new DeploymentController.
func NewDeploymentController(ctx context.Context, dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
logger := klog.FromContext(ctx)
dc := &DeploymentController{
client: client,
eventBroadcaster: eventBroadcaster,
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
}
dc.rsControl = controller.RealRSControl{
KubeClient: client,
Recorder: dc.eventRecorder,
}
//该函数用于创建一个新的DeploymentController。
//- 接收上下文ctx以及多个informer和client作为参数。
//- 创建一个新的事件广播器eventBroadcaster,并从上下文ctx中获取日志记录器logger 。
//- 初始化一个DeploymentController结构体实例dc,包括client、eventBroadcaster、eventRecorder、queue和rsControl字段。
//- dc.rsControl使用controller.RealRSControl结构体初始化,包括KubeClient和Recorder字段。
//- 返回创建的dc实例和可能出现的错误。
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
dc.addDeployment(logger, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
dc.updateDeployment(logger, oldObj, newObj)
},
// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
DeleteFunc: func(obj interface{}) {
dc.deleteDeployment(logger, obj)
},
})
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
dc.addReplicaSet(logger, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
dc.updateReplicaSet(logger, oldObj, newObj)
},
DeleteFunc: func(obj interface{}) {
dc.deleteReplicaSet(logger, obj)
},
})
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
dc.deletePod(logger, obj)
},
})
//这段代码定义了三个事件处理器,分别用于处理deployment、replicaSet和pod的添加、更新和删除事件。
//每个事件处理器都调用了对应的方法,例如对于deployment的添加事件,调用了dc.addDeployment(logger, obj)方法。
//这些方法的具体实现可以根据具体业务逻辑进行编写。
dc.syncHandler = dc.syncDeployment
dc.enqueueDeployment = dc.enqueue
dc.dLister = dInformer.Lister()
dc.rsLister = rsInformer.Lister()
dc.podLister = podInformer.Lister()
dc.dListerSynced = dInformer.Informer().HasSynced
dc.rsListerSynced = rsInformer.Informer().HasSynced
dc.podListerSynced = podInformer.Informer().HasSynced
return dc, nil
}
//这段代码是Go语言中的函数,主要功能是设置和初始化一个名为dc的对象,并返回该对象和nil。
//- 首先,将dc.syncHandler设置为dc.syncDeployment,将dc.enqueueDeployment设置为dc.enqueue。
//- 然后,通过dInformer、rsInformer和podInformer的Lister()方法,分别将dc.dLister、dc.rsLister和dc.podLister设置为相应的列表器。
//- 接着,通过dInformer、rsInformer和podInformer的Informer().HasSynced方法,分别将dc.dListerSynced、dc.rsListerSynced和dc.podListerSynced设置为相应的同步状态检查函数。
//- 最后,返回设置好的dc对象和nil。 这段代码主要涉及到对象的设置和初始化操作,使用了多个列表器和同步状态检查函数来管理不同资源的信息。
// Run begins watching and syncing.
func (dc *DeploymentController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
// Start events processing pipeline.
dc.eventBroadcaster.StartStructuredLogging(3)
dc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dc.client.CoreV1().Events("")})
defer dc.eventBroadcaster.Shutdown()
defer dc.queue.ShutDown()
logger := klog.FromContext(ctx)
logger.Info("Starting controller", "controller", "deployment")
defer logger.Info("Shutting down controller", "controller", "deployment")
if !cache.WaitForNamedCacheSync("deployment", ctx.Done(), dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, dc.worker, time.Second)
}
<-ctx.Done()
}
//该函数是DeploymentController类型的Run方法,用于启动部署控制器的监视和同步操作。
//- 首先,函数通过defer语句句柄处理崩溃情况。
//- 然后,启动事件处理管道,设置日志记录级别和目标。
//- 接着,通过defer语句关闭事件处理管道。
//- 然后,记录日志信息,表示控制器开始启动。
//- 接着,使用cache.WaitForNamedCacheSync函数等待缓存同步完成。
//- 然后,通过循环创建多个goroutine,并调用dc.worker函数进行工作。
//- 最后,等待上下文完成,并返回。
//该函数的主要功能是启动部署控制器,并使其开始监视和同步操作。
func (dc *DeploymentController) addDeployment(logger klog.Logger, obj interface{}) {
d := obj.(*apps.Deployment)
logger.V(4).Info("Adding deployment", "deployment", klog.KObj(d))
dc.enqueueDeployment(d)
}
func (dc *DeploymentController) updateDeployment(logger klog.Logger, old, cur interface{}) {
oldD := old.(*apps.Deployment)
curD := cur.(*apps.Deployment)
logger.V(4).Info("Updating deployment", "deployment", klog.KObj(oldD))
dc.enqueueDeployment(curD)
}
//这两个函数都是DeploymentController的方法,用于处理Deployment的添加和更新事件。
//- addDeployment方法接收一个logger和一个obj参数,其中obj是通过类型断言转换为*apps.Deployment类型的。
//该方法首先使用logger记录添加deployment的日志信息,然后调用dc.enqueueDeployment方法将该deployment加入队列中,以便后续处理。
//- updateDeployment方法与addDeployment方法类似,但它接收的是旧的和新的Deployment对象。该方法使用logger记录更新deployment的日志信息,
//并将新的Deployment对象加入队列中进行处理。
func (dc *DeploymentController) deleteDeployment(logger klog.Logger, obj interface{}) {
d, ok := obj.(*apps.Deployment)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
return
}
d, ok = tombstone.Obj.(*apps.Deployment)
if !ok {
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Deployment %#v", obj))
return
}
}
logger.V(4).Info("Deleting deployment", "deployment", klog.KObj(d))
dc.enqueueDeployment(d)
}
//该函数是一个Go语言函数,它定义了一个名为deleteDeployment的方法,该方法接受一个logger和一个obj参数,并没有任何返回值。
//该方法主要用于从一个部署(Deployment)中删除对象。
//首先,函数会尝试将obj参数断言为apps.Deployment类型,并检查断言是否成功。如果断言失败,
//则会尝试将obj参数断言为cache.DeletedFinalStateUnknown类型。如果这个断言也失败了,函数会记录一个错误信息并返回。
//如果断言成功,则会尝试从tombstone中获取对象,并再次断言该对象是否为apps.Deployment类型。如果断言失败,则会记录一个错误信息并返回。
//如果成功断言出对象为apps.Deployment类型,则会使用logger记录一条信息,表示正在删除该部署,
//并调用dc.enqueueDeployment方法将该部署加入队列,以便进一步处理。
// addReplicaSet enqueues the deployment that manages a ReplicaSet when the ReplicaSet is created.
func (dc *DeploymentController) addReplicaSet(logger klog.Logger, obj interface{}) {
rs := obj.(*apps.ReplicaSet)
if rs.DeletionTimestamp != nil {
// On a restart of the controller manager, it's possible for an object to
// show up in a state that is already pending deletion.
dc.deleteReplicaSet(logger, rs)
return
}
// If it has a ControllerRef, that's all that matters.
if controllerRef := metav1.GetControllerOf(rs); controllerRef != nil {
d := dc.resolveControllerRef(rs.Namespace, controllerRef)
if d == nil {
return
}
logger.V(4).Info("ReplicaSet added", "replicaSet", klog.KObj(rs))
dc.enqueueDeployment(d)
return
}
//该函数是Go语言编写的,用于在创建ReplicaSet时,将管理该ReplicaSet的Deployment加入队列中。
//函数首先判断传入的obj对象是否为ReplicaSet类型,并通过判断ReplicaSet的DeletionTimestamp是否为空来确定是否需要删除该ReplicaSet。
//如果ReplicaSet有ControllerRef,则通过resolveControllerRef函数解析出对应的Deployment,并将其加入队列中。
// Otherwise, it's an orphan. Get a list of all matching Deployments and sync
// them to see if anyone wants to adopt it.
ds := dc.getDeploymentsForReplicaSet(logger, rs)
if len(ds) == 0 {
return
}
logger.V(4).Info("Orphan ReplicaSet added", "replicaSet", klog.KObj(rs))
for _, d := range ds {
dc.enqueueDeployment(d)
}
}
//该函数主要处理孤儿ReplicaSet的情况。
//它会获取与该ReplicaSet匹配的所有Deployment列表,并尝试同步这些Deployment,看是否有Deployment愿意采用该ReplicaSet。
//如果找到了对应的Deployment,则将其加入到队列中以便进一步处理。
// getDeploymentsForReplicaSet returns a list of Deployments that potentially
// match a ReplicaSet.
func (dc *DeploymentController) getDeploymentsForReplicaSet(logger klog.Logger, rs *apps.ReplicaSet) []*apps.Deployment {
deployments, err := util.GetDeploymentsForReplicaSet(dc.dLister, rs)
if err != nil || len(deployments) == 0 {
return nil
}
// Because all ReplicaSet's belonging to a deployment should have a unique label key,
// there should never be more than one deployment returned by the above method.
// If that happens we should probably dynamically repair the situation by ultimately
// trying to clean up one of the controllers, for now we just return the older one
if len(deployments) > 1 {
// ControllerRef will ensure we don't do anything crazy, but more than one
// item in this list nevertheless constitutes user error.
logger.V(4).Info("user error! more than one deployment is selecting replica set",
"replicaSet", klog.KObj(rs), "labels", rs.Labels, "deployment", klog.KObj(deployments[0]))
}
return deployments
}
//函数用于获取与给定ReplicaSet匹配的所有Deployment列表。
//它首先调用util.GetDeploymentsForReplicaSet函数来获取匹配的Deployment列表,如果出现错误或列表为空,则返回nil。
//如果获取的Deployment列表长度大于1,则记录错误日志,并返回列表中的第一个Deployment。
//在正常情况下,返回获取的Deployment列表。
// updateReplicaSet figures out what deployment(s) manage a ReplicaSet when the ReplicaSet
// is updated and wake them up. If the anything of the ReplicaSets have changed, we need to
// awaken both the old and new deployments. old and cur must be *apps.ReplicaSet
// types.
func (dc *DeploymentController) updateReplicaSet(logger klog.Logger, old, cur interface{}) {
curRS := cur.(*apps.ReplicaSet)
oldRS := old.(*apps.ReplicaSet)
if curRS.ResourceVersion == oldRS.ResourceVersion {
// Periodic resync will send update events for all known replica sets.
// Two different versions of the same replica set will always have different RVs.
return
}
//该函数用于在更新ReplicaSet时,确定由哪个部署管理该ReplicaSet,并唤醒它们。
//如果ReplicaSet的任何内容发生变化,需要唤醒旧的和新的部署。old和cur必须是指向apps.ReplicaSet类型的指针。
//函数首先将传入的old和cur参数转换为*apps.ReplicaSet类型。
//然后,它比较两个ReplicaSet的ResourceVersion字段。
//如果它们相等,则表示这是周期性同步发送的更新事件,而对于同一ReplicaSet的两个不同版本,它们的ResourceVersion总会有不同的值。
//在这种情况下,函数直接返回,不做任何处理。
curControllerRef := metav1.GetControllerOf(curRS)
oldControllerRef := metav1.GetControllerOf(oldRS)
controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
if controllerRefChanged && oldControllerRef != nil {
// The ControllerRef was changed. Sync the old controller, if any.
if d := dc.resolveControllerRef(oldRS.Namespace, oldControllerRef); d != nil {
dc.enqueueDeployment(d)
}
}
//这段Go代码主要关注于检查和处理两个ReplicaSet(curRS和oldRS)的ControllerRef是否发生变化,并根据变化情况同步旧的控制器。
//1. 首先,通过metav1.GetControllerOf函数获取curRS和oldRS的ControllerRef。
//2. 然后,使用reflect.DeepEqual函数比较curControllerRef和oldControllerRef是否相等。
//3. 如果两个ControllerRef不相等且oldControllerRef不为nil,则认为ControllerRef发生了变化。
//4. 当ControllerRef发生变化时,需要同步旧的控制器。通过dc.resolveControllerRef函数解析oldRS的命名空间和ControllerRef,
//得到对应的Deployment。
//5. 最后,如果解析成功(d!=nil),则将该Deployment加入到调度队列中,以便进一步处理。
// If it has a ControllerRef, that's all that matters.
if curControllerRef != nil {
d := dc.resolveControllerRef(curRS.Namespace, curControllerRef)
if d == nil {
return
}
logger.V(4).Info("ReplicaSet updated", "replicaSet", klog.KObj(curRS))
dc.enqueueDeployment(d)
return
}
//这个Go函数主要检查当前的curControllerRef是否为nil。
//如果不为nil,则通过dc.resolveControllerRef方法解析curControllerRef,并检查解析结果是否为nil。
//如果不为nil,则记录日志并使用dc.enqueueDeployment方法将解析结果加入到队列中。
// Otherwise, it's an orphan. If anything changed, sync matching controllers
// to see if anyone wants to adopt it now.
labelChanged := !reflect.DeepEqual(curRS.Labels, oldRS.Labels)
if labelChanged || controllerRefChanged {
ds := dc.getDeploymentsForReplicaSet(logger, curRS)
if len(ds) == 0 {
return
}
logger.V(4).Info("Orphan ReplicaSet updated", "replicaSet", klog.KObj(curRS))
for _, d := range ds {
dc.enqueueDeployment(d)
}
}
}
//这段Go代码是处理孤儿ReplicaSet的逻辑。
//如果ReplicaSet的标签或控制器引用发生了变化,它会尝试同步匹配的控制器,以查看是否有控制器愿意现在采用它。具体步骤如下:
//1. 检查当前ReplicaSet的标签和旧ReplicaSet的标签是否相等,如果不相等,则标记标签发生变化;
//2. 检查控制器引用是否发生变化;
//3. 如果标签发生变化或控制器引用发生变化,则获取当前ReplicaSet对应的部署列表;
//4. 如果部署列表为空,则直接返回;
//5. 输出日志信息,表示孤儿ReplicaSet已更新;
//6. 遍历部署列表,将每个部署对象加入到队列中,以便进一步处理。
// deleteReplicaSet enqueues the deployment that manages a ReplicaSet when
// the ReplicaSet is deleted. obj could be an *apps.ReplicaSet, or
// a DeletionFinalStateUnknown marker item.
func (dc *DeploymentController) deleteReplicaSet(logger klog.Logger, obj interface{}) {
rs, ok := obj.(*apps.ReplicaSet)
//该函数是Go语言编写的,属于DeploymentController类型的一个方法,方法名为deleteReplicaSet。
//该方法接收一个logger和一个obj参数,其中logger用于记录日志,obj是一个接口类型,
//可以是*apps.ReplicaSet类型或者DeletionFinalStateUnknown标记项。
//方法的主要功能是从obj中解析出*apps.ReplicaSet类型的rs,并判断解析是否成功。
// When a delete is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone object which contains
// the deleted key/value. Note that this value might be stale. If the ReplicaSet
// changed labels the new deployment will not be woken up till the periodic resync.
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
return
}
rs, ok = tombstone.Obj.(*apps.ReplicaSet)
if !ok {
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a ReplicaSet %#v", obj))
return
}
}
//这段Go代码是处理从存储中获取对象时,如果对象已被删除的情况。
//如果获取对象失败,会检查对象是否是一个 tombstone 对象(即已被删除的对象)。
//如果是 tombstone 对象,则尝试从 tombstone 中恢复被删除的对象。如果恢复失败,则记录错误信息。
controllerRef := metav1.GetControllerOf(rs)
if controllerRef == nil {
// No controller should care about orphans being deleted.
return
}
d := dc.resolveControllerRef(rs.Namespace, controllerRef)
if d == nil {
return
}
logger.V(4).Info("ReplicaSet deleted", "replicaSet", klog.KObj(rs))
dc.enqueueDeployment(d)
}
//该函数主要实现当一个ReplicaSet被删除时,检查是否有对应的Deployment控制器,并将该Deployment加入到队列中以便进一步处理。具体流程如下:
//1. 通过metav1.GetControllerOf(rs)获取ReplicaSet的控制器引用;
//2. 如果控制器引用为空,则表示没有对应的Deployment控制器,直接返回;
//3. 调用dc.resolveControllerRef(rs.Namespace, controllerRef)解析控制器引用,获取对应的Deployment对象;
//4. 如果解析失败或返回的Deployment对象为空,则直接返回;
//5. 使用logger.V(4).Info记录日志,表示ReplicaSet已被删除;
//6. 调用dc.enqueueDeployment(d)将对应的Deployment对象加入到队列中,以便进一步处理。
// deletePod will enqueue a Recreate Deployment once all of its pods have stopped running.
func (dc *DeploymentController) deletePod(logger klog.Logger, obj interface{}) {
pod, ok := obj.(*v1.Pod)
//该函数是一个Go语言函数,名为deletePod,它属于DeploymentController类型。
//函数通过传入的logger和obj参数,将一个Recreate Deployment加入队列,但只有当该Deployment的所有Pod都停止运行时才会执行。
//函数首先尝试将obj参数断言为*v1.Pod类型,并检查断言是否成功。
// When a delete is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone object which contains
// the deleted key/value. Note that this value might be stale. If the Pod
// changed labels the new deployment will not be woken up till the periodic resync.
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
return
}
pod, ok = tombstone.Obj.(*v1.Pod)
if !ok {
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj))
return
}
}
d := dc.getDeploymentForPod(logger, pod)
if d == nil {
return
}
logger.V(4).Info("Pod deleted", "pod", klog.KObj(pod))
if d.Spec.Strategy.Type == apps.RecreateDeploymentStrategyType {
// Sync if this Deployment now has no more Pods.
rsList, err := util.ListReplicaSets(d, util.RsListFromClient(dc.client.AppsV1()))
if err != nil {
return
}
podMap, err := dc.getPodMapForDeployment(d, rsList)
if err != nil {
return
}
numPods := 0
for _, podList := range podMap {
numPods += len(podList)
}
if numPods == 0 {
dc.enqueueDeployment(d)
}
}
}
//这段Go代码是用于处理Kubernetes中Pod删除事件的逻辑。
//- 当一个Pod被删除时,如果控制器(例如Deployment)没有及时收到该事件,就会在存储中注意到这个Pod不在列表中,
//从而插入一个包含删除的键值对的墓碑对象(tombstone object)。
//- 如果墓碑对象中的对象不是Pod类型,函数会记录错误并返回。
//- 函数会尝试获取与该Pod关联的Deployment对象,如果获取不到则直接返回。
//- 如果该Deployment的策略是Recreate类型,则会检查该Deployment下是否已经没有Pod了。
//- 如果没有Pod了,则将该Deployment加入到队列中,以便进一步处理。
//这段代码的主要目的是确保在Pod被删除时,与之关联的Deployment能够及时地进行同步和更新。
func (dc *DeploymentController) enqueue(deployment *apps.Deployment) {
key, err := controller.KeyFunc(deployment)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", deployment, err))
return
}
dc.queue.Add(key)
}
//该函数是一个Go语言的方法,定义在一个名为DeploymentController的结构体类型上。
//方法名为enqueue,它接收一个参数deployment,类型为*apps.Deployment,表示一个Kubernetes部署对象的指针。
//该方法的主要功能是将给定的部署对象加入到一个队列中,以便后续处理。具体步骤如下:
//1. 调用controller.KeyFunc(deployment)函数,尝试获取部署对象的键值(通常是一个字符串),用于在队列中唯一标识该对象。
//2. 如果获取键值时出现错误,通过utilruntime.HandleError函数处理错误,并打印错误信息。然后直接返回,不将对象加入队列。
//3. 如果成功获取了键值,将其添加到dc.queue(一个队列对象)中,以便后续处理。
//总结:该方法用于将一个Kubernetes部署对象加入到队列中,以便后续进行处理。
func (dc *DeploymentController) enqueueRateLimited(deployment *apps.Deployment) {
key, err := controller.KeyFunc(deployment)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", deployment, err))
return
}
dc.queue.AddRateLimited(key)
}
//该函数用于将指定的部署对象加入到队列中,并对其进行速率限制。
//首先,通过调用controller.KeyFunc方法获取该部署对象的键值,如果获取失败,则通过utilruntime.HandleError方法记录错误信息并返回。
//如果获取成功,则调用dc.queue.AddRateLimited方法将键值加入到队列中,并对其进行速率限制。
// enqueueAfter will enqueue a deployment after the provided amount of time.
func (dc *DeploymentController) enqueueAfter(deployment *apps.Deployment, after time.Duration) {
key, err := controller.KeyFunc(deployment)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", deployment, err))
return
}
dc.queue.AddAfter(key, after)
}
//该函数将一个部署对象加入到队列中,但会在指定的时间延迟后才执行。
//首先,函数通过调用controller.KeyFunc方法获取部署对象的键值,如果获取失败,则通过utilruntime.HandleError方法记录错误信息并返回。
//接着,函数使用dc.queue.AddAfter方法将键值加入到队列中,并指定延迟执行的时间。
// getDeploymentForPod returns the deployment managing the given Pod.
func (dc *DeploymentController) getDeploymentForPod(logger klog.Logger, pod *v1.Pod) *apps.Deployment {
// Find the owning replica set
var rs *apps.ReplicaSet
var err error
controllerRef := metav1.GetControllerOf(pod)
if controllerRef == nil {
// No controller owns this Pod.
return nil
}
if controllerRef.Kind != apps.SchemeGroupVersion.WithKind("ReplicaSet").Kind {
// Not a pod owned by a replica set.
return nil
}
rs, err = dc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name)
if err != nil || rs.UID != controllerRef.UID {
logger.V(4).Info("Cannot get replicaset for pod", "ownerReference", controllerRef.Name, "pod", klog.KObj(pod), "err", err)
return nil
}
//该函数用于获取管理给定Pod的Deployment。
//- 首先,它查找拥有该Pod的ReplicaSet。
//- 如果找不到拥有者的Pod,则返回nil。 -
//如果Pod的拥有者不是ReplicaSet,则返回nil。
//- 然后,尝试根据Pod的拥有者名称获取ReplicaSet。
//- 如果获取失败或获取到的ReplicaSet的UID与拥有者的UID不匹配,则返回nil。
//- 最后,返回获取到的Deployment。
// Now find the Deployment that owns that ReplicaSet.
controllerRef = metav1.GetControllerOf(rs)
if controllerRef == nil {
return nil
}
return dc.resolveControllerRef(rs.Namespace, controllerRef)
}
//这个函数的作用是通过 ReplicaSet 的 controllerRef 找到对应的 Deployment。
//首先通过 metav1.GetControllerOf(rs) 获取到 controllerRef,
//如果 controllerRef 为空则返回 nil,否则调用 dc.resolveControllerRef(rs.Namespace, controllerRef) 解析并返回对应的 Deployment。
// resolveControllerRef returns the controller referenced by a ControllerRef,
// or nil if the ControllerRef could not be resolved to a matching controller
// of the correct Kind.
func (dc *DeploymentController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.Deployment {
// We can't look up by UID, so look up by Name and then verify UID.
// Don't even try to look up by Name if it's the wrong Kind.
if controllerRef.Kind != controllerKind.Kind {
return nil
}
d, err := dc.dLister.Deployments(namespace).Get(controllerRef.Name)
if err != nil {
return nil
}
if d.UID != controllerRef.UID {
// The controller we found with this Name is not the same one that the
// ControllerRef points to.
return nil
}
return d
}
//该函数是一个Go语言函数,用于解析一个ControllerRef引用所指向的控制器,如果无法解析出正确的控制器,则返回nil。
//函数接受三个参数: - namespace:字符串类型,表示命名空间。
//- controllerRef:指向metav1.OwnerReference类型的指针。
//函数返回一个指向apps.Deployment类型的指针。
//函数的主要步骤如下:
//1. 首先,检查controllerRef的Kind属性是否与controllerKind.Kind相等,如果不相等,则直接返回nil。
//2. 如果controllerRef的Kind属性与controllerKind.Kind相等,则通过dc.dLister.Deployments(namespace).Get(controllerRef.Name)获取具有相同名称的部署对象。
//3. 如果获取部署对象时出现错误,则返回nil。
//4. 最后,检查获取到的部署对象的UID属性是否与controllerRef的UID属性相等,如果不相等,则返回nil,否则返回该部署对象。
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (dc *DeploymentController) worker(ctx context.Context) {
for dc.processNextWorkItem(ctx) {
}
}
func (dc *DeploymentController) processNextWorkItem(ctx context.Context) bool {
key, quit := dc.queue.Get()
if quit {
return false
}
defer dc.queue.Done(key)
err := dc.syncHandler(ctx, key.(string))
dc.handleErr(ctx, err, key)
return true
}
//这段Go代码定义了两个函数:worker 和 processNextWorkItem,它们用于在DeploymentController中处理工作队列中的项。
//1. worker函数是一个无限循环,它不断地调用processNextWorkItem函数来处理队列中的下一个工作项,直到没有更多工作项需要处理为止。
//该函数接受一个context.Context参数,用于控制函数的取消或超时。
//2. processNextWorkItem函数从队列中获取下一个工作项的键,并调用syncHandler函数来处理该工作项。
//如果处理成功,它会标记该工作项为已完成。
//该函数也接受一个context.Context参数,并在处理完成后对其进行取消操作。
//该函数返回一个布尔值,表示是否成功处理了工作项。
//这段代码的主要目的是在DeploymentController中使用工作队列并发地处理工作项,并确保相同的键不会被并发处理。
func (dc *DeploymentController) handleErr(ctx context.Context, err error, key interface{}) {
logger := klog.FromContext(ctx)
if err == nil || errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
dc.queue.Forget(key)
return
}
ns, name, keyErr := cache.SplitMetaNamespaceKey(key.(string))
if keyErr != nil {
logger.Error(err, "Failed to split meta namespace cache key", "cacheKey", key)
}
if dc.queue.NumRequeues(key) < maxRetries {
logger.V(2).Info("Error syncing deployment", "deployment", klog.KRef(ns, name), "err", err)
dc.queue.AddRateLimited(key)
return
}
utilruntime.HandleError(err)
logger.V(2).Info("Dropping deployment out of the queue", "deployment", klog.KRef(ns, name), "err", err)
dc.queue.Forget(key)
}
//该函数是Go语言中的一个处理错误的函数,它属于DeploymentController类型。
//函数通过传入的上下文、错误和键来执行错误处理逻辑。
//首先,函数从上下文中获取日志记录器。如果错误为nil或错误的原因是命名空间终止,则将键从队列中忘记并返回。
//接下来,函数尝试将键拆分为命名空间和名称,并检查拆分是否成功。如果拆分失败,则记录错误信息。
//如果队列中键的重试次数小于最大重试次数,则记录错误信息,并将键添加到速率限制队列中。
//如果以上条件都不满足,则处理错误,并记录信息,将键从队列中忘记。
// getReplicaSetsForDeployment uses ControllerRefManager to reconcile
// ControllerRef by adopting and orphaning.
// It returns the list of ReplicaSets that this Deployment should manage.
func (dc *DeploymentController) getReplicaSetsForDeployment(ctx context.Context, d *apps.Deployment) ([]*apps.ReplicaSet, error) {
// List all ReplicaSets to find those we own but that no longer match our
// selector. They will be orphaned by ClaimReplicaSets().
rsList, err := dc.rsLister.ReplicaSets(d.Namespace).List(labels.Everything())
if err != nil {
return nil, err
}
deploymentSelector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector)
if err != nil {
return nil, fmt.Errorf("deployment %s/%s has invalid label selector: %v", d.Namespace, d.Name, err)
}
// If any adoptions are attempted, we should first recheck for deletion with
// an uncached quorum read sometime after listing ReplicaSets (see #42639).
canAdoptFunc := controller.RecheckDeletionTimestamp(func(ctx context.Context) (metav1.Object, error) {
fresh, err := dc.client.AppsV1().Deployments(d.Namespace).Get(ctx, d.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
if fresh.UID != d.UID {
return nil, fmt.Errorf("original Deployment %v/%v is gone: got uid %v, wanted %v", d.Namespace, d.Name, fresh.UID, d.UID)
}
return fresh, nil
})
cm := controller.NewReplicaSetControllerRefManager(dc.rsControl, d, deploymentSelector, controllerKind, canAdoptFunc)
return cm.ClaimReplicaSets(ctx, rsList)
}
//该函数用于通过ControllerRefManager来协调ControllerRef,实现采用和孤儿化操作。
//它返回一个列表,其中包含该Deployment应管理的ReplicaSets。 函数首先列出所有ReplicaSets,以找到我们拥有但不再匹配我们选择器的那些。
//它们将通过ClaimReplicaSets()函数被孤儿化。
//然后,函数将根据Deployment的规范选择器生成一个标签选择器。如果任何收养尝试都进行了,
//函数将首先在列出ReplicaSets后重新检查删除时间戳(请参阅#42639)。
//最后,函数创建一个ReplicaSetControllerRefManager,并使用ClaimReplicaSets函数来声明应管理的ReplicaSets列表
// getPodMapForDeployment returns the Pods managed by a Deployment.
//
// It returns a map from ReplicaSet UID to a list of Pods controlled by that RS,
// according to the Pod's ControllerRef.
// NOTE: The pod pointers returned by this method point the pod objects in the cache and thus
// shouldn't be modified in any way.
func (dc *DeploymentController) getPodMapForDeployment(d *apps.Deployment, rsList []*apps.ReplicaSet) (map[types.UID][]*v1.Pod, error) {
// Get all Pods that potentially belong to this Deployment.
selector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector)
if err != nil {
return nil, err
}
pods, err := dc.podLister.Pods(d.Namespace).List(selector)
if err != nil {
return nil, err
}
// Group Pods by their controller (if it's in rsList).
podMap := make(map[types.UID][]*v1.Pod, len(rsList))
for _, rs := range rsList {
podMap[rs.UID] = []*v1.Pod{}
}
for _, pod := range pods {
// Do not ignore inactive Pods because Recreate Deployments need to verify that no
// Pods from older versions are running before spinning up new Pods.
controllerRef := metav1.GetControllerOf(pod)
if controllerRef == nil {
continue
}
// Only append if we care about this UID.
if _, ok := podMap[controllerRef.UID]; ok {
podMap[controllerRef.UID] = append(podMap[controllerRef.UID], pod)
}
}
return podMap, nil
}
//该函数用于返回由Deployment管理的Pods的映射。
//它根据Pod的ControllerRef将Pod分组为其控制器(如果在rsList中)。
//函数首先根据Deployment的规范选择器获取所有可能属于该Deployment的Pods 。
//然后,它遍历这些Pods,并通过其ControllerRef将它们分组到podMap中。
//函数返回一个映射,其中键是ReplicaSet的UID,值是由该RS控制的Pod列表。
//注意,该函数返回的Pod指针指向缓存中的Pod对象,因此不应以任何方式修改它们。
// syncDeployment will sync the deployment with the given key.
// This function is not meant to be invoked concurrently with the same key.
func (dc *DeploymentController) syncDeployment(ctx context.Context, key string) error {
logger := klog.FromContext(ctx)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
logger.Error(err, "Failed to split meta namespace cache key", "cacheKey", key)
return err
}
//该函数是一个Go语言函数,名为syncDeployment,属于DeploymentController结构体。
//它接收一个ctx context.Context参数和一个key string参数,并返回一个error类型值。函数主要用于同步指定键值的部署信息。
//函数首先从上下文中获取日志记录器,然后使用cache.SplitMetaNamespaceKey函数将键值拆分为命名空间和名称。
//如果拆分过程中出现错误,则记录错误日志并返回该错误。
startTime := time.Now()
logger.V(4).Info("Started syncing deployment", "deployment", klog.KRef(namespace, name), "startTime", startTime)
defer func() {
logger.V(4).Info("Finished syncing deployment", "deployment", klog.KRef(namespace, name), "duration", time.Since(startTime))
}()
//这段Go代码主要实现了在开始和结束同步部署时记录日志的功能。
//- 首先,通过time.Now()获取当前时间作为开始时间,并使用logger.V(4).Info记录开始同步部署的日志,其中包含了部署的名称、命名空间和开始时间。
//- 然后,使用defer关键字定义了一个匿名函数,在函数执行结束后会自动执行该函数。
//该匿名函数使用logger.V(4).Info记录结束同步部署的日志,其中包含了部署的名称、命名空间和同步部署所花费的时间。
//通过这种方式,可以在日志中方便地查看部署的同步状态和耗时。
deployment, err := dc.dLister.Deployments(namespace).Get(name)
if errors.IsNotFound(err) {
logger.V(2).Info("Deployment has been deleted", "deployment", klog.KRef(namespace, name))
return nil
}
if err != nil {
return err
}
//该函数通过调用dc.dLister.Deployments(namespace).Get(name)获取指定命名空间中名为name的部署对象。
//如果该部署对象不存在,则记录日志并返回nil;如果存在其他错误,则直接返回错误。
// Deep-copy otherwise we are mutating our cache.
// TODO: Deep-copy only when needed.
d := deployment.DeepCopy()
everything := metav1.LabelSelector{}
if reflect.DeepEqual(d.Spec.Selector, &everything) {
dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
if d.Status.ObservedGeneration < d.Generation {
d.Status.ObservedGeneration = d.Generation
dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
}
return nil
}
//这段Go代码中的函数主要功能是对一个deployment对象进行深拷贝,并检查其Selector是否等于一个空的LabelSelector对象。
//如果是,则记录一条警告事件并更新deployment的状态。
// List ReplicaSets owned by this Deployment, while reconciling ControllerRef
// through adoption/orphaning.
rsList, err := dc.getReplicaSetsForDeployment(ctx, d)
if err != nil {
return err
}
// List all Pods owned by this Deployment, grouped by their ReplicaSet.
// Current uses of the podMap are:
//
// * check if a Pod is labeled correctly with the pod-template-hash label.
// * check that no old Pods are running in the middle of Recreate Deployments.
podMap, err := dc.getPodMapForDeployment(d, rsList)
if err != nil {
return err
}
if d.DeletionTimestamp != nil {
return dc.syncStatusOnly(ctx, d, rsList)
}
// Update deployment conditions with an Unknown condition when pausing/resuming
// a deployment. In this way, we can be sure that we won't timeout when a user
// resumes a Deployment with a set progressDeadlineSeconds.
if err = dc.checkPausedConditions(ctx, d); err != nil {
return err
}
if d.Spec.Paused {
return dc.sync(ctx, d, rsList)
}
// rollback is not re-entrant in case the underlying replica sets are updated with a new
// revision so we should ensure that we won't proceed to update replica sets until we
// make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
if getRollbackTo(d) != nil {
return dc.rollback(ctx, d, rsList)
}
scalingEvent, err := dc.isScalingEvent(ctx, d, rsList)
if err != nil {
return err
}
if scalingEvent {
return dc.sync(ctx, d, rsList)
}
switch d.Spec.Strategy.Type {
case apps.RecreateDeploymentStrategyType:
return dc.rolloutRecreate(ctx, d, rsList, podMap)
case apps.RollingUpdateDeploymentStrategyType:
return dc.rolloutRolling(ctx, d, rsList)
}
return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}
//该函数主要负责处理Deployment的更新和同步逻辑。
//1. 首先,函数会获取该Deployment所拥有的ReplicaSet列表和Pod的映射关系。
//2. 如果该Deployment已被删除,则只同步状态。
//3. 检查是否暂停,若暂停则只进行同步操作。
//4. 如果需要回滚,则执行回滚操作。
//5. 检测是否为缩放事件,若是则进行同步操作。
//6. 根据Deployment的策略类型(Recreate或RollingUpdate),执行相应的更新操作。
//7. 如果遇到意外的部署策略类型,返回错误。