2025-10-23 informer模式3

client-go Informer 机制如何检索 Kubernetes 资源的状态和详细信息 #

概述 #

client-go 的 informer 机制通过一个高效的缓存系统来检索和维护 Kubernetes 资源(如 Pod、Deployment 和 Service)的状态和详细信息。这个机制的核心是 Reflector 组件,它负责与 API 服务器通信并保持本地缓存与集群状态同步。 1

核心组件和数据流 #

整个 informer 机制包含以下关键组件:

  1. Reflector(反射器):执行 LIST 和 WATCH 操作
  2. DeltaFIFO:存储变更事件的队列
  3. Indexer(索引器/缓存):本地存储资源对象
  4. Event Handlers(事件处理器):响应资源变化 1

API 请求过程 #

1. LIST 请求 - 获取初始状态 #

Reflector 首先执行一个 LIST 操作来获取资源的一致性快照。这个操作使用 HTTP GET 请求到 API 服务器。 2

LIST 操作的具体实现过程:

  • 使用 metav1.ListOptions 指定 ResourceVersion 参数
  • 支持分页(pagination),可以通过 limitcontinue token 分批获取大量对象
  • 从返回的列表中提取 ResourceVersion,用于后续的 WATCH 操作 3

2. WATCH 请求 - 接收增量更新 #

在完成初始 LIST 后,Reflector 建立一个 WATCH 连接来接收后续的增量更新。WATCH 也使用 HTTP GET 请求,但带有 watch=true 查询参数。 4

WATCH 请求的关键特性:

  • 从 LIST 返回的 ResourceVersion 开始监听变更
  • 设置超时时间(TimeoutSeconds),通常在 5-10 分钟之间的随机值,以分散服务器负载
  • 启用 AllowWatchBookmarks=true 以支持书签机制,提高恢复效率
  • 返回一个事件流,包含 ADDED、MODIFIED、DELETED 和 BOOKMARK 事件 5

3. 处理 WATCH 事件 #

收到的 WATCH 事件会被处理并更新本地缓存:

  • ADDED 事件:将新对象添加到存储中
  • MODIFIED 事件:更新存储中的现有对象
  • DELETED 事件:从存储中删除对象
  • BOOKMARK 事件:更新 ResourceVersion,不修改存储内容 6

针对特定资源的实现 #

Pod Informer 示例 #

对于 Pod 资源,informer 的创建过程如下: 7

ListWatch 的构造包含四个函数:

  • ListFunc:调用 client.CoreV1().Pods(namespace).List()
  • WatchFunc:调用 client.CoreV1().Pods(namespace).Watch()
  • ListWithContextFunc:支持上下文的 List 版本
  • WatchFuncWithContext:支持上下文的 Watch 版本

这些函数最终会构造针对特定资源类型和命名空间的 API 请求。 8

Deployment 和 Service 的处理方式相同 #

Deployment 和 Service 资源遵循完全相同的模式,只是使用不同的 API 端点:

  • Pods: /api/v1/namespaces/{namespace}/pods
  • Deployments: /apis/apps/v1/namespaces/{namespace}/deployments
  • Services: /api/v1/namespaces/{namespace}/services

Watch-List 特性(新机制) #

从较新版本开始,client-go 支持 Watch-List 特性,这是一种优化的流式传输机制,可以将 LIST 和 WATCH 合并为一个请求: 9

Watch-List 模式的优势:

  • 减少服务器资源消耗
  • 通过单个流式连接获取初始状态和后续更新
  • 使用 SendInitialEvents: true 参数请求初始对象集
  • 收到带有 k8s.io/initial-events-end 注解的 BOOKMARK 事件后,表示初始同步完成 10

错误处理和重试机制 #

Reflector 实现了健壮的错误处理:

  1. ResourceVersion 过期:当 ResourceVersion 太旧时,重新执行 LIST 操作
  2. 连接失败:使用指数退避策略重试
  3. 速率限制(429 错误):自动退避并重试
  4. 内部错误(500 错误):在限定时间内重试 11

完整的使用示例 #

使用 informer 的典型模式: 12

Notes #

