client-go Informer 机制如何检索 Kubernetes 资源的状态和详细信息 #
概述 #
client-go 的 informer 机制通过一个高效的缓存系统来检索和维护 Kubernetes 资源(如 Pod、Deployment 和 Service)的状态和详细信息。这个机制的核心是 Reflector 组件,它负责与 API 服务器通信并保持本地缓存与集群状态同步。 1
核心组件和数据流 #
整个 informer 机制包含以下关键组件:
- Reflector(反射器):执行 LIST 和 WATCH 操作
- DeltaFIFO:存储变更事件的队列
- Indexer(索引器/缓存):本地存储资源对象
- Event Handlers(事件处理器):响应资源变化 1
API 请求过程 #
1. LIST 请求 - 获取初始状态 #
Reflector 首先执行一个 LIST 操作来获取资源的一致性快照。这个操作使用 HTTP GET 请求到 API 服务器。 2
LIST 操作的具体实现过程:
- 使用
metav1.ListOptions指定 ResourceVersion 参数 - 支持分页(pagination),可以通过
limit和continuetoken 分批获取大量对象 - 从返回的列表中提取 ResourceVersion,用于后续的 WATCH 操作 3
2. WATCH 请求 - 接收增量更新 #
在完成初始 LIST 后,Reflector 建立一个 WATCH 连接来接收后续的增量更新。WATCH 也使用 HTTP GET 请求,但带有 watch=true 查询参数。 4
WATCH 请求的关键特性:
- 从 LIST 返回的 ResourceVersion 开始监听变更
- 设置超时时间(TimeoutSeconds),通常在 5-10 分钟之间的随机值,以分散服务器负载
- 启用
AllowWatchBookmarks=true以支持书签机制,提高恢复效率 - 返回一个事件流,包含 ADDED、MODIFIED、DELETED 和 BOOKMARK 事件 5
3. 处理 WATCH 事件 #
收到的 WATCH 事件会被处理并更新本地缓存:
- ADDED 事件:将新对象添加到存储中
- MODIFIED 事件:更新存储中的现有对象
- DELETED 事件:从存储中删除对象
- BOOKMARK 事件:更新 ResourceVersion,不修改存储内容 6
针对特定资源的实现 #
Pod Informer 示例 #
对于 Pod 资源,informer 的创建过程如下: 7
ListWatch 的构造包含四个函数:
ListFunc:调用client.CoreV1().Pods(namespace).List()WatchFunc:调用client.CoreV1().Pods(namespace).Watch()ListWithContextFunc:支持上下文的 List 版本WatchFuncWithContext:支持上下文的 Watch 版本
这些函数最终会构造针对特定资源类型和命名空间的 API 请求。 8
Deployment 和 Service 的处理方式相同 #
Deployment 和 Service 资源遵循完全相同的模式,只是使用不同的 API 端点:
- Pods:
/api/v1/namespaces/{namespace}/pods - Deployments:
/apis/apps/v1/namespaces/{namespace}/deployments - Services:
/api/v1/namespaces/{namespace}/services
Watch-List 特性(新机制) #
从较新版本开始,client-go 支持 Watch-List 特性,这是一种优化的流式传输机制,可以将 LIST 和 WATCH 合并为一个请求: 9
Watch-List 模式的优势:
- 减少服务器资源消耗
- 通过单个流式连接获取初始状态和后续更新
- 使用
SendInitialEvents: true参数请求初始对象集 - 收到带有
k8s.io/initial-events-end注解的 BOOKMARK 事件后,表示初始同步完成 10
错误处理和重试机制 #
Reflector 实现了健壮的错误处理:
- ResourceVersion 过期:当 ResourceVersion 太旧时,重新执行 LIST 操作
- 连接失败:使用指数退避策略重试
- 速率限制(429 错误):自动退避并重试
- 内部错误(500 错误):在限定时间内重试 11
完整的使用示例 #
使用 informer 的典型模式: 12
Notes #
API 请求总结 #
- 初始同步:HTTP GET 请求到
/api/v1/pods(或其他资源路径) - 持续监听:HTTP GET 请求到
/api/v1/pods?watch=true&resourceVersion=<版本号> - 分页支持:对于大型列表,使用
limit和continue参数 - 超时控制:通过
timeoutSeconds参数控制 WATCH 连接的生命周期
所有这些请求都检索完整的资源对象,包括 spec、status 和 metadata 字段,除非明确请求部分元数据(PartialObjectMetadata)。
informer 机制的设计确保了本地缓存始终与 API 服务器保持同步,同时最小化了网络开销和服务器负载,这使得 Kubernetes 控制器能够高效地监控和响应集群状态变化。
Citations #
File: staging/src/k8s.io/client-go/ARCHITECTURE.md (L96-121)
```mermaid
graph TD
subgraph "Kubernetes API"
API_Server[API Server]
end
subgraph "client-go: Informer Mechanism"
Reflector("1. Reflector")
DeltaFIFO("2. DeltaFIFO")
Indexer["3. Indexer (Cache)"]
EventHandlers("4. Event Handlers")
end
subgraph "User Code"
WorkQueue["5. Work Queue"]
Controller("6. Controller")
end
API_Server -- LIST/WATCH --> Reflector
Reflector -- Puts changes into --> DeltaFIFO
DeltaFIFO -- Is popped by internal loop, which updates --> Indexer
Indexer -- Update triggers --> EventHandlers
EventHandlers -- Adds key to --> WorkQueue
WorkQueue -- Is processed by --> Controller
Controller -- Reads from cache via Lister --> Indexer
**File:** staging/src/k8s.io/client-go/tools/cache/reflector.go (L386-397)
```go
// ListAndWatch first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
//
// Contextual logging: ListAndWatchWithContext should be used instead of ListAndWatch in code which supports contextual logging.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
return r.ListAndWatchWithContext(wait.ContextForChannel(stopCh))
}
// ListAndWatchWithContext first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatchWithContext didn't even try to initialize watch.
File: staging/src/k8s.io/client-go/tools/cache/reflector.go (L411-434)
if r.useWatchList {
w, err = r.watchList(ctx)
if w == nil && err == nil {
// stopCh was closed
return nil
}
if err != nil {
logger.Error(err, "The watchlist request ended with an error, falling back to the standard LIST/WATCH semantics because making progress is better than deadlocking")
fallbackToList = true
// ensure that we won't accidentally pass some garbage down the watch.
w = nil
}
}
if fallbackToList {
err = r.list(ctx)
if err != nil {
return err
}
}
logger.V(2).Info("Caches populated", "type", r.typeDescription, "reflector", r.name)
return r.watchWithResync(ctx, w)
}
File: staging/src/k8s.io/client-go/tools/cache/reflector.go (L486-570)
func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc chan error) error {
stopCh := ctx.Done()
logger := klog.FromContext(ctx)
var err error
retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock)
defer func() {
if w != nil {
w.Stop()
}
}()
for {
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
select {
case <-stopCh:
// we can only end up here when the stopCh
// was closed after a successful watchlist or list request
return nil
default:
}
// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
start := r.clock.Now()
if w == nil {
timeoutSeconds := int64(r.minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options := metav1.ListOptions{
ResourceVersion: r.LastSyncResourceVersion(),
// We want to avoid situations of hanging watchers. Stop any watchers that do not
// receive any events within the timeout window.
TimeoutSeconds: &timeoutSeconds,
// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
// Reflector doesn't assume bookmarks are returned at all (if the server do not support
// watch bookmarks, it will ignore this field).
AllowWatchBookmarks: true,
}
w, err = r.listerWatcher.WatchWithContext(ctx, options)
if err != nil {
if canRetry := isWatchErrorRetriable(err); canRetry {
logger.V(4).Info("Watch failed - backing off", "reflector", r.name, "type", r.typeDescription, "err", err)
select {
case <-stopCh:
return nil
case <-r.backoffManager.Backoff().C():
continue
}
}
return err
}
}
err = handleWatch(ctx, start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion,
r.clock, resyncerrc)
// handleWatch always stops the watcher. So we don't need to here.
// Just set it to nil to trigger a retry on the next loop.
w = nil
retry.After(err)
if err != nil {
if !errors.Is(err, errorStopRequested) {
switch {
case isExpiredError(err):
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
// has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object.
logger.V(4).Info("Watch closed", "reflector", r.name, "type", r.typeDescription, "err", err)
case apierrors.IsTooManyRequests(err):
logger.V(2).Info("Watch returned 429 - backing off", "reflector", r.name, "type", r.typeDescription)
select {
case <-stopCh:
return nil
case <-r.backoffManager.Backoff().C():
continue
}
case apierrors.IsInternalError(err) && retry.ShouldRetry():
logger.V(2).Info("Retrying watch after internal error", "reflector", r.name, "type", r.typeDescription, "err", err)
continue
default:
logger.Info("Warning: watch ended with error", "reflector", r.name, "type", r.typeDescription, "err", err)
}
}
return nil
}
}
}
File: staging/src/k8s.io/client-go/tools/cache/reflector.go (L574-677)
func (r *Reflector) list(ctx context.Context) error {
var resourceVersion string
options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
initTrace := trace.New("Reflector ListAndWatch", trace.Field{Key: "name", Value: r.name})
defer initTrace.LogIfLong(10 * time.Second)
var list runtime.Object
var paginatedResult bool
var err error
listCh := make(chan struct{}, 1)
panicCh := make(chan interface{}, 1)
go func() {
defer func() {
if r := recover(); r != nil {
panicCh <- r
}
}()
// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
// list request will return the full response.
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.ListWithContext(ctx, opts)
}))
switch {
case r.WatchListPageSize != 0:
pager.PageSize = r.WatchListPageSize
case r.paginatedResult:
// We got a paginated result initially. Assume this resource and server honor
// paging requests (i.e. watch cache is probably disabled) and leave the default
// pager size set.
case options.ResourceVersion != "" && options.ResourceVersion != "0":
// User didn't explicitly request pagination.
//
// With ResourceVersion != "", we have a possibility to list from watch cache,
// but we do that (for ResourceVersion != "0") only if Limit is unset.
// To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
// switch off pagination to force listing from watch cache (if enabled).
// With the existing semantic of RV (result is at least as fresh as provided RV),
// this is correct and doesn't lead to going back in time.
//
// We also don't turn off pagination for ResourceVersion="0", since watch cache
// is ignoring Limit in that case anyway, and if watch cache is not enabled
// we don't introduce regression.
pager.PageSize = 0
}
list, paginatedResult, err = pager.ListWithAlloc(context.Background(), options)
if isExpiredError(err) || isTooLargeResourceVersionError(err) {
r.setIsLastSyncResourceVersionUnavailable(true)
// Retry immediately if the resource version used to list is unavailable.
// The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
// continuation pages, but the pager might not be enabled, the full list might fail because the
// resource version it is listing at is expired or the cache may not yet be synced to the provided
// resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
// the reflector makes forward progress.
list, paginatedResult, err = pager.ListWithAlloc(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
}
close(listCh)
}()
select {
case <-ctx.Done():
return nil
case r := <-panicCh:
panic(r)
case <-listCh:
}
initTrace.Step("Objects listed", trace.Field{Key: "error", Value: err})
if err != nil {
return fmt.Errorf("failed to list %v: %w", r.typeDescription, err)
}
// We check if the list was paginated and if so set the paginatedResult based on that.
// However, we want to do that only for the initial list (which is the only case
// when we set ResourceVersion="0"). The reasoning behind it is that later, in some
// situations we may force listing directly from etcd (by setting ResourceVersion="")
// which will return paginated result, even if watch cache is enabled. However, in
// that case, we still want to prefer sending requests to watch cache if possible.
//
// Paginated result returned for request with ResourceVersion="0" mean that watch
// cache is disabled and there are a lot of objects of a given type. In such case,
// there is no need to prefer listing from watch cache.
if options.ResourceVersion == "0" && paginatedResult {
r.paginatedResult = true
}
r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
return fmt.Errorf("unable to understand list result %#v: %v", list, err)
}
resourceVersion = listMetaInterface.GetResourceVersion()
initTrace.Step("Resource version extracted")
items, err := meta.ExtractListWithAlloc(list)
if err != nil {
return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
}
initTrace.Step("Objects extracted")
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("unable to sync list result: %v", err)
}
initTrace.Step("SyncWith done")
r.setLastSyncResourceVersion(resourceVersion)
initTrace.Step("Resource version updated")
return nil
}
File: staging/src/k8s.io/client-go/tools/cache/reflector.go (L679-698)
// watchList establishes a stream to get a consistent snapshot of data
// from the server as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#proposal
//
// case 1: start at Most Recent (RV="", ResourceVersionMatch=ResourceVersionMatchNotOlderThan)
// Establishes a consistent stream with the server.
// That means the returned data is consistent, as if, served directly from etcd via a quorum read.
// It begins with synthetic "Added" events of all resources up to the most recent ResourceVersion.
// It ends with a synthetic "Bookmark" event containing the most recent ResourceVersion.
// After receiving a "Bookmark" event the reflector is considered to be synchronized.
// It replaces its internal store with the collected items and
// reuses the current watch requests for getting further events.
//
// case 2: start at Exact (RV>"0", ResourceVersionMatch=ResourceVersionMatchNotOlderThan)
// Establishes a stream with the server at the provided resource version.
// To establish the initial state the server begins with synthetic "Added" events.
// It ends with a synthetic "Bookmark" event containing the provided or newer resource version.
// After receiving a "Bookmark" event the reflector is considered to be synchronized.
// It replaces its internal store with the collected items and
// reuses the current watch requests for getting further events.
func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) {
File: staging/src/k8s.io/client-go/tools/cache/reflector.go (L927-954)
switch event.Type {
case watch.Added:
err := store.Add(event.Object)
if err != nil {
utilruntime.HandleErrorWithContext(ctx, err, "Unable to add watch event object to store", "reflector", name, "object", event.Object)
}
case watch.Modified:
err := store.Update(event.Object)
if err != nil {
utilruntime.HandleErrorWithContext(ctx, err, "Unable to update watch event object to store", "reflector", name, "object", event.Object)
}
case watch.Deleted:
// TODO: Will any consumers need access to the "last known
// state", which is passed in event.Object? If so, may need
// to change this.
err := store.Delete(event.Object)
if err != nil {
utilruntime.HandleErrorWithContext(ctx, err, "Unable to delete watch event object from store", "reflector", name, "object", event.Object)
}
case watch.Bookmark:
// A `Bookmark` means watch has synced here, just update the resourceVersion
if meta.GetAnnotations()[metav1.InitialEventsAnnotationKey] == "true" {
watchListBookmarkReceived = true
}
default:
utilruntime.HandleErrorWithContext(ctx, err, "Unknown watch event", "reflector", name, "event", event)
}
setLastSyncResourceVersion(resourceVersion)
File: staging/src/k8s.io/client-go/informers/core/v1/pod.go (L58-90)
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).List(context.Background(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).Watch(context.Background(), options)
},
ListWithContextFunc: func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).List(ctx, options)
},
WatchFuncWithContext: func(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).Watch(ctx, options)
},
},
&apicorev1.Pod{},
resyncPeriod,
indexers,
)
}
File: staging/src/k8s.io/client-go/tools/cache/listwatch.go (L199-242)
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Do(context.Background()).
Get()
}
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
options.Watch = true
optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Watch(context.Background())
}
listFuncWithContext := func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Do(ctx).
Get()
}
watchFuncWithContext := func(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) {
options.Watch = true
optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Watch(ctx)
}
return &ListWatch{
ListFunc: listFunc,
WatchFunc: watchFunc,
ListWithContextFunc: listFuncWithContext,
WatchFuncWithContext: watchFuncWithContext,
}
}
File: staging/src/k8s.io/client-go/example_test.go (L184-269)
func Example_usingInformers() {
// This example demonstrates the basic pattern for using an informer to watch
// for changes to Pods. This is a conceptual example; a real controller would
// have more robust logic and a workqueue.
// Configure the client (out-of-cluster for this example).
var kubeconfig string
if home := os.Getenv("HOME"); home != "" {
kubeconfig = filepath.Join(home, ".kube", "config")
}
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
fmt.Printf("Error encountered: %v\n", err)
return
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
fmt.Printf("Error encountered: %v\n", err)
return
}
// A SharedInformerFactory provides a shared cache for multiple informers,
// which reduces memory and network overhead.
factory := informers.NewSharedInformerFactory(clientset, 10*time.Minute)
podInformer := factory.Core().V1().Pods().Informer()
_, err = podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
log.Printf("Pod ADDED: %s", key)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(newObj)
if err == nil {
log.Printf("Pod UPDATED: %s", key)
}
},
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
log.Printf("Pod DELETED: %s", key)
}
},
})
if err != nil {
fmt.Printf("Error encountered: %v\n", err)
return
}
// Graceful shutdown requires a two-channel pattern.
//
// The first channel, `sigCh`, is used by the `signal` package to send us
// OS signals (e.g., Ctrl+C). This channel must be of type `chan os.Signal`.
//
// The second channel, `stopCh`, is used to tell the informer factory to
// stop. The informer factory's `Start` method expects a channel of type
// `<-chan struct{}`. It will stop when this channel is closed.
//
// The goroutine below is the "translator" that connects these two channels.
// It waits for a signal on `sigCh`, and when it receives one, it closes
// `stopCh`, which in turn tells the informer factory to shut down.
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
stopCh := make(chan struct{})
go func() {
<-sigCh
close(stopCh)
}()
// Start the informer.
factory.Start(stopCh)
// Wait for the initial cache sync.
if !cache.WaitForCacheSync(stopCh, podInformer.HasSynced) {
log.Println("Timed out waiting for caches to sync")
return
}
log.Println("Informer has synced. Watching for Pod events...")
// Wait for the stop signal.
<-stopCh
log.Println("Shutting down...")
}