简介

Client-Go 共提供了 4 种与 Kubernetes APIServer 交互的客户端。分别是 RESTClient、DiscoveryClient、ClientSet、DynamicClient
  • • **RESTClient:**最基础的客户端,主要是对 HTTP 请求进行了封装,支持 Json 和 Protobuf 格式的数据。
  • • **DiscoveryClient:**发现客户端,负责发现 APIServer 支持的资源组、资源版本和资源信息的。
  • • **ClientSet:**负责操作 Kubernetes 内置的资源对象,例如:Pod、Service等。
  • • **DynamicClient:**动态客户端,可以对任意的 Kubernetes 资源对象进行通用操作,包括 CRD。

image-20240319150324140


RESTClient

上图可以看出 RESTClient 是所有 Client 的父类
它就是对 HTTP Request 进行了封装,实现了 RESTFul 风格的 API,可以直接通过 RESTClient 提供的 RESTful 方法 GET(),PUT(),POST(),DELETE() 操作数据
  • • 同时支持 json 和 protobuf
  • • 支持所有原生资源和 CRD

示例

使用 RESTClient 获取 K8S 集群 pod 资源
package main

import (
    "context"
    "flag"
    "fmt"
    "os"
    "path/filepath"

    corev1 "k8s.io/api/core/v1"
    "k8s.io/client-go/kubernetes/scheme"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/clientcmd"
)

// 获取系统家目录
func homeDir() string {
    if h := os.Getenv("HOME"); h != "" {
        return h
    }

    // for windows
    return os.Getenv("USERPROFILE")
}

func main() {
    var kubeConfig *string
    var err error
    var config *rest.Config

    // 获取 kubeconfig 文件路径
    if h := homeDir(); h != "" {
        kubeConfig = flag.String("kubeConfig", filepath.Join(h, ".kube", "config"), "use kubeconfig to access kube-apiserver")
    } else {
        kubeConfig = flag.String("kubeConfig", "", "use kubeconfig to access kube-apiserver")
    }
    flag.Parse()

    // 获取 kubeconfig
    config, err = clientcmd.BuildConfigFromFlags("", *kubeConfig)
    if err != nil {
        panic(err.Error())
    }

    // 使用 RESTClient 需要开发者自行设置资源 URL
    // pod 资源没有 group,在核心组,所以前缀是 api
    config.APIPath = "api"
    // 设置 corev1 groupVersion
    config.GroupVersion = &corev1.SchemeGroupVersion
    // 设置解析器,用于用于解析 scheme
    config.NegotiatedSerializer = scheme.Codecs.WithoutConversion()
    // 初始化 RESTClient
    restClient, err := rest.RESTClientFor(config)
    if err != nil {
        panic(err.Error())
    }
    // 调用结果用 podList 解析
    result := &corev1.PodList{}
    // 获取 kube-system 命名空间的 pod
    namespace := "kube-system"
    // 链式调用 RESTClient 方法获取,并将结果解析到 corev1.PodList{}
    err = restClient.Get().Namespace(namespace).Resource("pods").Do(context.TODO()).Into(result)
    if err != nil {
        panic(err.Error())
    }

    // 打印结果
    for _, pod := range result.Items {
        fmt.Printf("namespace: %s, pod: %s\n", pod.Namespace, pod.Name)
    }
}
程序结果如下:
namespace: kube-system, pod: coredns-697ddfb55c-5lk74
namespace: kube-system, pod: coredns-697ddfb55c-nnkhp
namespace: kube-system, pod: etcd-master-172-31-97-104
namespace: kube-system, pod: kube-apiserver-master-172-31-97-104
namespace: kube-system, pod: kube-controller-manager-master-172-31-97-104
namespace: kube-system, pod: kube-lvscare-node-172-31-97-105
namespace: kube-system, pod: kube-proxy-49k8k
namespace: kube-system, pod: kube-proxy-fvf57
namespace: kube-system, pod: kube-scheduler-master-172-31-97-104
namespace: kube-system, pod: metrics-server-7f6f9649f9-qvvj8

RESTClient 原理