API 请求总结 #

  1. 初始同步:HTTP GET 请求到 /api/v1/pods(或其他资源路径)
  2. 持续监听:HTTP GET 请求到 /api/v1/pods?watch=true&resourceVersion=<版本号>
  3. 分页支持:对于大型列表,使用 limitcontinue 参数
  4. 超时控制:通过 timeoutSeconds 参数控制 WATCH 连接的生命周期

所有这些请求都检索完整的资源对象,包括 specstatusmetadata 字段,除非明确请求部分元数据(PartialObjectMetadata)。

informer 机制的设计确保了本地缓存始终与 API 服务器保持同步,同时最小化了网络开销和服务器负载,这使得 Kubernetes 控制器能够高效地监控和响应集群状态变化。

Citations #

File: staging/src/k8s.io/client-go/ARCHITECTURE.md (L96-121)

```mermaid
graph TD
    subgraph "Kubernetes API"
        API_Server[API Server]
    end

    subgraph "client-go: Informer Mechanism"
        Reflector("1. Reflector")
        DeltaFIFO("2. DeltaFIFO")
        Indexer["3. Indexer (Cache)"]
        EventHandlers("4. Event Handlers")
    end

    subgraph "User Code"
        WorkQueue["5. Work Queue"]
        Controller("6. Controller")
    end

    API_Server -- LIST/WATCH --> Reflector
    Reflector -- Puts changes into --> DeltaFIFO
    DeltaFIFO -- Is popped by internal loop, which updates --> Indexer
    Indexer -- Update triggers --> EventHandlers
    EventHandlers -- Adds key to --> WorkQueue
    WorkQueue -- Is processed by --> Controller
    Controller -- Reads from cache via Lister --> Indexer

**File:** staging/src/k8s.io/client-go/tools/cache/reflector.go (L386-397)
```go
// ListAndWatch first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
//
// Contextual logging: ListAndWatchWithContext should be used instead of ListAndWatch in code which supports contextual logging.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
	return r.ListAndWatchWithContext(wait.ContextForChannel(stopCh))
}

// ListAndWatchWithContext first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatchWithContext didn't even try to initialize watch.

File: staging/src/k8s.io/client-go/tools/cache/reflector.go (L411-434)

	if r.useWatchList {
		w, err = r.watchList(ctx)
		if w == nil && err == nil {
			// stopCh was closed
			return nil
		}
		if err != nil {
			logger.Error(err, "The watchlist request ended with an error, falling back to the standard LIST/WATCH semantics because making progress is better than deadlocking")
			fallbackToList = true
			// ensure that we won't accidentally pass some garbage down the watch.
			w = nil
		}
	}

	if fallbackToList {
		err = r.list(ctx)
		if err != nil {
			return err
		}
	}

	logger.V(2).Info("Caches populated", "type", r.typeDescription, "reflector", r.name)
	return r.watchWithResync(ctx, w)
}

File: staging/src/k8s.io/client-go/tools/cache/reflector.go (L486-570)

