简介
Client-Go 共提供了 4 种与 Kubernetes APIServer 交互的客户端。分别是 RESTClient、DiscoveryClient、ClientSet、DynamicClient。
-
• **RESTClient:**最基础的客户端,主要是对 HTTP 请求进行了封装,支持 Json 和 Protobuf 格式的数据。
-
• **DiscoveryClient:**发现客户端,负责发现 APIServer 支持的资源组、资源版本和资源信息的。
-
• **ClientSet:**负责操作 Kubernetes 内置的资源对象,例如:Pod、Service等。
-
• **DynamicClient:**动态客户端,可以对任意的 Kubernetes 资源对象进行通用操作,包括 CRD。
RESTClient
上图可以看出 RESTClient 是所有 Client 的父类
它就是对 HTTP Request 进行了封装,实现了 RESTFul 风格的 API,可以直接通过 RESTClient 提供的 RESTful 方法 GET(),PUT(),POST(),DELETE()
操作数据
-
• 同时支持 json 和 protobuf
-
• 支持所有原生资源和 CRD
示例
使用 RESTClient 获取 K8S 集群 pod 资源
package main
import (
"context"
"flag"
"fmt"
"os"
"path/filepath"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
// 获取系统家目录
func homeDir() string {
if h := os.Getenv("HOME"); h != "" {
return h
}
// for windows
return os.Getenv("USERPROFILE")
}
func main() {
var kubeConfig *string
var err error
var config *rest.Config
// 获取 kubeconfig 文件路径
if h := homeDir(); h != "" {
kubeConfig = flag.String("kubeConfig", filepath.Join(h, ".kube", "config"), "use kubeconfig to access kube-apiserver")
} else {
kubeConfig = flag.String("kubeConfig", "", "use kubeconfig to access kube-apiserver")
}
flag.Parse()
// 获取 kubeconfig
config, err = clientcmd.BuildConfigFromFlags("", *kubeConfig)
if err != nil {
panic(err.Error())
}
// 使用 RESTClient 需要开发者自行设置资源 URL
// pod 资源没有 group,在核心组,所以前缀是 api
config.APIPath = "api"
// 设置 corev1 groupVersion
config.GroupVersion = &corev1.SchemeGroupVersion
// 设置解析器,用于用于解析 scheme
config.NegotiatedSerializer = scheme.Codecs.WithoutConversion()
// 初始化 RESTClient
restClient, err := rest.RESTClientFor(config)
if err != nil {
panic(err.Error())
}
// 调用结果用 podList 解析
result := &corev1.PodList{}
// 获取 kube-system 命名空间的 pod
namespace := "kube-system"
// 链式调用 RESTClient 方法获取,并将结果解析到 corev1.PodList{}
err = restClient.Get().Namespace(namespace).Resource("pods").Do(context.TODO()).Into(result)
if err != nil {
panic(err.Error())
}
// 打印结果
for _, pod := range result.Items {
fmt.Printf("namespace: %s, pod: %s\n", pod.Namespace, pod.Name)
}
}
程序结果如下:
namespace: kube-system, pod: coredns-697ddfb55c-5lk74
namespace: kube-system, pod: coredns-697ddfb55c-nnkhp
namespace: kube-system, pod: etcd-master-172-31-97-104
namespace: kube-system, pod: kube-apiserver-master-172-31-97-104
namespace: kube-system, pod: kube-controller-manager-master-172-31-97-104
namespace: kube-system, pod: kube-lvscare-node-172-31-97-105
namespace: kube-system, pod: kube-proxy-49k8k
namespace: kube-system, pod: kube-proxy-fvf57
namespace: kube-system, pod: kube-scheduler-master-172-31-97-104
namespace: kube-system, pod: metrics-server-7f6f9649f9-qvvj8
RESTClient 原理
初始化 RESTClient,可以发现对原生 HTTP 库进行了封装了
func RESTClientFor(config *Config) (*RESTClient, error) {
if config.GroupVersion == nil {
return nil, fmt.Errorf("GroupVersion is required when initializing a RESTClient")
}
if config.NegotiatedSerializer == nil {
return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")
}
// Validate config.Host before constructing the transport/client so we can fail fast.
// ServerURL will be obtained later in RESTClientForConfigAndClient()
_, _, err := defaultServerUrlFor(config)
if err != nil {
return nil, err
}
// 获取原生 http client
httpClient, err := HTTPClientFor(config)
if err != nil {
return nil, err
}
// 初始化 RESTClient
return RESTClientForConfigAndClient(config, httpClient)
}
RESTClient 实现了 Interface
接口
type Interface interface {
GetRateLimiter() flowcontrol.RateLimiter
Verb(verb string) *Request
Post() *Request
Put() *Request
Patch(pt types.PatchType) *Request
Get() *Request
Delete() *Request
APIVersion() schema.GroupVersion
}
RESTClient 的链式调用主要是设置 namespace,资源 name,一些选择器等,最终调用 Do()
方法网络调用
func (r *Request) Do(ctx context.Context) Result {
var result Result
err := r.request(ctx, func(req *http.Request, resp *http.Response) {
result = r.transformResponse(resp, req)
})
if err != nil {
return Result{err: err}
}
return result
}
func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)) error {
//Metrics for total request latency
start := time.Now()
defer func() {
metrics.RequestLatency.Observe(ctx, r.verb, r.finalURLTemplate(), time.Since(start))
}()
if r.err != nil {
klog.V(4).Infof("Error in request: %v", r.err)
return r.err
}
if err := r.requestPreflightCheck(); err != nil {
return err
}
client := r.c.Client
if client == nil {
client = http.DefaultClient
}
// Throttle the first try before setting up the timeout configured on the
// client. We don't want a throttled client to return timeouts to callers
// before it makes a single request.
if err := r.tryThrottle(ctx); err != nil {
return err
}
if r.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, r.timeout)
defer cancel()
}
// Right now we make about ten retry attempts if we get a Retry-After response.
var retryAfter *RetryAfter
for {
// 初始化网络请求
req, err := r.newHTTPRequest(ctx)
if err != nil {
return err
}
r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
if retryAfter != nil {
// We are retrying the request that we already send to apiserver
// at least once before.
// This request should also be throttled with the client-internal rate limiter.
if err := r.tryThrottleWithInfo(ctx, retryAfter.Reason); err != nil {
return err
}
retryAfter = nil
}
// 发起网络调用
resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
if err != nil {
r.backoff.UpdateBackoff(r.URL(), err, 0)
} else {
r.backoff.UpdateBackoff(r.URL(), err, resp.StatusCode)
}
done := func() bool {
defer readAndCloseResponseBody(resp)
// if the the server returns an error in err, the response will be nil.
f := func(req *http.Request, resp *http.Response) {
if resp == nil {
return
}
fn(req, resp)
}
var retry bool
retryAfter, retry = r.retry.NextRetry(req, resp, err, func(req *http.Request, err error) bool {
// "Connection reset by peer" or "apiserver is shutting down" are usually a transient errors.
// Thus in case of "GET" operations, we simply retry it.
// We are not automatically retrying "write" operations, as they are not idempotent.
if r.verb != "GET" {
return false
}
// For connection errors and apiserver shutdown errors retry.
if net.IsConnectionReset(err) || net.IsProbableEOF(err) {
return true
}
return false
})
if retry {
err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, req.URL.String(), r.body)
if err == nil {
return false
}
klog.V(4).Infof("Could not retry request - %v", err)
}
f(req, resp)
return true
}()
if done {
return err
}
}
}
ClientSet
ClientSet 在调用 Kubernetes 内置资源非常常用,但是无法操作自定义资源,需要实现自定义资源的 ClientSet 才能操作。
ClientSet 是在 RESTClient 的基础上封装了对 Resource 和 Version 的管理方法,Client-go 对 Kubernetes 每一个内置资源都封装了 Client,而 ClientSet 就是多个 Client 的集合。
示例
使用 ClientSet 获取 K8S 集群 pod 资源
package main
import (
"context"
"flag"
"fmt"
"os"
"path/filepath"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
func homeDir() string {
if h := os.Getenv("HOME"); h != "" {
return h
}
return os.Getenv("USERROFILE")
}
func main() {
var kubeConfig *string
if h := homeDir(); h != "" {
kubeConfig = flag.String("kubeConfig", filepath.Join(h, ".kube", "config"), "use kubeconfig to access kube-apiserver")
} else {
kubeConfig = flag.String("kubeConfig", "", "use kubeconfig to access kube-apiserver")
}
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", *kubeConfig)
if err != nil {
panic(err.Error())
}
// 获取 clientSet
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
namespace := "kube-system"
// 链式调用 ClientSet 获取 pod 列表
podList, err := clientSet.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
panic(err.Error())
}
for _, pod := range podList.Items {
fmt.Printf("namespace: %s, pod: %s\n", pod.Namespace, pod.Name)
}
}
ClientSet 原理
NewForConfig
获取 ClientSet
// k8s.io/client-go/kubernetes/clientset.go:413
func NewForConfig(c *rest.Config) (*Clientset, error) {
configShallowCopy := *c
// share the transport between all clients
httpClient, err := rest.HTTPClientFor(&configShallowCopy)
if err != nil {
return nil, err
}
return NewForConfigAndClient(&configShallowCopy, httpClient)
}
NewForConfigAndClient
获取每个 groupVersion
下的资源 Client
func NewForConfigAndClient(c *rest.Config, httpClient *http.Client) (*Clientset, error) {
configShallowCopy := *c
if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 {
if configShallowCopy.Burst <= 0 {
return nil, fmt.Errorf("burst is required to be greater than 0 when RateLimiter is not set and QPS is set to greater than 0")
}
configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst)
}
var cs Clientset
var err error
cs.admissionregistrationV1, err = admissionregistrationv1.NewForConfigAndClient(&configShallowCopy, httpClient)
if err != nil {
return nil, err
}
cs.admissionregistrationV1beta1, err = admissionregistrationv1beta1.NewForConfigAndClient(&configShallowCopy, httpClient)
if err != nil {
return nil, err
}
...
return &cs, nil
}
拿 admissionregistrationv1.NewForConfigAndClient
介绍
func NewForConfigAndClient(c *rest.Config, h *http.Client) (*AdmissionregistrationV1Client, error) {
config := *c
// 设置 client 参数
if err := setConfigDefaults(&config); err != nil {
return nil, err
}
// 最终调用 RESTClientForConfigAndClient 生成 RESTClient
client, err := rest.RESTClientForConfigAndClient(&config, h)
if err != nil {
return nil, err
}
return &AdmissionregistrationV1Client{client}, nil
}
// 可以发现,这些参数跟上面 RESTClient 差不多
func setConfigDefaults(config *rest.Config) error {
gv := v1.SchemeGroupVersion
config.GroupVersion = &gv
config.APIPath = "/apis"
config.NegotiatedSerializer = scheme.Codecs.WithoutConversion()
if config.UserAgent == "" {
config.UserAgent = rest.DefaultKubernetesUserAgent()
}
return nil
}
pod 资源实现了一系列方法,比如 List()
,可以发现最终调用 RESTClient 的方法
func (c *pods) List(ctx context.Context, opts metav1.ListOptions) (result *v1.PodList, err error) {
var timeout time.Duration
if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
}
result = &v1.PodList{}
err = c.client.Get().
Namespace(c.ns).
Resource("pods").
VersionedParams(&opts, scheme.ParameterCodec).
Timeout(timeout).
Do(ctx).
Into(result)
return
}
DynamicClient
DynamicClient 见名知意,一种动态客户端,通过动态指定资源组,资源版本和资源信息,来操作任意的 Kubernetes 资源对象。DynamicClient 不仅能操作 Kubernetes 内置资源,还能操作 CRD 。
DynamicClient 与 ClientSet 都是对 RESTClient 进行了封装
示例
DynamicClient 返回的结果不像 ClientSet 那样返回具体资源类型,它返回的是一个动态数据即 map 结构,所以需要将结果进行解析到具体资源类型
package main
import (
"context"
"flag"
"fmt"
"os"
"path/filepath"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/clientcmd"
)
func homeDir() string {
if h := os.Getenv("HOME"); h != "" {
return h
}
return os.Getenv("USERROFILE")
}
func main() {
var kubeConfig *string
if h := homeDir(); h != "" {
kubeConfig = flag.String("kubeConfig", filepath.Join(h, ".kube", "config"), "use kubeconfig to access kube-apiserver")
} else {
kubeConfig = flag.String("kubeConfig", "", "use kubeconfig to access kube-apiserver")
}
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", *kubeConfig)
if err != nil {
panic(err.Error())
}
// 初始化 DynamicClient
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
panic(err.Error())
}
// 提供 pod 的 gvr,因为是动态调用,dynamicClient 不知道需要操作哪个资源,所以需要自己提供
gvr := schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "pods",
}
//链式调用 dynamicClient 获取数据
result, err := dynamicClient.Resource(gvr).Namespace("kube-system").List(context.TODO(), metav1.ListOptions{})
if err != nil {
panic(err.Error())
}
podList := &corev1.PodList{}
// 将结果解析到 podList scheme 中
err = runtime.DefaultUnstructuredConverter.FromUnstructured(
result.UnstructuredContent(), podList)
for _, pod := range podList.Items {
fmt.Printf("namespace: %s, pod: %s\n", pod.Namespace, pod.Name)
}
}
DynamicClient 原理
dynamic.NewForConfig(config)
初始化 dynamicClient
func NewForConfig(inConfig *rest.Config) (Interface, error) {
config := ConfigFor(inConfig)
httpClient, err := rest.HTTPClientFor(config)
if err != nil {
return nil, err
}
return NewForConfigAndClient(config, httpClient)
}
func NewForConfigAndClient(inConfig *rest.Config, h *http.Client) (Interface, error) {
config := ConfigFor(inConfig)
// for serializing the options
config.GroupVersion = &schema.GroupVersion{}
config.APIPath = "/if-you-see-this-search-for-the-break"
// 初始化 RESTClient
restClient, err := rest.RESTClientForConfigAndClient(config, h)
if err != nil {
return nil, err
}
return &dynamicClient{client: restClient}, nil
}
可以看出 dynamicClient 与 ClientSet 一样都是封装了 RESTClient
dynamicClient.Resource(gvr).Namespace("kube-system").List(context.TODO(), metav1.ListOptions{})
dynamicClient 链式调用中,Resource()
需要传入需要操作对象的 gvr
最终也是调用 RESTClient 来获取数据
func (c *dynamicResourceClient) List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) {
result := c.client.client.Get().AbsPath(c.makeURLSegments("")...).SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).Do(ctx)
if err := result.Error(); err != nil {
return nil, err
}
retBytes, err := result.Raw()
if err != nil {
return nil, err
}
uncastObj, err := runtime.Decode(unstructured.UnstructuredJSONScheme, retBytes)
if err != nil {
return nil, err
}
if list, ok := uncastObj.(*unstructured.UnstructuredList); ok {
return list, nil
}
list, err := uncastObj.(*unstructured.Unstructured).ToList()
if err != nil {
return nil, err
}
return list, nil
}
DynamicClient 返回的结果是 *unstructured.UnstructuredList
Unstructured 是非结构化数据,用 map[string]interface{}
存储。
type UnstructuredList struct {
Object map[string]interface{}
// Items is a list of unstructured objects.
Items []Unstructured `json:"items"`
}
type Unstructured struct {
// Object is a JSON compatible map with string, float, int, bool, []interface{}, or
// map[string]interface{}
// children.
Object map[string]interface{}
}
所以拿到结果需要 decode 成结构化数据类型
// 将 result decode 到 podList
podList := &corev1.PodList{}
err = runtime.DefaultUnstructuredConverter.FromUnstructured(
result.UnstructuredContent(), podList)
DiscoveryClient
DiscoveryClient 是发现客户端,用于发现 Kube-apiserver 支持的资源组、资源版本、资源类型等。
kubectl api-resources
和 kubectl api-versions
命令就是通过 DiscoveryClient 实现的。
DiscoveryClient 支持本地目录缓存,一般在 ~/.kube/cache
会存储集群所有 gvr
信息,避免每次访问 Kube-apiserver
示例
通过 DiscoveryClient 查询集群所有的 gvr
package main
import (
"flag"
"fmt"
"os"
"path/filepath"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/tools/clientcmd"
)
func homeDir() string {
if h := os.Getenv("HOME"); h != "" {
return h
}
return os.Getenv("USERROFILE")
}
func main() {
var kubeConfig *string
if h := homeDir(); h != "" {
kubeConfig = flag.String("kubeConfig", filepath.Join(h, ".kube", "config"), "use kubeconfig to access kube-apiserver")
} else {
kubeConfig = flag.String("kubeConfig", "", "use kubeconfig to access kube-apiserver")
}
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", *kubeConfig)
if err != nil {
panic(err.Error())
}
// 初始化 DiscoveryClient
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
panic(err.Error())
}
// 获取集群所有资源
_, apiResourceList, err := discoveryClient.ServerGroupsAndResources()
if err != nil {
panic(err.Error())
}
for _, resources := range apiResourceList {
gv, err := schema.ParseGroupVersion(resources.GroupVersion)
if err != nil {
panic(err.Error())
}
for _, resource := range resources.APIResources {
fmt.Printf("group: %s, version: %s, resource: %s\n", gv.Group, gv.Version, resource.Name)
}
}
}
结果如下:
group: , version: v1, resource: bindings
group: , version: v1, resource: componentstatuses
group: , version: v1, resource: configmaps
group: , version: v1, resource: endpoints
group: , version: v1, resource: events
group: , version: v1, resource: limitranges
group: , version: v1, resource: namespaces
group: , version: v1, resource: namespaces/finalize
group: , version: v1, resource: namespaces/status
group: , version: v1, resource: nodes
group: , version: v1, resource: nodes/proxy
group: , version: v1, resource: nodes/status
group: , version: v1, resource: persistentvolumeclaims
group: , version: v1, resource: persistentvolumeclaims/status
group: , version: v1, resource: persistentvolumes
group: , version: v1, resource: persistentvolumes/status
group: , version: v1, resource: pods
group: , version: v1, resource: pods/attach
group: , version: v1, resource: pods/binding
group: , version: v1, resource: pods/ephemeralcontainers
group: , version: v1, resource: pods/eviction
group: , version: v1, resource: pods/exec
group: , version: v1, resource: pods/log
group: , version: v1, resource: pods/portforward
group: , version: v1, resource: pods/proxy
group: , version: v1, resource: pods/status
group: , version: v1, resource: podtemplates
group: , version: v1, resource: replicationcontrollers
group: , version: v1, resource: replicationcontrollers/scale
group: , version: v1, resource: replicationcontrollers/status
group: , version: v1, resource: resourcequotas
group: , version: v1, resource: resourcequotas/status
group: , version: v1, resource: secrets
group: , version: v1, resource: serviceaccounts
group: , version: v1, resource: services
group: , version: v1, resource: services/proxy
group: , version: v1, resource: services/status
group: apiregistration.k8s.io, version: v1, resource: apiservices
group: apiregistration.k8s.io, version: v1, resource: apiservices/status
group: apiregistration.k8s.io, version: v1beta1, resource: apiservices
group: apiregistration.k8s.io, version: v1beta1, resource: apiservices/status
group: extensions, version: v1beta1, resource: ingresses
group: extensions, version: v1beta1, resource: ingresses/status
group: apps, version: v1, resource: controllerrevisions
group: apps, version: v1, resource: daemonsets
group: apps, version: v1, resource: daemonsets/status
group: apps, version: v1, resource: deployments
group: apps, version: v1, resource: deployments/scale
group: apps, version: v1, resource: deployments/status
group: apps, version: v1, resource: replicasets
group: apps, version: v1, resource: replicasets/scale
group: apps, version: v1, resource: replicasets/status
group: apps, version: v1, resource: statefulsets
group: apps, version: v1, resource: statefulsets/scale
...