初始化 RESTClient,可以发现对原生 HTTP 库进行了封装了
func RESTClientFor(config *Config) (*RESTClient, error) {
    if config.GroupVersion == nil {
        return nil, fmt.Errorf("GroupVersion is required when initializing a RESTClient")
    }
    if config.NegotiatedSerializer == nil {
        return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")
    }

    // Validate config.Host before constructing the transport/client so we can fail fast.
    // ServerURL will be obtained later in RESTClientForConfigAndClient()
    _, _, err := defaultServerUrlFor(config)
    if err != nil {
        return nil, err
    }

    // 获取原生 http client
    httpClient, err := HTTPClientFor(config)
    if err != nil {
        return nil, err
    }
    
    // 初始化 RESTClient
    return RESTClientForConfigAndClient(config, httpClient)
}
RESTClient 实现了 Interface 接口
type Interface interface {
    GetRateLimiter() flowcontrol.RateLimiter
    Verb(verb string) *Request
    Post() *Request
    Put() *Request
    Patch(pt types.PatchType) *Request
    Get() *Request
    Delete() *Request
    APIVersion() schema.GroupVersion
}
RESTClient 的链式调用主要是设置 namespace,资源 name,一些选择器等,最终调用 Do() 方法网络调用
func (r *Request) Do(ctx context.Context) Result {
    var result Result
    err := r.request(ctx, func(req *http.Request, resp *http.Response) {
        result = r.transformResponse(resp, req)
    })
    if err != nil {
        return Result{err: err}
    }
    return result
}

func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)) error {
    //Metrics for total request latency
    start := time.Now()
    defer func() {
        metrics.RequestLatency.Observe(ctx, r.verb, r.finalURLTemplate(), time.Since(start))
    }()

    if r.err != nil {
        klog.V(4).Infof("Error in request: %v", r.err)
        return r.err
    }

    if err := r.requestPreflightCheck(); err != nil {
        return err
    }

    client := r.c.Client
    if client == nil {
        client = http.DefaultClient
    }

    // Throttle the first try before setting up the timeout configured on the
    // client. We don't want a throttled client to return timeouts to callers
    // before it makes a single request.
    if err := r.tryThrottle(ctx); err != nil {
        return err
    }

    if r.timeout > 0 {
        var cancel context.CancelFunc
        ctx, cancel = context.WithTimeout(ctx, r.timeout)
        defer cancel()
    }

    // Right now we make about ten retry attempts if we get a Retry-After response.
    var retryAfter *RetryAfter
    for {
        // 初始化网络请求
        req, err := r.newHTTPRequest(ctx)
        if err != nil {
            return err
        }

        r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
        if retryAfter != nil {
            // We are retrying the request that we already send to apiserver
            // at least once before.
            // This request should also be throttled with the client-internal rate limiter.
            if err := r.tryThrottleWithInfo(ctx, retryAfter.Reason); err != nil {
                return err
            }
            retryAfter = nil
        }
        // 发起网络调用
        resp, err := client.Do(req)
        updateURLMetrics(ctx, r, resp, err)
        if err != nil {
            r.backoff.UpdateBackoff(r.URL(), err, 0)
        } else {
            r.backoff.UpdateBackoff(r.URL(), err, resp.StatusCode)
        }

        done := func() bool {
            defer readAndCloseResponseBody(resp)

            // if the the server returns an error in err, the response will be nil.
            f := func(req *http.Request, resp *http.Response) {
                if resp == nil {
                    return
                }
                fn(req, resp)
            }

            var retry bool
            retryAfter, retry = r.retry.NextRetry(req, resp, err, func(req *http.Request, err error) bool {
                // "Connection reset by peer" or "apiserver is shutting down" are usually a transient errors.
                // Thus in case of "GET" operations, we simply retry it.
                // We are not automatically retrying "write" operations, as they are not idempotent.
                if r.verb != "GET" {
                    return false
                }
                // For connection errors and apiserver shutdown errors retry.
                if net.IsConnectionReset(err) || net.IsProbableEOF(err) {
                    return true
                }
                return false
            })
            if retry {
                err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, req.URL.String(), r.body)
                if err == nil {
                    return false
                }
                klog.V(4).Infof("Could not retry request - %v", err)
            }

            f(req, resp)
            return true
        }()
        if done {
            return err
        }
    }
}

ClientSet

ClientSet 在调用 Kubernetes 内置资源非常常用,但是无法操作自定义资源,需要实现自定义资源的 ClientSet 才能操作。
ClientSet 是在 RESTClient 的基础上封装了对 Resource 和 Version 的管理方法,Client-go 对 Kubernetes 每一个内置资源都封装了 Client,而 ClientSet 就是多个 Client 的集合。

示例

使用 ClientSet 获取 K8S 集群 pod 资源
package main

import (
    "context"
    "flag"
    "fmt"
    "os"
    "path/filepath"

    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
)

func homeDir() string {
    if h := os.Getenv("HOME"); h != "" {
        return h
    }

    return os.Getenv("USERROFILE")
}