func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc chan error) error {
	stopCh := ctx.Done()
	logger := klog.FromContext(ctx)
	var err error
	retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock)
	defer func() {
		if w != nil {
			w.Stop()
		}
	}()

	for {
		// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
		select {
		case <-stopCh:
			// we can only end up here when the stopCh
			// was closed after a successful watchlist or list request
			return nil
		default:
		}

		// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
		start := r.clock.Now()

		if w == nil {
			timeoutSeconds := int64(r.minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
			options := metav1.ListOptions{
				ResourceVersion: r.LastSyncResourceVersion(),
				// We want to avoid situations of hanging watchers. Stop any watchers that do not
				// receive any events within the timeout window.
				TimeoutSeconds: &timeoutSeconds,
				// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
				// Reflector doesn't assume bookmarks are returned at all (if the server do not support
				// watch bookmarks, it will ignore this field).
				AllowWatchBookmarks: true,
			}

			w, err = r.listerWatcher.WatchWithContext(ctx, options)
			if err != nil {
				if canRetry := isWatchErrorRetriable(err); canRetry {
					logger.V(4).Info("Watch failed - backing off", "reflector", r.name, "type", r.typeDescription, "err", err)
					select {
					case <-stopCh:
						return nil
					case <-r.backoffManager.Backoff().C():
						continue
					}
				}
				return err
			}
		}

		err = handleWatch(ctx, start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion,
			r.clock, resyncerrc)
		// handleWatch always stops the watcher. So we don't need to here.
		// Just set it to nil to trigger a retry on the next loop.
		w = nil
		retry.After(err)
		if err != nil {
			if !errors.Is(err, errorStopRequested) {
				switch {
				case isExpiredError(err):
					// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
					// has a semantic that it returns data at least as fresh as provided RV.
					// So first try to LIST with setting RV to resource version of last observed object.
					logger.V(4).Info("Watch closed", "reflector", r.name, "type", r.typeDescription, "err", err)
				case apierrors.IsTooManyRequests(err):
					logger.V(2).Info("Watch returned 429 - backing off", "reflector", r.name, "type", r.typeDescription)
					select {
					case <-stopCh:
						return nil
					case <-r.backoffManager.Backoff().C():
						continue
					}
				case apierrors.IsInternalError(err) && retry.ShouldRetry():
					logger.V(2).Info("Retrying watch after internal error", "reflector", r.name, "type", r.typeDescription, "err", err)
					continue
				default:
					logger.Info("Warning: watch ended with error", "reflector", r.name, "type", r.typeDescription, "err", err)
				}
			}
			return nil
		}
	}
}

File: staging/src/k8s.io/client-go/tools/cache/reflector.go (L574-677)

func (r *Reflector) list(ctx context.Context) error {
	var resourceVersion string
	options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}

	initTrace := trace.New("Reflector ListAndWatch", trace.Field{Key: "name", Value: r.name})
	defer initTrace.LogIfLong(10 * time.Second)
	var list runtime.Object
	var paginatedResult bool
	var err error
	listCh := make(chan struct{}, 1)
	panicCh := make(chan interface{}, 1)
	go func() {
		defer func() {
			if r := recover(); r != nil {
				panicCh <- r
			}
		}()
		// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
		// list request will return the full response.
		pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
			return r.listerWatcher.ListWithContext(ctx, opts)
		}))
		switch {
		case r.WatchListPageSize != 0:
			pager.PageSize = r.WatchListPageSize
		case r.paginatedResult:
			// We got a paginated result initially. Assume this resource and server honor
			// paging requests (i.e. watch cache is probably disabled) and leave the default
			// pager size set.
		case options.ResourceVersion != "" && options.ResourceVersion != "0":
			// User didn't explicitly request pagination.
			//
			// With ResourceVersion != "", we have a possibility to list from watch cache,
			// but we do that (for ResourceVersion != "0") only if Limit is unset.
			// To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
			// switch off pagination to force listing from watch cache (if enabled).
			// With the existing semantic of RV (result is at least as fresh as provided RV),
			// this is correct and doesn't lead to going back in time.
			//
			// We also don't turn off pagination for ResourceVersion="0", since watch cache
			// is ignoring Limit in that case anyway, and if watch cache is not enabled
			// we don't introduce regression.
			pager.PageSize = 0
		}

		list, paginatedResult, err = pager.ListWithAlloc(context.Background(), options)
		if isExpiredError(err) || isTooLargeResourceVersionError(err) {
			r.setIsLastSyncResourceVersionUnavailable(true)
			// Retry immediately if the resource version used to list is unavailable.
			// The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
			// continuation pages, but the pager might not be enabled, the full list might fail because the
			// resource version it is listing at is expired or the cache may not yet be synced to the provided
			// resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
			// the reflector makes forward progress.
			list, paginatedResult, err = pager.ListWithAlloc(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
		}
		close(listCh)
	}()
	select {
	case <-ctx.Done():
		return nil
	case r := <-panicCh:
		panic(r)
	case <-listCh:
	}
	initTrace.Step("Objects listed", trace.Field{Key: "error", Value: err})
	if err != nil {
		return fmt.Errorf("failed to list %v: %w", r.typeDescription, err)
	}

	// We check if the list was paginated and if so set the paginatedResult based on that.
	// However, we want to do that only for the initial list (which is the only case
	// when we set ResourceVersion="0"). The reasoning behind it is that later, in some
	// situations we may force listing directly from etcd (by setting ResourceVersion="")
	// which will return paginated result, even if watch cache is enabled. However, in
	// that case, we still want to prefer sending requests to watch cache if possible.
	//
	// Paginated result returned for request with ResourceVersion="0" mean that watch
	// cache is disabled and there are a lot of objects of a given type. In such case,
	// there is no need to prefer listing from watch cache.
	if options.ResourceVersion == "0" && paginatedResult {
		r.paginatedResult = true
	}

	r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
	listMetaInterface, err := meta.ListAccessor(list)
	if err != nil {
		return fmt.Errorf("unable to understand list result %#v: %v", list, err)
	}
	resourceVersion = listMetaInterface.GetResourceVersion()
	initTrace.Step("Resource version extracted")
	items, err := meta.ExtractListWithAlloc(list)
	if err != nil {
		return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
	}
	initTrace.Step("Objects extracted")
	if err := r.syncWith(items, resourceVersion); err != nil {
		return fmt.Errorf("unable to sync list result: %v", err)
	}
	initTrace.Step("SyncWith done")
	r.setLastSyncResourceVersion(resourceVersion)
	initTrace.Step("Resource version updated")
	return nil
}

