/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"context"
"crypto/tls"
"errors"
"fmt"
"math"
"net"
"net/http"
"os"
"path/filepath"
sysruntime "runtime"
"sort"
"sync"
"sync/atomic"
"time"
cadvisorapi "github.com/google/cadvisor/info/v1"
"github.com/google/go-cmp/cmp"
"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
"go.opentelemetry.io/otel/trace"
"k8s.io/client-go/informers"
netutils "k8s.io/utils/net"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/certificate"
"k8s.io/client-go/util/flowcontrol"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/component-helpers/apimachinery/lease"
internalapi "k8s.io/cri-api/pkg/apis"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
pluginwatcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/api/v1/resource"
"k8s.io/kubernetes/pkg/features"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate"
"k8s.io/kubernetes/pkg/kubelet/cloudresource"
"k8s.io/kubernetes/pkg/kubelet/clustertrustbundle"
"k8s.io/kubernetes/pkg/kubelet/cm"
draplugin "k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/configmap"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/cri/remote"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/eviction"
"k8s.io/kubernetes/pkg/kubelet/images"
"k8s.io/kubernetes/pkg/kubelet/kuberuntime"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/logs"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
"k8s.io/kubernetes/pkg/kubelet/network/dns"
"k8s.io/kubernetes/pkg/kubelet/nodeshutdown"
oomwatcher "k8s.io/kubernetes/pkg/kubelet/oom"
"k8s.io/kubernetes/pkg/kubelet/pleg"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager"
plugincache "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/preemption"
"k8s.io/kubernetes/pkg/kubelet/prober"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/runtimeclass"
"k8s.io/kubernetes/pkg/kubelet/secret"
"k8s.io/kubernetes/pkg/kubelet/server"
servermetrics "k8s.io/kubernetes/pkg/kubelet/server/metrics"
serverstats "k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/kubelet/stats"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/kubelet/sysctl"
"k8s.io/kubernetes/pkg/kubelet/token"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/userns"
"k8s.io/kubernetes/pkg/kubelet/userns/inuserns"
"k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/kubelet/util/manager"
"k8s.io/kubernetes/pkg/kubelet/util/queue"
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
"k8s.io/kubernetes/pkg/kubelet/volumemanager"
httpprobe "k8s.io/kubernetes/pkg/probe/http"
"k8s.io/kubernetes/pkg/security/apparmor"
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csi"
"k8s.io/kubernetes/pkg/volume/util/hostutil"
"k8s.io/kubernetes/pkg/volume/util/subpath"
"k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
"k8s.io/utils/clock"
)
const (
// Max amount of time to wait for the container runtime to come up.
maxWaitForContainerRuntime = 30 * time.Second
// nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed.
nodeStatusUpdateRetry = 5
// nodeReadyGracePeriod is the period to allow for before fast status update is
// terminated and container runtime not being ready is logged without verbosity guard.
nodeReadyGracePeriod = 120 * time.Second
// DefaultContainerLogsDir is the location of container logs.
DefaultContainerLogsDir = "/var/log/containers"
// MaxContainerBackOff is the max backoff period, exported for the e2e test
MaxContainerBackOff = 300 * time.Second
// Period for performing global cleanup tasks.
housekeepingPeriod = time.Second * 2
// Duration at which housekeeping failed to satisfy the invariant that
// housekeeping should be fast to avoid blocking pod config (while
// housekeeping is running no new pods are started or deleted).
housekeepingWarningDuration = time.Second * 1
// Period after which the runtime cache expires - set to slightly longer than
// the expected length between housekeeping periods, which explicitly refreshes
// the cache.
runtimeCacheRefreshPeriod = housekeepingPeriod + housekeepingWarningDuration
// Period for performing eviction monitoring.
// ensure this is kept in sync with internal cadvisor housekeeping.
evictionMonitoringPeriod = time.Second * 10
// The path in containers' filesystems where the hosts file is mounted.
linuxEtcHostsPath = "/etc/hosts"
windowsEtcHostsPath = "C:\\Windows\\System32\\drivers\\etc\\hosts"
// Capacity of the channel for receiving pod lifecycle events. This number
// is a bit arbitrary and may be adjusted in the future.
plegChannelCapacity = 1000
// Generic PLEG relies on relisting for discovering container events.
// A longer period means that kubelet will take longer to detect container
// changes and to update pod status. On the other hand, a shorter period
// will cause more frequent relisting (e.g., container runtime operations),
// leading to higher cpu usage.
// Note that even though we set the period to 1s, the relisting itself can
// take more than 1s to finish if the container runtime responds slowly
// and/or when there are many container changes in one cycle.
genericPlegRelistPeriod = time.Second * 1
genericPlegRelistThreshold = time.Minute * 3
// Generic PLEG relist period and threshold when used with Evented PLEG.
eventedPlegRelistPeriod = time.Second * 300
eventedPlegRelistThreshold = time.Minute * 10
eventedPlegMaxStreamRetries = 5
// backOffPeriod is the period to back off when pod syncing results in an
// error. It is also used as the base period for the exponential backoff
// container restarts and image pulls.
backOffPeriod = time.Second * 10
// ContainerGCPeriod is the period for performing container garbage collection.
ContainerGCPeriod = time.Minute
// ImageGCPeriod is the period for performing image garbage collection.
ImageGCPeriod = 5 * time.Minute
// Minimum number of dead containers to keep in a pod
minDeadContainerInPod = 1
// nodeLeaseRenewIntervalFraction is the fraction of lease duration to renew the lease
nodeLeaseRenewIntervalFraction = 0.25
// instrumentationScope is the name of OpenTelemetry instrumentation scope
instrumentationScope = "k8s.io/kubernetes/pkg/kubelet"
)
//这段代码定义了一系列常量,用于配置和控制Kubernetes节点上的Kubelet组件的行为。
//Kubelet负责管理节点上的容器,包括启动、停止、监控和更新容器的状态。
//- maxWaitForContainerRuntime:等待容器运行时启动的最大时间。
//- nodeStatusUpdateRetry:当发布节点状态失败时,kubelet重试的次数。
//- nodeReadyGracePeriod:在快速状态更新终止之前允许的时间,如果容器运行时未准备好,则记录日志而不进行详细保护。
//- DefaultContainerLogsDir:容器日志的默认位置。 - MaxContainerBackOff:容器重启的最大退避期,用于E2E测试。
//- housekeepingPeriod:执行全局清理任务的周期。
//- housekeepingWarningDuration:如果清理工作未能满足清理工作应快速进行的不变性条件(在清理工作运行时,不会启动或删除新的Pod),则会发出警告的时间。
//- runtimeCacheRefreshPeriod:运行时缓存的刷新周期,设置为比清理周期稍长的时间,清理周期显式刷新缓存。
//- evictionMonitoringPeriod:执行驱逐监控的周期,确保与内部cadvisor清理保持同步。
//- linuxEtcHostsPath:在Linux容器的文件系统中,主机文件被挂载的路径。
//- windowsEtcHostsPath:在Windows容器的文件系统中,主机文件被挂载的路径。
//- plegChannelCapacity:接收Pod生命周期事件的通道容量。
//- genericPlegRelistPeriod:通用PLEG重新列出的周期,用于发现容器事件。
//较长的周期意味着kubelet需要更长的时间来检测容器更改并更新Pod状态。
//另一方面,较短的周期会导致更频繁的重新列出(例如,容器运行时操作),导致CPU使用率更高。 -
//genericPlegRelistThreshold:通用PLEG重新列出的阈值。
//- eventedPlegRelistPeriod:使用Evented PLEG时的重新列出周期。
//- eventedPlegRelistThreshold:使用Evented PLEG时的重新列出阈值。
//- eventedPlegMaxStreamRetries:使用Evented PLEG时的最大流重试次数。
//- backOffPeriod:当Pod同步结果出现错误时的退避周期。它还用作容器重启和图像拉取的指数退避的基周期。
//- ContainerGCPeriod:执行容器垃圾收集的周期。
//- ImageGCPeriod:执行图像垃圾收集的周期。
//- minDeadContainerInPod:在Pod中保留的最少死亡容器数。
//- nodeLeaseRenewIntervalFraction:租约更新间隔的分数,为租约持续时间的四分之一。
//- instrumentationScope:OpenTelemetry仪器作用域的名称。
var (
// ContainerLogsDir can be overwritten for testing usage
ContainerLogsDir = DefaultContainerLogsDir
etcHostsPath = getContainerEtcHostsPath()
)
//这段Go代码定义了两个变量:
//1. ContainerLogsDir:用于存储容器日志的目录路径。它被初始化为 DefaultContainerLogsDir 的值。在测试场景下,可以重写该变量。
//2. etcHostsPath:表示容器中 hosts 文件的路径。它通过调用 getContainerEtcHostsPath() 函数进行初始化。
//这段代码的主要作用是配置容器日志目录和 hosts 文件路径,方便后续代码使用。
func getContainerEtcHostsPath() string {
if sysruntime.GOOS == "windows" {
return windowsEtcHostsPath
}
return linuxEtcHostsPath
}
//该函数用于获取容器的etc/hosts文件路径。
//根据操作系统的不同,返回不同的路径。
//如果操作系统是Windows,则返回windowsEtcHostsPath;否则返回linuxEtcHostsPath。
// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
HandlePodAdditions(pods []*v1.Pod)
HandlePodUpdates(pods []*v1.Pod)
HandlePodRemoves(pods []*v1.Pod)
HandlePodReconcile(pods []*v1.Pod)
HandlePodSyncs(pods []*v1.Pod)
HandlePodCleanups(ctx context.Context) error
}
//这是一个定义了多个处理Pod操作的方法的接口,用于Kubelet的测试。
//其中的方法包括处理Pod的添加、更新、移除、协调和同步,以及Pod的清理操作。
//这些方法分别接受一个Pod列表或上下文作为参数,并可能返回一个错误。
// Option is a functional option type for Kubelet
type Option func(*Kubelet)
//该函数是一个类型为Option的函数,参数为指向Kubelet类型的指针,并且没有返回值。Option类型定义了为Kubelet提供功能选项的方法。
// Bootstrap is a bootstrapping interface for kubelet, targets the initialization protocol
type Bootstrap interface {
GetConfiguration() kubeletconfiginternal.KubeletConfiguration
BirthCry()
StartGarbageCollection()
ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions, auth server.AuthInterface, tp trace.TracerProvider)
ListenAndServeReadOnly(address net.IP, port uint)
ListenAndServePodResources()
Run(<-chan kubetypes.PodUpdate)
RunOnce(<-chan kubetypes.PodUpdate) ([]RunPodResult, error)
}
//这个Go代码定义了一个名为Bootstrap的接口,用于Kubelet的初始化协议。该接口包含以下方法:
//1. GetConfiguration():返回KubeletConfiguration对象,表示Kubelet的配置。
//2. BirthCry():进行初始化时的“出生哭泣”操作,可能用于一些初始化的日志记录或资源申请等。
//3. StartGarbageCollection():启动垃圾回收机制,用于管理Kubelet运行时的资源清理。
//4. ListenAndServe():根据提供的Kubelet配置、TLS选项、认证接口和跟踪提供器,启动Kubelet的服务并进行监听。
//5. ListenAndServeReadOnly():在指定的IP地址和端口上启动只读服务,用于提供Kubelet的只读API。
//6. ListenAndServePodResources():启动一个服务来监听和处理Pod资源相关请求。
//7. Run():根据传入的Pod更新通道,持续运行Kubelet,处理Pod的生命周期事件。
//8. RunOnce():根据传入的Pod更新通道,一次性运行Kubelet,处理Pod的生命周期事件,并返回运行结果。
//这些方法涵盖了Kubelet初始化、配置、垃圾回收、服务监听和Pod管理等方面的功能。
// Dependencies is a bin for things we might consider "injected dependencies" -- objects constructed
// at runtime that are necessary for running the Kubelet. This is a temporary solution for grouping
// these objects while we figure out a more comprehensive dependency injection story for the Kubelet.
type Dependencies struct {
Options []Option
// Injected Dependencies
Auth server.AuthInterface
CAdvisorInterface cadvisor.Interface
Cloud cloudprovider.Interface
ContainerManager cm.ContainerManager
EventClient v1core.EventsGetter
HeartbeatClient clientset.Interface
OnHeartbeatFailure func()
KubeClient clientset.Interface
Mounter mount.Interface
HostUtil hostutil.HostUtils
OOMAdjuster *oom.OOMAdjuster
OSInterface kubecontainer.OSInterface
PodConfig *config.PodConfig
ProbeManager prober.Manager
Recorder record.EventRecorder
Subpather subpath.Interface
TracerProvider trace.TracerProvider
VolumePlugins []volume.VolumePlugin
DynamicPluginProber volume.DynamicPluginProber
TLSOptions *server.TLSOptions
RemoteRuntimeService internalapi.RuntimeService
RemoteImageService internalapi.ImageManagerService
PodStartupLatencyTracker util.PodStartupLatencyTracker
NodeStartupLatencyTracker util.NodeStartupLatencyTracker
// remove it after cadvisor.UsingLegacyCadvisorStats dropped.
useLegacyCadvisorStats bool
}
//该Go代码定义了一个名为Dependencies的结构体,用于存储Kubelet运行时所需的各种依赖项。
//这些依赖项包括各种接口、对象和配置,
//例如:
//- Auth:认证接口
//- CADvisorInterface:CADvisor接口
//- Cloud:云提供商接口
//- ContainerManager:容器管理器
//- EventClient:事件客户端
//- HeartbeatClient:心跳客户端
//- OnHeartbeatFailure:心跳失败时的回调函数
//- KubeClient:Kubernetes客户端
//- Mounter:挂载器
//- HostUtil:主机工具
//- OOMAdjuster:OOM调整器
//- OSInterface:操作系统接口
//- PodConfig:Pod配置
//- ProbeManager:探针管理器
//- Recorder:事件记录器
//- Subpather:子路径接口
//- TracerProvider:跟踪器提供者
//- VolumePlugins:卷插件列表
//- DynamicPluginProber:动态插件探针
//- TLSOptions:TLS选项
//- RemoteRuntimeService:远程运行时服务
//- RemoteImageService:远程镜像服务
//- PodStartupLatencyTracker:Pod启动延迟跟踪器
//- NodeStartupLatencyTracker:节点启动延迟跟踪器
//- useLegacyCadvisorStats:是否使用传统CADvisor统计信息的标志
//这些依赖项通过结构体成员变量的方式进行定义和存储,其中一些成员变量还包含了函数类型的变量。
//这个结构体的作用是为了临时解决依赖注入的问题,以便于管理和组织Kubelet的依赖项。
// makePodSourceConfig creates a config.PodConfig from the given
// KubeletConfiguration or returns an error.
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, nodeHasSynced func() bool) (*config.PodConfig, error) {
manifestURLHeader := make(http.Header)
if len(kubeCfg.StaticPodURLHeader) > 0 {
for k, v := range kubeCfg.StaticPodURLHeader {
for i := range v {
manifestURLHeader.Add(k, v[i])
}
}
}
//该函数根据给定的KubeletConfiguration,Dependencies,NodeName和nodeHasSynced函数创建一个config.PodConfig对象或返回错误。
//函数首先创建一个空的http.Header对象用于存储静态Pod URL的头部信息,
//然后根据kubeCfg.StaticPodURLHeader的值向manifestURLHeader中添加头部信息。
//最后,函数返回一个新创建的config.PodConfig对象和可能的错误。
// source of all configuration
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder, kubeDeps.PodStartupLatencyTracker)
//这个函数是配置管理的一部分,它创建了一个包含了所有配置信息的cfg对象。
//这个对象会被用来管理pod的配置信息,包括增量配置通知、事件记录器和Pod启动延迟追踪器。
// TODO: it needs to be replaced by a proper context in the future
ctx := context.TODO()
// define file config source
if kubeCfg.StaticPodPath != "" {
klog.InfoS("Adding static pod path", "path", kubeCfg.StaticPodPath)
config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.FileSource))
}
//这段Go代码中,首先创建了一个context.TODO()上下文,之后根据kubeCfg.StaticPodPath的值,如果其不为空,
//则通过config.NewSourceFile()函数将静态Pod路径添加到配置源中,并设置了节点名和文件检查频率。
// define url config source
if kubeCfg.StaticPodURL != "" {
klog.InfoS("Adding pod URL with HTTP header", "URL", kubeCfg.StaticPodURL, "header", manifestURLHeader)
config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.HTTPSource))
}
//这段Go代码是在配置Kubernetes集群时,用于添加静态Pod的URL地址作为配置源的代码片段。
//具体功能如下:
//1. 首先,代码会判断kubeCfg.StaticPodURL是否为空,如果不为空,则执行下面的操作。
//2. 通过klog.InfoS打印日志信息,记录正在添加的Pod URL地址以及相关的HTTP头信息。
//3. 调用config.NewSourceURL函数,将静态Pod的URL地址、HTTP头信息、节点名称、HTTP检查频率和配置通道等参数传入,创建一个新的URL配置源。
//4. 通过cfg.Channel获取配置通道,并将其作为参数传递给config.NewSourceURL函数。
//这段代码的主要作用是在Kubernetes集群中添加静态Pod的URL地址作为配置源,以便集群能够从该URL地址获取Pod的配置信息。
if kubeDeps.KubeClient != nil {
klog.InfoS("Adding apiserver pod source")
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(ctx, kubetypes.ApiserverSource))
}
return cfg, nil
}
//这段Go代码是在一个函数中的if语句块,主要功能是根据条件判断是否需要添加一个名为“apiserver pod source”的源。
//- 首先,它会检查kubeDeps.KubeClient是否为nil,如果不为nil,则执行下面的代码。
//- 接着,通过klog.InfoS打印一条日志信息,表示正在添加“apiserver pod source”。
//- 然后,调用config.NewSourceApiserver函数,传入kubeDeps.KubeClient、nodeName、nodeHasSynced以及cfg.Channel(ctx, kubetypes.ApiserverSource)作为参数,
//来创建并添加这个源。
//- 最后,函数返回cfg和nil。 总结:这段代码的功能是在满足一定条件时,向某个配置中添加一个名为“apiserver pod source”的源。
// PreInitRuntimeService will init runtime service before RunKubelet.
func PreInitRuntimeService(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies) error {
remoteImageEndpoint := kubeCfg.ImageServiceEndpoint
if remoteImageEndpoint == "" && kubeCfg.ContainerRuntimeEndpoint != "" {
remoteImageEndpoint = kubeCfg.ContainerRuntimeEndpoint
}
var err error
if kubeDeps.RemoteRuntimeService, err = remote.NewRemoteRuntimeService(kubeCfg.ContainerRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration, kubeDeps.TracerProvider); err != nil {
return err
}
if kubeDeps.RemoteImageService, err = remote.NewRemoteImageService(remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout.Duration, kubeDeps.TracerProvider); err != nil {
return err
}
kubeDeps.useLegacyCadvisorStats = cadvisor.UsingLegacyCadvisorStats(kubeCfg.ContainerRuntimeEndpoint)
return nil
}
//该函数在运行Kubelet之前初始化运行时服务。
//- 首先,它将根据kubeCfg中的配置确定远程镜像服务的端点remoteImageEndpoint。
//- 如果remoteImageEndpoint为空且kubeCfg.ContainerRuntimeEndpoint不为空,
//则将kubeCfg.ContainerRuntimeEndpoint赋值给remoteImageEndpoint。
//- 然后,尝试创建远程运行时服务kubeDeps.RemoteRuntimeService,如果创建失败则返回错误。
//- 接着,尝试创建远程镜像服务kubeDeps.RemoteImageService,如果创建失败则返回错误。
//- 最后,根据kubeCfg.ContainerRuntimeEndpoint确定是否使用传统的cadvisor统计信息,并将结果赋值给kubeDeps.useLegacyCadvisorStats。
//如果在创建服务时发生错误,函数将返回错误。否则,返回nil表示成功。
// NewMainKubelet instantiates a new Kubelet object along with all the required internal modules.
// No initialization of Kubelet and its modules should happen here.
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
kubeDeps *Dependencies,
crOptions *config.ContainerRuntimeOptions,
hostname string,
hostnameOverridden bool,
nodeName types.NodeName,
nodeIPs []net.IP,
providerID string,
cloudProvider string,
certDirectory string,
rootDirectory string,
podLogsDirectory string,
imageCredentialProviderConfigFile string,
imageCredentialProviderBinDir string,
registerNode bool,
registerWithTaints []v1.Taint,
allowedUnsafeSysctls []string,
experimentalMounterPath string,
kernelMemcgNotification bool,
experimentalNodeAllocatableIgnoreEvictionThreshold bool,
minimumGCAge metav1.Duration,
maxPerPodContainerCount int32,
maxContainerCount int32,
registerSchedulable bool,
keepTerminatedPodVolumes bool,
nodeLabels map[string]string,
nodeStatusMaxImages int32,
seccompDefault bool,
) (*Kubelet, error) {
ctx := context.Background()
logger := klog.TODO()
//该函数用于创建一个包含所有必需内部模块的新Kubelet对象。
//函数参数包括Kubelet配置、Kubelet依赖项、容器运行时选项、主机名、是否覆盖主机名、节点名、节点IP地址、提供商ID、云提供商、证书目录、根目录、
//Pod日志目录、镜像凭证提供程序配置文件、镜像凭证提供程序二进制目录、是否注册节点、注册时的污点、允许的不安全Sysctls、实验性挂载器路径、
//内核Memcg通知、实验性节点可分配忽略驱逐阈值、最小GC年龄、每个Pod的最大容器数、最大容器数、是否注册可调度、是否保留已终止的Pod卷、节点标签、
//节点状态最大图像数、seccomp默认设置等。
//函数内部会根据传入的参数创建相应的对象和模块,并进行一些初始化操作。
//最后会返回创建的Kubelet对象和可能的错误。
if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
}
if podLogsDirectory == "" {
return nil, errors.New("pod logs root directory is empty")
}
if kubeCfg.SyncFrequency.Duration <= 0 {
return nil, fmt.Errorf("invalid sync frequency %d", kubeCfg.SyncFrequency.Duration)
}
if utilfeature.DefaultFeatureGate.Enabled(features.DisableCloudProviders) && cloudprovider.IsDeprecatedInternal(cloudProvider) {
cloudprovider.DisableWarningForProvider(cloudProvider)
return nil, fmt.Errorf("cloud provider %q was specified, but built-in cloud providers are disabled. Please set --cloud-provider=external and migrate to an external cloud provider", cloudProvider)
}
//该函数主要进行一系列的参数校验,如果参数不符合要求则返回相应的错误信息。
//1. 校验rootDirectory是否为空,若为空则返回invalid root directory错误信息。
//2. 校验podLogsDirectory是否为空,若为空则返回pod logs root directory is empty错误信息。
//3. 校验kubeCfg.SyncFrequency.Duration是否大于0,若不大于0则返回invalid sync frequency错误信息。
//4. 若utilfeature.DefaultFeatureGate.Enabled(features.DisableCloudProviders)为true且cloudprovider.IsDeprecatedInternal(cloudProvider)为true,
//则返回cloud provider指定但内置云提供商被禁用的错误信息。
//并调用cloudprovider.DisableWarningForProvider(cloudProvider)方法。
var nodeHasSynced cache.InformerSynced
var nodeLister corelisters.NodeLister
// If kubeClient == nil, we are running in standalone mode (i.e. no API servers)
// If not nil, we are running as part of a cluster and should sync w/API
if kubeDeps.KubeClient != nil {
kubeInformers := informers.NewSharedInformerFactoryWithOptions(kubeDeps.KubeClient, 0, informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fields.Set{metav1.ObjectNameField: string(nodeName)}.String()
}))
nodeLister = kubeInformers.Core().V1().Nodes().Lister()
nodeHasSynced = func() bool {
return kubeInformers.Core().V1().Nodes().Informer().HasSynced()
}
kubeInformers.Start(wait.NeverStop)
klog.InfoS("Attempting to sync node with API server")
} else {
// we don't have a client to sync!
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
nodeLister = corelisters.NewNodeLister(nodeIndexer)
nodeHasSynced = func() bool { return true }
klog.InfoS("Kubelet is running in standalone mode, will skip API server sync")
}
//这段Go代码是用于配置和启动一个节点同步器的。
//首先,定义了nodeHasSynced和nodeLister两个全局变量,分别用于检查节点是否已经与API服务器同步,并获取节点列表。
//接着,通过条件语句判断是否是在集群中运行。
//如果是,在集群模式下,会使用kubeDeps.KubeClient创建一个kubeInformers共享informers工厂,并为节点资源创建一个自定义的listers和synced函数。
//其中,通过WithTweakListOptions函数设置了节点的字段选择器,仅选择名称为nodeName的节点。
//然后,启动kubeInformers并记录日志,尝试将节点与API服务器同步。
//如果不在集群中运行,则不会进行节点同步操作。
//相反,它将创建一个独立的节点索引器,并使用该索引器创建一个nodeLister,并设置nodeHasSynced函数始终返回true,表示节点已经同步。
//这段代码是节点同步器的一部分,用于根据运行模式配置和启动节点同步器,以保持节点信息的最新状态。
if kubeDeps.PodConfig == nil {
var err error
kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, nodeHasSynced)
if err != nil {
return nil, err
}
}
containerGCPolicy := kubecontainer.GCPolicy{
MinAge: minimumGCAge.Duration,
MaxPerPodContainer: int(maxPerPodContainerCount),
MaxContainers: int(maxContainerCount),
}
//这段Go代码主要功能是根据条件判断来初始化kubeDeps.PodConfig变量,并设置containerGCPolicy。
//首先,通过条件判断检查kubeDeps.PodConfig是否为nil,
//如果是,则调用makePodSourceConfig函数来创建并初始化kubeDeps.PodConfig,同时捕获可能的错误并返回。
//接下来,代码创建一个containerGCPolicy结构体实例,并设置其中的字段值。
//其中,MinAge字段表示容器的最小存活时间,MaxPerPodContainer字段表示每个Pod中最多保留的容器个数,
//MaxContainers字段表示节点上最多保留的容器个数。
//这段代码的主要目的是在初始化kubeDeps.PodConfig后,设置容器的垃圾回收策略。
daemonEndpoints := &v1.NodeDaemonEndpoints{
KubeletEndpoint: v1.DaemonEndpoint{Port: kubeCfg.Port},
}
imageGCPolicy := images.ImageGCPolicy{
MinAge: kubeCfg.ImageMinimumGCAge.Duration,
HighThresholdPercent: int(kubeCfg.ImageGCHighThresholdPercent),
LowThresholdPercent: int(kubeCfg.ImageGCLowThresholdPercent),
}
//这段代码中定义了两个结构体实例。
//首先,通过v1.NodeDaemonEndpoints结构体创建了一个名为daemonEndpoints的实例,
//其中KubeletEndpoint的Port字段被初始化为kubeCfg.Port的值。
//然后,通过images.ImageGCPolicy结构体创建了一个名为imageGCPolicy的实例,
//其中MinAge字段被初始化为kubeCfg.ImageMinimumGCAge.Duration的值,
//HighThresholdPercent字段被初始化为kubeCfg.ImageGCHighThresholdPercent的值转换为int类型,
//LowThresholdPercent字段被初始化为kubeCfg.ImageGCLowThresholdPercent的值转换为int类型。
if utilfeature.DefaultFeatureGate.Enabled(features.ImageMaximumGCAge) {
imageGCPolicy.MaxAge = kubeCfg.ImageMaximumGCAge.Duration
} else if kubeCfg.ImageMaximumGCAge.Duration != 0 {
klog.InfoS("ImageMaximumGCAge flag enabled, but corresponding feature gate is not enabled. Ignoring flag.")
}
//这段Go代码主要根据功能门控和配置来设置镜像最大年龄策略。
//如果默认功能门控中启用了ImageMaximumGCAge,则将imageGCPolicy.MaxAge设置为kubeCfg.ImageMaximumGCAge.Duration;
//否则,如果kubeCfg.ImageMaximumGCAge.Duration不为0,则记录一条信息表示忽略了该标志。
enforceNodeAllocatable := kubeCfg.EnforceNodeAllocatable
if experimentalNodeAllocatableIgnoreEvictionThreshold {
// Do not provide kubeCfg.EnforceNodeAllocatable to eviction threshold parsing if we are not enforcing Evictions
enforceNodeAllocatable = []string{}
}
thresholds, err := eviction.ParseThresholdConfig(enforceNodeAllocatable, kubeCfg.EvictionHard, kubeCfg.EvictionSoft, kubeCfg.EvictionSoftGracePeriod, kubeCfg.EvictionMinimumReclaim)
if err != nil {
return nil, err
}
evictionConfig := eviction.Config{
PressureTransitionPeriod: kubeCfg.EvictionPressureTransitionPeriod.Duration,
MaxPodGracePeriodSeconds: int64(kubeCfg.EvictionMaxPodGracePeriod),
Thresholds: thresholds,
KernelMemcgNotification: kernelMemcgNotification,
PodCgroupRoot: kubeDeps.ContainerManager.GetPodCgroupRoot(),
}
//这个Go函数主要功能是配置和返回一个关于Kubernetes节点上资源驱逐的配置信息。具体步骤如下:
//1. 将kubeCfg.EnforceNodeAllocatable的值赋给enforceNodeAllocatable变量。
//2. 如果experimentalNodeAllocatableIgnoreEvictionThreshold为true,则将enforceNodeAllocatable设置为空字符串数组。
//3. 调用eviction.ParseThresholdConfig函数解析驱逐阈值配置,
//参数包括enforceNodeAllocatable、kubeCfg.EvictionHard、kubeCfg.EvictionSoft、kubeCfg.EvictionSoftGracePeriod和kubeCfg.EvictionMinimumReclaim。
//如果解析出错,函数会返回nil和错误信息。
//4. 创建并返回一个eviction.Config结构体实例,其中包括压力转换期、最大Pod优雅终止时间、驱逐阈值、内核Memcg通知以及Pod容器组根路径等配置信息。
var serviceLister corelisters.ServiceLister
var serviceHasSynced cache.InformerSynced
if kubeDeps.KubeClient != nil {
kubeInformers := informers.NewSharedInformerFactoryWithOptions(kubeDeps.KubeClient, 0)
serviceLister = kubeInformers.Core().V1().Services().Lister()
serviceHasSynced = kubeInformers.Core().V1().Services().Informer().HasSynced
kubeInformers.Start(wait.NeverStop)
} else {
serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
serviceLister = corelisters.NewServiceLister(serviceIndexer)
serviceHasSynced = func() bool { return true }
}
//这段Go代码中定义了一个serviceLister变量和一个serviceHasSynced变量,
//它们分别用于列出Kubernetes服务和检查服务是否已同步。
//根据是否设置了kubeDeps.KubeClient,代码通过不同的方式初始化这两个变量。
//如果kubeDeps.KubeClient不为空,代码使用kubeInformers来创建serviceLister和serviceHasSynced;
//否则,代码使用cache.NewIndexer创建serviceLister,并将serviceHasSynced设置为始终返回true的函数。
//最后,如果kubeDeps.KubeClient不为空,还调用kubeInformers.Start启动信息处理器。
// construct a node reference used for events
nodeRef := &v1.ObjectReference{
Kind: "Node",
Name: string(nodeName),
UID: types.UID(nodeName),
Namespace: "",
}
//这段Go代码创建了一个v1.ObjectReference类型的指针变量nodeRef,
//用于表示一个节点引用,常用于事件处理中。其中,节点的名称、UID和命名空间分别被设置为nodeName参数对应的字符串值、
//types.UID转换后的值和空字符串。
oomWatcher, err := oomwatcher.NewWatcher(kubeDeps.Recorder)
if err != nil {
if inuserns.RunningInUserNS() {
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletInUserNamespace) {
// oomwatcher.NewWatcher returns "open /dev/kmsg: operation not permitted" error,
// when running in a user namespace with sysctl value `kernel.dmesg_restrict=1`.
klog.V(2).InfoS("Failed to create an oomWatcher (running in UserNS, ignoring)", "err", err)
oomWatcher = nil
} else {
klog.ErrorS(err, "Failed to create an oomWatcher (running in UserNS, Hint: enable KubeletInUserNamespace feature flag to ignore the error)")
return nil, err
}
} else {
return nil, err
}
}
//这段Go代码的主要功能是尝试创建一个OOM watcher实例,通过oomwatcher.NewWatcher(kubeDeps.Recorder)来实现。
//如果创建失败,会根据当前是否在用户命名空间中运行以及KubeletInUserNamespace特性标志的启用状态来决定是记录错误信息、忽略错误还是返回错误。
//具体逻辑如下:
//1. 首先,尝试创建OOM watcher,将返回的实例和可能发生的错误保存在oomWatcher和err变量中。
//2. 如果err不为nil,即创建OOM watcher失败,则会进一步判断是否在用户命名空间中运行。
//3. 如果在用户命名空间中运行,并且KubeletInUserNamespace特性标志已被启用,则记录一条信息,指示由于kernel.dmesg_restrict=1的sysctl值,
//运行在用户命名空间中时,oomwatcher.NewWatcher会返回"open /dev/kmsg: operation not permitted"错误,并将oomWatcher设置为nil。
//4. 如果在用户命名空间中运行,但KubeletInUserNamespace特性标志未启用,则记录一条错误信息,指示创建OOM watcher失败,
//并建议启用KubeletInUserNamespace特性标志来忽略该错误,然后返回nil和错误err。
//5. 如果不在用户命名空间中运行,则直接返回nil和错误err。
//6. 如果成功创建OOM watcher(即err为nil),则可以继续后续操作。
clusterDNS := make([]net.IP, 0, len(kubeCfg.ClusterDNS))
for _, ipEntry := range kubeCfg.ClusterDNS {
ip := netutils.ParseIPSloppy(ipEntry)
if ip == nil {
klog.InfoS("Invalid clusterDNS IP", "IP", ipEntry)
} else {
clusterDNS = append(clusterDNS, ip)
}
}
//该函数用于将kubeCfg.ClusterDNS中的IP地址解析并存储到clusterDNS切片中。
//具体来说,函数首先根据kubeCfg.ClusterDNS的长度初始化一个空的clusterDNS切片,
//然后遍历kubeCfg.ClusterDNS中的每个IP地址。
//对于每个IP地址,函数使用netutils.ParseIPSloppy函数进行解析,
//如果解析失败,则记录一条日志信息;
//如果解析成功,则将解析后的IP地址添加到clusterDNS切片中。
//最终,函数返回存储了有效IP地址的clusterDNS切片。
// A TLS transport is needed to make HTTPS-based container lifecycle requests,
// but we do not have the information necessary to do TLS verification.
//
// This client must not be modified to include credentials, because it is
// critical that credentials not leak from the client to arbitrary hosts.
insecureContainerLifecycleHTTPClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
CheckRedirect: httpprobe.RedirectChecker(false),
}
//这段Go代码创建了一个不进行TLS验证的HTTP客户端。该客户端用于进行基于HTTPS的容器生命周期请求。
//由于没有必要的信息来进行TLS验证,因此必须确保该客户端不被修改为包含凭证,以防止凭证泄露到任意主机。
//该客户端的重定向检查器被设置为禁止重定向。
tracer := kubeDeps.TracerProvider.Tracer(instrumentationScope)
klet := &Kubelet{
hostname: hostname,
hostnameOverridden: hostnameOverridden,
nodeName: nodeName,
kubeClient: kubeDeps.KubeClient,
heartbeatClient: kubeDeps.HeartbeatClient,
onRepeatedHeartbeatFailure: kubeDeps.OnHeartbeatFailure,
rootDirectory: filepath.Clean(rootDirectory),
podLogsDirectory: podLogsDirectory,
resyncInterval: kubeCfg.SyncFrequency.Duration,
sourcesReady: config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources),
registerNode: registerNode,
registerWithTaints: registerWithTaints,
registerSchedulable: registerSchedulable,
dnsConfigurer: dns.NewConfigurer(kubeDeps.Recorder, nodeRef, nodeIPs, clusterDNS, kubeCfg.ClusterDomain, kubeCfg.ResolverConfig),
serviceLister: serviceLister,
serviceHasSynced: serviceHasSynced,
nodeLister: nodeLister,
nodeHasSynced: nodeHasSynced,
streamingConnectionIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
recorder: kubeDeps.Recorder,
cadvisor: kubeDeps.CAdvisorInterface,
cloud: kubeDeps.Cloud,
externalCloudProvider: cloudprovider.IsExternal(cloudProvider),
providerID: providerID,
nodeRef: nodeRef,
nodeLabels: nodeLabels,
nodeStatusUpdateFrequency: kubeCfg.NodeStatusUpdateFrequency.Duration,
nodeStatusReportFrequency: kubeCfg.NodeStatusReportFrequency.Duration,
os: kubeDeps.OSInterface,
oomWatcher: oomWatcher,
cgroupsPerQOS: kubeCfg.CgroupsPerQOS,
cgroupRoot: kubeCfg.CgroupRoot,
mounter: kubeDeps.Mounter,
hostutil: kubeDeps.HostUtil,
subpather: kubeDeps.Subpather,
maxPods: int(kubeCfg.MaxPods),
podsPerCore: int(kubeCfg.PodsPerCore),
syncLoopMonitor: atomic.Value{},
daemonEndpoints: daemonEndpoints,
containerManager: kubeDeps.ContainerManager,
nodeIPs: nodeIPs,
nodeIPValidator: validateNodeIP,
clock: clock.RealClock{},
enableControllerAttachDetach: kubeCfg.EnableControllerAttachDetach,
makeIPTablesUtilChains: kubeCfg.MakeIPTablesUtilChains,
keepTerminatedPodVolumes: keepTerminatedPodVolumes,
nodeStatusMaxImages: nodeStatusMaxImages,
tracer: tracer,
nodeStartupLatencyTracker: kubeDeps.NodeStartupLatencyTracker,
}
//这段Go代码初始化了一个Kubelet对象。
//Kubelet是Kubernetes集群中的一个核心组件,负责管理节点上的Pods。
//该对象的属性包括节点的主机名、是否被覆盖、节点名、Kubernetes客户端、心跳客户端、重复心跳失败的处理函数、根目录、Pod日志目录、同步频率、
//是否准备好源、注册节点函数、是否带污点注册节点、是否注册为可调度节点、DNS配置器、服务列表器、服务是否已同步、节点列表器、节点是否已同步、
//流连接空闲超时时间、记录器、cadvisor接口、云提供者、外部云提供者标志、提供者ID、节点引用、节点标签、节点状态更新频率、节点状态报告频率、
//操作系统接口、OOM观察者、是否支持QoS级别的cgroups、cgroup根目录、安装器、主机工具、子路径处理工具、最大Pod数、每核Pod数、同步循环监视器、
//守护进程端点、容器管理器、节点IPs、节点IP验证器、时钟、是否启用控制器附加/分离、是否创建iptables工具链、是否保留已终止的Pod卷、
//节点状态最大图片数、跟踪器和节点启动延迟追踪器。
//综上所述,这段代码主要用于初始化一个Kubelet对象,以便在Kubernetes集群中管理节点上的Pods。
if klet.cloud != nil {
klet.cloudResourceSyncManager = cloudresource.NewSyncManager(klet.cloud, nodeName, klet.nodeStatusUpdateFrequency)
}
var secretManager secret.Manager
var configMapManager configmap.Manager
if klet.kubeClient != nil {
switch kubeCfg.ConfigMapAndSecretChangeDetectionStrategy {
case kubeletconfiginternal.WatchChangeDetectionStrategy:
secretManager = secret.NewWatchingSecretManager(klet.kubeClient, klet.resyncInterval)
configMapManager = configmap.NewWatchingConfigMapManager(klet.kubeClient, klet.resyncInterval)
case kubeletconfiginternal.TTLCacheChangeDetectionStrategy:
secretManager = secret.NewCachingSecretManager(
klet.kubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
configMapManager = configmap.NewCachingConfigMapManager(
klet.kubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
case kubeletconfiginternal.GetChangeDetectionStrategy:
secretManager = secret.NewSimpleSecretManager(klet.kubeClient)
configMapManager = configmap.NewSimpleConfigMapManager(klet.kubeClient)
default:
return nil, fmt.Errorf("unknown configmap and secret manager mode: %v", kubeCfg.ConfigMapAndSecretChangeDetectionStrategy)
}
//这段Go代码根据不同的条件初始化了不同的secretManager和configMapManager。
//首先,如果klet.cloud不为空,则创建一个新的cloudResourceSyncManager。
//接下来,根据klet.kubeClient和kubeCfg.ConfigMapAndSecretChangeDetectionStrategy的值,
//分别初始化secretManager和configMapManager。
//- 如果kubeCfg.ConfigMapAndSecretChangeDetectionStrategy的值是kubeletconfiginternal.WatchChangeDetectionStrategy,
//则分别创建一个watchingSecretManager和watchingConfigMapManager。
//- 如果kubeCfg.ConfigMapAndSecretChangeDetectionStrategy的值是kubeletconfiginternal.TTLCacheChangeDetectionStrategy,
//则分别创建一个cachingSecretManager和cachingConfigMapManager。
//- 如果kubeCfg.ConfigMapAndSecretChangeDetectionStrategy的值是kubeletconfiginternal.GetChangeDetectionStrategy,
//则分别创建一个simpleSecretManager和simpleConfigMapManager。
//- 如果kubeCfg.ConfigMapAndSecretChangeDetectionStrategy的值不是以上三种情况,则返回一个错误。
//最后,返回secretManager和configMapManager。
klet.secretManager = secretManager
klet.configMapManager = configMapManager
//这个代码片段是Go语言中的简单赋值语句。它将secretManager赋值给klet.secretManager,
//将configMapManager赋值给klet.configMapManager。这两个操作将使得klet对象能够管理秘密和配置映射。
}
machineInfo, err := klet.cadvisor.MachineInfo()
if err != nil {
return nil, err
}
// Avoid collector collects it as a timestamped metric
// See PR #95210 and #97006 for more details.
machineInfo.Timestamp = time.Time{}
klet.setCachedMachineInfo(machineInfo)
imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
klet.livenessManager = proberesults.NewManager()
klet.readinessManager = proberesults.NewManager()
klet.startupManager = proberesults.NewManager()
klet.podCache = kubecontainer.NewCache()
klet.mirrorPodClient = kubepod.NewBasicMirrorClient(klet.kubeClient, string(nodeName), nodeLister)
klet.podManager = kubepod.NewBasicPodManager()
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet, kubeDeps.PodStartupLatencyTracker, klet.getRootDir())
klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration, kubeDeps.Recorder)
klet.runtimeService = kubeDeps.RemoteRuntimeService
//该Go函数主要是进行初始化操作。
//1. 首先通过cadvisor获取机器信息,并设置Timestamp为time.Time{}避免被收集为时间戳指标。
//2. 初始化imageBackOff(容器启动失败时的退避策略)。
//3. 创建并初始化livenessManager、readinessManager、startupManager(用于管理容器的存活、就绪和启动状态)。
//4. 创建并初始化podCache(用于缓存Pod信息)。
//5. 创建并初始化mirrorPodClient和podManager(用于管理镜像Pod和Pod)。
//6. 创建并初始化statusManager(用于管理Pod的状态)。
//7. 创建并初始化resourceAnalyzer(用于分析资源使用情况)。
//8. 将runtimeService设置为kubeDeps.RemoteRuntimeService。
//这个函数主要是为了初始化kubelet的各种组件和管理器,并设置相应的参数和配置。
if kubeDeps.KubeClient != nil {
klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient)
}
// setup containerLogManager for CRI container runtime
containerLogManager, err := logs.NewContainerLogManager(
klet.runtimeService,
kubeDeps.OSInterface,
kubeCfg.ContainerLogMaxSize,
int(kubeCfg.ContainerLogMaxFiles),
int(kubeCfg.ContainerLogMaxWorkers),
kubeCfg.ContainerLogMonitorInterval,
)
if err != nil {
return nil, fmt.Errorf("failed to initialize container log manager: %v", err)
}
klet.containerLogManager = containerLogManager
//这段Go代码主要执行了两个步骤:
//1. 若kubeDeps.KubeClient不为空,则创建一个runtimeclass.Manager实例,并将其赋值给klet.runtimeClassManager。
//2. 创建一个logs.ContainerLogManager实例,并将其赋值给klet.containerLogManager。
//此实例用于管理容器日志,包括日志文件的最大大小、最大文件数、最大工作线程数等配置。若创建过程中出现错误,则返回错误信息。
//总结:这段代码主要负责初始化两个管理器:runtimeclass.Manager和logs.ContainerLogManager,用于管理运行时类和容器日志。
klet.reasonCache = NewReasonCache()
klet.workQueue = queue.NewBasicWorkQueue(klet.clock)
klet.podWorkers = newPodWorkers(
klet,
kubeDeps.Recorder,
klet.workQueue,
klet.resyncInterval,
backOffPeriod,
klet.podCache,
)
//这段代码是Go语言中的函数调用和赋值语句,主要进行了三个对象的初始化和赋值操作。
//- 首先,通过NewReasonCache()函数创建了一个reasonCache对象,并将其赋值给klet.reasonCache。
//- 然后,通过queue.NewBasicWorkQueue(klet.clock)函数创建了一个workQueue对象,并将其赋值给klet.workQueue。
//- 最后,通过newPodWorkers()函数创建了一个podWorkers对象,并将其赋值给klet.podWorkers。newPodWorkers()函数的参数包括klet对象、
//Recorder对象、workQueue对象、resyncInterval时间间隔、backOffPeriod时间间隔、podCache对象等
runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
klet.livenessManager,
klet.readinessManager,
klet.startupManager,
rootDirectory,
podLogsDirectory,
machineInfo,
klet.podWorkers,
kubeDeps.OSInterface,
klet,
insecureContainerLifecycleHTTPClient,
imageBackOff,
kubeCfg.SerializeImagePulls,
kubeCfg.MaxParallelImagePulls,
float32(kubeCfg.RegistryPullQPS),
int(kubeCfg.RegistryBurst),
imageCredentialProviderConfigFile,
imageCredentialProviderBinDir,
kubeCfg.CPUCFSQuota,
kubeCfg.CPUCFSQuotaPeriod,
kubeDeps.RemoteRuntimeService,
kubeDeps.RemoteImageService,
kubeDeps.ContainerManager,
klet.containerLogManager,
klet.runtimeClassManager,
seccompDefault,
kubeCfg.MemorySwap.SwapBehavior,
kubeDeps.ContainerManager.GetNodeAllocatableAbsolute,
*kubeCfg.MemoryThrottlingFactor,
kubeDeps.PodStartupLatencyTracker,
kubeDeps.TracerProvider,
)
if err != nil {
return nil, err
}
klet.containerRuntime = runtime
klet.streamingRuntime = runtime
klet.runner = runtime
//这个函数主要用于创建并初始化一个KubeGenericRuntimeManager对象,
//它是kubernetes中的一个容器运行时管理器,主要负责管理容器的生命周期。
//函数中使用了许多参数,它们主要用于配置运行时管理器的各种属性,例如日志目录、镜像拉取策略、CPU和内存限制等。
//函数返回一个runtime对象,它包含了运行时管理器的各种接口,如启动容器、停止容器等。
//最后,将runtime对象赋值给klet结构体的相应字段,以便在后续代码中使用。
runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime, runtimeCacheRefreshPeriod)
if err != nil {
return nil, err
}
klet.runtimeCache = runtimeCache
// common provider to get host file system usage associated with a pod managed by kubelet
hostStatsProvider := stats.NewHostStatsProvider(kubecontainer.RealOS{}, func(podUID types.UID) string {
return getEtcHostsPath(klet.getPodDir(podUID))
}, podLogsDirectory)
if kubeDeps.useLegacyCadvisorStats {
klet.StatsProvider = stats.NewCadvisorStatsProvider(
klet.cadvisor,
klet.resourceAnalyzer,
klet.podManager,
klet.runtimeCache,
klet.containerRuntime,
klet.statusManager,
hostStatsProvider)
} else {
klet.StatsProvider = stats.NewCRIStatsProvider(
klet.cadvisor,
klet.resourceAnalyzer,
klet.podManager,
klet.runtimeCache,
kubeDeps.RemoteRuntimeService,
kubeDeps.RemoteImageService,
hostStatsProvider,
utilfeature.DefaultFeatureGate.Enabled(features.PodAndContainerStatsFromCRI))
}
//该Go函数主要功能是初始化Kubelet的统计提供者。
//首先,函数通过调用kubecontainer.NewRuntimeCache方法创建一个运行时缓存,并将其赋值给runtimeCache变量。
//如果创建过程中出现错误,则会返回nil和错误信息。
//接下来,函数通过调用stats.NewHostStatsProvider方法创建一个主机状态提供者,
//用于获取与Kubelet管理的Pod相关的主机文件系统使用情况。
//该方法接收三个参数:一个实现了OS接口的对象(这里使用kubecontainer.RealOS{}),
//一个函数(用于根据Pod的UID获取/etc/hosts的路径),以及Pod日志目录的路径。
//然后,函数根据kubeDeps.useLegacyCadvisorStats的值选择合适的统计提供者。
//如果为true,则调用stats.NewCadvisorStatsProvider方法创建一个基于Cadvisor的统计提供者;
//否则,调用stats.NewCRIStatsProvider方法创建一个基于CRI的统计提供者。
//这两个方法都接收多个参数,包括cadvisor接口、resourceAnalyzer接口、podManager接口、runtimeCache对象、containerRuntime接口、
//statusManager接口以及之前创建的hostStatsProvider对象。
//其中,NewCRIStatsProvider方法还额外接收RemoteRuntimeService和RemoteImageService接口,以及一个表示是否启用特定功能的布尔值。
//最后,函数将创建的统计提供者赋值给klet.StatsProvider字段。
eventChannel := make(chan *pleg.PodLifecycleEvent, plegChannelCapacity)
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
// adjust Generic PLEG relisting period and threshold to higher value when Evented PLEG is turned on
genericRelistDuration := &pleg.RelistDuration{
RelistPeriod: eventedPlegRelistPeriod,
RelistThreshold: eventedPlegRelistThreshold,
}
//这段代码中定义了一个名为eventChannel的通道,类型为*pleg.PodLifecycleEvent,并设置了其容量为plegChannelCapacity。
//接着,通过utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG)判断是否启用了EventedPLEG特性。
//如果启用了,就会调整Generic PLEG的重新列表周期和阈值,将其设置为eventedPlegRelistPeriod和eventedPlegRelistThreshold所指定的值。
//这段代码的功能是根据是否启用了EventedPLEG特性,来调整Generic PLEG的重新列表周期和阈值。
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, eventChannel, genericRelistDuration, klet.podCache, clock.RealClock{})
// In case Evented PLEG has to fall back on Generic PLEG due to an error,
// Evented PLEG should be able to reset the Generic PLEG relisting duration
// to the default value.
eventedRelistDuration := &pleg.RelistDuration{
RelistPeriod: genericPlegRelistPeriod,
RelistThreshold: genericPlegRelistThreshold,
}
//这段Go代码是Kubernetes中的一个片段,它创建了一个PLEG(Pod Lifecycle Event Generator)实例,用于监听容器生命周期事件。
//klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, eventChannel, genericRelistDuration, klet.podCache,
//clock.RealClock{})这行代码创建了一个PLEG实例,其中NewGenericPLEG是一个工厂方法,用于根据提供的参数创建PLEG实例。
//这个方法接收5个参数:
//1. klet.containerRuntime:容器运行时接口,用于与底层容器引擎交互。
//2. eventChannel:一个通道,用于接收容器事件。
//3. genericRelistDuration:重新列出容器的间隔时间。
//4. klet.podCache:一个缓存,用于存储Pod的信息。
//5. clock.RealClock{}:一个时钟对象,用于提供当前时间。
//第二段代码eventedRelistDuration := &pleg.RelistDuration{RelistPeriod: genericPlegRelistPeriod,
//RelistThreshold: genericPlegRelistThreshold}创建了一个RelistDuration结构体实例,用于设置PLEG重新列出容器的周期和阈值。
//其中:
//1. RelistPeriod:重新列出容器的时间间隔。
//2. RelistThreshold:在达到此阈值后,PLEG将重新列出容器。
//这段代码的主要目的是创建并配置一个PLEG实例,用于监听容器生命周期事件,并设置重新列出容器的间隔时间和阈值。
klet.eventedPleg, err = pleg.NewEventedPLEG(klet.containerRuntime, klet.runtimeService, eventChannel,
klet.podCache, klet.pleg, eventedPlegMaxStreamRetries, eventedRelistDuration, clock.RealClock{})
if err != nil {
return nil, err
}
} else {
genericRelistDuration := &pleg.RelistDuration{
RelistPeriod: genericPlegRelistPeriod,
RelistThreshold: genericPlegRelistThreshold,
}
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, eventChannel, genericRelistDuration, klet.podCache, clock.RealClock{})
}
//这段Go代码中,根据条件创建了两种不同类型的PLEG(Pod Lifecycle Event Generator)对象,并将其赋值给klet.pleg。
//如果某个条件满足,则创建一个EventedPLEG对象,并将klet.eventedPleg指向它。
//NewEventedPLEG函数的参数包括容器运行时、运行时服务、事件通道、Pod缓存、当前的klet.pleg对象、最大流重试次数、重新列出持续时间和实际时钟。
//否则,创建一个GenericPLEG对象,并将klet.pleg指向它。
//NewGenericPLEG函数的参数包括容器运行时、事件通道、重新列出持续时间、Pod缓存和实际时钟。
//这段代码的主要目的是初始化并配置PLEG对象,用于监控和生成Pod生命周期事件。
klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
klet.runtimeState.addHealthCheck("PLEG", klet.pleg.Healthy)
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
klet.runtimeState.addHealthCheck("EventedPLEG", klet.eventedPleg.Healthy)
}
if _, err := klet.updatePodCIDR(ctx, kubeCfg.PodCIDR); err != nil {
klog.ErrorS(err, "Pod CIDR update failed")
}
//这段Go代码中的函数功能如下:
//- 首先,通过newRuntimeState(maxWaitForContainerRuntime)创建一个新的runtimeState实例,并将其赋值给klet.runtimeState。
//- 接着,调用addHealthCheck方法向runtimeState实例添加了一个名为"PLEG"的健康检查,其健康状态为klet.pleg.Healthy。
//- 如果features.EventedPLEG特性门已启用,则调用addHealthCheck方法向runtimeState实例添加了一个名为"EventedPLEG"的健康检查,
//其健康状态为klet.eventedPleg.Healthy。
//- 最后,调用updatePodCIDR方法尝试更新Pod CIDR,如果更新失败则记录错误日志。
//总结:这段代码主要通过runtimeState实例管理容器运行时的状态和健康检查,并尝试更新Pod CIDR,如果更新失败则记录错误日志。
// setup containerGC
containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy, klet.sourcesReady)
if err != nil {
return nil, err
}
klet.containerGC = containerGC
klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, max(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))
//这段代码主要功能是创建并初始化一个ContainerGC对象,
//并将其赋值给klet的containerGC和containerDeletor字段。
//其中,ContainerGC是用来定期清理和回收容器的组件;newPodContainerDeletor是用来删除过期或无效的容器的函数。
//max和minDeadContainerInPod是两个辅助函数,用来计算最大存活容器数和最小死亡容器数。
//如果创建ContainerGC对象失败,则会返回错误。
// setup imageManager
imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, kubeDeps.Recorder, nodeRef, imageGCPolicy, kubeDeps.TracerProvider)
if err != nil {
return nil, fmt.Errorf("failed to initialize image manager: %v", err)
}
klet.imageManager = imageManager
if kubeCfg.ServerTLSBootstrap && kubeDeps.TLSOptions != nil && utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) {
klet.serverCertificateManager, err = kubeletcertificate.NewKubeletServerCertificateManager(klet.kubeClient, kubeCfg, klet.nodeName, klet.getLastObservedNodeAddresses, certDirectory)
if err != nil {
return nil, fmt.Errorf("failed to initialize certificate manager: %v", err)
}
kubeDeps.TLSOptions.Config.GetCertificate = func(*tls.ClientHelloInfo) (*tls.Certificate, error) {
cert := klet.serverCertificateManager.Current()
if cert == nil {
return nil, fmt.Errorf("no serving certificate available for the kubelet")
}
return cert, nil
}
}
//这段Go代码主要进行了一系列的初始化操作。
//首先,它初始化了一个名为imageManager的对象,并将其赋值给klet.imageManager。
//这个对象是由images.NewImageGCManager函数创建的,它依赖于containerRuntime、StatsProvider、Recorder、nodeRef、
//imageGCPolicy和TracerProvider等参数。
//接着,代码检查了是否启用了服务器TLS引导和相关的功能门控。
//如果满足条件,它会初始化一个名为serverCertificateManager的对象,并将其赋值给klet.serverCertificateManager。
//这个对象是由kubeletcertificate.NewKubeletServerCertificateManager函数创建的,
//它依赖于kubeClient、kubeCfg、nodeName、getLastObservedNodeAddresses和certDirectory等参数。
//最后,代码将serverCertificateManager.Current()方法设置为TLSOptions.Config.GetCertificate的实现。
//这个方法会返回当前有效的TLS证书,如果证书不存在,则会返回一个错误。
//整体上,这段代码主要是为了初始化imageManager和serverCertificateManager两个对象,并设置相关的配置和依赖
if kubeDeps.ProbeManager != nil {
klet.probeManager = kubeDeps.ProbeManager
} else {
klet.probeManager = prober.NewManager(
klet.statusManager,
klet.livenessManager,
klet.readinessManager,
klet.startupManager,
klet.runner,
kubeDeps.Recorder)
}
//这段Go代码主要实现了根据条件来初始化klet.probeManager变量。
//- 首先,它会检查kubeDeps.ProbeManager是否为nil,如果是非nil,则直接将其赋值给klet.probeManager;
//- 如果kubeDeps.ProbeManager为nil,则通过调用prober.NewManager方法来创建一个新的probeManager实例,
//并将其赋值给klet.probeManager。
//prober.NewManager方法接受多个参数,包括klet.statusManager、klet.livenessManager、klet.readinessManager、
//klet.startupManager、klet.runner和kubeDeps.Recorder。这些参数用于配置和初始化新的probeManager实例。
//总结一下,这段代码的作用是在kubeDeps.ProbeManager非nil时直接使用它,否则创建一个新的probeManager实例并配置初始化。
tokenManager := token.NewManager(kubeDeps.KubeClient)
var clusterTrustBundleManager clustertrustbundle.Manager
if kubeDeps.KubeClient != nil && utilfeature.DefaultFeatureGate.Enabled(features.ClusterTrustBundleProjection) {
kubeInformers := informers.NewSharedInformerFactoryWithOptions(kubeDeps.KubeClient, 0)
clusterTrustBundleManager, err = clustertrustbundle.NewInformerManager(kubeInformers.Certificates().V1alpha1().ClusterTrustBundles(), 2*int(kubeCfg.MaxPods), 5*time.Minute)
if err != nil {
return nil, fmt.Errorf("while starting informer-based ClusterTrustBundle manager: %w", err)
}
kubeInformers.Start(wait.NeverStop)
klog.InfoS("Started ClusterTrustBundle informer")
} else {
// In static kubelet mode, use a no-op manager.
clusterTrustBundleManager = &clustertrustbundle.NoopManager{}
klog.InfoS("Not starting ClusterTrustBundle informer because we are in static kubelet mode")
}
//该函数主要创建并初始化一个token.Manager实例和一个clustertrustbundle.Manager实例。
//- 首先,根据kubeDeps.KubeClient创建一个token.Manager实例。
//- 然后,判断kubeDeps.KubeClient是否为空且features.ClusterTrustBundleProjection特性门是否开启,
//如果满足条件,则使用kubeDeps.KubeClient创建一个informers.SharedInformerFactory实例,
//并根据该实例创建一个clustertrustbundle.InformerManager实例,同时设置clusterTrustBundleManager。
//如果不满足条件,则创建一个clustertrustbundle.NoopManager实例,并设置clusterTrustBundleManager。
//- 最后,如果创建clustertrustbundle.InformerManager实例成功,则启动kubeInformers并记录日志;否则返回错误信息。
//这段代码主要是为了管理集群信任捆绑(token和cluster trust bundle)而设计的。
// NewInitializedVolumePluginMgr initializes some storageErrors on the Kubelet runtimeState (in csi_plugin.go init)
// which affects node ready status. This function must be called before Kubelet is initialized so that the Node
// ReadyState is accurate with the storage state.
klet.volumePluginMgr, err =
NewInitializedVolumePluginMgr(klet, secretManager, configMapManager, tokenManager, clusterTrustBundleManager, kubeDeps.VolumePlugins, kubeDeps.DynamicPluginProber)
if err != nil {
return nil, err
}
klet.pluginManager = pluginmanager.NewPluginManager(
klet.getPluginsRegistrationDir(), /* sockDir */
kubeDeps.Recorder,
)
//这段Go代码中的函数是用于初始化Kubelet的卷插件管理器(volumePluginMgr)的。
//- NewInitializedVolumePluginMgr函数会初始化Kubelet运行时状态中的某些存储错误,这将影响节点的就绪状态。
//这个函数必须在Kubelet初始化之前调用,以确保节点的就绪状态与存储状态准确无误。
//- NewInitializedVolumePluginMgr函数的参数包括klet(Kubelet的实例)、secretManager、configMapManager、tokenManager、
//clusterTrustBundleManager、kubeDeps.VolumePlugins和kubeDeps.DynamicPluginProber。
//这些参数用于配置和初始化卷插件管理器。
//- NewInitializedVolumePluginMgr函数返回一个初始化后的卷插件管理器实例和一个错误(如果有)。
//- pluginmanager.NewPluginManager函数用于创建一个新的插件管理器实例。
//- pluginmanager.NewPluginManager函数的参数包括插件注册目录(sockDir)和记录器(kubeDeps.Recorder)。
//- 最后,将创建的卷插件管理器实例赋值给klet.pluginManager。
//这个函数的主要目的是在Kubelet初始化之前,设置和初始化与存储相关的插件管理器,以便节点的就绪状态能准确反映存储状态。
// If the experimentalMounterPathFlag is set, we do not want to
// check node capabilities since the mount path is not the default
if len(experimentalMounterPath) != 0 {
// Replace the nameserver in containerized-mounter's rootfs/etc/resolv.conf with kubelet.ClusterDNS
// so that service name could be resolved
klet.dnsConfigurer.SetupDNSinContainerizedMounter(experimentalMounterPath)
}
//这段Go代码是一个条件语句,其功能是根据$experimentalMounterPathFlag$的设置来决定是否需要检查节点能力。
//如果$experimentalMounterPathFlag$被设置(即$experimentalMounterPath$不为空),
//则会调用$klet.dnsConfigurer.SetupDNSinContainerizedMounter(experimentalMounterPath)$方法,
//用kubelet.ClusterDNS替换containerized-mounter根文件系统/etc/resolv.conf中的nameserver,以便能够解析服务名称。
// setup volumeManager
klet.volumeManager = volumemanager.NewVolumeManager(
kubeCfg.EnableControllerAttachDetach,
nodeName,
klet.podManager,
klet.podWorkers,
klet.kubeClient,
klet.volumePluginMgr,
klet.containerRuntime,
kubeDeps.Mounter,
kubeDeps.HostUtil,
klet.getPodsDir(),
kubeDeps.Recorder,
keepTerminatedPodVolumes,
volumepathhandler.NewBlockVolumePathHandler())
klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
//该代码段是Go语言编写的,其中定义了两个不同的内容:一个是volumeManager的设置,另一个是backOff的设置。
//首先,关于volumeManager的设置:
//- 通过调用volumemanager.NewVolumeManager()函数创建一个新的volumeManager。
//- 该函数接受多个参数,包括是否启用控制器挂载/卸载、节点名称、pod管理器、pod workers、kubeClient、volume插件管理器、容器运行时、
//挂载器、主机工具、pod目录、事件记录器、是否保留已终止的pod卷以及块卷路径处理器。
//- 这些参数用于配置和初始化新的volumeManager,以便管理节点上的卷。
//接下来,关于backOff的设置:
//- 通过调用flowcontrol.NewBackOff()函数创建一个新的backOff对象。
//- 该函数接受两个参数,即回退周期和最大容器回退时间。
//- 这些参数用于配置回退对象的行为,以便在容器启动失败时进行重试,避免频繁地立即重试。
// setup eviction manager
evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig,
killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock, kubeCfg.LocalStorageCapacityIsolation)
klet.evictionManager = evictionManager
klet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
//这段代码是设置驱逐管理器的。它通过调用eviction.NewManager函数创建一个驱逐管理器和一个驱逐准入处理器。
//其中,驱逐管理器用于管理节点的资源驱逐策略,驱逐准入处理器用于在Pod准入阶段进行驱逐检查和处理。
//接着,将驱逐管理器赋值给klet.evictionManager,将驱逐准入处理器添加到klet.admitHandlers的Pod准入处理器列表中。
// Safe, allowed sysctls can always be used as unsafe sysctls in the spec.
// Hence, we concatenate those two lists.
safeAndUnsafeSysctls := append(sysctl.SafeSysctlAllowlist(), allowedUnsafeSysctls...)
sysctlsAllowlist, err := sysctl.NewAllowlist(safeAndUnsafeSysctls)
if err != nil {
return nil, err
}
klet.admitHandlers.AddPodAdmitHandler(sysctlsAllowlist)
//这段Go代码主要实现了以下功能:
//1. 将安全的sysctls和不安全的sysctls合并成一个列表 safeAndUnsafeSysctls。
//2. 使用合并后的列表创建一个sysctl允许列表 sysctlsAllowlist。
//3. 如果创建允许列表时发生错误,则返回 nil 和错误信息。
//4. 将sysctls允许列表添加到 klet.admitHandlers 中作为Pod的审核处理器。
//这段代码的作用是通过合并安全和不安全的sysctls列表,并创建一个允许列表,然后将其添加为Pod的审核处理器,
//以确保只有允许的sysctls才能在系统中使用。
// enable active deadline handler
activeDeadlineHandler, err := newActiveDeadlineHandler(klet.statusManager, kubeDeps.Recorder, klet.clock)
if err != nil {
return nil, err
}
klet.AddPodSyncLoopHandler(activeDeadlineHandler)
klet.AddPodSyncHandler(activeDeadlineHandler)
//这段Go代码的功能是启用一个激活期限处理器。
//它首先通过调用newActiveDeadlineHandler函数创建一个激活期限处理器,并将其与状态管理器、事件记录器和时钟进行关联。
//接着,通过调用AddPodSyncLoopHandler和AddPodSyncHandler方法,
//将激活期限处理器添加到Pod同步循环处理器和Pod同步处理器中,以便在处理Pod时能够检查是否超过了激活期限。
//如果在创建激活期限处理器时出现错误,函数将返回nil和错误信息。
klet.admitHandlers.AddPodAdmitHandler(klet.containerManager.GetAllocateResourcesPodAdmitHandler())
criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder)
klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources))
// apply functional Option's
for _, opt := range kubeDeps.Options {
opt(klet)
}
//这段Go代码中定义了一个函数,函数中包含多个步骤。
//首先,通过klet.admitHandlers.AddPodAdmitHandler()方法添加了一个Pod Admit Handler。
//然后,创建了一个Critical Pod Admit Handler,并通过klet.admitHandlers.AddPodAdmitHandler()方法将其添加到admitHandlers中。
//接着,创建了一个Predicate Admit Handler,并通过klet.admitHandlers.AddPodAdmitHandler()方法将其添加到admitHandlers中。
//最后,对klet应用了一系列的函数式选项。
if sysruntime.GOOS == "linux" {
// AppArmor is a Linux kernel security module and it does not support other operating systems.
klet.appArmorValidator = apparmor.NewValidator()
klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewAppArmorAdmitHandler(klet.appArmorValidator))
}
//这段Go代码是在检查当前运行的操作系统是否为Linux,如果是Linux,则创建一个AppArmorValidator对象,
//并将其添加到softAdmitHandlers的PodAdmitHandler中。
//AppArmor是Linux内核的一个安全模块,这段代码的目的是在Kubernetes中启用AppArmor的安全功能。
leaseDuration := time.Duration(kubeCfg.NodeLeaseDurationSeconds) * time.Second
renewInterval := time.Duration(float64(leaseDuration) * nodeLeaseRenewIntervalFraction)
klet.nodeLeaseController = lease.NewController(
klet.clock,
klet.heartbeatClient,
string(klet.nodeName),
kubeCfg.NodeLeaseDurationSeconds,
klet.onRepeatedHeartbeatFailure,
renewInterval,
string(klet.nodeName),
v1.NamespaceNodeLease,
util.SetNodeOwnerFunc(klet.heartbeatClient, string(klet.nodeName)))
//这个Go函数主要目的是创建并初始化一个node lease控制器。
//该控制器用于管理节点的租约,以确保节点在Kubernetes集群中保持活动状态。
//函数首先根据配置确定租约的持续时间和更新间隔,然后使用这些值以及提供的客户端、节点名和其他参数来创建一个新的lease控制器实例。
// setup node shutdown manager
shutdownManager, shutdownAdmitHandler := nodeshutdown.NewManager(&nodeshutdown.Config{
Logger: logger,
ProbeManager: klet.probeManager,
Recorder: kubeDeps.Recorder,
NodeRef: nodeRef,
GetPodsFunc: klet.GetActivePods,
KillPodFunc: killPodNow(klet.podWorkers, kubeDeps.Recorder),
SyncNodeStatusFunc: klet.syncNodeStatus,
ShutdownGracePeriodRequested: kubeCfg.ShutdownGracePeriod.Duration,
ShutdownGracePeriodCriticalPods: kubeCfg.ShutdownGracePeriodCriticalPods.Duration,
ShutdownGracePeriodByPodPriority: kubeCfg.ShutdownGracePeriodByPodPriority,
StateDirectory: rootDirectory,
})
klet.shutdownManager = shutdownManager
klet.usernsManager, err = userns.MakeUserNsManager(klet)
if err != nil {
return nil, err
}
klet.admitHandlers.AddPodAdmitHandler(shutdownAdmitHandler)
//这段代码是Go语言编写的,用于设置节点关闭管理器。
//首先,通过调用nodeshutdown.NewManager函数创建一个新的节点关闭管理器和一个节点关闭准入处理器。
//在创建过程中,传入了日志记录器、探针管理器、事件记录器、节点引用、获取活跃Pods的函数、杀死Pod的函数、同步节点状态的函数、
//节点关闭的优雅等待时间、节点关闭的优雅等待时间(针对关键Pods)、按Pod优先级设置的节点关闭的优雅等待时间以及状态目录等配置参数。
//然后,将节点关闭管理器赋值给klet.shutdownManager,并将用户命名空间管理器赋值给klet.usernsManager。
//如果创建用户命名空间管理器失败,则返回错误。
//最后,将节点关闭准入处理器添加到klet.admitHandlers中的Pod准入处理器列表中。
// Finally, put the most recent version of the config on the Kubelet, so
// people can see how it was configured.
klet.kubeletConfiguration = *kubeCfg
// Generating the status funcs should be the last thing we do,
// since this relies on the rest of the Kubelet having been constructed.
klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()
return klet, nil
}
//这段Go代码是Kubernetes中的Kubelet初始化过程的一部分。在这个函数中,主要完成了以下两步操作:
//1. 将最新的配置版本应用到Kubelet上,这样用户就可以查看Kubelet是如何配置的。
//2. 生成节点状态报告函数,这应该是我们做的最后一件事,因为它依赖于Kubelet的其他部分已经构建完成。
//最后,该函数返回初始化完成的Kubelet实例和一个nil的错误值。
type serviceLister interface {
List(labels.Selector) ([]*v1.Service, error)
}
//这段Go代码定义了一个名为serviceLister的接口,其中包含一个List方法。
//该方法接收一个labels.Selector类型的参数,并返回一个[]*v1.Service类型(即服务列表)和一个错误值。
//这个接口的作用是用于列举出符合给定标签选择器的服务列表。
// Kubelet is the main kubelet implementation.
type Kubelet struct {
kubeletConfiguration kubeletconfiginternal.KubeletConfiguration
// hostname is the hostname the kubelet detected or was given via flag/config
hostname string
// hostnameOverridden indicates the hostname was overridden via flag/config
hostnameOverridden bool
//该Go代码定义了一个名为Kubelet的结构体,它表示一个Kubelet实例。
//- kubeletConfiguration是kubeletconfiginternal.KubeletConfiguration类型的一个字段,用于存储Kubelet的配置信息。
//- hostname是一个字符串字段,表示Kubelet检测到或通过标志/配置给定的主机名。
//- hostnameOverridden是一个布尔字段,指示主机名是否通过标志/配置被覆盖。
nodeName types.NodeName
runtimeCache kubecontainer.RuntimeCache
kubeClient clientset.Interface
heartbeatClient clientset.Interface
// mirrorPodClient is used to create and delete mirror pods in the API for static
// pods.
mirrorPodClient kubepod.MirrorClient
//该Go函数定义了一个结构体,包含以下成员变量:
//- nodeName:类型为types.NodeName,表示节点名称。
//- runtimeCache:类型为kubecontainer.RuntimeCache,表示运行时缓存。
//- kubeClient:类型为clientset.Interface,用于与Kubernetes集群进行交互。
//- heartbeatClient:类型为clientset.Interface,用于发送心跳等维护操作。
//- mirrorPodClient:类型为kubepod.MirrorClient,用于在API中创建和删除静态Pod的镜像Pod。
rootDirectory string
podLogsDirectory string
lastObservedNodeAddressesMux sync.RWMutex
lastObservedNodeAddresses []v1.NodeAddress
// onRepeatedHeartbeatFailure is called when a heartbeat operation fails more than once. optional.
onRepeatedHeartbeatFailure func()
//这段Go代码定义了以下内容:
//- rootDirectory和podLogsDirectory两个字符串变量,分别表示根目录和Pod日志目录。
//- lastObservedNodeAddressesMux是一个读写锁,用于保护lastObservedNodeAddresses变量的并发访问。
//- lastObservedNodeAddresses是一个包含v1.NodeAddress类型的切片,存储了最近观察到的节点地址。
//- onRepeatedHeartbeatFailure是一个可选的回调函数,当心跳操作失败多次时会被调用。
// podManager stores the desired set of admitted pods and mirror pods that the kubelet should be
// running. The actual set of running pods is stored on the podWorkers. The manager is populated
// by the kubelet config loops which abstracts receiving configuration from many different sources
// (api for regular pods, local filesystem or http for static pods). The manager may be consulted
// by other components that need to see the set of desired pods. Note that not all desired pods are
// running, and not all running pods are in the podManager - for instance, force deleting a pod
// from the apiserver will remove it from the podManager, but the pod may still be terminating and
// tracked by the podWorkers. Components that need to know the actual consumed resources of the
// node or are driven by podWorkers and the sync*Pod methods (status, volume, stats) should also
// consult the podWorkers when reconciling.
//
// TODO: review all kubelet components that need the actual set of pods (vs the desired set)
// and update them to use podWorkers instead of podManager. This may introduce latency in some
// methods, but avoids race conditions and correctly accounts for terminating pods that have
// been force deleted or static pods that have been updated.
// https://github.com/kubernetes/kubernetes/issues/116970
podManager kubepod.Manager
//podManager是一个存储了kubelet应该运行的期望的Pod集合(包括已承认的Pod和镜像Pod)。
//它的实际运行Pod集合存储在podWorkers中。podManager通过kubelet配置循环来抽象从不同来源接收配置
//(来自API的普通Pod,本地文件系统或HTTP的静态Pod)。
//其他组件可以查询podManager来获取期望的Pod集合。
//注意,不是所有期望的Pod都在运行,也不是所有运行的Pod都在podManager中。
//例如,从apiserver强制删除一个Pod会将其从podManager中移除,但该Pod可能仍在终止中,并由podWorkers跟踪。
//需要知道节点实际消耗资源的组件或由podWorkers和sync*Pod方法(状态,卷,统计信息)驱动的组件也应该在协调时咨询podWorkers。
// podWorkers is responsible for driving the lifecycle state machine of each pod. The worker is
// notified of config changes, updates, periodic reconciliation, container runtime updates, and
// evictions of all desired pods and will invoke reconciliation methods per pod in separate
// goroutines. The podWorkers are authoritative in the kubelet for what pods are actually being
// run and their current state:
//
// * syncing: pod should be running (syncPod)
// * terminating: pod should be stopped (syncTerminatingPod)
// * terminated: pod should have all resources cleaned up (syncTerminatedPod)
//
// and invoke the handler methods that correspond to each state. Components within the
// kubelet that need to know the phase of the pod in order to correctly set up or tear down
// resources must consult the podWorkers.
//
// Once a pod has been accepted by the pod workers, no other pod with that same UID (and
// name+namespace, for static pods) will be started until the first pod has fully terminated
// and been cleaned up by SyncKnownPods. This means a pod may be desired (in API), admitted
// (in pod manager), and requested (by invoking UpdatePod) but not start for an arbitrarily
// long interval because a prior pod is still terminating.
//
// As an event-driven (by UpdatePod) controller, the podWorkers must periodically be resynced
// by the kubelet invoking SyncKnownPods with the desired state (admitted pods in podManager).
// Since the podManager may be unaware of some running pods due to force deletion, the
// podWorkers are responsible for triggering a sync of pods that are no longer desired but
// must still run to completion.
podWorkers PodWorkers
//podWorkers是一个负责驱动每个Pod生命周期状态机的函数。
//它会在配置更改、更新、定期同步、容器运行时更新和驱逐所有期望的Pod时被通知,
//并在单独的goroutine中对每个Pod调用和解方法。
//podWorkers是kubelet中Pod实际运行状态和当前状态的权威。
//它通过调用对应于每个状态的处理方法,来实现Pod的同步、终止和清理等操作。
//此外,podWorkers还负责触发不再期望运行但必须完成同步的Pod。
// evictionManager observes the state of the node for situations that could impact node stability
// and evicts pods (sets to phase Failed with reason Evicted) to reduce resource pressure. The
// eviction manager acts on the actual state of the node and considers the podWorker to be
// authoritative.
evictionManager eviction.Manager
//该函数定义了一个名为evictionManager的变量,其类型为eviction.Manager。
//这个变量的目的是监控节点的状态,以便在可能影响节点稳定性的场景下进行操作。
//具体的操作是通过驱逐(设置为失败状态且附加 Evicted 原因)Pod来减少资源压力。
//驱逐管理器依据节点的实际状态进行操作,并将podWorker视为权威。
// probeManager tracks the set of running pods and ensures any user-defined periodic checks are
// run to introspect the state of each pod. The probe manager acts on the actual state of the node
// and is notified of pods by the podWorker. The probe manager is the authoritative source of the
// most recent probe status and is responsible for notifying the status manager, which
// synthesizes them into the overall pod status.
probeManager prober.Manager
//该函数定义了一个名为probeManager的变量,其类型为prober.Manager。
//probeManager的作用是跟踪正在运行的Pod集合,并确保运行用户定义的周期性检查,以检查每个Pod的状态。
//它是根据节点的实际状态进行操作的,并由podWorker通知Pod的状态。
//probeManager是最新探针状态的权威来源,并负责通知状态管理器,将其合成到整体Pod状态中。
// secretManager caches the set of secrets used by running pods on this node. The podWorkers
// notify the secretManager when pods are started and terminated, and the secretManager must
// then keep the needed secrets up-to-date as they change.
secretManager secret.Manager
//这段代码定义了一个名为secretManager的变量,
//其类型为secret.Manager。secretManager是一个缓存,
//用于存储当前节点上运行的Pod所使用的秘密集。
//podWorkers在Pod启动和终止时通知secretManager,然后secretManager必须根据秘密的变化保持所需秘密的更新。
// configMapManager caches the set of config maps used by running pods on this node. The
// podWorkers notify the configMapManager when pods are started and terminated, and the
// configMapManager must then keep the needed config maps up-to-date as they change.
configMapManager configmap.Manager
//这段代码定义了一个名为configMapManager的变量,它的类型是configmap.Manager。
//该变量用于缓存当前节点上运行的Pod所使用的ConfigMap集合。
//PodWorkers会在Pod启动和终止时通知configMapManager,然后configMapManager必须保持所需ConfigMap的更新。
// volumeManager observes the set of running pods and is responsible for attaching, mounting,
// unmounting, and detaching as those pods move through their lifecycle. It periodically
// synchronizes the set of known volumes to the set of actually desired volumes and cleans up
// any orphaned volumes. The volume manager considers the podWorker to be authoritative for
// which pods are running.
volumeManager volumemanager.VolumeManager
//这段代码定义了一个名为volumeManager的变量,它的类型是volumemanager.VolumeManager。
//该变量用于观察运行中的Pod集合,并负责在Pod生命周期中进行挂载、卸载和分离等操作。
//它会周期性地将已知的卷同步到期望的卷集合中,并清理任何孤儿卷。volumeManager将podWorker视为运行中Pod的权威。
// statusManager receives updated pod status updates from the podWorker and updates the API
// status of those pods to match. The statusManager is authoritative for the synthesized
// status of the pod from the kubelet's perspective (other components own the individual
// elements of status) and should be consulted by components in preference to assembling
// that status themselves. Note that the status manager is downstream of the pod worker
// and components that need to check whether a pod is still running should instead directly
// consult the pod worker.
statusManager status.Manager
//这段代码定义了一个名为statusManager的变量,它的类型是status.Manager。
//statusManager的作用是接收来自podWorker的更新后的Pod状态,并将这些Pod的状态更新为与API状态匹配。
//statusManager是关于kubelet视角下Pod合成状态的权威组件(其他组件各自拥有状态的各个元素),
//组件应该优先咨询statusManager而不是自己组装状态。需要注意的是,statusManager是在podWorker的下游,
//如果组件需要检查Pod是否仍在运行,应该直接咨询podWorker。
// resyncInterval is the interval between periodic full reconciliations of
// pods on this node.
resyncInterval time.Duration
//该函数定义了一个名为resyncInterval的变量,其类型为time.Duration,用于表示节点上周期性完全同步Pod的间隔时间。
// sourcesReady records the sources seen by the kubelet, it is thread-safe.
sourcesReady config.SourcesReady
//sourcesReady是一个config.SourcesReady类型的变量,用于记录kubelet看到的源。它是线程安全的。
// Optional, defaults to /logs/ from /var/log
logServer http.Handler
// Optional, defaults to simple Docker implementation
runner kubecontainer.CommandRunner
//这段代码定义了两个变量:
//1. logServer:一个类型为http.Handler的变量,用于处理日志服务的请求,默认值为/logs/路径下的日志文件。
//2. runner:一个类型为kubecontainer.CommandRunner的变量,用于执行容器内部的命令,默认实现为简单的Docker命令执行器。
//这两个变量都是可选的,如果没有显式地设置,它们将使用默认值。
// cAdvisor used for container information.
cadvisor cadvisor.Interface
//该行代码定义了一个名为cadvisor.Interface类型的变量cAdvisor,用于获取容器的信息
// Set to true to have the node register itself with the apiserver.
registerNode bool
// List of taints to add to a node object when the kubelet registers itself.
registerWithTaints []v1.Taint
// Set to true to have the node register itself as schedulable.
registerSchedulable bool
// for internal book keeping; access only from within registerWithApiserver
registrationCompleted bool
//这段代码定义了四个变量,用于配置节点在注册到 apiserver 时的一些行为:
//- registerNode:设置为 true 时,节点会自动注册到 apiserver。
//- registerWithTaints:节点注册时要添加的 taint 列表。
//- registerSchedulable:设置为 true 时,节点注册时会将自己标记为可调度的。
//- registrationCompleted:内部使用,用于记录节点是否已完成注册。
// dnsConfigurer is used for setting up DNS resolver configuration when launching pods.
dnsConfigurer *dns.Configurer
// serviceLister knows how to list services
serviceLister serviceLister
// serviceHasSynced indicates whether services have been sync'd at least once.
// Check this before trusting a response from the lister.
serviceHasSynced cache.InformerSynced
// nodeLister knows how to list nodes
nodeLister corelisters.NodeLister
// nodeHasSynced indicates whether nodes have been sync'd at least once.
// Check this before trusting a response from the node lister.
nodeHasSynced cache.InformerSynced
// a list of node labels to register
nodeLabels map[string]string
//这段代码定义了几个字段,用于配置和管理Kubernetes中的DNS解析器、服务列表、节点列表等。
//- dnsConfigurer *dns.Configurer:用于在启动Pod时设置DNS解析器的配置。
//- serviceLister serviceLister:用于列举服务。
//- serviceHasSynced cache.InformerSynced:指示服务是否至少同步过一次。在信任列表器的响应之前,应该检查此字段。
//- nodeLister corelisters.NodeLister:用于列举节点。
//- nodeHasSynced cache.InformerSynced:指示节点是否至少同步过一次。
//在信任节点列表器的响应之前,应该检查此字段。
//- nodeLabels map[string]string:要注册的节点标签列表。
// Last timestamp when runtime responded on ping.
// Mutex is used to protect this value.
runtimeState *runtimeState
// Volume plugins.
volumePluginMgr *volume.VolumePluginMgr
// Manages container health check results.
livenessManager proberesults.Manager
readinessManager proberesults.Manager
startupManager proberesults.Manager
//这段代码定义了四个变量,分别用于记录运行时响应ping的最后一个时间戳、管理卷插件、管理容器健康检查结果和管理容器就绪检查结果。
//其中,runtimeState使用互斥锁来保护,volumePluginMgr是卷插件管理器,
//livenessManager、readinessManager和startupManager分别用于管理容器的存活检查结果、就绪检查结果和启动检查结果。
// How long to keep idle streaming command execution/port forwarding
// connections open before terminating them
streamingConnectionIdleTimeout time.Duration
//该函数定义了一个时间间隔,用于控制在终止闲置的流式命令执行/端口转发连接之前保持连接打开的时间长度。
// The EventRecorder to use
recorder record.EventRecorder
//这段代码定义了一个名为recorder的变量,其类型为record.EventRecorder。这是一个用于记录Kubernetes事件的接口。
// Policy for handling garbage collection of dead containers.
containerGC kubecontainer.GC
//该函数定义了一个处理垃圾回收的策略,用于回收死亡的容器。它使用了kubecontainer.GC类型,表示具体的垃圾回收策略。
// Manager for image garbage collection.
imageManager images.ImageGCManager
//该代码定义了一个名为imageManager的变量,其类型为images.ImageGCManager。这是一个用于管理图像垃圾收集的管理器。
// Manager for container logs.
containerLogManager logs.ContainerLogManager
//该函数定义了一个名为containerLogManager的接口,该接口用于管理容器日志。
// Cached MachineInfo returned by cadvisor.
machineInfoLock sync.RWMutex
machineInfo *cadvisorapi.MachineInfo
//这段Go代码定义了一个全局变量machineInfo,它是一个指向cadvisorapi.MachineInfo类型的指针,用于存储由cadvisor返回的机器信息。
//同时,为了保证并发安全,使用了sync.RWMutex类型的machineInfoLock变量作为锁。
// Handles certificate rotations.
serverCertificateManager certificate.Manager
//该代码定义了一个名为serverCertificateManager的变量,其类型为certificate.Manager,用于处理证书轮换。
// Cloud provider interface.
cloud cloudprovider.Interface
// Handles requests to cloud provider with timeout
cloudResourceSyncManager cloudresource.SyncManager
//这段代码定义了两个变量:
//1. cloud:代表云提供商的接口,具体实现取决于云提供商,它定义了与云提供商交互的方法。
//2. cloudResourceSyncManager:代表一个同步管理器,用于处理与云提供商资源的同步操作,
//具体实现可能包括同步云提供商的虚拟机、网络、存储等资源信息到本地缓存,并定期更新这些信息以保持数据的同步。
// Indicates that the node initialization happens in an external cloud controller
externalCloudProvider bool
// Reference to this node.
nodeRef *v1.ObjectReference
//这段代码定义了两个Go语言的变量。
//1. externalCloudProvider是一个布尔类型的变量,用于指示节点初始化是在外部云控制器中进行的。
//2. nodeRef是一个指向v1.ObjectReference类型的指针,用于引用该节点。
// Container runtime.
containerRuntime kubecontainer.Runtime
//该代码定义了一个变量 containerRuntime,其类型为 kubecontainer.Runtime,用于表示容器运行时环境。
// Streaming runtime handles container streaming.
streamingRuntime kubecontainer.StreamingRuntime
//该函数是一个Go语言函数声明,函数名称为streamingRuntime,函数参数为kubecontainer.StreamingRuntime类型。
//该函数用于处理容器的流式传输。
// Container runtime service (needed by container runtime Start()).
runtimeService internalapi.RuntimeService
//这个Go函数定义了一个名为runtimeService的变量,它是一个internalapi.RuntimeService类型的接口。
//这个接口用于与容器运行时服务进行交互,是容器运行时启动所必需的。
// reasonCache caches the failure reason of the last creation of all containers, which is
// used for generating ContainerStatus.
reasonCache *ReasonCache
//该行代码定义了一个名为reasonCache的变量,其类型为*ReasonCache。
//ReasonCache是一个用于缓存所有容器最后创建失败原因的结构体,这些失败原因用于生成ContainerStatus。
// containerRuntimeReadyExpected indicates whether container runtime being ready is expected
// so errors are logged without verbosity guard, to avoid excessive error logs at node startup.
// It's false during the node initialization period of nodeReadyGracePeriod, and after that
// it's set to true by fastStatusUpdateOnce when it exits.
containerRuntimeReadyExpected bool
//该函数用于指示容器运行时是否预期准备好,以便在节点启动时避免过多错误日志。
//在节点初始化期间为false,之后通过fastStatusUpdateOnce退出时设置为true。
// nodeStatusUpdateFrequency specifies how often kubelet computes node status. If node lease
// feature is not enabled, it is also the frequency that kubelet posts node status to master.
// In that case, be cautious when changing the constant, it must work with nodeMonitorGracePeriod
// in nodecontroller. There are several constraints:
// 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where
// N means number of retries allowed for kubelet to post node status. It is pointless
// to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there
// will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency.
// The constant must be less than podEvictionTimeout.
// 2. nodeStatusUpdateFrequency needs to be large enough for kubelet to generate node
// status. Kubelet may fail to update node status reliably if the value is too small,
// as it takes time to gather all necessary node information.
nodeStatusUpdateFrequency time.Duration
//这个go函数定义了一个时间间隔,表示kubelet计算节点状态的频率。
//如果未启用节点租约功能,则该频率也是kubelet向主节点发布节点状态的频率。
//在更改该常量时需要谨慎,它必须与节点控制器中的nodeMonitorGracePeriod配合使用,并且需要满足以下约束条件:
//1. nodeMonitorGracePeriod必须是nodeStatusUpdateFrequency的N倍,其中N表示kubelet发布节点状态允许的重试次数。
//将nodeMonitorGracePeriod设置为小于nodeStatusUpdateFrequency没有意义,
//因为kubelet只会以nodeStatusUpdateFrequency的时间间隔提供新鲜值。
//该常量必须小于podEvictionTimeout。
//2. nodeStatusUpdateFrequency需要足够大,以便kubelet生成节点状态。
//如果该值太小,kubelet可能无法可靠地更新节点状态,
//因为它需要时间来收集所有必要的节点信息。
// nodeStatusReportFrequency is the frequency that kubelet posts node
// status to master. It is only used when node lease feature is enabled.
nodeStatusReportFrequency time.Duration
//该代码定义了一个变量nodeStatusReportFrequency,它是kubelet向master报告节点状态的频率。
//这个变量只在启用了节点租约功能时使用。
// lastStatusReportTime is the time when node status was last reported.
lastStatusReportTime time.Time
//该函数定义了一个变量lastStatusReportTime,它表示节点状态最后一次报告的时间,其类型为time.Time。
// syncNodeStatusMux is a lock on updating the node status, because this path is not thread-safe.
// This lock is used by Kubelet.syncNodeStatus and Kubelet.fastNodeStatusUpdate functions and shouldn't be used anywhere else.
syncNodeStatusMux sync.Mutex
//这个Go函数定义了一个名为syncNodeStatusMux的互斥锁,用于保护节点状态的更新操作,因为这条路径不是线程安全的。
//这个互斥锁仅被Kubelet.syncNodeStatus和Kubelet.fastNodeStatusUpdate函数使用,不建议在其他地方使用。
// updatePodCIDRMux is a lock on updating pod CIDR, because this path is not thread-safe.
// This lock is used by Kubelet.updatePodCIDR function and shouldn't be used anywhere else.
updatePodCIDRMux sync.Mutex
//这段代码定义了一个名为updatePodCIDRMux的互斥锁,用于保证更新Pod CIDR的路径的线程安全性。
//这个互斥锁仅被Kubelet的updatePodCIDR函数使用,不建议在其他地方使用。
// updateRuntimeMux is a lock on updating runtime, because this path is not thread-safe.
// This lock is used by Kubelet.updateRuntimeUp, Kubelet.fastNodeStatusUpdate and
// Kubelet.HandlerSupportsUserNamespaces functions and shouldn't be used anywhere else.
updateRuntimeMux sync.Mutex
//这个函数定义了一个名为updateRuntimeMux的互斥锁,用于保证更新运行时的路径线程安全。
//这个互斥锁被Kubelet.updateRuntimeUp、Kubelet.fastNodeStatusUpdate和Kubelet.HandlerSupportsUserNamespaces函数使用,
//不应该在其他地方使用。
// nodeLeaseController claims and renews the node lease for this Kubelet
nodeLeaseController lease.Controller
//nodeLeaseController 是一个 lease.Controller 类型的变量。它的功能是声明和更新当前 Kubelet 的节点租约。
// pleg observes the state of the container runtime and notifies the kubelet of changes to containers, which
// notifies the podWorkers to reconcile the state of the pod (for instance, if a container dies and needs to
// be restarted).
pleg pleg.PodLifecycleEventGenerator
//该函数定义了一个名为pleg的变量,其类型为pleg.PodLifecycleEventGenerator。
//这个函数的作用是观察容器运行时的状态,并将容器的变化通知给kubelet,进而通知podWorkers去协调pod的状态。
//例如,如果一个容器死亡需要重启,pleg会通知kubelet和podWorkers去处理。
// eventedPleg supplements the pleg to deliver edge-driven container changes with low-latency.
eventedPleg pleg.PodLifecycleEventGenerator
//该Go函数定义了一个名为eventedPleg的变量,其类型为pleg.PodLifecycleEventGenerator。
//这个函数的作用是补充(pleg)以提供边缘驱动的容器变化的低延迟。
// Store kubecontainer.PodStatus for all pods.
podCache kubecontainer.Cache
//该代码定义了一个名为podCache的变量,其类型为kubecontainer.Cache。
//这个变量的作用是存储所有Pod的kubecontainer.PodStatus信息。
// os is a facade for various syscalls that need to be mocked during testing.
os kubecontainer.OSInterface
//该函数定义了一个类型os,它是一个kubecontainer.OSInterface的别名。
//这个类型的目的是作为一个Facade,用于在测试期间模拟各种系统调用。
// Watcher of out of memory events.
oomWatcher oomwatcher.Watcher
//该Go函数定义了一个名为oomWatcher的变量,其类型为oomwatcher.Watcher。这个变量用于监控内存溢出事件。
// Monitor resource usage
resourceAnalyzer serverstats.ResourceAnalyzer
//该函数定义了一个资源分析器,类型为serverstats.ResourceAnalyzer。通过该资源分析器可以监控服务器的资源使用情况。
// Whether or not we should have the QOS cgroup hierarchy for resource management
cgroupsPerQOS bool
//该函数定义了一个名为cgroupsPerQOS的变量,其类型为bool。该变量用于指示是否应该为资源管理而创建QOS级别的cgroup层次结构。
// If non-empty, pass this to the container runtime as the root cgroup.
cgroupRoot string
//该函数定义了一个名为cgroupRoot的字符串变量,它表示容器运行时的根cgroup。如果cgroupRoot不为空,则将其传递给容器运行时作为根cgroup。
// Mounter to use for volumes.
mounter mount.Interface
//这段代码定义了一个名为Mounter的接口,用于挂载卷。
// hostutil to interact with filesystems
hostutil hostutil.HostUtils
//该函数定义了一个名为hostutil的变量,它是一个hostutil.HostUtils类型的变量。
//这个变量是用来与文件系统进行交互的工具类。它可以提供一系列的方法,如读取文件、写入文件、创建文件夹等操作。
//通过这个变量,可以方便地对文件系统进行操作。
// subpather to execute subpath actions
subpather subpath.Interface
//该函数定义了一个名为subpather的变量,它是一个subpath.Interface类型的变量。这个变量是用来执行子路径操作的接口。
// Manager of non-Runtime containers.
containerManager cm.ContainerManager
//该代码定义了一个名为containerManager的变量,它是一个cm.ContainerManager类型的容器管理器。
//具体功能和用途需要查看cm.ContainerManager该代码的实现和相关文档才能确定。
// Maximum Number of Pods which can be run by this Kubelet
maxPods int
//该函数定义了一个名为maxPods的整型变量,它表示这个Kubelet能够运行的最大Pods数量。
// Monitor Kubelet's sync loop
syncLoopMonitor atomic.Value
//该函数是一个Go语言的函数,名为syncLoopMonitor,它用于监控Kubelet的同步循环。
//该函数使用了atomic.Value类型,atomic.Value是Go语言标准库中的一种原子类型,可以用于安全地存储和加载值。
//在这个函数中,syncLoopMonitor被定义为atomic.Value类型,可以用来存储和加载Kubelet的同步循环的状态信息。
//具体来说,syncLoopMonitor可以用来监控Kubelet的同步循环是否正常运行,以及同步循环的运行状态如何。
//通过使用atomic.Value类型,可以在多线程环境下安全地读写syncLoopMonitor的值,确保了数据的一致性和安全性。
//总之,syncLoopMonitor函数用于监控Kubelet的同步循环,并使用atomic.Value类型来保证数据的一致性和安全性。
// Container restart Backoff
backOff *flowcontrol.Backoff
//这个Go函数定义了一个名为"backOff"的变量,它是flowcontrol包中的Backoff类型。这个变量用于实现容器重启的退避策略。
// Information about the ports which are opened by daemons on Node running this Kubelet server.
daemonEndpoints *v1.NodeDaemonEndpoints
//该函数用于获取由运行此Kubelet服务器的节点上的守护进程打开的端口的信息。返回一个类型为*v1.NodeDaemonEndpoints的指针。
// A queue used to trigger pod workers.
workQueue queue.WorkQueue
//该函数定义了一个名为workQueue的队列,用于触发Pod workers。
//这是一个工作队列,可以用来存储待处理的任务,并且按照一定的顺序进行处理。
//在Go语言中,队列通常被用来实现并发编程中的生产者-消费者模式,其中生产者负责向队列中添加任务,消费者负责从队列中取出任务并进行处理。
//这个workQueue可以被用来协调多个Pod workers的工作,确保任务能够被有序、高效地处理。
// oneTimeInitializer is used to initialize modules that are dependent on the runtime to be up.
oneTimeInitializer sync.Once
//这个函数是一个用于初始化依赖运行时的模块的同步函数。它使用了sync.Once来确保初始化只执行一次。
// If set, use this IP address or addresses for the node
nodeIPs []net.IP
//该函数用于设置节点的IP地址或地址列表。
//- nodeIPs []net.IP:表示要设置的节点IP地址或地址列表,类型为[]net.IP,即可以是一个IP地址或多个IP地址的切片。
// use this function to validate the kubelet nodeIP
nodeIPValidator func(net.IP) error
//该函数用于验证Kubelet节点的IP地址是否有效。
//其输入参数为一个net.IP类型的IP地址,返回值为一个error类型的错误信息。
//具体验证过程和逻辑未在代码片段中展示。
// If non-nil, this is a unique identifier for the node in an external database, eg. cloudprovider
providerID string
//该函数用于设置节点在外部数据库中的唯一标识符,例如云提供商。
// clock is an interface that provides time related functionality in a way that makes it
// easy to test the code.
clock clock.WithTicker
//该函数实现了一个时钟接口clock,其中WithTicker方法返回一个定时器,该定时器会在指定的时间间隔后发出时间信号。
//这个接口的目的是为了方便测试代码,因为它可以模拟时间的流逝和定时器的行为。
// handlers called during the tryUpdateNodeStatus cycle
setNodeStatusFuncs []func(context.Context, *v1.Node) error
//这段Go代码定义了一个名为setNodeStatusFuncs的切片,其元素是一个函数类型,函数接受context.Context和*v1.Node作为参数,
//并返回一个error类型。 这个切片主要用于存储在tryUpdateNodeStatus周期中被调用的处理函数。
lastNodeUnschedulableLock sync.Mutex
// maintains Node.Spec.Unschedulable value from previous run of tryUpdateNodeStatus()
lastNodeUnschedulable bool
//这段代码定义了一个互斥锁lastNodeUnschedulableLock和一个布尔值lastNodeUnschedulable,
//用于维护节点的Spec.Unschedulable值从上一次运行tryUpdateNodeStatus()函数开始就没有改变过。
// the list of handlers to call during pod admission.
admitHandlers lifecycle.PodAdmitHandlers
//该函数定义了一个名为admitHandlers的变量,它是一个lifecycle.PodAdmitHandlers类型的切片。
//这个变量用于存储在Pod准入阶段需要调用的一系列处理器(handlers)。
//lifecycle.PodAdmitHandlers是一个结构体类型,它包含了多个处理Pod准入的处理器,例如PreAdmit、Admit和PostAdmit等。
//每个处理器都是一个函数,它们会在Pod准入的不同阶段被调用,以执行相应的准入控制逻辑。
//通过这个admitHandlers变量,可以在处理Pod准入时动态添加或移除处理器,以灵活地控制Pod的准入流程。
// softAdmithandlers are applied to the pod after it is admitted by the Kubelet, but before it is
// run. A pod rejected by a softAdmitHandler will be left in a Pending state indefinitely. If a
// rejected pod should not be recreated, or the scheduler is not aware of the rejection rule, the
// admission rule should be applied by a softAdmitHandler.
softAdmitHandlers lifecycle.PodAdmitHandlers
//这段Go代码定义了一个名为softAdmitHandlers的变量,它是一个lifecycle.PodAdmitHandlers类型的变量。
//这个变量是一个软准入处理器,它会在Kubelet接纳Pod之后、运行Pod之前对Pod进行处理。
//如果一个Pod被软准入处理器拒绝,它将无限期地保持Pending状态。
//如果一个被拒绝的Pod不应该被重新创建,或者调度器不知道拒绝规则,那么应该由软准入处理器来应用准入规则。
// the list of handlers to call during pod sync loop.
lifecycle.PodSyncLoopHandlers
//这段代码定义了一个名为PodSyncLoopHandlers的变量,它是一个lifecycle.PodSyncLoopHandlers类型的手册。
//这个变量用于在Pod同步循环中调用的一系列处理程序。
// the list of handlers to call during pod sync.
lifecycle.PodSyncHandlers
//lifecycle.PodSyncHandlers是一个包含了在同步pod时需要调用的一系列处理程序的列表。
// the number of allowed pods per core
podsPerCore int
//该代码片段定义了一个名为podsPerCore的整型变量,用于表示每核允许的Pod数量。
// enableControllerAttachDetach indicates the Attach/Detach controller
// should manage attachment/detachment of volumes scheduled to this node,
// and disable kubelet from executing any attach/detach operations
enableControllerAttachDetach bool
//该函数用于设置是否启用控制器来管理卷的挂载和卸载操作,并禁用kubelet执行任何挂载/卸载操作。
//其中,参数enableControllerAttachDetach为一个布尔值,若设置为true则启用控制器管理卷的挂载和卸载,
//若设置为false则禁用控制器管理卷的挂载和卸载。
// trigger deleting containers in a pod
containerDeletor *podContainerDeletor
//这个Go代码定义了一个名为containerDeletor的变量,它是podContainerDeletor类型的指针。
//podContainerDeletor是一个结构体,用于触发删除Pod中的容器。
// config iptables util rules
makeIPTablesUtilChains bool
//该代码片段定义了一个名为makeIPTablesUtilChains的布尔类型变量,用于配置iptables实用程序的规则。
// The AppArmor validator for checking whether AppArmor is supported.
appArmorValidator apparmor.Validator
//这段代码定义了一个名为appArmorValidator的变量,其类型为apparmor.Validator。
//该变量用于验证是否支持AppArmor。
//AppArmor(ApplicationArmor)是一种安全模块,用于增强Linux操作系统的安全性。
//它通过为每个应用程序定义安全策略来限制其访问系统资源的权限。
//apparmor.Validator是一个接口,用于检查系统中是否启用了AppArmor,并验证其配置是否正确。
//综上所述,appArmorValidator是一个用于验证系统是否支持AppArmor的变量。
// StatsProvider provides the node and the container stats.
StatsProvider *stats.Provider
//该Go函数定义了一个名为StatsProvider的变量,该变量指向一个stats.Provider类型的对象。
//stats.Provider是一个接口,用于提供节点和容器的统计信息。
// This flag, if set, instructs the kubelet to keep volumes from terminated pods mounted to the node.
// This can be useful for debugging volume related issues.
keepTerminatedPodVolumes bool // DEPRECATED
//该函数定义了一个名为keepTerminatedPodVolumes的布尔型变量,通过该变量控制kubelet是否将已终止pod的卷保持挂载到节点上。
//该特性用于调试与卷相关的故障。注意,该变量已废弃。
// pluginmanager runs a set of asynchronous loops that figure out which
// plugins need to be registered/unregistered based on this node and makes it so.
pluginManager pluginmanager.PluginManager
//该函数定义了一个名为pluginManager的变量,其类型为pluginmanager.PluginManager。
//该变量运行一组异步循环,用于确定基于当前节点需要注册/注销的插件,并进行相应操作。
// This flag sets a maximum number of images to report in the node status.
nodeStatusMaxImages int32
//该函数用于设置节点状态报告中最大图像数量的上限。
// Handles RuntimeClass objects for the Kubelet.
runtimeClassManager *runtimeclass.Manager
//这个Go函数定义了一个名为runtimeClassManager的变量,它是runtimeclass.Manager类型的一个实例。
//这个变量用于处理Kubelet的RuntimeClass对象。
// Handles node shutdown events for the Node.
shutdownManager nodeshutdown.Manager
//该函数用于处理节点关闭事件。
//- 参数:
//- nodeshutdown.Manager:节点关闭管理器。
//- 功能:
//- 监听节点关闭事件。
//- 当节点关闭时,执行相应的处理逻辑。
// Manage user namespaces
usernsManager *userns.UsernsManager
//该代码定义了一个变量usernsManager,其类型为*userns.UsernsManager,用于管理用户命名空间。
//用户命名空间是Linux系统中的一个特性,可以为不同用户创建独立的文件系统、网络等环境,实现资源的隔离。
//userns.UsernsManager是一个用于管理用户命名空间的封装对象,提供了创建、删除、查询等操作接口。
// Mutex to serialize new pod admission and existing pod resizing
podResizeMutex sync.Mutex
//这段代码定义了一个名为podResizeMutex的sync.Mutex类型变量。
//sync.Mutex是Go标准库中的一个互斥锁类型,用于控制并发访问共享资源。
//在本例中,podResizeMutex用于确保新建Pod的准入和现有Pod的调整大小操作的序列化执行,以避免并发冲突。
// OpenTelemetry Tracer
tracer trace.Tracer
//该函数定义了一个OpenTelemetry Tracer。
// Track node startup latencies
nodeStartupLatencyTracker util.NodeStartupLatencyTracker
//该代码定义了一个名为nodeStartupLatencyTracker的变量,它的类型是util.NodeStartupLatencyTracker。根据变量的命名,
//可以推测这是一个用于追踪节点启动延迟的工具。具体实现和功能细节需要查看util.NodeStartupLatencyTracker的定义和实现。
}
// ListPodStats is delegated to StatsProvider, which implements stats.Provider interface
func (kl *Kubelet) ListPodStats(ctx context.Context) ([]statsapi.PodStats, error) {
return kl.StatsProvider.ListPodStats(ctx)
}
//这个函数是Kubelet的一个方法,用于列出所有Pod的统计信息。
//它将请求委托给实现了stats.Provider接口的StatsProvider对象的ListPodStats方法,
//返回一个包含所有Pod统计信息的切片。
// ListPodCPUAndMemoryStats is delegated to StatsProvider, which implements stats.Provider interface
func (kl *Kubelet) ListPodCPUAndMemoryStats(ctx context.Context) ([]statsapi.PodStats, error) {
return kl.StatsProvider.ListPodCPUAndMemoryStats(ctx)
}
//该函数是一个代理函数,将调用委托给实现了stats.Provider接口的StatsProvider对象的ListPodCPUAndMemoryStats方法。
//该方法用于获取Pod的CPU和内存统计信息。
// ListPodStatsAndUpdateCPUNanoCoreUsage is delegated to StatsProvider, which implements stats.Provider interface
func (kl *Kubelet) ListPodStatsAndUpdateCPUNanoCoreUsage(ctx context.Context) ([]statsapi.PodStats, error) {
return kl.StatsProvider.ListPodStatsAndUpdateCPUNanoCoreUsage(ctx)
}
//这个函数是Kubelet的一个方法,用于列出Pod的统计信息并更新CPU使用量。
//它委托给实现了stats.Provider接口的StatsProvider来完成具体的操作。
// ImageFsStats is delegated to StatsProvider, which implements stats.Provider interface
func (kl *Kubelet) ImageFsStats(ctx context.Context) (*statsapi.FsStats, *statsapi.FsStats, error) {
return kl.StatsProvider.ImageFsStats(ctx)
}
//该函数是Kubelet的一个方法,用于获取镜像文件系统的统计信息。
//它将请求委托给实现了stats.Provider接口的StatsProvider对象的ImageFsStats方法,返回镜像文件系统的使用情况统计信息。
// GetCgroupStats is delegated to StatsProvider, which implements stats.Provider interface
func (kl *Kubelet) GetCgroupStats(cgroupName string, updateStats bool) (*statsapi.ContainerStats, *statsapi.NetworkStats, error) {
return kl.StatsProvider.GetCgroupStats(cgroupName, updateStats)
}
//该函数是一个委托函数,将获取cgroup统计信息的任务委托给实现了stats.Provider接口的StatsProvider对象。
//函数接收一个cgroup名称和一个布尔值updateStats作为参数,
//并返回一个包含容器统计信息的statsapi.ContainerStats指针、一个包含网络统计信息的statsapi.NetworkStats指针以及可能出现的错误。
//具体实现由StatsProvider对象完成。
// GetCgroupCPUAndMemoryStats is delegated to StatsProvider, which implements stats.Provider interface
func (kl *Kubelet) GetCgroupCPUAndMemoryStats(cgroupName string, updateStats bool) (*statsapi.ContainerStats, error) {
return kl.StatsProvider.GetCgroupCPUAndMemoryStats(cgroupName, updateStats)
}
//该函数是一个委托函数,将获取cgroup的CPU和内存统计信息的任务委托给实现了stats.Provider接口的StatsProvider对象。
//函数接收一个cgroup名称和一个更新统计信息的布尔值作为参数,并返回一个指向statsapi.ContainerStats的指针和一个错误。
// RootFsStats is delegated to StatsProvider, which implements stats.Provider interface
func (kl *Kubelet) RootFsStats() (*statsapi.FsStats, error) {
return kl.StatsProvider.RootFsStats()
}
//这个函数的功能是获取Kubelet根文件系统的统计信息。
//它将请求委托给实现了stats.Provider接口的StatsProvider对象的RootFsStats方法,
//返回一个包含文件系统统计信息的statsapi.FsStats指针和一个错误值。
// RlimitStats is delegated to StatsProvider, which implements stats.Provider interface
func (kl *Kubelet) RlimitStats() (*statsapi.RlimitStats, error) {
return kl.StatsProvider.RlimitStats()
}
//该函数是Kubelet的RlimitStats方法,它被委托给StatsProvider来实现stats.Provider接口。
//该方法返回一个*statsapi.RlimitStats类型的对象和一个错误(如果有的话)。
// setupDataDirs creates:
// 1. the root directory
// 2. the pods directory
// 3. the plugins directory
// 4. the pod-resources directory
// 5. the checkpoint directory
// 6. the pod logs root directory
func (kl *Kubelet) setupDataDirs() error {
if cleanedRoot := filepath.Clean(kl.rootDirectory); cleanedRoot != kl.rootDirectory {
return fmt.Errorf("rootDirectory not in canonical form: expected %s, was %s", cleanedRoot, kl.rootDirectory)
}
//该函数用于设置Kubelet的数据目录。
//首先,它通过filepath.Clean函数清理kl.rootDirectory根目录的路径,
//然后检查清理后的路径是否与原始路径不同。
//如果不同,则返回一个错误,指出根目录路径不是规范形式。
pluginRegistrationDir := kl.getPluginsRegistrationDir()
pluginsDir := kl.getPluginsDir()
if err := os.MkdirAll(kl.getRootDir(), 0750); err != nil {
return fmt.Errorf("error creating root directory: %v", err)
}
if err := os.MkdirAll(kl.getPodLogsDir(), 0750); err != nil {
return fmt.Errorf("error creating pod logs root directory %q: %w", kl.getPodLogsDir(), err)
}
//这段Go代码主要实现了创建Kubernetes集群中两个目录的功能。
//1. 首先,通过调用kl.getPluginsRegistrationDir()方法获取插件注册目录的路径,然后调用os.MkdirAll()方法创建该目录,
//如果创建失败,则返回错误信息。
//2. 接着,通过调用kl.getPluginsDir()方法获取插件目录的路径,同样使用os.MkdirAll()方法创建该目录,如果创建失败,则返回错误信息。
//3. 最后,通过调用kl.getRootDir()方法获取根目录的路径,并使用os.MkdirAll()方法创建该目录,如果创建失败,则返回错误信息。
//需要注意的是,创建目录时的权限设置为0750,即所有者具有读、写和执行权限,而组和其他用户只具有读和执行权限。
//总结起来,这段代码的主要作用是在Kubernetes集群中创建插件注册目录、插件目录和Pod日志目录。
//该函数的作用是创建Kubelet的数据目录,包括根目录、Pods目录、插件目录、Pod资源目录、检查点目录和Pod日志根目录。
//函数首先检查根目录的规范性,然后创建各个目录,并返回可能发生的错误。
if err := kl.hostutil.MakeRShared(kl.getRootDir()); err != nil {
return fmt.Errorf("error configuring root directory: %v", err)
}
//该函数尝试将根目录配置为共享目录。如果执行失败,函数将返回一个错误消息。
if err := os.MkdirAll(kl.getPodsDir(), 0750); err != nil {
return fmt.Errorf("error creating pods directory: %v", err)
}
if err := os.MkdirAll(kl.getPluginsDir(), 0750); err != nil {
return fmt.Errorf("error creating plugins directory: %v", err)
}
if err := os.MkdirAll(kl.getPluginsRegistrationDir(), 0750); err != nil {
return fmt.Errorf("error creating plugins registry directory: %v", err)
}
//这段Go代码中包含了一个匿名的if条件语句,
//其主要功能是创建三个不同的目录:pod目录、插件目录和插件注册目录。
//具体来说,它通过调用os.MkdirAll函数来创建目录,并使用kl.getPodsDir()、
//kl.getPluginsDir()和kl.getPluginsRegistrationDir()方法分别获取目录的路径。
//如果创建目录时出现错误,函数会返回一个自定义的错误信息。
if err := os.MkdirAll(kl.getPodResourcesDir(), 0750); err != nil {
return fmt.Errorf("error creating podresources directory: %v", err)
}
//该函数的主要功能是在给定路径上创建一个目录。
//具体而言,它通过调用kl.getPodResourcesDir()获取目录路径,然后使用os.MkdirAll系统调用创建该目录及其所有父目录(如果不存在)。
//创建目录时指定的权限为0750。如果在创建目录的过程中遇到任何错误,函数将返回一个格式化的错误信息字符串,指示出无法创建目录的原因。
if utilfeature.DefaultFeatureGate.Enabled(features.ContainerCheckpoint) {
if err := os.MkdirAll(kl.getCheckpointsDir(), 0700); err != nil {
return fmt.Errorf("error creating checkpoint directory: %v", err)
}
}
//这段Go代码是条件语句,首先检查utilfeature.DefaultFeatureGate是否启用了features.ContainerCheckpoint特性,
//如果启用了,则尝试创建一个目录,目录路径通过kl.getCheckpointsDir()方法获取,权限设置为0700。
//如果创建目录过程中出现错误,则返回一个错误信息。
if selinux.GetEnabled() {
err := selinux.SetFileLabel(pluginRegistrationDir, config.KubeletPluginsDirSELinuxLabel)
if err != nil {
klog.InfoS("Unprivileged containerized plugins might not work, could not set selinux context on plugin registration dir", "path", pluginRegistrationDir, "err", err)
}
err = selinux.SetFileLabel(pluginsDir, config.KubeletPluginsDirSELinuxLabel)
if err != nil {
klog.InfoS("Unprivileged containerized plugins might not work, could not set selinux context on plugins dir", "path", pluginsDir, "err", err)
}
}
return nil
}
//该函数主要功能是检查Selinux是否启用,如果启用,则尝试为插件注册目录和插件目录设置Selinux上下文。
//如果设置失败,函数会记录一条信息但不会影响程序执行,最后返回nil。
// StartGarbageCollection starts garbage collection threads.
func (kl *Kubelet) StartGarbageCollection() {
loggedContainerGCFailure := false
go wait.Until(func() {
ctx := context.Background()
if err := kl.containerGC.GarbageCollect(ctx); err != nil {
klog.ErrorS(err, "Container garbage collection failed")
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ContainerGCFailed, err.Error())
loggedContainerGCFailure = true
//该函数是Kubelet的StartGarbageCollection方法,用于启动垃圾回收线程。
//它首先定义了一个布尔变量loggedContainerGCFailure,并初始化为false,用于记录是否已经记录了容器垃圾回收失败的日志。
//然后使用goroutine启动一个无限循环,定期执行垃圾回收操作。
//在每次循环中,通过调用kl.containerGC.GarbageCollect方法执行垃圾回收,并检查返回的错误。
//如果发生错误,会使用klog记录错误日志,并通过kl.recorder.Eventf方法向事件记录器发送一条警告事件,通知容器垃圾回收失败。
//同时,将loggedContainerGCFailure设置为true,表示已经记录了容器垃圾回收失败的日志。
//注意,以上只是函数的一部分,代码片段在记录了日志后就中断了,没有展示完整的函数实现。
} else {
var vLevel klog.Level = 4
if loggedContainerGCFailure {
vLevel = 1
loggedContainerGCFailure = false
}
//这段Go代码是条件语句的else部分,它声明了一个名为vLevel的变量并将其初始化为4。
//接着,它检查loggedContainerGCFailure变量的值,如果是true,则将vLevel设置为1,并将loggedContainerGCFailure重置为false。
//这段代码的作用是根据loggedContainerGCFailure的值来设置vLevel的值,并重置loggedContainerGCFailure。
klog.V(vLevel).InfoS("Container garbage collection succeeded")
}
}, ContainerGCPeriod, wait.NeverStop)
//这段这段GoGo代码是使用Kubernetes的logging代码是使用Kubernetes的logging库k库klog,来记录容器垃圾收集log,
//来记录容器垃圾收集成功的日成功的日志信息。函数体内部志信息。
//函数体内部使用了klog使用了klog的InfoS方法,的InfoS方法,该方法用于输出该方法用于输出日志信息。
//日志信息。其中,vLevel是其中,vLevel是日志级别,日志级别,根据具体需求设定。
//根据具体需求设定。
//该函数作为一个该函数作为一个匿名函数被传递给一个匿名函数被传递给一个名为After的名为After的函数,
//并设置了一个定时器函数,并设置了一个定时器,使得,使得该函数会在ContainerGCPeriod时间该函数会在ContainerGCPeriod时间后执行后执行。
//wait.NeverStop则表示该函数。wait.NeverStop则表示该函数会会一直等待执行,不会停止。
// when the high threshold is set to 100, and the max age is 0 (or the max age feature is disabled)
// stub the image GC manager
if kl.kubeletConfiguration.ImageGCHighThresholdPercent == 100 &&
(!utilfeature.DefaultFeatureGate.Enabled(features.ImageMaximumGCAge) || kl.kubeletConfiguration.ImageMaximumGCAge.Duration == 0) {
klog.V(2).InfoS("ImageGCHighThresholdPercent is set 100 and ImageMaximumGCAge is 0, Disable image GC")
return
}
//这段代码的功能是检查Kubelet配置中的图像垃圾回收高阈值是否设置为100,并且最大年龄是否为0或最大年龄功能是否禁用。
//如果是,则禁用图像垃圾回收,并记录相关信息。
prevImageGCFailed := false
beganGC := time.Now()
go wait.Until(func() {
ctx := context.Background()
if err := kl.imageManager.GarbageCollect(ctx, beganGC); err != nil {
if prevImageGCFailed {
klog.ErrorS(err, "Image garbage collection failed multiple times in a row")
// Only create an event for repeated failures
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ImageGCFailed, err.Error())
} else {
klog.ErrorS(err, "Image garbage collection failed once. Stats initialization may not have completed yet")
}
prevImageGCFailed = true
//这个Go函数用于定期执行镜像垃圾回收操作。
//具体来说,它通过wait.Until函数以一定间隔调用一个匿名函数,该匿名函数执行以下操作:
//1. 创建一个后台上下文ctx。
//2. 调用kl.imageManager.GarbageCollect方法执行镜像垃圾回收操作,传入ctx和开始垃圾回收的时间beganGC作为参数。
//3. 如果垃圾回收操作出现错误:
//- 如果之前垃圾回收操作也失败过,则记录错误日志,并且创建一个事件通知,表示垃圾回收失败多次。
//- 如果之前垃圾回收操作成功过,则记录错误日志,提示垃圾回收操作失败一次,可能是统计信息初始化未完成。
//4. 将prevImageGCFailed标志设置为true,表示垃圾回收操作失败。
} else {
var vLevel klog.Level = 4
if prevImageGCFailed {
vLevel = 1
prevImageGCFailed = false
}
//这段Go代码是条件语句的else部分,它首先声明了一个名为vLevel的klog.Level类型变量,并将其初始化为4。
//然后,通过判断prevImageGCFailed的值,如果其为true,则将vLevel设置为1,并将prevImageGCFailed重置为false。
//这段代码的主要作用是根据prevImageGCFailed的值来设置vLevel的值。
klog.V(vLevel).InfoS("Image garbage collection succeeded")
}
}, ImageGCPeriod, wait.NeverStop)
//这是一个Go语言函数,它通过klog库输出一条日志信息。函数体内部使用了klog.V(vLevel).InfoS方法,
//参数为"Image garbage collection succeeded",表示成功进行了镜像垃圾回收。
//该函数每间隔ImageGCPeriod时间执行一次,直到等待过程被终止。
}
// initializeModules will initialize internal modules that do not require the container runtime to be up.
// Note that the modules here must not depend on modules that are not initialized here.
func (kl *Kubelet) initializeModules() error {
// Prometheus metrics.
metrics.Register(
collectors.NewVolumeStatsCollector(kl),
collectors.NewLogMetricsCollector(kl.StatsProvider.ListPodStats),
)
metrics.SetNodeName(kl.nodeName)
servermetrics.Register()
//该函数是Kubelet的初始化模块函数,用于初始化内部模块,这些模块不需要容器运行时支持。
//函数主要进行三个方面的操作:
//1. 注册Prometheus指标,包括卷统计信息和日志指标;
//2. 设置节点名称;
//3. 注册服务器指标。
// Setup filesystem directories.
if err := kl.setupDataDirs(); err != nil {
return err
}
// If the container logs directory does not exist, create it.
if _, err := os.Stat(ContainerLogsDir); err != nil {
if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {
return fmt.Errorf("failed to create directory %q: %v", ContainerLogsDir, err)
}
}
//这段Go代码主要实现了两个功能:
//1. 调用kl.setupDataDirs()函数设置文件系统目录,如果设置失败则返回错误。
//2. 检查容器日志目录ContainerLogsDir是否存在,如果不存在则尝试创建该目录,如果创建失败则返回错误。
//其中,kl是一个对象,kl.os是kl对象中的一个操作系统相关的属性,kl.os.MkdirAll()函数用于创建目录。
// Start the image manager.
kl.imageManager.Start()
// Start the certificate manager if it was enabled.
if kl.serverCertificateManager != nil {
kl.serverCertificateManager.Start()
}
//这段Go代码中包含两个启动操作。
//首先,它启动了一个名为imageManager的图像管理器。
//然后,如果serverCertificateManager不为空,则启动证书管理器。
// Start out of memory watcher.
if kl.oomWatcher != nil {
if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
return fmt.Errorf("failed to start OOM watcher: %w", err)
}
}
// Start resource analyzer
kl.resourceAnalyzer.Start()
return nil
//这段Go代码主要包含两个部分的功能:启动内存溢出监视器(OOM watcher)和启动资源分析器(resource analyzer)。
//如果kl.oomWatcher不为空,则尝试启动OOM watcher,并通过kl.nodeRef传递节点引用。
//如果启动失败,则返回相应的错误信息。 然后,无论如何都会启动kl.resourceAnalyzer。
//最后,如果没有发生错误,则返回nil。
}
// initializeRuntimeDependentModules will initialize internal modules that require the container runtime to be up.
func (kl *Kubelet) initializeRuntimeDependentModules() {
if err := kl.cadvisor.Start(); err != nil {
// Fail kubelet and rely on the babysitter to retry starting kubelet.
klog.ErrorS(err, "Failed to start cAdvisor")
os.Exit(1)
}
//该函数是Kubelet的一个方法,用于初始化需要容器运行时才能启动的内部模块。
//主要工作是尝试启动cAdvisor,如果启动失败,则记录错误日志并退出Kubelet进程。
// trigger on-demand stats collection once so that we have capacity information for ephemeral storage.
// ignore any errors, since if stats collection is not successful, the container manager will fail to start below.
kl.StatsProvider.GetCgroupStats("/", true)
// Start container manager.
node, err := kl.getNodeAnyWay()
if err != nil {
// Fail kubelet and rely on the babysitter to retry starting kubelet.
klog.ErrorS(err, "Kubelet failed to get node info")
os.Exit(1)
}
//这个Go函数主要执行了两个操作:
//1. 首先,它调用kl.StatsProvider.GetCgroupStats("/", true)来触发一次按需统计信息的收集,以便我们有关临时存储容量的信息。
//即使统计信息收集不成功,容器管理器也不会启动,因此这里忽略了任何错误。
//2. 然后,它尝试调用kl.getNodeAnyWay()来启动容器管理器。如果获取节点信息失败,则记录错误并退出Kubelet进程,
//依赖babysitter来重试启动Kubelet。
// containerManager must start after cAdvisor because it needs filesystem capacity information
if err := kl.containerManager.Start(node, kl.GetActivePods, kl.sourcesReady, kl.statusManager, kl.runtimeService, kl.supportLocalStorageCapacityIsolation()); err != nil {
// Fail kubelet and rely on the babysitter to retry starting kubelet.
klog.ErrorS(err, "Failed to start ContainerManager")
os.Exit(1)
}
// eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs
kl.evictionManager.Start(kl.StatsProvider, kl.GetActivePods, kl.PodIsFinished, evictionMonitoringPeriod)
//这段Go代码是Kubernetes中的kubelet组件的一部分,用于启动containerManager和evictionManager两个组件。
//首先,如果containerManager启动失败,则会记录错误日志并退出kubelet进程。
//containerManager的启动需要依赖cAdvisor提供的文件系统容量信息。
//然后,evictionManager必须在cadvisor之后启动,因为它需要知道容器运行时是否有专门的imagefs。
//evictionManager的启动需要传入StatsProvider、GetActivePods和PodIsFinished等参数,以及evictionMonitoringPeriod参数。
//总的来说,这段代码用于在kubelet启动过程中初始化和启动containerManager和evictionManager两个组件。
// container log manager must start after container runtime is up to retrieve information from container runtime
// and inform container to reopen log file after log rotation.
kl.containerLogManager.Start()
// Adding Registration Callback function for CSI Driver
kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler))
// Adding Registration Callback function for DRA Plugin
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
kl.pluginManager.AddHandler(pluginwatcherapi.DRAPlugin, plugincache.PluginHandler(draplugin.NewRegistrationHandler(kl.kubeClient, kl.getNodeAnyWay)))
}
// Adding Registration Callback function for Device Manager
kl.pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler())
//这段代码主要在启动时执行了三个操作:
//1. 启动了container log manager,用于从容器运行时检索信息并通知容器在日志轮转后重新打开日志文件。
//2. 为CSI驱动添加了一个注册回调函数。
//3. 根据功能门控的开启情况,为DRA插件和设备管理器分别添加了一个注册回调函数。
// Start the plugin manager
klog.V(4).InfoS("Starting plugin manager")
go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop)
err = kl.shutdownManager.Start()
if err != nil {
// The shutdown manager is not critical for kubelet, so log failure, but don't block Kubelet startup if there was a failure starting it.
klog.ErrorS(err, "Failed to start node shutdown manager")
}
}
//这段Go代码主要实现了启动插件管理器和节点关闭管理器的功能。
//1. 首先,通过klog.V(4).InfoS("Starting plugin manager")日志记录插件管理器的启动信息。
//2. 然后,使用go关键字开启一个新的goroutine来运行插件管理器,通过kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop)来实现插件管理器的异步执行。
//其中,kl.sourcesReady表示插件管理器的源准备就绪,wait.NeverStop表示永远不停止运行。
//3. 接着,通过kl.shutdownManager.Start()启动节点关闭管理器。如果启动失败,会通过klog.ErrorS(err, "Failed to start node shutdown manager")记录错误日志,
//但不会阻塞Kubelet的启动。
//总结:这段代码的目的是为了启动插件管理器和节点关闭管理器,以实现插件管理和节点关闭的功能。
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
ctx := context.Background()
if kl.logServer == nil {
file := http.FileServer(http.Dir(nodeLogDir))
if utilfeature.DefaultFeatureGate.Enabled(features.NodeLogQuery) && kl.kubeletConfiguration.EnableSystemLogQuery {
kl.logServer = http.StripPrefix("/logs/", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if nlq, errs := newNodeLogQuery(req.URL.Query()); len(errs) > 0 {
http.Error(w, errs.ToAggregate().Error(), http.StatusBadRequest)
return
//该函数是Kubelet的Run方法,用于启动kubelet并响应配置更新。
//它通过监听updates通道来接收Pod更新信息。在功能上,它主要完成了以下几点:
//1. 创建了一个后台上下文ctx。
//2. 检查logServer是否为nil,如果是,则通过file服务器提供节点日志查询服务。
//如果启用了系统日志查询功能(通过DefaultFeatureGate判断),
//则使用http.HandlerFunc处理日志查询请求,并通过newNodeLogQuery解析请求URL的查询参数。如果解析出错,则返回400错误。
//注意:以上是根据代码片段进行的功能描述,可能不完整,具体实现细节还需要结合上下文代码进行理解。
} else if nlq != nil {
if req.URL.Path != "/" && req.URL.Path != "" {
http.Error(w, "path not allowed in query mode", http.StatusNotAcceptable)
return
}
if errs := nlq.validate(); len(errs) > 0 {
http.Error(w, errs.ToAggregate().Error(), http.StatusNotAcceptable)
return
}
//这段代码是处理节点日志查询请求的,如果newNodeLogQuery解析成功且请求的URL路径不是"/"或为空,则返回400错误。
//然后通过调用nlq.validate()方法验证查询参数的合法性,如果有错误则返回406错误。
// Validation ensures that the request does not query services and files at the same time
if len(nlq.Services) > 0 {
journal.ServeHTTP(w, req)
return
}
// Validation ensures that the request does not explicitly query multiple files at the same time
if len(nlq.Files) == 1 {
// Account for the \ being used on Windows clients
req.URL.Path = filepath.ToSlash(nlq.Files[0])
}
}
//这段Go代码包含了一个条件判断逻辑。
//首先,它验证请求是否同时查询了服务和文件,如果是,就通过journal.ServeHTTP(w, req)方法处理请求,并返回。
//接着,如果请求没有同时查询服务和文件,它会进一步验证是否显式地查询了多个文件。
//如果请求查询了一个文件,代码会将请求的URL路径中的反斜杠(Windows客户端中使用的路径分隔符)转换为斜杠,然后继续处理请求。
//这段代码的主要目的是确保请求不会同时查询服务和文件,也不会显式地查询多个文件。
// Fall back in case the caller is directly trying to query a file
// Example: kubectl get --raw /api/v1/nodes/$name/proxy/logs/foo.log
file.ServeHTTP(w, req)
}))
} else {
kl.logServer = http.StripPrefix("/logs/", file)
//这段Go代码是关于处理HTTP请求的。 首先,如果条件成立,则使用file.ServeHTTP(w, req)函数来处理请求。
//file是一个http.FileServer对象,它表示一个文件服务器,w是响应写入器,用于向客户端发送响应,req是客户端的请求。
//这个函数会根据请求路径从文件服务器中找到对应的文件并将其内容发送给客户端。
//如果条件不成立,则将kl.logServer设置为http.StripPrefix("/logs/", file)的结果。http.StripPrefix()函数是一个处理HTTP请求的中间件,
//它会将请求路径中的指定前缀去除后再将请求传递给下一个处理程序。
//在这里,它会将路径中的/logs/前缀去除,然后将处理请求的任务交给file对象。
//这样做是为了确保文件服务器能够正确地处理请求中的路径。
}
}
if kl.kubeClient == nil {
klog.InfoS("No API server defined - no node status update will be sent")
}
// Start the cloud provider sync manager
if kl.cloudResourceSyncManager != nil {
go kl.cloudResourceSyncManager.Run(wait.NeverStop)
}
if err := kl.initializeModules(); err != nil {
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
klog.ErrorS(err, "Failed to initialize internal modules")
os.Exit(1)
}
//这段Go代码是Kubernetes中的一个函数片段,主要功能是初始化Kubelet。
//1. 首先检查kl.kubeClient是否为nil,如果是,则记录一条日志,表示没有定义API服务器,因此不会发送节点状态更新。
//2. 然后,如果kl.cloudResourceSyncManager不为nil,则通过go关键字启动云提供商资源同步管理器的运行。
//3. 最后,调用kl.initializeModules()初始化内部模块,如果初始化失败,则记录事件和错误日志,并通过os.Exit(1)退出程序。
// Start volume manager
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
if kl.kubeClient != nil {
// Start two go-routines to update the status.
//
// The first will report to the apiserver every nodeStatusUpdateFrequency and is aimed to provide regular status intervals,
// while the second is used to provide a more timely status update during initialization and runs an one-shot update to the apiserver
// once the node becomes ready, then exits afterwards.
//
// Introduce some small jittering to ensure that over time the requests won't start
// accumulating at approximately the same time from the set of nodes due to priority and
// fairness effect.
go wait.JitterUntil(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, 0.04, true, wait.NeverStop)
go kl.fastStatusUpdateOnce()
// start syncing lease
go kl.nodeLeaseController.Run(context.Background())
}
//这段Go代码主要启动了几个goroutine来管理Kubernetes节点的状态和租约。
//1. 首先使用kl.volumeManager.Run()启动了一个volume manager来管理卷。
//2. 如果kl.kubeClient不为空,则启动了两个goroutine来更新节点状态:
//- 第一个goroutine使用wait.JitterUntil()函数,每隔kl.nodeStatusUpdateFrequency时间向API服务器报告一次节点状态,提供规律的状态更新。
//- 第二个goroutine使用kl.fastStatusUpdateOnce()函数,仅在节点准备就绪时向API服务器进行一次状态更新,然后退出。
//3. 最后,使用kl.nodeLeaseController.Run()启动了一个goroutine来同步租约。
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
// Set up iptables util rules
if kl.makeIPTablesUtilChains {
kl.initNetworkUtil()
}
// Start component sync loops.
kl.statusManager.Start()
// Start syncing RuntimeClasses if enabled.
if kl.runtimeClassManager != nil {
kl.runtimeClassManager.Start(wait.NeverStop)
}
//这段代码是Go语言编写的,主要执行了以下几个操作:
//1. 使用wait.Until函数,每隔5秒调用一次kl.updateRuntimeUp函数,并且永远不停止。
//2. 如果kl.makeIPTablesUtilChains为true,则调用kl.initNetworkUtil函数初始化iptables工具规则。
//3. 调用kl.statusManager.Start()函数,启动组件同步循环。
//4. 如果kl.runtimeClassManager不为nil,则调用kl.runtimeClassManager.Start(wait.NeverStop)函数,启动RuntimeClasses的同步。
//这段代码的主要目的是在Kubernetes中启动组件和功能的同步,并设置iptables规则。
// Start the pod lifecycle event generator.
kl.pleg.Start()
// Start eventedPLEG only if EventedPLEG feature gate is enabled.
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
kl.eventedPleg.Start()
}
kl.syncLoop(ctx, updates, kl)
}
//这段Go代码主要涉及到了Kubernetes中Pod生命周期事件生成器的启动和同步循环的逻辑。
//首先,kl.pleg.Start()启动了Pod生命周期事件生成器,它会监控Pod的状态变化,并生成相应的事件。
//接着,通过判断features.EventedPLEG是否启用,来决定是否启动kl.eventedPleg.Start()。
//EventedPLEG是一种更高效的事件处理机制,当有多个事件发生时,它能够批量处理这些事件,减少处理时间。
//最后,kl.syncLoop(ctx, updates, kl)启动了一个同步循环,它会不断地从updates通道中获取Pod状态的更新,并调用kl对象的方法来处理这些更新。
//这个循环会一直运行,直到ctx上下文被取消或终止。
//总的来说,这段代码主要是启动了Kubernetes中Pod生命周期事件的监控和处理逻辑,确保能够及时地处理Pod状态的变化。
// SyncPod is the transaction script for the sync of a single pod (setting up)
// a pod. This method is reentrant and expected to converge a pod towards the
// desired state of the spec. The reverse (teardown) is handled in
// SyncTerminatingPod and SyncTerminatedPod. If SyncPod exits without error,
// then the pod runtime state is in sync with the desired configuration state
// (pod is running). If SyncPod exits with a transient error, the next
// invocation of SyncPod is expected to make progress towards reaching the
// desired state. SyncPod exits with isTerminal when the pod was detected to
// have reached a terminal lifecycle phase due to container exits (for
// RestartNever or RestartOnFailure) and the next method invoked will be
// SyncTerminatingPod. If the pod terminates for any other reason, SyncPod
// will receive a context cancellation and should exit as soon as possible.
//
// Arguments:
//
// updateType - whether this is a create (first time) or an update, should
// only be used for metrics since this method must be reentrant
//
// pod - the pod that is being set up
//
// mirrorPod - the mirror pod known to the kubelet for this pod, if any
//
// podStatus - the most recent pod status observed for this pod which can
// be used to determine the set of actions that should be taken during
// this loop of SyncPod
//
// The workflow is:
// - If the pod is being created, record pod worker start latency
// - Call generateAPIPodStatus to prepare an v1.PodStatus for the pod
// - If the pod is being seen as running for the first time, record pod
// start latency
// - Update the status of the pod in the status manager
// - Stop the pod's containers if it should not be running due to soft
// admission
// - Ensure any background tracking for a runnable pod is started
// - Create a mirror pod if the pod is a static pod, and does not
// already have a mirror pod
// - Create the data directories for the pod if they do not exist
// - Wait for volumes to attach/mount
// - Fetch the pull secrets for the pod
// - Call the container runtime's SyncPod callback
// - Update the traffic shaping for the pod's ingress and egress limits
//
// If any step of this workflow errors, the error is returned, and is repeated
// on the next SyncPod call.
//
// This operation writes all events that are dispatched in order to provide
// the most accurate information possible about an error situation to aid debugging.
// Callers should not write an event if this operation returns an error.
func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
ctx, otelSpan := kl.tracer.Start(ctx, "syncPod", trace.WithAttributes(
semconv.K8SPodUIDKey.String(string(pod.UID)),
attribute.String("k8s.pod", klog.KObj(pod).String()),
semconv.K8SPodNameKey.String(pod.Name),
attribute.String("k8s.pod.update_type", updateType.String()),
semconv.K8SNamespaceNameKey.String(pod.Namespace),
//该函数是Kubelet的syncPod方法,用于同步Pod的状态。它根据传入的Pod、mirrorPod和podStatus参数,更新Pod的状态,
//并返回一个布尔值和一个错误。
//函数使用了OpenTelemetry进行跟踪,并设置了多个属性,
//包括Pod的UID、名称、命名空间和更新类型等。
))
klog.V(4).InfoS("SyncPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
defer func() {
klog.V(4).InfoS("SyncPod exit", "pod", klog.KObj(pod), "podUID", pod.UID, "isTerminal", isTerminal)
otelSpan.End()
}()
//该函数是Kubelet的syncPod方法,用于同步Pod的状态。
//它根据传入的Pod、mirrorPod和podStatus参数,更新Pod的状态,并返回一个布尔值isTerminal和一个错误err。
//在函数开始时,它通过kl.tracer.Start创建了一个名为syncPod的trace span,并在函数结束时通过otelSpan.End结束该span。
//函数内部的具体逻辑没有在代码片段中展示出来。
//这段Go代码主要使用了klog和OpenTelemetry两个库,功能是在进入和退出SyncPod函数时记录日志,并通过OpenTelemetry跟踪操作。
//- klog.V(4).InfoS("SyncPod enter", "pod", klog.KObj(pod), "podUID", pod.UID):记录SyncPod函数进入的日志,日志级别为4,
//包含pod的信息和podUID。
//- defer关键字定义了一个延迟执行的函数,会在SyncPod函数退出时执行。
//- klog.V(4).InfoS("SyncPod exit", "pod", klog.KObj(pod), "podUID", pod.UID, "isTerminal", isTerminal):
//记录SyncPod函数退出的日志,包含pod的信息、podUID和isTerminal。
//- otelSpan.End():结束OpenTelemetry的跟踪操作。
// Latency measurements for the main workflow are relative to the
// first time the pod was seen by kubelet.
var firstSeenTime time.Time
if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok {
firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get()
}
//这段Go代码主要进行了一个延迟测量的操作。
//具体来说,它首先检查了一个名为pod.Annotations的结构体中是否存在键为kubetypes.ConfigFirstSeenAnnotationKey的元素,
//如果存在,则将该元素的值转换为时间戳类型并赋值给变量firstSeenTime。
//这个操作主要是为了记录主工作流的延迟时间,而延迟的起始时间是相对于kubelet第一次看到pod的时间来计算的。
// Record pod worker start latency if being created
// TODO: make pod workers record their own latencies
if updateType == kubetypes.SyncPodCreate {
if !firstSeenTime.IsZero() {
// This is the first time we are syncing the pod. Record the latency
// since kubelet first saw the pod if firstSeenTime is set.
metrics.PodWorkerStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
} else {
klog.V(3).InfoS("First seen time not recorded for pod",
"podUID", pod.UID,
"pod", klog.KObj(pod))
}
}
//该函数用于记录Pod创建时的工作线程启动延迟。
//如果updateType等于kubetypes.SyncPodCreate,则会判断firstSeenTime是否为零,如果不为零,
//则记录自kubelet首次看到Pod以来的延迟;
//如果为零,则输出日志信息。
// Generate final API pod status with pod and status manager status
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus, false)
// The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)
// TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and
// set pod IP to hostIP directly in runtime.GetPodStatus
podStatus.IPs = make([]string, 0, len(apiPodStatus.PodIPs))
for _, ipInfo := range apiPodStatus.PodIPs {
podStatus.IPs = append(podStatus.IPs, ipInfo.IP)
}
if len(podStatus.IPs) == 0 && len(apiPodStatus.PodIP) > 0 {
podStatus.IPs = []string{apiPodStatus.PodIP}
}
//这段Go代码是用于生成最终的API Pod状态的。
//它首先调用kl.generateAPIPodStatus(pod, podStatus, false)来生成一个API Pod状态对象,
//然后根据这个对象来设置podStatus对象的IPs字段。
//如果pod使用了宿主机网络,那么在generateAPIPodStatus函数中可能会更改pod的IP地址。
//代码中还提到,以后可能会将pod的规格写入容器标签,并在runtime.GetPodStatus中直接将pod IP设置为宿主机IP。
//最后,如果podStatus.IPs为空且apiPodStatus.PodIP不为空,则将apiPodStatus.PodIP作为podStatus.IPs的值。
// If the pod is terminal, we don't need to continue to setup the pod
if apiPodStatus.Phase == v1.PodSucceeded || apiPodStatus.Phase == v1.PodFailed {
kl.statusManager.SetPodStatus(pod, apiPodStatus)
isTerminal = true
return isTerminal, nil
}
//该函数用于判断Pod是否处于终止状态(PodSucceeded或PodFailed),如果是,则设置Pod状态为终止状态,并返回true。否则返回false。
// If the pod should not be running, we request the pod's containers be stopped. This is not the same
// as termination (we want to stop the pod, but potentially restart it later if soft admission allows
// it later). Set the status and phase appropriately
runnable := kl.canRunPod(pod)
if !runnable.Admit {
// Pod is not runnable; and update the Pod and Container statuses to why.
if apiPodStatus.Phase != v1.PodFailed && apiPodStatus.Phase != v1.PodSucceeded {
apiPodStatus.Phase = v1.PodPending
}
apiPodStatus.Reason = runnable.Reason
apiPodStatus.Message = runnable.Message
// Waiting containers are not creating.
const waitingReason = "Blocked"
//该函数用于判断Pod是否应该运行,并根据判断结果更新Pod的状态和阶段。
//如果Pod不可运行,则将其状态和阶段更新为Pending,并设置相应的Reason和Message。
//其中,如果Pod的状态不是Failed或Succeeded,则将其阶段更新为Pending。
for _, cs := range apiPodStatus.InitContainerStatuses {
if cs.State.Waiting != nil {
cs.State.Waiting.Reason = waitingReason
}
}
for _, cs := range apiPodStatus.ContainerStatuses {
if cs.State.Waiting != nil {
cs.State.Waiting.Reason = waitingReason
}
}
}
//这段Go代码中有两个for循环,分别遍历了apiPodStatus.InitContainerStatuses和apiPodStatus.ContainerStatuses两个切片。
//其中,如果遍历到的ContainerStatus的State.Waiting不为nil,则将其Reason字段设置为waitingReason。
//这段代码的作用是给Pod中所有处于等待状态的初始化容器和容器设置等待原因。
// Record the time it takes for the pod to become running
// since kubelet first saw the pod if firstSeenTime is set.
existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning &&
!firstSeenTime.IsZero() {
metrics.PodStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
}
//这段Go代码的功能是记录Pod从kubelet首次看到它到变为运行状态所花费的时间。
//具体来说,它首先从kl.statusManager获取Pod的现有状态,
//如果获取成功并且Pod的现有状态为Pending,而apiPodStatus的Phase为Running,
//并且firstSeenTime不为零,则使用metrics.SinceInSeconds函数记录从firstSeenTime到当前时间的时间差,
//并将其作为观察值传递给metrics.PodStartDuration。
kl.statusManager.SetPodStatus(pod, apiPodStatus)
// Pods that are not runnable must be stopped - return a typed error to the pod worker
if !runnable.Admit {
klog.V(2).InfoS("Pod is not runnable and must have running containers stopped", "pod", klog.KObj(pod), "podUID", pod.UID, "message", runnable.Message)
var syncErr error
p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
if err := kl.killPod(ctx, pod, p, nil); err != nil {
if !wait.Interrupted(err) {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
syncErr = fmt.Errorf("error killing pod: %w", err)
utilruntime.HandleError(syncErr)
//该函数用于设置Pod的状态,并根据Pod是否可运行来决定是否停止Pod。如果Pod不可运行,则通过kl.killPod函数停止Pod,并记录事件和错误信息。
}
} else {
// There was no error killing the pod, but the pod cannot be run.
// Return an error to signal that the sync loop should back off.
syncErr = fmt.Errorf("pod cannot be run: %v", runnable.Message)
}
return false, syncErr
//这段Go代码是一个函数的一部分,根据给定的条件执行不同的操作,并返回两个值:一个布尔值和一个错误。
//首先,它检查是否有错误杀死Pod(容器),如果有,则将错误记录并返回。
//如果没有错误,但Pod无法运行,则将自定义错误返回,以指示同步循环应退避。
//最后,该函数返回false和同步错误。
}
// If the network plugin is not ready, only start the pod if it uses the host network
if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, err)
return false, fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err)
}
//该函数主要检查网络插件是否准备就绪,如果未就绪且Pod不使用宿主机网络,则记录事件并返回错误。
// ensure the kubelet knows about referenced secrets or configmaps used by the pod
if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
if kl.secretManager != nil {
kl.secretManager.RegisterPod(pod)
}
if kl.configMapManager != nil {
kl.configMapManager.RegisterPod(pod)
}
}
//该函数用于确保kubelet知道pod中使用到的secret或configmap。它首先检查pod是否被请求终止,
//如果不是,则分别调用secretManager和configMapManager的RegisterPod方法来注册pod。
// Create Cgroups for the pod and apply resource parameters
// to them if cgroups-per-qos flag is enabled.
pcm := kl.containerManager.NewPodContainerManager()
// If pod has already been terminated then we need not create
// or update the pod's cgroup
// TODO: once context cancellation is added this check can be removed
if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
// When the kubelet is restarted with the cgroups-per-qos
// flag enabled, all the pod's running containers
// should be killed intermittently and brought back up
// under the qos cgroup hierarchy.
// Check if this is the pod's first sync
firstSync := true
//这段Go代码是Kubernetes中的一个片段,用于创建和管理Pod的Cgroups。
//函数首先创建一个PodContainerManager实例,然后检查Pod是否已经被终止。
//如果Pod没有被终止,则会检查是否是Pod的第一个同步。如果是第一次同步,将会为Pod创建或更新Cgroups,并应用资源参数。
//这段代码的主要目的是在Kubernetes中管理Pod的资源限制和隔离。
for _, containerStatus := range apiPodStatus.ContainerStatuses {
if containerStatus.State.Running != nil {
firstSync = false
break
}
}
// Don't kill containers in pod if pod's cgroups already
// exists or the pod is running for the first time
podKilled := false
//这段Go代码是用于遍历Pod中的容器状态,检查是否有正在运行的容器。
//如果存在正在运行的容器,则将firstSync标记为false并退出循环。
//接下来,根据podKilled的值决定是否终止Pod中的容器。
if !pcm.Exists(pod) && !firstSync {
p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
if err := kl.killPod(ctx, pod, p, nil); err == nil {
if wait.Interrupted(err) {
return false, err
}
podKilled = true
} else {
klog.ErrorS(err, "KillPod failed", "pod", klog.KObj(pod), "podStatus", podStatus)
}
}
//这段Go代码是一个条件判断语句,其主要功能是在特定条件下杀死一个Pod。
//具体来说,函数首先检查Pod是否存在于pcm中,以及firstSync是否为false。
//如果两个条件都满足,则会将podStatus转换为一个运行中的Pod,并尝试通过killPod函数杀死该Pod。
//如果杀死操作成功,且返回的错误是由于等待中断导致的,则返回false和该错误。
//如果杀死操作失败,则会记录一个错误日志。
// Create and Update pod's Cgroups
// Don't create cgroups for run once pod if it was killed above
// The current policy is not to restart the run once pods when
// the kubelet is restarted with the new flag as run once pods are
// expected to run only once and if the kubelet is restarted then
// they are not expected to run again.
// We don't create and apply updates to cgroup if its a run once pod and was killed above
if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
if !pcm.Exists(pod) {
if err := kl.containerManager.UpdateQOSCgroups(); err != nil {
klog.V(2).InfoS("Failed to update QoS cgroups while syncing pod", "pod", klog.KObj(pod), "err", err)
}
if err := pcm.EnsureExists(pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err)
return false, fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)
}
}
}
}
//这段Go代码是Kubernetes的一部分,用于创建和更新Pod的Cgroups。Cgroups是Linux操作系统中的一个功能,可以限制和监控进程的资源使用。
//这段代码首先检查Pod是否被杀死且重启策略为Never,如果是,则不创建Cgroups。
//然后,它检查Pod的Cgroups是否存在,如果不存在,则尝试更新QoS cgroups并确保Pod的Cgroups存在并正确应用。
//如果更新QoS cgroups或确保Cgroups存在失败,则记录事件并返回错误。
// Create Mirror Pod for Static Pod if it doesn't already exist
if kubetypes.IsStaticPod(pod) {
deleted := false
if mirrorPod != nil {
if mirrorPod.DeletionTimestamp != nil || !kubepod.IsMirrorPodOf(mirrorPod, pod) {
// The mirror pod is semantically different from the static pod. Remove
// it. The mirror pod will get recreated later.
klog.InfoS("Trying to delete pod", "pod", klog.KObj(pod), "podUID", mirrorPod.ObjectMeta.UID)
podFullName := kubecontainer.GetPodFullName(pod)
var err error
deleted, err = kl.mirrorPodClient.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID)
if deleted {
klog.InfoS("Deleted mirror pod because it is outdated", "pod", klog.KObj(mirrorPod))
} else if err != nil {
klog.ErrorS(err, "Failed deleting mirror pod", "pod", klog.KObj(mirrorPod))
}
}
}
//这段Go代码主要功能是检查给定的Pod是否为静态Pod,如果是,则检查是否存在与之对应的镜像Pod。
//如果存在但与静态Pod语义不同或已被标记删除,则删除该镜像Pod。
//具体流程如下:
//1. 首先判断给定的Pod是否为静态Pod,如果是则继续执行后续逻辑。
//2. 检查是否存在镜像Pod,如果存在则进一步判断镜像Pod是否与静态Pod语义相同或未被标记删除。
//3. 如果镜像Pod与静态Pod语义不同或已被标记删除,则尝试删除该镜像Pod。
//4. 如果删除成功,则记录日志;如果删除失败,则记录错误日志。
//该函数通过调用mirrorPodClient的DeleteMirrorPod方法来删除镜像Pod。
if mirrorPod == nil || deleted {
node, err := kl.GetNode()
if err != nil {
klog.V(4).ErrorS(err, "No need to create a mirror pod, since failed to get node info from the cluster", "node", klog.KRef("", string(kl.nodeName)))
} else if node.DeletionTimestamp != nil {
klog.V(4).InfoS("No need to create a mirror pod, since node has been removed from the cluster", "node", klog.KRef("", string(kl.nodeName)))
} else {
klog.V(4).InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(pod))
if err := kl.mirrorPodClient.CreateMirrorPod(pod); err != nil {
klog.ErrorS(err, "Failed creating a mirror pod for", "pod", klog.KObj(pod))
}
}
}
}
//这段代码主要功能是根据条件判断是否需要创建一个镜像Pod。
//如果mirrorPod为nil或deleted标志为true,则执行以下操作:
//1. 通过kl.GetNode()获取节点信息。
//2. 如果获取节点信息时发生错误,则记录错误日志。
//3. 如果节点信息获取成功,检查节点是否已被删除。
//4. 如果节点已被删除,则记录一条信息日志。
//5. 如果节点未被删除,则记录一条信息日志,表示正在创建镜像Pod。
//6. 调用kl.mirrorPodClient.CreateMirrorPod(pod)尝试创建镜像Pod,如果创建失败则记录错误日志。
//这段代码通过日志记录了操作的结果,并根据条件判断是否需要创建镜像Pod。
// Make data directories for the pod
if err := kl.makePodDataDirs(pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)
klog.ErrorS(err, "Unable to make pod data directories for pod", "pod", klog.KObj(pod))
return false, err
}
//该函数是用于为Pod创建数据目录的。它首先调用kl.makePodDataDirs(pod)函数来创建目录,如果创建失败,则记录事件并返回错误。
// Wait for volumes to attach/mount
if err := kl.volumeManager.WaitForAttachAndMount(ctx, pod); err != nil {
if !wait.Interrupted(err) {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
klog.ErrorS(err, "Unable to attach or mount volumes for pod; skipping pod", "pod", klog.KObj(pod))
}
return false, err
}
//这段代码的功能是等待卷附加和挂载操作完成。
//具体而言,它通过调用kl.volumeManager.WaitForAttachAndMount(ctx, pod)来等待卷的附加和挂载操作完成。
//如果操作失败且不是由于中断引起,则会通过kl.recorder.Eventf()记录事件,并通过klog.ErrorS()记录错误日志,并最终返回false和错误信息err。
// Fetch the pull secrets for the pod
pullSecrets := kl.getPullSecretsForPod(pod)
// Ensure the pod is being probed
kl.probeManager.AddPod(pod)
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
// Handle pod resize here instead of doing it in HandlePodUpdates because
// this conveniently retries any Deferred resize requests
// TODO(vinaykul,InPlacePodVerticalScaling): Investigate doing this in HandlePodUpdates + periodic SyncLoop scan
// See: https://github.com/kubernetes/kubernetes/pull/102884#discussion_r663160060
if kl.podWorkers.CouldHaveRunningContainers(pod.UID) && !kubetypes.IsStaticPod(pod) {
pod = kl.handlePodResourcesResize(pod)
}
}
//这段Go代码是Kubernetes中kubelet的一个片段,主要功能是处理Pod的镜像拉取凭证、添加Pod到探测管理器中、以及处理Pod的资源缩放。
//1. 首先,函数getPullSecretsForPod从kl(kubelet实例)中获取Pod的镜像拉取凭证pullSecrets。
//2. 然后,通过调用probeManager.AddPod方法,将该Pod添加到探测管理器中,以确保Pod被探测。
//3. 接着,通过判断是否启用了InPlacePodVerticalScaling特性,决定是否处理Pod的资源缩放。
//如果启用了该特性,并且当前Pod有运行中的容器且不是静态Pod,则调用handlePodResourcesResize方法处理Pod的资源缩放。
//4. handlePodResourcesResize方法会尝试处理Pod的资源缩放请求,并返回缩放后的Pod对象。
//这段代码的主要目的是在Pod更新时处理镜像拉取凭证、探测和资源缩放等操作。
//其中,资源缩放功能通过检查Pod的状态和特性门控来决定是否执行,以实现Pod的垂直缩放。
// TODO(#113606): use cancellation from the incoming context parameter, which comes from the pod worker.
// Currently, using cancellation from that context causes test failures. To remove this WithoutCancel,
// any wait.Interrupted errors need to be filtered from result and bypass the reasonCache - cancelling
// the context for SyncPod is a known and deliberate error, not a generic error.
// Use WithoutCancel instead of a new context.TODO() to propagate trace context
// Call the container runtime's SyncPod callback
sctx := context.WithoutCancel(ctx)
result := kl.containerRuntime.SyncPod(sctx, pod, podStatus, pullSecrets, kl.backOff)
kl.reasonCache.Update(pod.UID, result)
if err := result.Error(); err != nil {
// Do not return error if the only failures were pods in backoff
for _, r := range result.SyncResults {
if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff {
// Do not record an event here, as we keep all event logging for sync pod failures
// local to container runtime, so we get better errors.
return false, err
}
}
return false, nil
}
//这个Go函数用于同步Pod,并执行以下操作:
//- 创建一个不取消的上下文sctx。
//- 调用容器运行时的SyncPod回调函数,传入sctx、Pod、Pod状态、pullSecrets和backOff。
//- 使用结果更新Pod的状态。
//- 如果有错误,遍历结果中的同步结果,如果错误不是ErrCrashLoopBackOff或ErrImagePullBackOff,则返回错误。
//- 否则,返回false和nil。
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) && isPodResizeInProgress(pod, &apiPodStatus) {
// While resize is in progress, periodically call PLEG to update pod cache
runningPod := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
if err, _ := kl.pleg.UpdateCache(&runningPod, pod.UID); err != nil {
klog.ErrorS(err, "Failed to update pod cache", "pod", klog.KObj(pod))
return false, err
}
}
return false, nil
}
//这段Go代码的功能是在Kubernetes中处理Pod的垂直扩展。如果启用了InPlacePodVerticalScaling功能并且Pod正在进行大小调整,
//则会定期调用PLEG来更新Pod的缓存。
//如果更新失败,则会记录错误并返回错误信息。如果条件不满足,则直接返回false和nil。
// SyncTerminatingPod is expected to terminate all running containers in a pod. Once this method
// returns without error, the pod is considered to be terminated and it will be safe to clean up any
// pod state that is tied to the lifetime of running containers. The next method invoked will be
// SyncTerminatedPod. This method is expected to return with the grace period provided and the
// provided context may be cancelled if the duration is exceeded. The method may also be interrupted
// with a context cancellation if the grace period is shortened by the user or the kubelet (such as
// during eviction). This method is not guaranteed to be called if a pod is force deleted from the
// configuration and the kubelet is restarted - SyncTerminatingRuntimePod handles those orphaned
// pods.
func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error {
// TODO(#113606): connect this with the incoming context parameter, which comes from the pod worker.
// Currently, using that context causes test failures.
ctx, otelSpan := kl.tracer.Start(context.Background(), "syncTerminatingPod", trace.WithAttributes(
semconv.K8SPodUIDKey.String(string(pod.UID)),
attribute.String("k8s.pod", klog.KObj(pod).String()),
semconv.K8SPodNameKey.String(pod.Name),
semconv.K8SNamespaceNameKey.String(pod.Namespace),
))
defer otelSpan.End()
klog.V(4).InfoS("SyncTerminatingPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
defer klog.V(4).InfoS("SyncTerminatingPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
//该函数是Kubelet的一个方法,用于同步终止Pod中的所有运行中的容器。
//当此方法在没有错误的情况下返回时,Pod被认为已经终止,并且可以安全地清理与运行容器的生命周期相关的任何Pod状态。
//下一个调用的方法将是SyncTerminatedPod。
//此方法期望在提供的优雅期间内返回,并且提供的上下文可能会在持续时间超过时被取消。
//该方法也可能因用户或kubelet(如驱逐期间)缩短优雅期间而被上下文取消。
//如果Pod被强制从配置中删除,并且kubelet被重新启动,则不会调用此方法
//- SyncTerminatingRuntimePod处理这些孤儿Pod。
//函数内部使用了OpenTelemetry进行日志记录和跟踪,并通过klog记录进入和退出函数的日志。
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus, false)
if podStatusFn != nil {
podStatusFn(&apiPodStatus)
}
kl.statusManager.SetPodStatus(pod, apiPodStatus)
//这段Go代码中的函数实现了以下功能:
//- 首先,它调用kl.generateAPIPodStatus(pod, podStatus, false)生成一个apiPodStatus对象。
//- 然后,如果podStatusFn不为空,它将调用podStatusFn(&apiPodStatus)来对apiPodStatus进行进一步的处理。
//- 最后,它调用kl.statusManager.SetPodStatus(pod, apiPodStatus)来设置Pod的状态。
//总的来说,这个函数通过调用kl.generateAPIPodStatus生成一个Pod状态对象apiPodStatus,并将其设置为Pod的实际状态。
//如果提供了podStatusFn函数,它还会对apiPodStatus进行额外的处理。
if gracePeriod != nil {
klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", *gracePeriod)
} else {
klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", nil)
}
//这段Go代码主要进行日志记录,根据gracePeriod是否为nil,记录不同级别的日志信息。
//- 如果gracePeriod不为nil,则调用klog.V(4).InfoS函数记录日志,
//日志内容包括:"Pod terminating with grace period"、"pod"、"podUID"、"gracePeriod"。
//- 如果gracePeriod为nil,则调用klog.V(4).InfoS函数记录日志,
//日志内容包括:"Pod terminating with grace period"、"pod"、"podUID"、"gracePeriod",其中"gracePeriod"的值为nil。
//这里的klog是一个日志库,V(4)表示日志级别为4,InfoS表示记录信息级别的日志,并可以传入多个键值对参数来丰富日志内容。
//pod和pod.UID是函数的参数,用于提供上下文信息。
kl.probeManager.StopLivenessAndStartup(pod)
p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
if err := kl.killPod(ctx, pod, p, gracePeriod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
// there was an error killing the pod, so we return that error directly
utilruntime.HandleError(err)
return err
}
//该函数的功能是停止Pod的liveness和startup探针管理器,将Pod状态转换为运行中的Pod,然后尝试杀死Pod。
//如果杀死Pod时出现错误,则记录事件并返回该错误。
// Once the containers are stopped, we can stop probing for liveness and readiness.
// TODO: once a pod is terminal, certain probes (liveness exec) could be stopped immediately after
// the detection of a container shutdown or (for readiness) after the first failure. Tracked as
// https://github.com/kubernetes/kubernetes/issues/107894 although may not be worth optimizing.
kl.probeManager.RemovePod(pod)
//该函数用于从探针管理器中移除指定的Pod。
//在容器停止后,可以停止对Pod的存活和就绪状态的探测。
//此函数是Kubernetes中的一个组件,用于管理Pod的探针。
//通过调用kl.probeManager.RemovePod(pod),可以将指定的Pod从探针管理器中移除,从而停止对该Pod的存活和就绪状态的探测。
// Guard against consistency issues in KillPod implementations by checking that there are no
// running containers. This method is invoked infrequently so this is effectively free and can
// catch race conditions introduced by callers updating pod status out of order.
// TODO: have KillPod return the terminal status of stopped containers and write that into the
// cache immediately
podStatus, err := kl.containerRuntime.GetPodStatus(ctx, pod.UID, pod.Name, pod.Namespace)
if err != nil {
klog.ErrorS(err, "Unable to read pod status prior to final pod termination", "pod", klog.KObj(pod), "podUID", pod.UID)
return err
}
//这段Go代码中的函数主要功能是在终止Pod之前,通过检查是否有正在运行的容器来防止一致性问题。
//该方法被调用的频率较低,因此基本上是免费的,可以捕获调用者按错误顺序更新Pod状态时引入的竞态条件。
//通过获取Pod的状态来实现检查,如果无法读取Pod状态,则记录错误日志并返回错误。
var runningContainers []string
type container struct {
Name string
State string
ExitCode int
FinishedAt string
}
var containers []container
klogV := klog.V(4)
klogVEnabled := klogV.Enabled()
//这段Go代码定义了一个container结构体类型,包含名称、状态、退出码和完成时间四个字段;
//同时定义了一个runningContainers切片和一个containers切片。
//klogV和klogVEnabled是klog库的函数调用,用于判断日志级别是否启用。
for _, s := range podStatus.ContainerStatuses {
if s.State == kubecontainer.ContainerStateRunning {
runningContainers = append(runningContainers, s.ID.String())
}
//该Go代码片段是一个for循环,遍历了podStatus.ContainerStatuses切片中的每个元素,
//并将处于运行状态(ContainerStateRunning)的容器的ID以字符串形式保存到runningContainers切片中。
if klogVEnabled {
containers = append(containers, container{Name: s.Name, State: string(s.State), ExitCode: s.ExitCode, FinishedAt: s.FinishedAt.UTC().Format(time.RFC3339Nano)})
}
//这段Go代码是一个条件语句,判断klogVEnabled是否为true,如果是,则将一个容器信息追加到containers切片中。
//追加的容器信息包括容器的名称、状态、退出码和完成时间(使用UTC格式)。
}
if klogVEnabled {
sort.Slice(containers, func(i, j int) bool { return containers[i].Name < containers[j].Name })
klog.V(4).InfoS("Post-termination container state", "pod", klog.KObj(pod), "podUID", pod.UID, "containers", containers)
}
//这段Go代码是在klogVEnabled为true时对containers进行排序,并通过klog.V(4).InfoS输出日志信息。
//其中,sort.Slice根据containers中Name字段的值对containers进行升序排序;
//klog.V(4).InfoS输出的日志信息包括"Post-termination container state"、"pod"、"podUID"和"containers"等字段的值。
if len(runningContainers) > 0 {
return fmt.Errorf("detected running containers after a successful KillPod, CRI violation: %v", runningContainers)
}
//该函数检测到运行中的容器后,在成功调用KillPod后返回一个错误,指示CRI违规。
// NOTE: resources must be unprepared AFTER all containers have stopped
// and BEFORE the pod status is changed on the API server
// to avoid race conditions with the resource deallocation code in kubernetes core.
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
if err := kl.UnprepareDynamicResources(pod); err != nil {
return err
}
}
//这段Go代码是Kubernetes中的一个函数片段,它的主要功能是在动态资源分配的特性开启时,
//调用kl.UnprepareDynamicResources(pod)方法来释放动态资源。
//- 首先,代码通过utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation)判断当前是否启用了动态资源分配的特性。
//- 如果特性开启,则调用kl.UnprepareDynamicResources(pod)方法来释放与给定Pod相关的动态资源。
//- 如果在释放资源的过程中出现错误,则函数会返回该错误。
//需要注意的是,该函数的注释指出,在调用该函数释放资源之前,必须确保所有容器已经停止,
//并且在修改Pod的状态之前调用,以避免与Kubernetes核心代码中的资源释放逻辑发生竞态条件。
// Compute and update the status in cache once the pods are no longer running.
// The computation is done here to ensure the pod status used for it contains
// information about the container end states (including exit codes) - when
// SyncTerminatedPod is called the containers may already be removed.
apiPodStatus = kl.generateAPIPodStatus(pod, podStatus, true)
kl.statusManager.SetPodStatus(pod, apiPodStatus)
// we have successfully stopped all containers, the pod is terminating, our status is "done"
klog.V(4).InfoS("Pod termination stopped all running containers", "pod", klog.KObj(pod), "podUID", pod.UID)
return nil
//这段Go代码是一个函数片段,它用于计算和更新Pod的状态缓存,当Pod不再运行时。
//函数首先调用kl.generateAPIPodStatus生成一个API Pod状态,然后使用kl.statusManager.SetPodStatus设置Pod的状态。
//最后,函数返回nil。
//这段代码属于Kubernetes的一部分,用于管理Pod的生命周期。
//在Pod终止时,该函数被调用,确保容器的终止状态(包括退出码)被包含在Pod状态中。
//函数通过设置Pod状态来记录Pod的终止状态。
}
// SyncTerminatingRuntimePod is expected to terminate running containers in a pod that we have no
// configuration for. Once this method returns without error, any remaining local state can be safely
// cleaned up by background processes in each subsystem. Unlike syncTerminatingPod, we lack
// knowledge of the full pod spec and so cannot perform lifecycle related operations, only ensure
// that the remnant of the running pod is terminated and allow garbage collection to proceed. We do
// not update the status of the pod because with the source of configuration removed, we have no
// place to send that status.
func (kl *Kubelet) SyncTerminatingRuntimePod(_ context.Context, runningPod *kubecontainer.Pod) error {
// TODO(#113606): connect this with the incoming context parameter, which comes from the pod worker.
// Currently, using that context causes test failures.
ctx := context.Background()
pod := runningPod.ToAPIPod()
klog.V(4).InfoS("SyncTerminatingRuntimePod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
defer klog.V(4).InfoS("SyncTerminatingRuntimePod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
//该函数是Kubelet的一个方法,用于同步终止无配置的Pod中的运行时容器。
//该方法不更新Pod的状态,仅确保运行Pod的剩余部分被终止,以便进行垃圾回收。
//函数首先创建一个背景上下文,然后将运行Pod转换为API Pod。通过记录日志来标记函数的进入和退出。
// we kill the pod directly since we have lost all other information about the pod.
klog.V(4).InfoS("Orphaned running pod terminating without grace period", "pod", klog.KObj(pod), "podUID", pod.UID)
// TODO: this should probably be zero, to bypass any waiting (needs fixes in container runtime)
gracePeriod := int64(1)
if err := kl.killPod(ctx, pod, *runningPod, &gracePeriod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
// there was an error killing the pod, so we return that error directly
utilruntime.HandleError(err)
return err
}
klog.V(4).InfoS("Pod termination stopped all running orphaned containers", "pod", klog.KObj(pod), "podUID", pod.UID)
return nil
}
//这段Go代码是一个函数,用于直接终止一个孤立的正在运行的Pod(容器)。
//函数首先记录一条日志信息,表示将要终止Pod,并指出该Pod已经没有其他相关信息。
//然后,函数调用kl.killPod方法来终止Pod,给定的优雅终止时间是1秒。
//如果终止Pod时出现错误,函数会记录一条事件日志,并直接返回该错误。
//最后,函数记录一条日志信息,表示已经成功终止了Pod的所有正在运行的容器,并返回nil表示没有错误。
// SyncTerminatedPod cleans up a pod that has terminated (has no running containers).
// The invocations in this call are expected to tear down all pod resources.
// When this method exits the pod is expected to be ready for cleanup. This method
// reduces the latency of pod cleanup but is not guaranteed to get called in all scenarios.
//
// Because the kubelet has no local store of information, all actions in this method that modify
// on-disk state must be reentrant and be garbage collected by HandlePodCleanups or a separate loop.
// This typically occurs when a pod is force deleted from configuration (local disk or API) and the
// kubelet restarts in the middle of the action.
func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
ctx, otelSpan := kl.tracer.Start(ctx, "syncTerminatedPod", trace.WithAttributes(
semconv.K8SPodUIDKey.String(string(pod.UID)),
attribute.String("k8s.pod", klog.KObj(pod).String()),
semconv.K8SPodNameKey.String(pod.Name),
semconv.K8SNamespaceNameKey.String(pod.Namespace),
))
defer otelSpan.End()
klog.V(4).InfoS("SyncTerminatedPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
defer klog.V(4).InfoS("SyncTerminatedPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
//该函数是Kubelet的一个方法,用于清理已经终止(没有运行中的容器)的Pod。
//调用该方法的目的是销毁Pod的所有资源,当方法退出时,Pod应该准备好进行清理。
//该方法可以减少Pod清理的延迟,但在所有场景中都可能不会被调用。
//由于Kubelet没有本地存储信息,该方法中所有修改本地磁盘状态的操作必须是可重入的,并且可以通过HandlePodCleanups或单独的循环进行垃圾回收。
//这通常发生在Pod从配置(本地磁盘或API)中强制删除,并且kubelet在操作中间重新启动时。
//该函数首先通过kl.tracer.Start创建一个名为syncTerminatedPod的跟踪Span,并设置相关属性。
//然后使用defer语句在函数退出时结束该Span。接下来,使用klog.V(4).InfoS记录函数的进入和退出日志。
//函数的主要逻辑部分在这些日志记录和跟踪Span的开启和结束之间。
// generate the final status of the pod
// TODO: should we simply fold this into TerminatePod? that would give a single pod update
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus, true)
kl.statusManager.SetPodStatus(pod, apiPodStatus)
// volumes are unmounted after the pod worker reports ShouldPodRuntimeBeRemoved (which is satisfied
// before syncTerminatedPod is invoked)
if err := kl.volumeManager.WaitForUnmount(ctx, pod); err != nil {
return err
}
klog.V(4).InfoS("Pod termination unmounted volumes", "pod", klog.KObj(pod), "podUID", pod.UID)
//这段Go代码是Kubernetes中Kubelet的一部分,用于处理已终止的Pod的最终状态。
//首先,它调用kl.generateAPIPodStatus(pod, podStatus, true)生成Pod的最终状态。
//这个函数会根据Pod的实际状态和配置生成一个API兼容的Pod状态对象。
//接下来,它调用kl.statusManager.SetPodStatus(pod, apiPodStatus)将生成的Pod状态设置为Kubernetes API中记录的状态。
//最后,它调用kl.volumeManager.WaitForUnmount(ctx, pod)等待所有卷被卸载。
//这个函数会等待所有与Pod相关的卷被成功卸载,以确保Pod的所有资源都被正确清理。
//如果卷卸载失败,则会返回错误,终止Pod的处理将停止。
//如果成功,则会记录一条日志消息,表示Pod的卸载已经完成。
if !kl.keepTerminatedPodVolumes {
// This waiting loop relies on the background cleanup which starts after pod workers respond
// true for ShouldPodRuntimeBeRemoved, which happens after `SyncTerminatingPod` is completed.
if err := wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) {
volumesExist := kl.podVolumesExist(pod.UID)
if volumesExist {
klog.V(3).InfoS("Pod is terminated, but some volumes have not been cleaned up", "pod", klog.KObj(pod), "podUID", pod.UID)
}
return !volumesExist, nil
}); err != nil {
return err
}
klog.V(3).InfoS("Pod termination cleaned up volume paths", "pod", klog.KObj(pod), "podUID", pod.UID)
}
//这段Go代码是一个等待循环,用于检查已终止的Pod的卷是否已被清理。
//如果kl.keepTerminatedPodVolumes为false,则会进入循环,通过wait.PollUntilContextCancel函数每隔100毫秒检查一次。
//循环会一直进行,直到Pod的卷不存在,或者上下文被取消。如果循环结束且没有错误发生,则会记录一条日志表示Pod终止时清理了卷路径。
// After volume unmount is complete, let the secret and configmap managers know we're done with this pod
if kl.secretManager != nil {
kl.secretManager.UnregisterPod(pod)
}
if kl.configMapManager != nil {
kl.configMapManager.UnregisterPod(pod)
}
//这段Go代码中的函数是在完成卷卸载后,通知密钥和配置管理器此Pod不再使用。
//具体来说,它首先检查kl.secretManager是否不为空,如果非空则从secretManager中注销此Pod,接着检查kl.configMapManager是否不为空,
//如果非空则从configMapManager中注销此Pod。
// Note: we leave pod containers to be reclaimed in the background since dockershim requires the
// container for retrieving logs and we want to make sure logs are available until the pod is
// physically deleted.
// remove any cgroups in the hierarchy for pods that are no longer running.
if kl.cgroupsPerQOS {
pcm := kl.containerManager.NewPodContainerManager()
name, _ := pcm.GetPodContainerName(pod)
if err := pcm.Destroy(name); err != nil {
return err
}
klog.V(4).InfoS("Pod termination removed cgroups", "pod", klog.KObj(pod), "podUID", pod.UID)
}
//这段Go代码是一个函数,用于删除不再运行的Pod的所有cgroups(控制组)。
//- 首先,代码检查kl.cgroupsPerQOS是否为true。如果为true,则会创建一个新的PodContainerManager实例。
//- 然后,通过pcm.GetPodContainerName(pod)获取Pod的容器名称。
//- 最后,调用pcm.Destroy(name)来销毁该Pod的所有cgroups,并在成功删除后记录日志。
//这个函数的主要作用是在Pod终止时清理相关的cgroups资源。
kl.usernsManager.Release(pod.UID)
// mark the final pod status
kl.statusManager.TerminatePod(pod)
klog.V(4).InfoS("Pod is terminated and will need no more status updates", "pod", klog.KObj(pod), "podUID", pod.UID)
return nil
//这段Go代码中的函数调用是针对Kubernetes中某个Pod的释放和状态标记操作。
//首先,kl.usernsManager.Release(pod.UID)调用释放了与Pod关联的用户命名空间资源。
//接下来,kl.statusManager.TerminatePod(pod)调用标记了Pod的最终状态为终止。
//最后,通过klog.V(4).InfoS(...)记录日志信息,表示Pod已被终止,不再需要更新状态。
//函数执行完毕后,返回nil表示操作成功完成。
}
// Get pods which should be resynchronized. Currently, the following pod should be resynchronized:
// - pod whose work is ready.
// - internal modules that request sync of a pod.
//
// This method does not return orphaned pods (those known only to the pod worker that may have
// been deleted from configuration). Those pods are synced by HandlePodCleanups as a consequence
// of driving the state machine to completion.
//
// TODO: Consider synchronizing all pods which have not recently been acted on to be resilient
// to bugs that might prevent updates from being delivered (such as the previous bug with
// orphaned pods). Instead of asking the work queue for pending work, consider asking the
// PodWorker which pods should be synced.
func (kl *Kubelet) getPodsToSync() []*v1.Pod {
allPods := kl.podManager.GetPods()
podUIDs := kl.workQueue.GetWork()
podUIDSet := sets.NewString()
for _, podUID := range podUIDs {
podUIDSet.Insert(string(podUID))
}
//该函数用于获取需要重新同步的Pod。
//具体来说,它会返回所有工作准备就绪的Pod和内部模块请求同步的Pod。
//函数首先获取所有Pod的信息,然后从工作队列中获取需要同步的Pod的UID,并将其放入一个字符串集合中。
//最后,函数会返回这个集合中所有Pod的信息。需要注意的是,该函数不会返回孤立的Pod(那些只被Pod工作者知道,可能已经被从配置中删除的Pod),
//这些Pod会由HandlePodCleanups函数进行同步。
//函数的注释中还提到,可能会考虑同步所有最近没有被操作的Pod,以增加对错误的鲁棒性。
var podsToSync []*v1.Pod
for _, pod := range allPods {
if podUIDSet.Has(string(pod.UID)) {
// The work of the pod is ready
podsToSync = append(podsToSync, pod)
continue
}
for _, podSyncLoopHandler := range kl.PodSyncLoopHandlers {
if podSyncLoopHandler.ShouldSync(pod) {
podsToSync = append(podsToSync, pod)
break
}
}
}
return podsToSync
}
//该函数是一个简单的for循环,用于遍历allPods切片,并根据条件筛选出需要同步的Pod,将其添加到podsToSync切片中。
//具体来说,它首先检查podUIDSet中是否包含当前Pod的UID,如果是,则将该Pod添加到podsToSync中。
//然后,它遍历kl.PodSyncLoopHandlers中的每个podSyncLoopHandler,调用其ShouldSync方法检查是否需要同步该Pod,
//如果是,则将该Pod添加到podsToSync中并跳出循环。最后,函数返回podsToSync切片。
// deletePod deletes the pod from the internal state of the kubelet by:
// 1. stopping the associated pod worker asynchronously
// 2. signaling to kill the pod by sending on the podKillingCh channel
//
// deletePod returns an error if not all sources are ready or the pod is not
// found in the runtime cache.
func (kl *Kubelet) deletePod(pod *v1.Pod) error {
if pod == nil {
return fmt.Errorf("deletePod does not allow nil pod")
}
if !kl.sourcesReady.AllReady() {
// If the sources aren't ready, skip deletion, as we may accidentally delete pods
// for sources that haven't reported yet.
return fmt.Errorf("skipping delete because sources aren't ready yet")
}
klog.V(3).InfoS("Pod has been deleted and must be killed", "pod", klog.KObj(pod), "podUID", pod.UID)
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
UpdateType: kubetypes.SyncPodKill,
})
// We leave the volume/directory cleanup to the periodic cleanup routine.
return nil
}
//该函数是Kubelet的一个方法,用于从kubelet的内部状态中删除一个pod,
//具体操作包括:
//1. 停止关联的pod worker(异步进行)。
//2. 通过发送信号到podKillingCh通道来终止pod。
//如果输入的pod为nil,函数会返回一个错误。
//如果相关的数据源未准备就绪,函数也会返回一个错误,以防止意外删除未报告的pod。
//在执行删除操作后,函数会更新pod worker,并将其标记为需要被终止。
//然后函数返回nil,表示删除成功。
//**注意:**该函数不会立即清理相关的卷和目录,而是将其留给了周期性的清理程序。
// rejectPod records an event about the pod with the given reason and message,
// and updates the pod to the failed phase in the status manager.
func (kl *Kubelet) rejectPod(pod *v1.Pod, reason, message string) {
kl.recorder.Eventf(pod, v1.EventTypeWarning, reason, message)
kl.statusManager.SetPodStatus(pod, v1.PodStatus{
Phase: v1.PodFailed,
Reason: reason,
Message: "Pod was rejected: " + message})
}
//该函数用于记录一个关于给定pod的事件,并在状态管理器中将pod的状态更新为失败。
//具体实现中,函数使用kl.recorder.Eventf方法记录事件,然后使用kl.statusManager.SetPodStatus方法将pod的状态设置为失败,并提供原因和消息。
// canAdmitPod determines if a pod can be admitted, and gives a reason if it
// cannot. "pod" is new pod, while "pods" are all admitted pods
// The function returns a boolean value indicating whether the pod
// can be admitted, a brief single-word reason and a message explaining why
// the pod cannot be admitted.
func (kl *Kubelet) canAdmitPod(pods []*v1.Pod, pod *v1.Pod) (bool, string, string) {
// the kubelet will invoke each pod admit handler in sequence
// if any handler rejects, the pod is rejected.
// TODO: move out of disk check into a pod admitter
// TODO: out of resource eviction should have a pod admitter call-out
attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: pods}
//该函数是Kubelet的一个方法,用于判断是否可以接纳一个Pod,并给出不能接纳的原因。
//函数传入参数为已接纳的Pod列表和待判断的Pod,返回一个布尔值表示是否可以接纳,一个简短的单个单词原因和一个解释不能接纳原因的消息。
//函数内部会按顺序调用每个Pod的准入处理器,如果有任何一个处理器拒绝,则Pod会被拒绝接纳。
//函数还有两个TODO注释,表示后续会将磁盘检查和资源不足的驱逐处理移动到Pod的准入处理器中。
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
// Use allocated resources values from checkpoint store (source of truth) to determine fit
otherPods := make([]*v1.Pod, 0, len(pods))
for _, p := range pods {
op := p.DeepCopy()
kl.updateContainerResourceAllocation(op)
otherPods = append(otherPods, op)
}
attrs.OtherPods = otherPods
}
//这段Go代码是用于在Kubernetes中进行Pod垂直扩展的功能。
//具体来说,如果启用了InPlacePodVerticalScaling特性,它会使用checkpoint存储中记录的已分配资源值来确定Pod是否适合进行垂直扩展。
//代码首先创建一个空的Pod数组,然后遍历传入的Pods,并对每个Pod进行深拷贝。
//然后,通过调用updateContainerResourceAllocation方法来更新每个Pod的容器资源分配情况,并将更新后的Pod添加到数组中。
//最后,将这个更新后的Pod数组赋值给attrs.OtherPods属性。
for _, podAdmitHandler := range kl.admitHandlers {
if result := podAdmitHandler.Admit(attrs); !result.Admit {
return false, result.Reason, result.Message
}
}
return true, "", ""
}
//该函数的作用是逐一检查kl.admitHandlers中的每个podAdmitHandler是否能够通过attrs属性的检验。
//具体流程如下:
//1. 使用for range循环遍历kl.admitHandlers,每次迭代将一个podAdmitHandler赋值给podAdmitHandler变量。
//2. 在循环内部,调用当前podAdmitHandler的Admit方法,传入attrs参数,并将返回的结果赋值给result变量。
//3. 如果result的Admit字段为false,则函数立即返回false,同时返回result的Reason和Message字段作为错误信息。
//4. 如果循环结束后没有返回,则函数最终返回true,同时返回空字符串作为错误信息。
//总之,该函数的作用是检验kl.admitHandlers中的每个podAdmitHandler是否能够通过attrs属性的检验,并返回检验结果以及相应的错误信息。
func (kl *Kubelet) canRunPod(pod *v1.Pod) lifecycle.PodAdmitResult {
attrs := &lifecycle.PodAdmitAttributes{Pod: pod}
// Get "OtherPods". Rejected pods are failed, so only include admitted pods that are alive.
attrs.OtherPods = kl.GetActivePods()
for _, handler := range kl.softAdmitHandlers {
if result := handler.Admit(attrs); !result.Admit {
return result
}
}
return lifecycle.PodAdmitResult{Admit: true}
}
//该函数是Kubelet的一个方法,用于判断是否可以运行一个Pod。
//它首先创建一个PodAdmitAttributes对象,其中包含了要判断的Pod和其他活跃的Pod。
//然后遍历Kubelet的softAdmitHandlers列表,依次调用每个handler的Admit方法,并传入PodAdmitAttributes对象。
//如果有任何一个handler的Admit方法返回不承认(Admit为false),则函数立即返回该结果。
//如果所有handler都承认(Admit为true),则函数返回一个承认的结果。
// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(ctx context.Context, updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
klog.InfoS("Starting kubelet main sync loop")
// The syncTicker wakes up kubelet to checks if there are any pod workers
// that need to be sync'd. A one-second period is sufficient because the
// sync interval is defaulted to 10s.
syncTicker := time.NewTicker(time.Second)
defer syncTicker.Stop()
housekeepingTicker := time.NewTicker(housekeepingPeriod)
defer housekeepingTicker.Stop()
plegCh := kl.pleg.Watch()
const (
base = 100 * time.Millisecond
max = 5 * time.Second
factor = 2
)
//该函数是kubelet的主要同步循环,用于处理来自三个渠道(文件、apiserver和http)的更改。
//它将这些更改合并,并对期望状态和运行状态进行同步。如果配置没有更改,它将每sync-frequency秒同步最后一次已知的期望状态。
//该函数不会返回。 详细解释:
//- syncLoop 函数是 Kubelet 类的一个方法,它接收一个上下文对象 ctx、一个 PodUpdate 类型的通道 updates 和一个 SyncHandler 类型的处理程序 handler。
//- 函数首先使用 klog.InfoS 函数记录信息日志,表示开始启动 kubelet 的主同步循环。
//- 然后,函数创建一个 syncTicker,用于唤醒 kubelet 检查是否有需要同步的 pod 工作线程。
//由于同步间隔默认为 10s,因此使用 1s 的周期就足够了。
//- 函数还创建一个 housekeepingTicker,用于执行清理操作。
//- 接下来,函数通过调用 kl.pleg.Watch() 方法来监听 pod 状态变化事件,并将返回的通道赋值给 plegCh 变量。
//- 函数定义了一些常量,用于控制重试的时间间隔。
//- 在无限循环中,函数首先通过 select 语句监听多个通道。如果 ctx.Done() 通道被关闭,函数将退出循环。
//- 如果 updates 通道有新的更改,函数将调用 handler 处理程序进行同步。
//- 如果 plegCh 通道有新的 pod 状态变化事件,函数将调用 kl.pleg.Grab() 方法进行处理。
//- 如果 housekeepingTicker.C 通道触发清理操作,函数将调用 kl.doHousekeeping() 方法进行清理。
//- 如果没有监听到任何事件,则使用 time.Sleep() 函数等待一段时间后继续循环。
//- 如果在尝试同步时发生错误,函数将根据重试策略进行重试。重试时间间隔会根据失败次数进行指数增加,最大间隔为 5s。
//- 如果同步成功,函数将重置重试时间间隔为基本值 100ms。
//- 如果同步失败超过一定次数,函数将记录错误日志并退出循环。
duration := base
// Responsible for checking limits in resolv.conf
// The limits do not have anything to do with individual pods
// Since this is called in syncLoop, we don't need to call it anywhere else
if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
kl.dnsConfigurer.CheckLimitsForResolvConf()
}
//这段Go代码中的函数主要负责检查resolv.conf文件中的限制。
//该函数会判断kl.dnsConfigurer是否为空,且其ResolverConfig是否已设置,如果满足条件,
//则调用kl.dnsConfigurer的CheckLimitsForResolvConf方法进行限制检查。
//由于该函数是在syncLoop中调用的,因此不需要在其他地方再次调用。
for {
if err := kl.runtimeState.runtimeErrors(); err != nil {
klog.ErrorS(err, "Skipping pod synchronization")
// exponential backoff
time.Sleep(duration)
duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
continue
}
// reset backoff if we have a success
duration = base
kl.syncLoopMonitor.Store(kl.clock.Now())
if !kl.syncLoopIteration(ctx, updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
kl.syncLoopMonitor.Store(kl.clock.Now())
}
}
//这段Go代码是一个无限循环,用于进行Pod同步。循环中包含两个主要部分:错误处理和同步逻辑。
//1. 错误处理: - 使用runtimeErrors()检查运行时错误。
//- 如果存在错误,则记录错误日志,并进行指数退避。
//- 通过time.Sleep()暂停执行,暂停时间根据退避策略计算得出。
//- 退避策略是通过将duration乘以factor并限制最大值来计算的。
//- 出现错误时,使用continue语句继续下一次循环。
//2. 同步逻辑: - 在没有错误的情况下,重置退避时间至初始值。
//- 更新syncLoopMonitor的值为当前时间。
//- 调用syncLoopIteration()函数进行Pod同步。
//- 如果syncLoopIteration()返回false,则退出循环。
//- 更新syncLoopMonitor的值为当前时间。
//这段代码的主要目的是在遇到错误时进行指数退避,并在成功时重置退避时间,持续进行Pod同步操作。
// syncLoopIteration reads from various channels and dispatches pods to the
// given handler.
//
// Arguments:
// 1. configCh: a channel to read config events from
// 2. handler: the SyncHandler to dispatch pods to
// 3. syncCh: a channel to read periodic sync events from
// 4. housekeepingCh: a channel to read housekeeping events from
// 5. plegCh: a channel to read PLEG updates from
//
// Events are also read from the kubelet liveness manager's update channel.
//
// The workflow is to read from one of the channels, handle that event, and
// update the timestamp in the sync loop monitor.
//
// Here is an appropriate place to note that despite the syntactical
// similarity to the switch statement, the case statements in a select are
// evaluated in a pseudorandom order if there are multiple channels ready to
// read from when the select is evaluated. In other words, case statements
// are evaluated in random order, and you can not assume that the case
// statements evaluate in order if multiple channels have events.
//
// With that in mind, in truly no particular order, the different channels
// are handled as follows:
//
// - configCh: dispatch the pods for the config change to the appropriate
// handler callback for the event type
// - plegCh: update the runtime cache; sync pod
// - syncCh: sync all pods waiting for sync
// - housekeepingCh: trigger cleanup of pods
// - health manager: sync pods that have failed or in which one or more
// containers have failed health checks
func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
// Update from a config source; dispatch it to the right handler
// callback.
//该函数是一个同步循环,用于从多个通道读取事件并将其分发给给定的处理程序。
//它接收5个参数:configCh(配置事件的通道)、handler(分发pod的SyncHandler处理程序)、syncCh(定期同步事件的通道)、
//housekeepingCh(清理事件的通道)和plegCh(PLEG更新的通道)。
//事件还会从kubelet存活管理器的更新通道中读取。该函数的工作流程是读取一个通道中的事件,处理该事件,并在同步循环监视器中更新时间戳。
//在处理这些通道中的事件时,需要注意case语句的评估顺序是随机的。
//具体来说,不同通道的处理方式如下:
//- configCh:将配置更改分发给适当的处理程序回调。
//- plegCh:更新运行时缓存并同步pod。
//- syncCh:同步所有等待同步的pod。
//- housekeepingCh:触发清理pod。
//- 健康管理器:同步失败或其中一个或多个容器失败的健康检查的pod。
if !open {
klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
return false
}
//这段Go代码是一个条件判断语句,判断变量open是否为false,如果为false,则通过klog.ErrorS函数记录错误日志,并返回false。
//这段代码的作用是在某个更新通道关闭时,退出同步循环,并记录错误日志。
switch u.Op {
case kubetypes.ADD:
klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
// After restarting, kubelet will get all existing pods through
// ADD as if they are new pods. These pods will then go through the
// admission process and *may* be rejected. This can be resolved
// once we have checkpointing.
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
case kubetypes.SET:
// TODO: Do we want to support this?
klog.ErrorS(nil, "Kubelet does not support snapshot update")
default:
klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
}
//该函数根据传入的操作类型(u.Op)执行相应的处理逻辑。
//- 当操作类型为kubetypes.ADD时,会记录日志信息并调用handler.HandlePodAdditions(u.Pods)处理Pod的添加操作。
//- 当操作类型为kubetypes.UPDATE时,会记录日志信息并调用handler.HandlePodUpdates(u.Pods)处理Pod的更新操作。
//- 当操作类型为kubetypes.REMOVE时,会记录日志信息并调用handler.HandlePodRemoves(u.Pods)处理Pod的移除操作。
//- 当操作类型为kubetypes.RECONCILE时,会记录日志信息并调用handler.HandlePodReconcile(u.Pods)处理Pod的协调操作。
//- 当操作类型为kubetypes.DELETE时,会记录日志信息并调用handler.HandlePodUpdates(u.Pods)处理Pod的删除操作(被当做更新操作处理)。 - 当操作类型为kubetypes.SET时,会记录错误信息,表示当前不支持该操作。 - 当操作类型为其他值时,会记录错误信息,表示收到了无效的操作类型
kl.sourcesReady.AddSource(u.Source)
case e := <-plegCh:
if isSyncPodWorthy(e) {
// PLEG event for a pod; sync it.
if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
handler.HandlePodSyncs([]*v1.Pod{pod})
} else {
// If the pod no longer exists, ignore the event.
klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
}
}
//这段Go代码是在一个select语句的分支中,首先将u.Source添加到kl.sourcesReady中。
//随后,从plegCh通道中接收一个事件e,并通过isSyncPodWorthy(e)函数判断是否需要同步该事件对应的Pod。
//如果需要同步,通过kl.podManager.GetPodByUID(e.ID)获取Pod对象,并调用handler.HandlePodSyncs([]*v1.Pod{pod})进行同步操作。
//如果Pod不存在,则在日志中记录相关信息并忽略该事件。
if e.Type == pleg.ContainerDied {
if containerID, ok := e.Data.(string); ok {
kl.cleanUpContainersInPod(e.ID, containerID)
}
}
//该代码片段是一个if语句,判断e.Type是否等于pleg.ContainerDied,
//如果是,则进一步判断e.Data是否为字符串类型,并将字符串类型的e.Data赋值给containerID。
//如果上述判断都为真,则调用kl.cleanUpContainersInPod(e.ID, containerID)方法。
//这段代码的主要作用是在容器死亡时,清理Pod中的容器。
case <-syncCh:
// Sync pods waiting for sync
podsToSync := kl.getPodsToSync()
if len(podsToSync) == 0 {
break
}
klog.V(4).InfoS("SyncLoop (SYNC) pods", "total", len(podsToSync), "pods", klog.KObjSlice(podsToSync))
handler.HandlePodSyncs(podsToSync)
case update := <-kl.livenessManager.Updates():
if update.Result == proberesults.Failure {
handleProbeSync(kl, update, handler, "liveness", "unhealthy")
}
//这是一个Go语言的代码片段,它通过监听两个通道(syncCh和kl.livenessManager.Updates())来实现Pod的同步和健康检查。
//具体功能如下:
//1. 如果监听到syncCh通道有消息,会执行以下操作:
//- 调用kl.getPodsToSync()获取需要同步的Pod列表;
//- 如果列表为空,则跳出当前循环;
//- 如果列表不为空,则通过klog记录日志信息,并调用handler.HandlePodSyncs(podsToSync)来处理Pod的同步操作。
//2. 如果监听到kl.livenessManager.Updates()通道有消息,会执行以下操作:
//- 检查消息的结果是否为失败(proberesults.Failure),如果是,
//则调用handleProbeSync(kl, update, handler, "liveness", "unhealthy")来处理不健康的Pod。
//这段代码通过使用Go语言的并发特性,实现了在同步Pod的同时进行健康检查的功能。
case update := <-kl.readinessManager.Updates():
ready := update.Result == proberesults.Success
kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
status := ""
if ready {
status = "ready"
}
handleProbeSync(kl, update, handler, "readiness", status)
case update := <-kl.startupManager.Updates():
started := update.Result == proberesults.Success
kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
status := "unhealthy"
if started {
status = "started"
}
handleProbeSync(kl, update, handler, "startup", status)
//这段Go代码包含两个case语句,它们分别处理来自kl.readinessManager.Updates()和kl.startupManager.Updates()的更新。
//每个case语句内部都会根据更新的结果来更新容器的就绪状态或启动状态,并调用handleProbeSync函数。
//1. 第一个case语句处理就绪状态的更新:
//- 通过update.Result == proberesults.Success判断就绪检查是否成功,将结果赋值给ready变量。
//- 调用kl.statusManager.SetContainerReadiness来设置容器的就绪状态。
//- 根据ready的值决定status的值,如果就绪则为"ready",否则为空字符串。
//- 调用handleProbeSync函数处理就绪探针的同步,传入参数包括kl、update、handler、探针类型"readiness"和status。
//2. 第二个case语句处理启动状态的更新:
//- 通过update.Result == proberesults.Success判断启动检查是否成功,将结果赋值给started变量。
//- 调用kl.statusManager.SetContainerStartup来设置容器的启动状态。
//- 根据started的值决定status的值,如果启动成功则为"started",否则为"unhealthy"。
//- 调用handleProbeSync函数处理启动探针的同步,传入参数包括kl、update、handler、探针类型"startup"和status。
//综上所述,这段代码主要负责处理容器的就绪和启动状态更新,并通过handleProbeSync函数进行进一步处理。
case <-housekeepingCh:
if !kl.sourcesReady.AllReady() {
// If the sources aren't ready or volume manager has not yet synced the states,
// skip housekeeping, as we may accidentally delete pods from unready sources.
klog.V(4).InfoS("SyncLoop (housekeeping, skipped): sources aren't ready yet")
} else {
start := time.Now()
klog.V(4).InfoS("SyncLoop (housekeeping)")
if err := handler.HandlePodCleanups(ctx); err != nil {
klog.ErrorS(err, "Failed cleaning pods")
}
duration := time.Since(start)
if duration > housekeepingWarningDuration {
klog.ErrorS(fmt.Errorf("housekeeping took too long"), "Housekeeping took longer than expected", "expected", housekeepingWarningDuration, "actual", duration.Round(time.Millisecond))
}
klog.V(4).InfoS("SyncLoop (housekeeping) end", "duration", duration.Round(time.Millisecond))
}
}
return true
//该Go函数是一个case语句,从housekeepingCh通道接收信号。
//当接收到信号时,它会检查kl.sourcesReady.AllReady()是否为true,如果不为true,则跳过清理工作,因为可能从未准备好的源意外删除Pod。
//如果kl.sourcesReady.AllReady()为true,则开始执行清理工作,并记录开始时间。
//清理工作由handler.HandlePodCleanups(ctx)函数执行,如果执行失败,则记录错误信息。
//最后,计算清理工作的时间,如果时间超过预期的housekeepingWarningDuration,则记录警告信息。
//最后,函数返回true。
}
func handleProbeSync(kl *Kubelet, update proberesults.Update, handler SyncHandler, probe, status string) {
// We should not use the pod from manager, because it is never updated after initialization.
pod, ok := kl.podManager.GetPodByUID(update.PodUID)
if !ok {
// If the pod no longer exists, ignore the update.
klog.V(4).InfoS("SyncLoop (probe): ignore irrelevant update", "probe", probe, "status", status, "update", update)
return
}
klog.V(1).InfoS("SyncLoop (probe)", "probe", probe, "status", status, "pod", klog.KObj(pod))
handler.HandlePodSyncs([]*v1.Pod{pod})
}
//该函数用于处理探针同步操作。
//1. 首先,它通过update获取Pod的UID,并尝试从Kubelet的podManager中获取对应的Pod对象。
//2. 如果找不到Pod,则忽略更新,并记录日志。
//3. 如果找到Pod,则记录日志,并调用handler处理Pod的同步操作,将该Pod对象以切片的形式传递给HandlePodSyncs方法。
// HandlePodAdditions is the callback in SyncHandler for pods being added from
// a config source.
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
start := kl.clock.Now()
sort.Sort(sliceutils.PodsByCreationTime(pods))
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
kl.podResizeMutex.Lock()
defer kl.podResizeMutex.Unlock()
}
//该函数是Kubelet的SyncHandler回调函数,用于处理从配置源添加的Pods。
//函数首先记录开始时间,然后对Pods进行排序。
//如果启用了InPlacePodVerticalScaling特性,则加锁并释放锁以进行Pod的垂直扩展。
for _, pod := range pods {
existingPods := kl.podManager.GetPods()
// Always add the pod to the pod manager. Kubelet relies on the pod
// manager as the source of truth for the desired state. If a pod does
// not exist in the pod manager, it means that it has been deleted in
// the apiserver and no action (other than cleanup) is required.
kl.podManager.AddPod(pod)
//这段Go代码是一个for循环,其中pods是一个Pod的切片。
//对于每个Pod,它会执行以下操作:
//1. 调用kl.podManager.GetPods()获取当前已存在的Pods。
//2. 调用kl.podManager.AddPod(pod)将当前Pod添加到podManager中。
//这段代码的主要目的是将所有的Pod添加到podManager中。
//podManager是Kubelet的核心组件之一,它负责管理Pod的生命周期。
//通过将Pod添加到podManager中,Kubelet可以确保它知道集群中所有Pod的期望状态,以便它可以正确地管理和同步Pod的状态与实际状态。
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
if wasMirror {
if pod == nil {
klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID)
continue
}
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: kubetypes.SyncPodUpdate,
StartTime: start,
})
continue
}
//这段Go代码是Kubernetes中kubelet的一个片段,主要功能是处理mirror pod和其对应的pod的关系。
//- kl.podManager.GetPodAndMirrorPod(pod)函数用于获取mirror pod和其对应的pod。
//- 如果wasMirror为true,表示当前处理的是mirror pod。
//- 如果pod为空,则记录日志并跳过当前循环。
//- 如果pod不为空,则调用kl.podWorkers.UpdatePod()函数更新pod信息,其中UpdateType为kubetypes.SyncPodUpdate,表示同步更新pod。
//总结:这段代码主要处理mirror pod的更新操作。
// Only go through the admission process if the pod is not requested
// for termination by another part of the kubelet. If the pod is already
// using resources (previously admitted), the pod worker is going to be
// shutting it down. If the pod hasn't started yet, we know that when
// the pod worker is invoked it will also avoid setting up the pod, so
// we simply avoid doing any work.
if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
// We failed pods that we rejected, so activePods include all admitted
// pods that are alive.
activePods := kl.filterOutInactivePods(existingPods)
//这段Go代码中的函数是一个条件判断语句,其作用是判断某个Pod是否被其他部分请求终止。
//如果该Pod没有被请求终止,并且之前已经被允许使用资源(先前已通过准入控制),则该函数会让Pod worker继续关闭该Pod。
//如果该Pod还没有启动,函数会知道当Pod worker被调用时,它也会避免设置Pod,因此函数会避免执行任何操作。
//如果Pod没有被请求终止,则函数会过滤掉所有非活跃的Pod,只保留活跃的Pod。
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
// To handle kubelet restarts, test pod admissibility using AllocatedResources values
// (for cpu & memory) from checkpoint store. If found, that is the source of truth.
podCopy := pod.DeepCopy()
kl.updateContainerResourceAllocation(podCopy)
//这段Go代码中的函数主要做的是检查是否启用了InPlacePodVerticalScaling特性,
//如果启用了,则会通过allocatedResources值(来自checkpoint store的CPU和内存)来测试pod的可接受性。
//如果找到allocatedResources,则将其视为真相来源,并对pod进行深拷贝并更新容器资源分配。
// Check if we can admit the pod; if not, reject it.
if ok, reason, message := kl.canAdmitPod(activePods, podCopy); !ok {
kl.rejectPod(pod, reason, message)
continue
}
//这段Go代码是一个条件判断语句,其主要功能是检查是否可以接受一个Pod(容器的组合),如果不能接受,则拒绝该Pod。
//具体来说,函数kl.canAdmitPod(activePods, podCopy)会返回一个布尔值ok、一个原因reason和一条消息message。
//如果ok为false,则表示不能接受该Pod,此时会调用kl.rejectPod(pod, reason, message)函数来拒绝该Pod,并继续执行下一次循环。
//需要注意的是,这里的kl是一个对象实例,它调用了canAdmitPod和rejectPod两个方法。
//根据代码上下文来理解,这段代码应该是在处理 Kubernetes 中的 Pod 调度逻辑。
// For new pod, checkpoint the resource values at which the Pod has been admitted
if err := kl.statusManager.SetPodAllocation(podCopy); err != nil {
//TODO(vinaykul,InPlacePodVerticalScaling): Can we recover from this in some way? Investigate
klog.ErrorS(err, "SetPodAllocation failed", "pod", klog.KObj(pod))
}
//该函数用于在创建新的Pod时,设置Pod的资源分配情况。具体操作是调用kl.statusManager.SetPodAllocation(podCopy)方法,
//将Pod的资源分配信息保存到Pod的状态中。如果设置失败,会记录错误日志,并标记为待调查的问题。
} else {
// Check if we can admit the pod; if not, reject it.
if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
kl.rejectPod(pod, reason, message)
continue
//这段代码是Kubernetes中处理Pod调度的逻辑。如果Pod不能被当前节点接纳,则会拒绝该Pod,并继续处理下一个Pod。
//kl.canAdmitPod()函数检查Pod是否可以被接纳,
//如果不可以,则调用kl.rejectPod()函数拒绝Pod,并提供拒绝的原因和信息。
}
}
}
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: kubetypes.SyncPodCreate,
StartTime: start,
//该函数用于更新Pod的状态信息。
//- 参数UpdatePodOptions包含了更新Pod所需的各项参数:
//- Pod: 要更新的Pod对象。 - MirrorPod: Pod的镜像对象。
//- UpdateType: 更新类型,此处为SyncPodCreate,表示创建Pod。 - StartTime: 更新开始时间。
//- 该函数会根据传入的参数更新Pod的状态,并进行相应的日志记录和状态同步操作
})
}
}
//这段Go代码是Kubernetes中kubelet组件的一部分,用于处理新创建的Pod的分配和准入控制。
//1. 如果是新Pod,会调用kl.statusManager.SetPodAllocation(podCopy)函数来记录Pod被接纳时的资源值。如果记录失败,会打印错误日志。
//2. 如果Pod不是新创建的,则会调用kl.canAdmitPod(activePods, pod)函数来检查是否可以接纳该Pod。
//如果不能接纳,则会调用kl.rejectPod(pod, reason, message)函数来拒绝Pod,并继续处理下一个Pod。
//3. 如果Pod可以接纳,则会调用kl.podWorkers.UpdatePod()函数来更新Pod的状态。
//这段代码的主要作用是实现Pod的准入控制和状态更新。
// updateContainerResourceAllocation updates AllocatedResources values
// (for cpu & memory) from checkpoint store
func (kl *Kubelet) updateContainerResourceAllocation(pod *v1.Pod) {
for _, c := range pod.Spec.Containers {
allocatedResources, found := kl.statusManager.GetContainerResourceAllocation(string(pod.UID), c.Name)
if c.Resources.Requests != nil && found {
if _, ok := allocatedResources[v1.ResourceCPU]; ok {
c.Resources.Requests[v1.ResourceCPU] = allocatedResources[v1.ResourceCPU]
}
if _, ok := allocatedResources[v1.ResourceMemory]; ok {
c.Resources.Requests[v1.ResourceMemory] = allocatedResources[v1.ResourceMemory]
}
}
}
}
//该函数的作用是更新容器的资源分配信息(包括CPU和内存)。
//具体实现如下:
//1. 遍历Pod中所有的容器。
//2. 对于每个容器,通过调用kl.statusManager.GetContainerResourceAllocation方法,获取该容器的资源分配信息。
//3. 如果该容器的资源请求(c.Resources.Requests)不为空且成功获取到资源分配信息(allocatedResources),
//则将分配的CPU和内存资源信息更新到容器的资源请求中。
// HandlePodUpdates is the callback in the SyncHandler interface for pods
// being updated from a config source.
func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {
start := kl.clock.Now()
for _, pod := range pods {
kl.podManager.UpdatePod(pod)
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
if wasMirror {
if pod == nil {
klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID)
continue
}
}
//该函数是Kubelet的HandlePodUpdates方法,用于处理从配置源更新的Pod。
//它遍历传入的Pod数组,并通过kl.podManager.UpdatePod(pod)更新每个Pod。
//然后,它获取每个Pod及其镜像Pod,并根据是否是镜像Pod进行不同的处理。
//如果找不到对应的Pod,则跳过该镜像Pod。
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: kubetypes.SyncPodUpdate,
StartTime: start,
})
}
//该函数用于更新Pod的状态信息。
//- 参数pod是要更新的Pod对象。
//- 参数mirrorPod是Pod的镜像对象。
//- 参数updateType指定更新的类型,这里是同步更新。
//- 参数startTime记录更新开始的时间。
//该函数会根据参数更新Pod的相关状态,并记录更新的开始时间。
}
// HandlePodRemoves is the callback in the SyncHandler interface for pods
// being removed from a config source.
func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
start := kl.clock.Now()
for _, pod := range pods {
kl.podManager.RemovePod(pod)
//该函数是Kubelet的HandlePodRemoves方法,用于处理从配置源中移除的Pods。
//它遍历传入的Pods数组,并通过kl.podManager.RemovePod方法将每个Pod从管理器中移除。
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
if wasMirror {
if pod == nil {
klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID)
continue
}
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: kubetypes.SyncPodUpdate,
StartTime: start,
})
continue
}
//这段Go代码是Kubernetes中kubelet的一个片段,主要功能是处理mirror pod和其对应的pod的关系。
//- kl.podManager.GetPodAndMirrorPod(pod)函数用于获取mirror pod和其对应的pod。
//- 如果wasMirror为true,表示当前处理的是mirror pod。
//- 如果pod为空,则记录日志并跳过当前循环。
//- 如果pod不为空,则调用kl.podWorkers.UpdatePod()函数更新pod信息,其中UpdateType为kubetypes.SyncPodUpdate,表示同步更新pod。
//总结:这段代码主要处理mirror pod的更新操作。
// Deletion is allowed to fail because the periodic cleanup routine
// will trigger deletion again.
if err := kl.deletePod(pod); err != nil {
klog.V(2).InfoS("Failed to delete pod", "pod", klog.KObj(pod), "err", err)
}
}
}
//这段Go代码中的函数是一个if条件语句,其主要功能是尝试删除一个pod(Kubernetes中的一个容器化应用程序实例)。
//如果删除操作失败,函数会记录一条日志信息。 具体分析如下:
//1. 这段代码首先调用kl.deletePod(pod)方法尝试删除pod。
//2. 如果删除操作失败,即err != nil,则会执行if语句内部的代码块。
//3. 在代码块中,会使用klog.V(2).InfoS方法记录一条日志信息,包含以下内容:
//- "Failed to delete pod":表示删除pod失败。
//- "pod":表示被删除的pod的详细信息。
//- "err":表示删除操作失败时返回的错误信息。
//4. 该函数的主要目的是允许删除操作失败,因为有一个定期清理程序会再次触发删除操作。
//总结:这段代码的主要功能是尝试删除一个pod,并在删除失败时记录一条日志信息。
// HandlePodReconcile is the callback in the SyncHandler interface for pods
// that should be reconciled. Pods are reconciled when only the status of the
// pod is updated in the API.
func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {
start := kl.clock.Now()
for _, pod := range pods {
// Update the pod in pod manager, status manager will do periodically reconcile according
// to the pod manager.
kl.podManager.UpdatePod(pod)
//该函数是Kubelet实现SyncHandler接口的回调函数,用于处理需要进行状态同步的Pod。
//当Pod的状态在API中被更新时,就会触发Pod的同步。
//函数遍历传入的Pod列表,通过调用Kubelet的podManager的UpdatePod方法,
//更新Pod的状态,并由statusManager定期进行状态同步。
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
if wasMirror {
if pod == nil {
klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID)
continue
}
// Static pods should be reconciled the same way as regular pods
}
//这段Go代码是Kubernetes中的一部分,用于处理Pod和其对应的Mirror Pod的关系。
//下面是这段代码的功能解释:
//- kl.podManager.GetPodAndMirrorPod(pod):获取给定Pod及其对应的Mirror Pod。
//- wasMirror:判断返回的Mirror Pod是否是之前已经存在的。
//- 如果wasMirror为true,则进一步判断pod是否为nil。
//- 如果pod为nil,则记录日志信息,并跳过当前循环。
//- 如果pod不为nil,则将Static Pods与普通Pods进行一致性处理。
//这段代码的主要作用是在处理Pod的过程中,对Mirror Pod进行相应的处理。
// TODO: reconcile being calculated in the config manager is questionable, and avoiding
// extra syncs may no longer be necessary. Reevaluate whether Reconcile and Sync can be
// merged (after resolving the next two TODOs).
// Reconcile Pod "Ready" condition if necessary. Trigger sync pod for reconciliation.
// TODO: this should be unnecessary today - determine what is the cause for this to
// be different than Sync, or if there is a better place for it. For instance, we have
// needsReconcile in kubelet/config, here, and in status_manager.
if status.NeedToReconcilePodReadiness(pod) {
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: kubetypes.SyncPodSync,
StartTime: start,
})
}
//这段Go代码中的函数是一个待完成的任务,它的功能是重新调整Pod的"Ready"状态,如果需要的话。
//该函数的实现需要进一步评估和确定,因为它可能不再必要,而且可能会导致额外的同步操作。
//函数的具体实现包括检查Pod是否需要重新调整状态,并根据需要触发同步操作。
//这个函数存在一些待解决的问题和TODO,需要进一步解决和改进。
// After an evicted pod is synced, all dead containers in the pod can be removed.
// TODO: this is questionable - status read is async and during eviction we already
// expect to not have some container info. The pod worker knows whether a pod has
// been evicted, so if this is about minimizing the time to react to an eviction we
// can do better. If it's about preserving pod status info we can also do better.
if eviction.PodIsEvicted(pod.Status) {
if podStatus, err := kl.podCache.Get(pod.UID); err == nil {
kl.containerDeletor.deleteContainersInPod("", podStatus, true)
}
}
}
//这段Go代码中的函数用于在Pod被驱逐后同步删除Pod中的所有死亡容器。
//函数首先检查Pod是否已被驱逐,如果是,则尝试从podCache中获取Pod的状态信息。
//如果获取成功,则调用kl.containerDeletor.deleteContainersInPod函数删除Pod中的所有容器。
}
// HandlePodSyncs is the callback in the syncHandler interface for pods
// that should be dispatched to pod workers for sync.
func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) {
start := kl.clock.Now()
for _, pod := range pods {
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
if wasMirror {
if pod == nil {
klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID)
continue
}
//该函数是Kubelet的HandlePodSyncs方法,用于处理同步Pod的工作。
//它接收一个Pod列表作为参数,然后遍历每个Pod,获取对应的Pod和镜像Pod,并判断是否为镜像Pod。
//如果是镜像Pod且找不到对应的Pod,则跳过该Pod的处理。
// Syncing a mirror pod is a programmer error since the intent of sync is to
// batch notify all pending work. We should make it impossible to double sync,
// but for now log a programmer error to prevent accidental introduction.
klog.V(3).InfoS("Programmer error, HandlePodSyncs does not expect to receive mirror pods", "podUID", pod.UID, "mirrorPodUID", mirrorPod.UID)
continue
}
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: kubetypes.SyncPodSync,
StartTime: start,
})
}
}
//该函数是Kubelet的HandlePodSyncs方法,用于处理Pod的同步操作。
//它接收一个Pod列表作为参数,遍历每个Pod,获取对应的Pod和mirror Pod,并根据情况执行相应的操作。
//如果传入的是mirror Pod,则记录日志并跳过。
//否则,通过podWorkers更新Pod的状态。
func isPodResizeInProgress(pod *v1.Pod, podStatus *v1.PodStatus) bool {
for _, c := range pod.Spec.Containers {
if cs, ok := podutil.GetContainerStatus(podStatus.ContainerStatuses, c.Name); ok {
if cs.Resources == nil {
continue
}
if !cmp.Equal(c.Resources.Limits, cs.Resources.Limits) || !cmp.Equal(cs.AllocatedResources, cs.Resources.Requests) {
return true
}
}
}
return false
}
//该函数用于检查Pod中是否有容器的资源正在扩容中。
//它遍历Pod的容器规格,获取每个容器的状态,然后比较容器的资源限制和请求是否相等。
//如果不相等,则表示容器资源正在扩容,函数返回true;
//否则,返回false。
func (kl *Kubelet) canResizePod(pod *v1.Pod) (bool, *v1.Pod, v1.PodResizeStatus) {
var otherActivePods []*v1.Pod
node, err := kl.getNodeAnyWay()
if err != nil {
klog.ErrorS(err, "getNodeAnyway function failed")
return false, nil, ""
}
podCopy := pod.DeepCopy()
cpuAvailable := node.Status.Allocatable.Cpu().MilliValue()
memAvailable := node.Status.Allocatable.Memory().Value()
cpuRequests := resource.GetResourceRequest(podCopy, v1.ResourceCPU)
memRequests := resource.GetResourceRequest(podCopy, v1.ResourceMemory)
if cpuRequests > cpuAvailable || memRequests > memAvailable {
klog.V(3).InfoS("Resize is not feasible as request exceeds allocatable node resources", "pod", podCopy.Name)
return false, podCopy, v1.PodResizeStatusInfeasible
//该函数的主要功能是判断是否可以对给定的Pod进行扩容。具体步骤包括:
//通过kl.getNodeAnyWay()获取一个Node节点,深拷贝Pod对象,计算Node节点的可用CPU和内存资源量,获取Pod的CPU和内存请求量,
//如果Pod的请求量超过了Node的可用资源量,则认为扩容不可行,并返回相应的信息。
}
//该函数是Kubelet的一个方法,用于判断是否可以调整Pod的大小。它首先获取当前节点的信息,并创建Pod的深拷贝。
//然后,它比较节点可用的CPU和内存资源与Pod请求的资源,如果请求的资源超过了节点的可用资源,则返回不可调整大小的错误信息。
// Treat the existing pod needing resize as a new pod with desired resources seeking admit.
// If desired resources don't fit, pod continues to run with currently allocated resources.
activePods := kl.GetActivePods()
for _, p := range activePods {
if p.UID != pod.UID {
otherActivePods = append(otherActivePods, p)
}
}
//该函数用于处理需要调整资源的现有Pod,将其视为具有所需资源的新Pod寻求准入。
//如果所需资源无法适应,则Pod将继续以目前分配的资源运行。
//函数首先获取所有活跃的Pod,然后遍历这些Pod,将与给定Pod UID不相同的Pod添加到otherActivePods列表中。
if ok, failReason, failMessage := kl.canAdmitPod(otherActivePods, podCopy); !ok {
// Log reason and return. Let the next sync iteration retry the resize
klog.V(3).InfoS("Resize cannot be accommodated", "pod", podCopy.Name, "reason", failReason, "message", failMessage)
return false, podCopy, v1.PodResizeStatusDeferred
}
//该函数是kl.canAdmitPod()的调用方,用于判断是否可以调整pod的大小。
//如果kl.canAdmitPod()返回不成功,即不满足调整大小的条件,则记录原因并返回false,
//同时返回不成功的pod副本和v1.PodResizeStatusDeferred状态。
for _, container := range podCopy.Spec.Containers {
idx, found := podutil.GetIndexOfContainerStatus(podCopy.Status.ContainerStatuses, container.Name)
if found {
for rName, rQuantity := range container.Resources.Requests {
podCopy.Status.ContainerStatuses[idx].AllocatedResources[rName] = rQuantity
}
}
}
return true, podCopy, v1.PodResizeStatusInProgress
}
//该函数的功能是更新podCopy的状态,将每个容器的资源请求分配给对应的容器状态。
//具体来说,它遍历podCopy的Spec.Containers,通过podutil.GetIndexOfContainerStatus函数获取每个容器在podCopy.Status.ContainerStatuses中的索引,
//如果找到,则遍历该容器的Resources.Requests,将其分配给podCopy.Status.ContainerStatuses中对应容器的AllocatedResources。
//最后,函数返回true,podCopy和v1.PodResizeStatusInProgress,表示更新成功并且正在进行调整大小操作。
func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod) *v1.Pod {
if pod.Status.Phase != v1.PodRunning {
return pod
}
podResized := false
for _, container := range pod.Spec.Containers {
if len(container.Resources.Requests) == 0 {
continue
}
containerStatus, found := podutil.GetContainerStatus(pod.Status.ContainerStatuses, container.Name)
if !found {
klog.V(5).InfoS("ContainerStatus not found", "pod", pod.Name, "container", container.Name)
break
}
//该函数是Kubelet的一个方法,用于处理Pod资源的调整。
//当Pod的状态不是运行中时,直接返回该Pod。
//遍历Pod的容器,如果容器的资源请求不为空,则尝试获取容器的状态。
//如果找不到容器状态,则记录日志并退出循环。
if len(containerStatus.AllocatedResources) != len(container.Resources.Requests) {
klog.V(5).InfoS("ContainerStatus.AllocatedResources length mismatch", "pod", pod.Name, "container", container.Name)
break
}
if !cmp.Equal(container.Resources.Requests, containerStatus.AllocatedResources) {
podResized = true
break
}
}
//这段Go代码是在检查容器状态(containerStatus)中已分配的资源与容器请求的资源(container.Resources.Requests)是否一致。
//首先,它会比较两个资源列表的长度,如果不相等,则记录一条信息并跳出循环。
//接着,它使用cmp.Equal函数来比较两个资源列表的内容是否完全相等,如果不相等,则将podResized标记为true并跳出循环。
//总的来说,这段代码的主要作用是检测容器的资源请求是否已经被正确分配。
if !podResized {
return pod
}
kl.podResizeMutex.Lock()
defer kl.podResizeMutex.Unlock()
fit, updatedPod, resizeStatus := kl.canResizePod(pod)
if updatedPod == nil {
return pod
}
if fit {
// Update pod resource allocation checkpoint
if err := kl.statusManager.SetPodAllocation(updatedPod); err != nil {
//TODO(vinaykul,InPlacePodVerticalScaling): Can we recover from this in some way? Investigate
klog.ErrorS(err, "SetPodAllocation failed", "pod", klog.KObj(updatedPod))
return pod
}
}
//这段Go代码主要实现了Pod的垂直扩展功能。
//具体来说,它首先检查Pod是否已经进行了大小调整,如果没有,则直接返回原始Pod。
//然后,它使用互斥锁来确保对Pod大小调整操作的原子性。
//接下来,它调用canResizePod函数来检查是否可以对Pod进行大小调整,
//如果返回的updatedPod为nil,则表示无法调整大小,直接返回原始Pod。如果可以调整大小,则将更新后的Pod的资源分配状态保存到状态管理器中。
//如果保存失败,则记录错误日志并返回原始Pod。
if resizeStatus != "" {
// Save resize decision to checkpoint
if err := kl.statusManager.SetPodResizeStatus(updatedPod.UID, resizeStatus); err != nil {
//TODO(vinaykul,InPlacePodVerticalScaling): Can we recover from this in some way? Investigate
klog.ErrorS(err, "SetPodResizeStatus failed", "pod", klog.KObj(updatedPod))
return pod
}
updatedPod.Status.Resize = resizeStatus
}
kl.podManager.UpdatePod(updatedPod)
kl.statusManager.SetPodStatus(updatedPod, updatedPod.Status)
return updatedPod
}
//这段Go代码中的函数逻辑如下:
//首先,检查resizeStatus是否为空。如果不为空,则执行以下操作:
//1. 尝试将调整大小的决策保存到检查点,调用kl.statusManager.SetPodResizeStatus方法,并传入updatedPod.UID和resizeStatus作为参数。
//如果出现错误,则记录错误日志并返回原始pod对象。
//2. 更新updatedPod的状态,将resizeStatus赋值给updatedPod.Status.Resize。
//接下来,无论resizeStatus是否为空,都会执行以下操作:
//1. 调用kl.podManager.UpdatePod方法,传入updatedPod对象进行更新。
//2. 调用kl.statusManager.SetPodStatus方法,传入updatedPod对象和updatedPod.Status,以更新Pod的状态。
//最后,返回更新后的updatedPod对象。
//这段代码的主要目的是更新Pod的状态,并根据resizeStatus是否为空来决定是否保存调整大小的决策到检查点并更新Pod的状态。
// LatestLoopEntryTime returns the last time in the sync loop monitor.
func (kl *Kubelet) LatestLoopEntryTime() time.Time {
val := kl.syncLoopMonitor.Load()
if val == nil {
return time.Time{}
}
return val.(time.Time)
}
//该函数用于获取Kubelet同步循环监视器中的最后一个时间入口。
//它首先使用kl.syncLoopMonitor.Load()方法加载监视器的值,如果值为nil,则返回空的时间类型;
//否则,将值转换为时间类型并返回。
// updateRuntimeUp calls the container runtime status callback, initializing
// the runtime dependent modules when the container runtime first comes up,
// and returns an error if the status check fails. If the status check is OK,
// update the container runtime uptime in the kubelet runtimeState.
func (kl *Kubelet) updateRuntimeUp() {
kl.updateRuntimeMux.Lock()
defer kl.updateRuntimeMux.Unlock()
ctx := context.Background()
//该函数是Kubelet的一个方法,用于更新容器运行时的状态。
//它首先通过调用容器运行时的状态回调函数来初始化运行时依赖的模块,然后检查容器运行时的状态是否正常。
//如果状态检查失败,会返回一个错误。
//如果状态检查正常,则会更新容器运行时的运行时间。函数中使用了互斥锁来保证更新操作的原子性。
s, err := kl.containerRuntime.Status(ctx)
if err != nil {
klog.ErrorS(err, "Container runtime sanity check failed")
return
}
if s == nil {
klog.ErrorS(nil, "Container runtime status is nil")
return
}
//该Go函数通过调用kl.containerRuntime.Status(ctx)获取容器运行时的状态,并根据状态进行相应的错误处理。
//- 如果获取状态时出现错误,会使用klog.ErrorS记录错误日志,并返回。
//- 如果获取的状态为nil,会使用klog.ErrorS记录错误日志,并返回。
// Periodically log the whole runtime status for debugging.
klog.V(4).InfoS("Container runtime status", "status", s)
klogErrorS := klog.ErrorS
if !kl.containerRuntimeReadyExpected {
klogErrorS = klog.V(4).ErrorS
}
networkReady := s.GetRuntimeCondition(kubecontainer.NetworkReady)
//该代码片段是Go语言编写的,它主要实现了定期打印容器运行时状态的功能,用于调试。
//具体来说,函数首先使用klog.V(4).InfoS方法记录一条日志,内容为"Container runtime status"和状态s。
//然后,根据变量kl.containerRuntimeReadyExpected的值,选择使用不同的日志级别记录错误信息。
//最后,该函数获取容器运行时的网络状态,并将其保存在变量networkReady中。
if networkReady == nil || !networkReady.Status {
klogErrorS(nil, "Container runtime network not ready", "networkReady", networkReady)
kl.runtimeState.setNetworkState(fmt.Errorf("container runtime network not ready: %v", networkReady))
} else {
// Set nil if the container runtime network is ready.
kl.runtimeState.setNetworkState(nil)
}
// information in RuntimeReady condition will be propagated to NodeReady condition.
runtimeReady := s.GetRuntimeCondition(kubecontainer.RuntimeReady)
// If RuntimeReady is not set or is false, report an error.
//这段Go代码主要功能是检查容器运行时网络是否准备就绪,并更新运行时状态。
//首先,它检查networkReady是否为nil或状态为false。
//如果是,则记录错误并设置运行时状态为"container runtime network not ready"。
//否则,将运行时状态设置为nil。
//接下来,它获取运行时准备就绪的条件,并检查是否设置且为true。
//如果不是,则报告错误。 这段代码的目的是确保容器运行时网络已准备就绪,并更新运行时状态。
if runtimeReady == nil || !runtimeReady.Status {
klogErrorS(nil, "Container runtime not ready", "runtimeReady", runtimeReady)
kl.runtimeState.setRuntimeState(fmt.Errorf("container runtime not ready: %v", runtimeReady))
return
}
//这段Go代码是用于定期记录容器运行时状态的,主要用于调试。
//函数首先使用klog.V(4).InfoS记录容器运行时状态的信息。
//然后根据kl.containerRuntimeReadyExpected的值决定使用哪个级别的日志记录错误信息。
//接着检查容器运行时的网络状态,如果不Ready,则记录错误信息并设置相应的状态。
//最后检查容器运行时的RuntimeReady状态,如果不Ready,则记录错误信息并设置相应的状态,并返回。
kl.runtimeState.setRuntimeState(nil)
kl.runtimeState.setRuntimeHandlers(s.Handlers)
kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
kl.runtimeState.setRuntimeSync(kl.clock.Now())
}
//这段Go代码是用于定期记录容器运行时状态的,主要用于调试。
//函数首先使用klog.V(4).InfoS记录容器运行时状态的信息,然后根据kl.containerRuntimeReadyExpected的值决定使用哪个级别的日志记录错误信息。
//接着检查容器运行时的网络状态,如果不Ready,则记录错误信息并设置相应的状态。
//最后检查容器运行时的RuntimeReady状态,如果不Ready,则记录错误信息并设置相应的状态,并返回。
//如果容器运行时状态正常,则会执行以下操作:
//1. 调用kl.runtimeState.setRuntimeState(nil)设置运行时状态为正常。
//2. 调用kl.runtimeState.setRuntimeHandlers(s.Handlers)设置运行时处理程序。
//3. 调用kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)执行一次初始化运行时依赖模块的操作。
//4. 调用kl.runtimeState.setRuntimeSync(kl.clock.Now())设置运行时同步时间。
// GetConfiguration returns the KubeletConfiguration used to configure the kubelet.
func (kl *Kubelet) GetConfiguration() kubeletconfiginternal.KubeletConfiguration {
return kl.kubeletConfiguration
}
//该函数是一个Go语言函数,名为GetConfiguration,它属于Kubelet结构体。
//函数的作用是返回用于配置kubelet的KubeletConfiguration。
//- kl *Kubelet:函数接收一个Kubelet类型的指针作为参数。
//- kubeletconfiginternal.KubeletConfiguration:函数返回一个KubeletConfiguration类型的结果。
//函数实现很简单,就是直接返回kl对象的kubeletConfiguration属性。
// BirthCry sends an event that the kubelet has started up.
func (kl *Kubelet) BirthCry() {
// Make an event that kubelet restarted.
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeNormal, events.StartingKubelet, "Starting kubelet.")
}
//这个函数的功能是发送一个事件,表示kubelet已经启动。
//它通过调用kl.recorder.Eventf方法,传入kl.nodeRef、v1.EventTypeNormal、events.StartingKubelet和"Starting kubelet."作为参数,
//来记录kubelet重启的事件。
// ResyncInterval returns the interval used for periodic syncs.
func (kl *Kubelet) ResyncInterval() time.Duration {
return kl.resyncInterval
}
//该函数是Kubelet的一个方法,返回用于周期性同步的时间间隔。
//- kl *Kubelet:表示Kubelet的实例。
//- return kl.resyncInterval:返回Kubelet实例中的resyncInterval属性值,其类型为time.Duration,表示时间间隔。
// ListenAndServe runs the kubelet HTTP server.
func (kl *Kubelet) ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions,
auth server.AuthInterface, tp trace.TracerProvider) {
server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, kubeCfg, tlsOptions, auth, tp)
}
//该函数是Kubelet的ListenAndServe方法,用于启动kubelet的HTTP服务器。
//它通过调用server.ListenAndServeKubeletServer方法来实现,传入Kubelet实例、资源分析器、kubelet配置、TLS选项、认证接口和跟踪提供者作为参数。
// ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode.
func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) {
server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port)
}
//该函数用于以只读模式启动kubelet HTTP服务器。它通过调用server.ListenAndServeKubeletReadOnlyServer函数来实现,
//该函数接受kl(Kubelet实例)、kl.resourceAnalyzer(资源分析器)以及地址和端口作为参数,用于监听和提供服务。
// ListenAndServePodResources runs the kubelet podresources grpc service
func (kl *Kubelet) ListenAndServePodResources() {
endpoint, err := util.LocalEndpoint(kl.getPodResourcesDir(), podresources.Socket)
if err != nil {
klog.V(2).InfoS("Failed to get local endpoint for PodResources endpoint", "err", err)
return
}
//该函数用于启动kubelet的podresources gRPC服务。
//它首先通过调用util.LocalEndpoint函数获取本地端点,该函数的参数是kubelet的pod资源目录和Socket。
//如果获取本地端点失败,则通过klog.V(2).InfoS记录日志信息,并返回。
providers := podresources.PodResourcesProviders{
Pods: kl.podManager,
Devices: kl.containerManager,
Cpus: kl.containerManager,
Memory: kl.containerManager,
DynamicResources: kl.containerManager,
}
server.ListenAndServePodResources(endpoint, providers)
}
//这段Go代码创建了一个podresources.PodResourcesProviders结构体实例providers,
//其中包含了Pods、Devices、Cpus、Memory和DynamicResources等字段,它们都是kl.containerManager的实例。
//接着,代码通过调用server.ListenAndServePodResources函数,启动了一个服务,监听指定的endpoint,并使用providers作为参数提供Pod资源。
// Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around.
func (kl *Kubelet) cleanUpContainersInPod(podID types.UID, exitedContainerID string) {
if podStatus, err := kl.podCache.Get(podID); err == nil {
// When an evicted or deleted pod has already synced, all containers can be removed.
removeAll := kl.podWorkers.ShouldPodContentBeRemoved(podID)
kl.containerDeletor.deleteContainersInPod(exitedContainerID, podStatus, removeAll)
}
}
//该函数用于清理Pod中的死亡容器实例。根据配置,可能会保留最新的死亡容器。
//函数首先通过kl.podCache.Get(podID)获取Pod的状态。
//然后,根据kl.podWorkers.ShouldPodContentBeRemoved(podID)的返回值,
//调用kl.containerDeletor.deleteContainersInPod(exitedContainerID, podStatus, removeAll)来删除容器。
// fastStatusUpdateOnce starts a loop that checks if the current state of kubelet + container runtime
// would be able to turn the node ready, and sync the ready state to the apiserver as soon as possible.
// Function returns after the node status update after such event, or when the node is already ready.
// Function is executed only during Kubelet start which improves latency to ready node by updating
// kubelet state, runtime status and node statuses ASAP.
func (kl *Kubelet) fastStatusUpdateOnce() {
ctx := context.Background()
start := kl.clock.Now()
stopCh := make(chan struct{})
//该函数用于在kubelet启动时快速更新节点状态。
//它会循环检查kubelet和容器运行时的状态,如果发现节点可以变为准备就绪状态,就会立即同步该状态到apiserver。
//函数会在节点状态更新后或节点已经准备就绪时返回。
//该函数通过启动一个上下文对象、记录开始时间并创建一个停止通道来实现循环的控制和终止。
// Keep trying to make fast node status update until either timeout is reached or an update is successful.
wait.Until(func() {
// fastNodeStatusUpdate returns true when it succeeds or when the grace period has expired
// (status was not updated within nodeReadyGracePeriod and the second argument below gets true),
// then we close the channel and abort the loop.
if kl.fastNodeStatusUpdate(ctx, kl.clock.Since(start) >= nodeReadyGracePeriod) {
close(stopCh)
}
}, 100*time.Millisecond, stopCh)
}
//这段Go代码使用了wait.Until函数,它会在给定的stopCh通道关闭之前,周期性地执行传入的函数。
//函数的主要作用是尝试进行快速节点状态更新,直到超时或更新成功。
//具体实现中,函数内部调用了kl.fastNodeStatusUpdate方法尝试更新节点状态。
//如果更新成功或超过了nodeReadyGracePeriod时间间隔,则会关闭stopCh通道,结束循环。
//总结起来,这段代码的功能是在一定时间内持续尝试更新节点状态,直到更新成功或超时。
// CheckpointContainer tries to checkpoint a container. The parameters are used to
// look up the specified container. If the container specified by the given parameters
// cannot be found an error is returned. If the container is found the container
// engine will be asked to checkpoint the given container into the kubelet's default
// checkpoint directory.
func (kl *Kubelet) CheckpointContainer(
ctx context.Context,
podUID types.UID,
podFullName,
containerName string,
options *runtimeapi.CheckpointContainerRequest,
) error {
container, err := kl.findContainer(ctx, podFullName, podUID, containerName)
if err != nil {
return err
}
if container == nil {
return fmt.Errorf("container %v not found", containerName)
}
options.Location = filepath.Join(
kl.getCheckpointsDir(),
fmt.Sprintf(
"checkpoint-%s-%s-%s.tar",
podFullName,
containerName,
time.Now().Format(time.RFC3339),
),
)
//该函数的功能是尝试对一个容器进行检查点操作。
//函数通过给定的参数查找指定的容器,如果找不到指定的容器,则返回一个错误。
//如果找到了容器,则会请求容器引擎将该容器检查点保存到kubelet的默认检查点目录中。
//具体来说,函数首先调用kl.findContainer函数来查找指定的容器,如果找不到容器或发生错误,则直接返回错误或nil。
//如果找到了容器,则通过调用filepath.Join函数来设置检查点的存储位置。
//存储位置由kubelet的默认检查点目录、pod全名、容器名和当前时间组成。最后,函数返回错误或nil。
options.ContainerId = string(container.ID.ID)
if err := kl.containerRuntime.CheckpointContainer(ctx, options); err != nil {
return err
}
return nil
}
//该函数的功能是在容器上执行检查点操作。
//首先,它将容器的ID转换为字符串,并将其赋值给options.ContainerId。
//然后,它调用kl.containerRuntime.CheckpointContainer函数来执行实际的检查点操作,将上下文和options作为参数传递。
//如果该函数执行失败并返回错误,则该函数将错误返回。
//如果该函数成功执行,则该函数将返回nil。
// ListMetricDescriptors gets the descriptors for the metrics that will be returned in ListPodSandboxMetrics.
func (kl *Kubelet) ListMetricDescriptors(ctx context.Context) ([]*runtimeapi.MetricDescriptor, error) {
return kl.containerRuntime.ListMetricDescriptors(ctx)
}
//该函数是Kubelet的一个方法,用于获取容器运行时的指标描述符。
//它通过调用containerRuntime的ListMetricDescriptors方法来实现。
//返回值是指标描述符的切片和可能的错误。
// ListPodSandboxMetrics retrieves the metrics for all pod sandboxes.
func (kl *Kubelet) ListPodSandboxMetrics(ctx context.Context) ([]*runtimeapi.PodSandboxMetrics, error) {
return kl.containerRuntime.ListPodSandboxMetrics(ctx)
}
//该函数是Kubelet的一个方法,用于获取所有Pod沙箱的指标信息。
//它通过调用containerRuntime的ListPodSandboxMetrics方法来实现。
//返回值是一个包含多个Pod沙箱指标信息的切片和一个错误(如果有)。
func (kl *Kubelet) supportLocalStorageCapacityIsolation() bool {
return kl.GetConfiguration().LocalStorageCapacityIsolation
}
//该函数用于判断Kubelet是否支持本地存储容量隔离。
//它通过调用Kubelet的GetConfiguration方法获取Kubelet的配置信息,
//然后返回LocalStorageCapacityIsolation字段的值来表示是否支持本地存储容量隔离。
// isSyncPodWorthy filters out events that are not worthy of pod syncing
func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool {
// ContainerRemoved doesn't affect pod state
return event.Type != pleg.ContainerRemoved
}
//该函数用于过滤掉不值得进行Pod同步的事件。具体实现中,判断事件的类型是否为ContainerRemoved,
//若不是,则返回true,表示该事件值得进行Pod同步;
//若是,则返回false,表示该事件不值得进行Pod同步。
// PrepareDynamicResources calls the container Manager PrepareDynamicResources API
// This method implements the RuntimeHelper interface
func (kl *Kubelet) PrepareDynamicResources(pod *v1.Pod) error {
return kl.containerManager.PrepareDynamicResources(pod)
}
//该函数是Kubelet的PrepareDynamicResources方法,实现了RuntimeHelper接口。
//它调用了container Manager的PrepareDynamicResources API,用于准备容器的动态资源。
// UnprepareDynamicResources calls the container Manager UnprepareDynamicResources API
// This method implements the RuntimeHelper interface
func (kl *Kubelet) UnprepareDynamicResources(pod *v1.Pod) error {
return kl.containerManager.UnprepareDynamicResources(pod)
}
//该函数是Kubelet的一个方法,实现了RuntimeHelper接口。
//它调用了container Manager的UnprepareDynamicResources API,用于准备Pod的动态资源。