func main() {
    var kubeConfig *string

    if h := homeDir(); h != "" {
        kubeConfig = flag.String("kubeConfig", filepath.Join(h, ".kube", "config"), "use kubeconfig to access kube-apiserver")
    } else {
        kubeConfig = flag.String("kubeConfig", "", "use kubeconfig to access kube-apiserver")
    }
    flag.Parse()

    config, err := clientcmd.BuildConfigFromFlags("", *kubeConfig)
    if err != nil {
        panic(err.Error())
    }

    // 获取 clientSet
    clientSet, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err.Error())
    }

    namespace := "kube-system"
    // 链式调用 ClientSet 获取 pod 列表
    podList, err := clientSet.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{})
    if err != nil {
        panic(err.Error())
    }

    for _, pod := range podList.Items {
        fmt.Printf("namespace: %s, pod: %s\n", pod.Namespace, pod.Name)
    }
}

ClientSet 原理

NewForConfig 获取 ClientSet
// k8s.io/client-go/kubernetes/clientset.go:413
func NewForConfig(c *rest.Config) (*Clientset, error) {
    configShallowCopy := *c

    // share the transport between all clients
    httpClient, err := rest.HTTPClientFor(&configShallowCopy)
    if err != nil {
        return nil, err
    }

    return NewForConfigAndClient(&configShallowCopy, httpClient)
}
NewForConfigAndClient 获取每个 groupVersion 下的资源 Client
func NewForConfigAndClient(c *rest.Config, httpClient *http.Client) (*Clientset, error) {
    configShallowCopy := *c
    if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 {
        if configShallowCopy.Burst <= 0 {
            return nil, fmt.Errorf("burst is required to be greater than 0 when RateLimiter is not set and QPS is set to greater than 0")
        }
        configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst)
    }

    var cs Clientset
    var err error
    cs.admissionregistrationV1, err = admissionregistrationv1.NewForConfigAndClient(&configShallowCopy, httpClient)
    if err != nil {
        return nil, err
    }
    cs.admissionregistrationV1beta1, err = admissionregistrationv1beta1.NewForConfigAndClient(&configShallowCopy, httpClient)
    if err != nil {
        return nil, err
    }
    ...
    return &cs, nil
}
admissionregistrationv1.NewForConfigAndClient 介绍
func NewForConfigAndClient(c *rest.Config, h *http.Client) (*AdmissionregistrationV1Client, error) {
    config := *c
    // 设置 client 参数
    if err := setConfigDefaults(&config); err != nil {
        return nil, err
    }
    // 最终调用 RESTClientForConfigAndClient 生成 RESTClient
    client, err := rest.RESTClientForConfigAndClient(&config, h)
    if err != nil {
        return nil, err
    }
    return &AdmissionregistrationV1Client{client}, nil
}

// 可以发现,这些参数跟上面 RESTClient 差不多
func setConfigDefaults(config *rest.Config) error {
    gv := v1.SchemeGroupVersion
    config.GroupVersion = &gv
    config.APIPath = "/apis"
    config.NegotiatedSerializer = scheme.Codecs.WithoutConversion()

    if config.UserAgent == "" {
        config.UserAgent = rest.DefaultKubernetesUserAgent()
    }

    return nil
}
pod 资源实现了一系列方法,比如 List(),可以发现最终调用 RESTClient 的方法
func (c *pods) List(ctx context.Context, opts metav1.ListOptions) (result *v1.PodList, err error) {
    var timeout time.Duration
    if opts.TimeoutSeconds != nil {
        timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
    }
    result = &v1.PodList{}
    err = c.client.Get().
        Namespace(c.ns).
        Resource("pods").
        VersionedParams(&opts, scheme.ParameterCodec).
        Timeout(timeout).
        Do(ctx).
        Into(result)
    return
}

DynamicClient

DynamicClient 见名知意,一种动态客户端,通过动态指定资源组,资源版本和资源信息,来操作任意的 Kubernetes 资源对象。DynamicClient 不仅能操作 Kubernetes 内置资源,还能操作 CRD 。

DynamicClientClientSet 都是对 RESTClient 进行了封装

示例

DynamicClient 返回的结果不像 ClientSet 那样返回具体资源类型,它返回的是一个动态数据即 map 结构,所以需要将结果进行解析到具体资源类型
package main

import (
    "context"
    "flag"
    "fmt"
    "os"
    "path/filepath"

    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/runtime/schema"
    "k8s.io/client-go/dynamic"
    "k8s.io/client-go/tools/clientcmd"
)

func homeDir() string {
    if h := os.Getenv("HOME"); h != "" {
        return h
    }

    return os.Getenv("USERROFILE")
}