File: staging/src/k8s.io/client-go/tools/cache/reflector.go (L679-698)

// watchList establishes a stream to get a consistent snapshot of data
// from the server as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#proposal
//
// case 1: start at Most Recent (RV="", ResourceVersionMatch=ResourceVersionMatchNotOlderThan)
// Establishes a consistent stream with the server.
// That means the returned data is consistent, as if, served directly from etcd via a quorum read.
// It begins with synthetic "Added" events of all resources up to the most recent ResourceVersion.
// It ends with a synthetic "Bookmark" event containing the most recent ResourceVersion.
// After receiving a "Bookmark" event the reflector is considered to be synchronized.
// It replaces its internal store with the collected items and
// reuses the current watch requests for getting further events.
//
// case 2: start at Exact (RV>"0", ResourceVersionMatch=ResourceVersionMatchNotOlderThan)
// Establishes a stream with the server at the provided resource version.
// To establish the initial state the server begins with synthetic "Added" events.
// It ends with a synthetic "Bookmark" event containing the provided or newer resource version.
// After receiving a "Bookmark" event the reflector is considered to be synchronized.
// It replaces its internal store with the collected items and
// reuses the current watch requests for getting further events.
func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) {

File: staging/src/k8s.io/client-go/tools/cache/reflector.go (L927-954)

			switch event.Type {
			case watch.Added:
				err := store.Add(event.Object)
				if err != nil {
					utilruntime.HandleErrorWithContext(ctx, err, "Unable to add watch event object to store", "reflector", name, "object", event.Object)
				}
			case watch.Modified:
				err := store.Update(event.Object)
				if err != nil {
					utilruntime.HandleErrorWithContext(ctx, err, "Unable to update watch event object to store", "reflector", name, "object", event.Object)
				}
			case watch.Deleted:
				// TODO: Will any consumers need access to the "last known
				// state", which is passed in event.Object? If so, may need
				// to change this.
				err := store.Delete(event.Object)
				if err != nil {
					utilruntime.HandleErrorWithContext(ctx, err, "Unable to delete watch event object from store", "reflector", name, "object", event.Object)
				}
			case watch.Bookmark:
				// A `Bookmark` means watch has synced here, just update the resourceVersion
				if meta.GetAnnotations()[metav1.InitialEventsAnnotationKey] == "true" {
					watchListBookmarkReceived = true
				}
			default:
				utilruntime.HandleErrorWithContext(ctx, err, "Unknown watch event", "reflector", name, "event", event)
			}
			setLastSyncResourceVersion(resourceVersion)

File: staging/src/k8s.io/client-go/informers/core/v1/pod.go (L58-90)

func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
	return cache.NewSharedIndexInformer(
		&cache.ListWatch{
			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.CoreV1().Pods(namespace).List(context.Background(), options)
			},
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.CoreV1().Pods(namespace).Watch(context.Background(), options)
			},
			ListWithContextFunc: func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.CoreV1().Pods(namespace).List(ctx, options)
			},
			WatchFuncWithContext: func(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.CoreV1().Pods(namespace).Watch(ctx, options)
			},
		},
		&apicorev1.Pod{},
		resyncPeriod,
		indexers,
	)
}

File: staging/src/k8s.io/client-go/tools/cache/listwatch.go (L199-242)

func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
	listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
		optionsModifier(&options)
		return c.Get().
			Namespace(namespace).
			Resource(resource).
			VersionedParams(&options, metav1.ParameterCodec).
			Do(context.Background()).
			Get()
	}
	watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
		options.Watch = true
		optionsModifier(&options)
		return c.Get().
			Namespace(namespace).
			Resource(resource).
			VersionedParams(&options, metav1.ParameterCodec).
			Watch(context.Background())
	}
	listFuncWithContext := func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
		optionsModifier(&options)
		return c.Get().
			Namespace(namespace).
			Resource(resource).
			VersionedParams(&options, metav1.ParameterCodec).
			Do(ctx).
			Get()
	}
	watchFuncWithContext := func(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) {
		options.Watch = true
		optionsModifier(&options)
		return c.Get().
			Namespace(namespace).
			Resource(resource).
			VersionedParams(&options, metav1.ParameterCodec).
			Watch(ctx)
	}
	return &ListWatch{
		ListFunc:             listFunc,
		WatchFunc:            watchFunc,
		ListWithContextFunc:  listFuncWithContext,
		WatchFuncWithContext: watchFuncWithContext,
	}
}

File: staging/src/k8s.io/client-go/example_test.go (L184-269)

func Example_usingInformers() {
	// This example demonstrates the basic pattern for using an informer to watch
	// for changes to Pods. This is a conceptual example; a real controller would
	// have more robust logic and a workqueue.

	// Configure the client (out-of-cluster for this example).
	var kubeconfig string
	if home := os.Getenv("HOME"); home != "" {
		kubeconfig = filepath.Join(home, ".kube", "config")
	}
	config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
	if err != nil {
		fmt.Printf("Error encountered: %v\n", err)
		return
	}
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		fmt.Printf("Error encountered: %v\n", err)
		return
	}

	// A SharedInformerFactory provides a shared cache for multiple informers,
	// which reduces memory and network overhead.
	factory := informers.NewSharedInformerFactory(clientset, 10*time.Minute)
	podInformer := factory.Core().V1().Pods().Informer()

	_, err = podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			key, err := cache.MetaNamespaceKeyFunc(obj)
			if err == nil {
				log.Printf("Pod ADDED: %s", key)
			}
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			key, err := cache.MetaNamespaceKeyFunc(newObj)
			if err == nil {
				log.Printf("Pod UPDATED: %s", key)
			}
		},
		DeleteFunc: func(obj interface{}) {
			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
			if err == nil {
				log.Printf("Pod DELETED: %s", key)
			}
		},
	})
	if err != nil {
		fmt.Printf("Error encountered: %v\n", err)
		return
	}

	// Graceful shutdown requires a two-channel pattern.
	//
	// The first channel, `sigCh`, is used by the `signal` package to send us
	// OS signals (e.g., Ctrl+C). This channel must be of type `chan os.Signal`.
	//
	// The second channel, `stopCh`, is used to tell the informer factory to
	// stop. The informer factory's `Start` method expects a channel of type
	// `<-chan struct{}`. It will stop when this channel is closed.
	//
	// The goroutine below is the "translator" that connects these two channels.
	// It waits for a signal on `sigCh`, and when it receives one, it closes
	// `stopCh`, which in turn tells the informer factory to shut down.
	sigCh := make(chan os.Signal, 1)
	signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
	stopCh := make(chan struct{})
	go func() {
		<-sigCh
		close(stopCh)
	}()

	// Start the informer.
	factory.Start(stopCh)

	// Wait for the initial cache sync.
	if !cache.WaitForCacheSync(stopCh, podInformer.HasSynced) {
		log.Println("Timed out waiting for caches to sync")
		return
	}

	log.Println("Informer has synced. Watching for Pod events...")

	// Wait for the stop signal.
	<-stopCh
	log.Println("Shutting down...")
}