func main() {
    var kubeConfig *string

    if h := homeDir(); h != "" {
        kubeConfig = flag.String("kubeConfig", filepath.Join(h, ".kube", "config"), "use kubeconfig to access kube-apiserver")
    } else {
        kubeConfig = flag.String("kubeConfig", "", "use kubeconfig to access kube-apiserver")
    }
    flag.Parse()

    config, err := clientcmd.BuildConfigFromFlags("", *kubeConfig)
    if err != nil {
        panic(err.Error())
    }

    // 初始化 DynamicClient
    dynamicClient, err := dynamic.NewForConfig(config)
    if err != nil {
        panic(err.Error())
    }

    // 提供 pod 的 gvr,因为是动态调用,dynamicClient 不知道需要操作哪个资源,所以需要自己提供
    gvr := schema.GroupVersionResource{
        Group:    "",
        Version:  "v1",
        Resource: "pods",
    }
    //链式调用 dynamicClient 获取数据
    result, err := dynamicClient.Resource(gvr).Namespace("kube-system").List(context.TODO(), metav1.ListOptions{})
    if err != nil {
        panic(err.Error())
    }
    podList := &corev1.PodList{}
    // 将结果解析到 podList scheme 中
    err = runtime.DefaultUnstructuredConverter.FromUnstructured(
        result.UnstructuredContent(), podList)

    for _, pod := range podList.Items {
        fmt.Printf("namespace: %s, pod: %s\n", pod.Namespace, pod.Name)
    }

}

DynamicClient 原理

dynamic.NewForConfig(config) 初始化 dynamicClient
func NewForConfig(inConfig *rest.Config) (Interface, error) {
    config := ConfigFor(inConfig)

    httpClient, err := rest.HTTPClientFor(config)
    if err != nil {
        return nil, err
    }
    return NewForConfigAndClient(config, httpClient)
}

func NewForConfigAndClient(inConfig *rest.Config, h *http.Client) (Interface, error) {
    config := ConfigFor(inConfig)
    // for serializing the options
    config.GroupVersion = &schema.GroupVersion{}
    config.APIPath = "/if-you-see-this-search-for-the-break"
    // 初始化 RESTClient
    restClient, err := rest.RESTClientForConfigAndClient(config, h)
    if err != nil {
        return nil, err
    }
    return &dynamicClient{client: restClient}, nil
}

可以看出 dynamicClientClientSet 一样都是封装了 RESTClient

dynamicClient.Resource(gvr).Namespace("kube-system").List(context.TODO(), metav1.ListOptions{})
dynamicClient 链式调用中,Resource() 需要传入需要操作对象的 gvr
最终也是调用 RESTClient 来获取数据
func (c *dynamicResourceClient) List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) {
    result := c.client.client.Get().AbsPath(c.makeURLSegments("")...).SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).Do(ctx)
    if err := result.Error(); err != nil {
        return nil, err
    }
    retBytes, err := result.Raw()
    if err != nil {
        return nil, err
    }
    uncastObj, err := runtime.Decode(unstructured.UnstructuredJSONScheme, retBytes)
    if err != nil {
        return nil, err
    }
    if list, ok := uncastObj.(*unstructured.UnstructuredList); ok {
        return list, nil
    }

    list, err := uncastObj.(*unstructured.Unstructured).ToList()
    if err != nil {
        return nil, err
    }
    return list, nil
}
DynamicClient 返回的结果是 *unstructured.UnstructuredList
Unstructured 是非结构化数据,用 map[string]interface{} 存储。
type UnstructuredList struct {
    Object map[string]interface{}

    // Items is a list of unstructured objects.
    Items []Unstructured `json:"items"`
}

type Unstructured struct {
    // Object is a JSON compatible map with string, float, int, bool, []interface{}, or
    // map[string]interface{}
    // children.
    Object map[string]interface{}
}
所以拿到结果需要 decode 成结构化数据类型
// 将 result decode 到 podList 
podList := &corev1.PodList{}
    err = runtime.DefaultUnstructuredConverter.FromUnstructured(
        result.UnstructuredContent(), podList)

DiscoveryClient

DiscoveryClient 是发现客户端,用于发现 Kube-apiserver 支持的资源组、资源版本、资源类型等。
kubectl api-resourceskubectl api-versions 命令就是通过 DiscoveryClient 实现的。
DiscoveryClient 支持本地目录缓存,一般在 ~/.kube/cache 会存储集群所有 gvr 信息,避免每次访问 Kube-apiserver

示例

通过 DiscoveryClient 查询集群所有的 gvr
package main

import (
    "flag"
    "fmt"
    "os"
    "path/filepath"

    "k8s.io/apimachinery/pkg/runtime/schema"
    "k8s.io/client-go/discovery"
    "k8s.io/client-go/tools/clientcmd"
)

func homeDir() string {
    if h := os.Getenv("HOME"); h != "" {
        return h
    }

    return os.Getenv("USERROFILE")
}

func main() {
    var kubeConfig *string

    if h := homeDir(); h != "" {
        kubeConfig = flag.String("kubeConfig", filepath.Join(h, ".kube", "config"), "use kubeconfig to access kube-apiserver")
    } else {
        kubeConfig = flag.String("kubeConfig", "", "use kubeconfig to access kube-apiserver")
    }
    flag.Parse()

    config, err := clientcmd.BuildConfigFromFlags("", *kubeConfig)
    if err != nil {
        panic(err.Error())
    }

    // 初始化 DiscoveryClient
    discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
    if err != nil {
        panic(err.Error())
    }
    // 获取集群所有资源
    _, apiResourceList, err := discoveryClient.ServerGroupsAndResources()
    if err != nil {
        panic(err.Error())
    }

    for _, resources := range apiResourceList {
        gv, err := schema.ParseGroupVersion(resources.GroupVersion)
        if err != nil {
            panic(err.Error())
        }
        for _, resource := range resources.APIResources {
            fmt.Printf("group: %s, version: %s, resource: %s\n", gv.Group, gv.Version, resource.Name)
        }

    }
}
结果如下:
group: , version: v1, resource: bindings
group: , version: v1, resource: componentstatuses
group: , version: v1, resource: configmaps
group: , version: v1, resource: endpoints
group: , version: v1, resource: events
group: , version: v1, resource: limitranges
group: , version: v1, resource: namespaces
group: , version: v1, resource: namespaces/finalize
group: , version: v1, resource: namespaces/status
group: , version: v1, resource: nodes
group: , version: v1, resource: nodes/proxy
group: , version: v1, resource: nodes/status
group: , version: v1, resource: persistentvolumeclaims
group: , version: v1, resource: persistentvolumeclaims/status
group: , version: v1, resource: persistentvolumes
group: , version: v1, resource: persistentvolumes/status
group: , version: v1, resource: pods
group: , version: v1, resource: pods/attach
group: , version: v1, resource: pods/binding
group: , version: v1, resource: pods/ephemeralcontainers
group: , version: v1, resource: pods/eviction
group: , version: v1, resource: pods/exec
group: , version: v1, resource: pods/log
group: , version: v1, resource: pods/portforward
group: , version: v1, resource: pods/proxy
group: , version: v1, resource: pods/status
group: , version: v1, resource: podtemplates
group: , version: v1, resource: replicationcontrollers
group: , version: v1, resource: replicationcontrollers/scale
group: , version: v1, resource: replicationcontrollers/status
group: , version: v1, resource: resourcequotas
group: , version: v1, resource: resourcequotas/status
group: , version: v1, resource: secrets
group: , version: v1, resource: serviceaccounts
group: , version: v1, resource: services
group: , version: v1, resource: services/proxy
group: , version: v1, resource: services/status
group: apiregistration.k8s.io, version: v1, resource: apiservices
group: apiregistration.k8s.io, version: v1, resource: apiservices/status
group: apiregistration.k8s.io, version: v1beta1, resource: apiservices
group: apiregistration.k8s.io, version: v1beta1, resource: apiservices/status
group: extensions, version: v1beta1, resource: ingresses
group: extensions, version: v1beta1, resource: ingresses/status
group: apps, version: v1, resource: controllerrevisions
group: apps, version: v1, resource: daemonsets
group: apps, version: v1, resource: daemonsets/status
group: apps, version: v1, resource: deployments
group: apps, version: v1, resource: deployments/scale
group: apps, version: v1, resource: deployments/status
group: apps, version: v1, resource: replicasets
group: apps, version: v1, resource: replicasets/scale
group: apps, version: v1, resource: replicasets/status
group: apps, version: v1, resource: statefulsets
group: apps, version: v1, resource: statefulsets/scale
...

DiscoveryClient 原理

discovery.NewDiscoveryClientForConfig(config) 初始化 DsicoveryClient,与 ClientSetDynamicClient 原理类似,都是封装了 RESTClient,这里不再赘述。
discoveryClient.ServerGroupsAndResources() 返回集群所有的资源对象,这里可能会有疑问,这些资源是什么时候存储到 etcd 中的,猜想是 Kube-apiserver 启动时将这些资源类型存储到 etcd 中。需要看 Kube-apiserver 源码来佐证。

总结

Client-go 四种客户端,在平时开发中 ClientSet 使用频率最高,其他三种了解原理一般就行。