4. client-go 编程式交互

Kubernetes 系统使用 client-go 作为 Go 语言的官方编程式交互客户端库,提供对 Kubernetes API Server 服务的交互访问。Kubernetes 的源码中已经集成了 client-go 的源码,无须单独下载。client-go 源码路径为 vendor/k8s.io/client-go

开发者经常使用 client-go 基于 Kubernetes 做二次开发,所以 client-go 是开发者应熟练掌握的必会技能。

<br>

1. client-go 源码结构

client-go 的代码库已经集成到 Kubernetes 源码中了,无须考虑版本兼容性问题,源码结构示例如下:

Client-Go 共提供了 4 种与 Kubernetes APIServer 交互的客户端。分别是 RESTClient、DiscoveryClient、ClientSet、DynamicClient。

  • RESTClient:最基础的客户端,主要是对 HTTP 请求进行了封装,支持 Json 和 Protobuf 格式的数据。DiscoveryClient、ClientSet、DynamicClient 客户端都是基于 RESTClient 实现的。
  • DiscoveryClient:发现客户端,负责发现 APIServer 支持的资源组、资源版本和资源信息(即 Group、Versions、Resources)。
  • ClientSet:在 RESTClient 的基础上封装了对 Resource 和 Version 的管理方法。负责操作 Kubernetes 内置的资源对象,例如:Pod、Service 等。每一个 Resource 可以理解为一个客户端,而 ClientSet 则是多个客户端的集合,每一个 Resource 和 Version 都以函数的方式暴露给开发者。ClientSet 只能够处理 Kubernetes 内置资源,它是通过 client-gen 代码生成器自动生成的。
  • DynamicClient:动态客户端,可以对任意的 Kubernetes 资源对象进行通用操作,包括 CRD 自定义资源。

以上 4 种客户端:RESTClient、DiscoveryClient、ClientSet、DynamicClient 都可以通过 kubeconfig 配置信息连接到指定的 Kubernetes API Server。

<br>

2. kubeconfig 配置管理

kubeconfig 用于管理访问 kube-apiserver 的配置信息,同时也支持访问多 kube-apiserver 的配置管理,可以在不同的环境下管理不同的 kube-apiserver 集群配置,不同的业务线也可以拥有不同的集群。Kubernetes 的其他组件都使用 kubeconfig 配置信息来连接 kube-apiserver 组件,例如当 kubectl 访问 kube-apiserver 时,会默认加载 kubeconfig 配置信息

kubeconfig 中存储了集群、用户、命名空间和身份验证等信息,在默认的情况下,kubeconfig 存放在 $HOME/.kube/config 路径下。kubeconfig 配置信息如下:

$ cat $HOME/.kube/config
apiVersion: v1
kind: Config
preferences: {}

clusters:
- cluster:
  name: dev-cluster

user:
- name: dev-user

contexts:
- context
  name: dev-context

kubeconfig 配置信息通常包含 3 个部分,分别介绍如下:

  • clusters: 定义 Kubernetes 集群信息,例如 kube-apiserver 的服务地址以及集群的证书信息等。
  • user:定义 Kubernetes 集群用户身份验证的客户端凭证,例如 client-certificate、client-key、token 以及 username/password 等。
  • contexts:定义 Kubernetes 集群用户信息和命名空间等,用户将请求发送到指定的集群。

client-go 会读取 kubeconfig 配置信息并生成 config 对象,用于与 kube-apiserver 通信,代码示例如下:

package main

import (
  "k8s.io/client-go/tools/clientcmd"
)

func main() {
	config, err := clientcmd.BuildConfigFromFlag("", "/root/.kube/config")
	if err != nil {
		panic(err)
	}
	...
}

在上述代码中,clientcmd.BuildConfigFromFlag 函数会读取 kubeconfig 配置信息并实例化 rest.Config 对象。其中 kubeconfig 最核心的功能是管理多个访问 kube-apiserver 集群的配置信息,将多个配置信息合并(merge)成一份,在合并的过程中会解决多个配置文件字段冲突的问题。该过程由 Load 函数完成,可以分为两步:第 1 步,加载 kubeconfig 配置信息;第 2 步,合并多个 kubeconfig 配置信息。代码示例如下:

1.加载 kubeconfig 配置信息

代码路径:vendor\k8s.io\client-go\tools\clientcmd\loader.go

func (rules *ClientConfigLoadingRules) Load() (*clientcmdapi.Config, error) {
	if err := rules.Migrate(); err != nil {
		return nil, err
	}

	errlist := []error{}
	missingList := []string{}

	kubeConfigFiles := []string{}

	// Make sure a file we were explicitly told to use exists
	if len(rules.ExplicitPath) > 0 {
		if _, err := os.Stat(rules.ExplicitPath); os.IsNotExist(err) {
			return nil, err
		}
		kubeConfigFiles = append(kubeConfigFiles, rules.ExplicitPath)

	} else {
		kubeConfigFiles = append(kubeConfigFiles, rules.Precedence...)
	}

	kubeconfigs := []*clientcmdapi.Config{}
	// read and cache the config files so that we only look at them once
	for _, filename := range kubeConfigFiles {
		if len(filename) == 0 {
			// no work to do
			continue
		}

		config, err := LoadFromFile(filename)

		if os.IsNotExist(err) {
			// skip missing files
			// Add to the missing list to produce a warning
			missingList = append(missingList, filename)
			continue
		}

		if err != nil {
			errlist = append(errlist, fmt.Errorf("error loading config file \"%s\": %v", filename, err))
			continue
		}

		kubeconfigs = append(kubeconfigs, config)
	}

	...
}

有两种方式可以获取 kubeconfig 配置信息路径:第 1 种,文件路径(即 rules.ExplicitPath);第 2 种,环境变量(通过 KUBECONFIG 变量,即 rules.Precedence,可以指定多个路径)。最后将配置信息汇总到 kubeConfigFiles 中。这两种方式都通过 LoadFromFile 函数读取数据并把读取到的数据反序列化到 Config 对象中。代码示例如下:

代码路径:vendor\k8s.io\client-go\tools\clientcmd\loader.go

// Load takes a byte slice and deserializes the contents into Config object.
// Encapsulates deserialization without assuming the source is a file.
func Load(data []byte) (*clientcmdapi.Config, error) {
	config := clientcmdapi.NewConfig()
	// if there's no data in a file, return the default object instead of failing (DecodeInto reject empty input)
	if len(data) == 0 {
		return config, nil
	}
	decoded, _, err := clientcmdlatest.Codec.Decode(data, &schema.GroupVersionKind{Version: clientcmdlatest.Version, Kind: "Config"}, config)
	if err != nil {
		return nil, err
	}
	return decoded.(*clientcmdapi.Config), nil
}

2.合并多个 kubeconfig 配置信息

有两份 kubeconfig 配置,集群分别为 cow-cluster 和 pig-cluster,经过合并后,最终得到一份多集群的配置信息,代码示例如下:

config := clientcmdapi.NewConfig()
mergo.Merge(config, mapConfig, mergo.WithOverride)
mergo.Merge(config, nonMapConfig, mergo.WithOverride)

mergo.Merge 函数将 src 字段填充到 dst 结构中,私有字段除外,非空的 dst 字段将被覆盖。另外,dst 和 src 必须拥有有效的相同类型结构。合并过程举例如下:

src 结构:    T{X: "two", Z: Z{A: "three", B: 4}}
dst 结构:    T{X: "one", Y: 5, Z: Z{A: "four", B: 6}}
merge 后的结构:T{X: "two", Y: 5, Z: Z{A: "three", B: 4}}

<br>

3. RESTClient 客户端

RESTClient 是所有客户端的父类,这也是为啥前面说,它是最基础的客户端的原因。

它提供了 RESTful 对应的方法的封装,如:Get()、Put()、Post()、Delete() 等。通过这些封装发方法与 Kubernetes APIServer RESTful API 进行交互。

因为它是所有客户端的父类,DiscoveryClient、ClientSet、DynamicClient 客户端都是基于 RESTClient 实现的。所以它可以操作 Kubernetes 内置的所有资源对象以及 CRD。

运行以下代码,将得到 default 下的 Pod 部分相关资源信息。

package main

import (
    "context"
    "fmt"
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes/scheme"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/clientcmd"
)

func main() {
    // 加载配置文件,生成 config 对象
    config, err := clientcmd.BuildConfigFromFlags("", "../../kubeconfig")
    if err != nil {
        panic(err.Error())
    }

    // 配置 API 路径
    config.APIPath = "api"
    // 配置分组版本
    config.GroupVersion = &corev1.SchemeGroupVersion
    // 配置数据的编解码器
    config.NegotiatedSerializer = scheme.Codecs
    // 实例化 RESTClient
    restClient, err := rest.RESTClientFor(config)
    if err != nil {
        panic(err.Error())
    }

    // 定义返回接收值
    result := &corev1.PodList{}

    err = restClient.Get().
        Namespace("default"). // 查询的 Namespace
        Resource("pods"). // 查询的资源类型
        VersionedParams(&metav1.ListOptions{Limit: 100}, scheme.ParameterCodec). // 参数及序列化工具
        Do(context.TODO()). // 发送请求
        Into(result) // 写入返回值
    if err != nil {
        panic(err.Error())
    }

    // 输出返回结果
    for _, d := range result.Items {
        fmt.Printf("namespace: %v, name: %v, status: %v\n", d.Namespace, d.Name, d.Status.Phase)
    }
}

运行以下代码,将得到 default 下的 Pod 部分相关资源信息。首先加载 kubeconfig 配置信息,并设置 config.APIPath 请求的 HTTP 路径。然后设置 config.GroupVersion 请求的资源组/资源版本。最后设置 config.NegotiatedSerializer 数据的编解码器。

RESTClient 其实就是底层使用 net/http 库,将调用 Kubernetes APIServer 的 API 请求,进行了封装,对参数和返回结果进行了序列化及反序列化。

代码路径如下:vendor\k8s.io\client-go\rest\request.go

func (r *Request) Do(ctx context.Context) Result {
	var result Result
	err := r.request(ctx, func(req *http.Request, resp *http.Response) {
		result = r.transformResponse(resp, req)
	})
	if err != nil {
		return Result{err: err}
	}
	if result.err == nil || len(result.body) > 0 {
		metrics.ResponseSize.Observe(ctx, r.verb, r.URL().Host, float64(len(result.body)))
	}
	return result
}

func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)) error {
	// Metrics for total request latency
	start := time.Now()
	defer func() {
		metrics.RequestLatency.Observe(ctx, r.verb, r.finalURLTemplate(), time.Since(start))
	}()

	if r.err != nil {
		klog.V(4).Infof("Error in request: %v", r.err)
		return r.err
	}

	if err := r.requestPreflightCheck(); err != nil {
		return err
	}

	client := r.c.Client
	if client == nil {
		client = http.DefaultClient
	}

	// Throttle the first try before setting up the timeout configured on the
	// client. We don't want a throttled client to return timeouts to callers
	// before it makes a single request.
	if err := r.tryThrottle(ctx); err != nil {
		return err
	}

	if r.timeout > 0 {
		var cancel context.CancelFunc
		ctx, cancel = context.WithTimeout(ctx, r.timeout)
		defer cancel()
	}

	isErrRetryableFunc := func(req *http.Request, err error) bool {
		// "Connection reset by peer" or "apiserver is shutting down" are usually a transient errors.
		// Thus in case of "GET" operations, we simply retry it.
		// We are not automatically retrying "write" operations, as they are not idempotent.
		if req.Method != "GET" {
			return false
		}
		// For connection errors and apiserver shutdown errors retry.
		if net.IsConnectionReset(err) || net.IsProbableEOF(err) {
			return true
		}
		return false
	}

	// Right now we make about ten retry attempts if we get a Retry-After response.
	retry := r.retryFn(r.maxRetries)
	for {
		if err := retry.Before(ctx, r); err != nil {
			return retry.WrapPreviousError(err)
		}
		req, err := r.newHTTPRequest(ctx)
		if err != nil {
			return err
		}
		resp, err := client.Do(req)
		updateURLMetrics(ctx, r, resp, err)
		// The value -1 or a value of 0 with a non-nil Body indicates that the length is unknown.
		// https://pkg.go.dev/net/http#Request
		if req.ContentLength >= 0 && !(req.Body != nil && req.ContentLength == 0) {
			metrics.RequestSize.Observe(ctx, r.verb, r.URL().Host, float64(req.ContentLength))
		}
		retry.After(ctx, r, resp, err)

		done := func() bool {
			defer readAndCloseResponseBody(resp)

			// if the server returns an error in err, the response will be nil.
			f := func(req *http.Request, resp *http.Response) {
				if resp == nil {
					return
				}
				fn(req, resp)
			}

			if retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
				return false
			}

			f(req, resp)
			return true
		}()
		if done {
			return retry.WrapPreviousError(err)
		}
	}
}

请求发送之前需要根据请求参数生成请求的 RESTful URL,由 r.URL.String 函数完成。例如,在 RESTClient Example 代码示例中,根据请求参数生成请求的 RESTful URL 为 http://127.0.0.1:8080/api/v1/namespaces/default/pods?limit=500,其中 api 参数为 v1,namespace 参数为 default,请求的资源为 pods,limit 参数表示最多检索出 500 条信息。

最后通过 Go 语言标准库 net/http 向 RESTful URL(即 kube-apiserver)发送请求,请求得到的结构存放在 http.Response 的 Body 对象中,fn 函数(即 transformResponse)将结果转换为资源对象。当函数退出时,会通过 rest.Body.Close 命令进行关闭,防止内存溢出。

<br>

4. ClientSet 客户端

上面介绍了 RESTClient,它虽然可以操作 Kubernetes 的所有资源对象,但是使用起来确实比较复杂,需要配置的参数过于繁琐,如需要指定 Resource 和 Version 等信息,因此,为了更优雅的更方便的与 Kubernetes APIServer 进行交互,则需要进一步的封装。

前面有过介绍,ClientSet 是基于 RESTClient 的封装,同时 ClientSet 是使用预生成的 API 对象与 APIServer 进行交互的,这样做更方便进行二次开发。

ClientSet 在 RESTClient 的基础上封装了对 Resource 和 Version 的管理方法。负责操作 Kubernetes 内置的资源对象,例如:Pod、Service 等。每一个 Resource 可以理解为一个客户端,而 ClientSet 则是多个客户端的集合,每一个 Resource 和 Version 都以函数的方式暴露给开发者。ClientSet 是一组资源对象客户端的集合,例如负责操作 Pods、Services 等资源的 CoreV1Client,负责操作 Deployments、DaemonSets 等资源的 AppsV1Client 等。通过这些资源对象客户端提供的操作方法,即可对 Kubernetes 内置的资源对象进行 Create、Update、Get、List、Delete 等操作。

运行以下代码,将得到 default 下的 Pod 部分相关资源信息。

package main

import (
    "context"
    "fmt"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
)

/*
  @Author : lanyulei
  @Desc :
*/

func main() {
    // 加载配置文件,生成 config 对象
    config, err := clientcmd.BuildConfigFromFlags("", "../../kubeconfig")
    if err != nil {
        panic(err.Error())
    }

    // 实例化 ClientSet
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err.Error())
    }

    // 查询 default 下的 pods 部门资源信息
    pods, err := clientset.
        CoreV1().                                  // 实例化资源客户端,这里标识实例化 CoreV1Client
        Pods("default").                           // 选择 namespace,为空则表示所有 Namespace
	List(context.TODO(), metav1.ListOptions{limit:500}) // 查询 pods 列表
    if err != nil {
        panic(err.Error())
    }

    // 输出 Pods 资源信息
    for _, item := range pods.Items {
        fmt.Printf("namespace: %v, name: %v\n", item.Namespace, item.Name)
    }
}

运行以上代码,列出 default 命名空间下的所有 pod 资源对象的相关信息。首先加载 kubeconfig 配置信息,kubernetes.NewForConfig 通过 kubeconfig 配置信息实例化 clientset 对象,该对象用于管理所有 Resource 的客户端。

clientset.CoreV1().Pods 函数表示请求 core 核心资源组的 v1 资源版本下的 Pod 资源对象,其内部设置了 APIPath 请求的 HTTP 路径,GroupVersion 请求的资源组、资源版本,NegotiatedSerializer 数据的编解码器。

其中, Pods 函数时一个资源接口对象,用于 Pod 资源对象的管理,例如,对 Pod 资源执行 Create、Update、Delete、Get、List、Watch、Patch 等操作,这些操作实际上是对 RESTClient 进行了封装,可以设置选项(如 Limit、TimeoutSeconds 等)。List 函数通过 RESTClient 获得 Pod 列表,代码如下:

代码路径:vendor\k8s.io\client-go\kubernetes\typed\core\v1\pod.go

// List takes label and field selectors, and returns the list of Pods that match those selectors.
func (c *pods) List(ctx context.Context, opts metav1.ListOptions) (result *v1.PodList, err error) {
	var timeout time.Duration
	if opts.TimeoutSeconds != nil {
		timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
	}
	result = &v1.PodList{}
	err = c.client.Get().
		Namespace(c.ns).
		Resource("pods").
		VersionedParams(&opts, scheme.ParameterCodec).
		Timeout(timeout).
		Do(ctx).
		Into(result)
	return
}

<br>

5. DynamicClient 客户端

DynamicClient 是一种动态客户端,通过动态指定资源组、资源版本和资源等信息,来操作任意的 Kubernetes 资源对象的一种客户端。即不仅仅是操作 Kubernetes 内置的资源对象,还可以操作 CRD。这也是与 ClientSet 最明显的一个区别。

使用 ClientSet 的时候,程序会将所用的版本与类型紧密耦合。而 DynamicClient 使用嵌套的 map[string]interface{} 结构存储 Kubernetes APIServer 的返回值,使用反射机制,在运行的时候,进行数据绑定,这种方式更加灵活,但是却无法获取强数据类型的检查和验证。

此外,在介绍 DynamicClient 之前,还需要了解另外两个重要的知识点,Object.runtime 接口和 Unstructured 结构体。

Object.runtime:Kubernetes 中的所有资源对象,都实现了这个接口,其中包含 DeepCopyObject 和 GetObjectKind 的方法,分别用于对象深拷贝和获取对象的具体资源类型。Unstructured:包含 map[string]interface{} 类型字段,在处理无法预知结构的数据时,将数据值存入 interface{} 中,待运行时利用反射判断。该结构体提供了大量的工具方法,便于处理非结构化的数据。这也是 DynamicClient 能够处理 CRD 自定义资源的关键ClientSet 则需要预先实现每种 Resource 和 Version 的操作,其内部的数据都是结构化数据

DynamicClient 的处理过程将 Resource(例如 PodList)转换成 Unstructured 结构类型,Kubernetes 的所有 Resource 都可以转换为该结构类型。处理完成后,再将 Unstructured 转换成 PodList。整个过程类似于 Go 语言的 interface{} 断言转换过程。另外,Unstructured 结构类型是通过 map[string]interface{} 转换的

运行以下代码,将得到 default 下的 Pod 部分相关资源信息。

package main

import (
    "context"
    "fmt"
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/runtime/schema"
    "k8s.io/client-go/dynamic"
    "k8s.io/client-go/tools/clientcmd"
)

/*
  @Author : lanyulei
  @Desc :
*/

func main() {
    // 加载配置文件,生成 config 对象
    config, err := clientcmd.BuildConfigFromFlags("", "../../kubeconfig")
    if err != nil {
        panic(err.Error())
    }

    // 实例化 DynamicClient
    dynamicClient, err := dynamic.NewForConfig(config)
    if err != nil {
        panic(err.Error())
    }

    // 设置要请求的 GVR
    gvr := schema.GroupVersionResource{
        Group:    "",
        Version:  "v1",
        Resource: "pods",
    }

    // 发送请求,并得到返回结果
    unStructData, err := dynamicClient.Resource(gvr).Namespace("default").List(context.TODO(), metav1.ListOptions{})
    if err != nil {
        panic(err.Error())
    }

    // 使用反射将 unStructData 的数据转成对应的结构体类型,例如这是是转成 v1.PodList 类型
    podList := &corev1.PodList{}
    err = runtime.DefaultUnstructuredConverter.FromUnstructured(
        unStructData.UnstructuredContent(),
        podList,
    )
    if err != nil {
        panic(err.Error())
    }

    // 输出 Pods 资源信息
    for _, item := range podList.Items {
        fmt.Printf("namespace: %v, name: %v\n", item.Namespace, item.Name)
    }
}

运行以上代码,列出 default 命名空间下的所有 pod 资源对象的相关信息。首先加载 kubeconfig 配置信息,dynamic.NewForConfig 通过 kubeconfig 配置信息实例化 DynamicClient 对象,该对象用于管理所有 Resource 的客户端。

dynamicClient.Resource(gvr) 函数用于设置请求的资源组、资源版本、资源名称。Namespace 函数用于设置请求的命名空间。List 函数用于获取 Pod 列表。得到的 Pod 列表为 unstructured.UnstructuredList 指针类型,然后通过 runtime.DefaultUnstructuredConverter.FromUnstructured 函数将 unstructured.UnstructuredList 转换为 PodList 类型。

<br>

6. DiscoveryClient 客户端

DiscoveryClient 是发现客户端,它主要用于发现 Kubernetes API Server 所支持的资源组、资源版本、资源信息。Kubernetes API Server 支持很多资源组、资源版本、资源信息,开发者在开发过程中很难记住所有信息,此时可以通过 DiscoveryClient 查看所支持的资源组、资源版本、资源信息

前面咱们介绍了 3 种客户端,都是针对与资源对象管理的。而 DiscoveryClient 则是针对于资源的。用于查看当前 Kubernetes 集群支持那些资源组、资源版本、资源信息。

DiscoveryClient 除了可以发现 Kubernetes API Server 所支持的资源组、资源版本、资源信息,还可以将这些信息存储到本地,用于本地缓存(Cache),以减轻对 Kubernetes API Server 访问的压力。在运行 Kubernetes 组件的机器上,缓存信息默认存储于 ~/.kube/cache~/.kube/http-cache 中。

运行以下代码,将得到 Kubernetes 集群的资源列表。

package main

import (
    "fmt"
    "k8s.io/apimachinery/pkg/runtime/schema"
    "k8s.io/client-go/discovery"
    "k8s.io/client-go/tools/clientcmd"
)

/*
  @Author : lanyulei
  @Desc :
*/

func main() {
    // 加载配置文件,生成 config 对象
    config, err := clientcmd.BuildConfigFromFlags("", "../../kubeconfig")
    if err != nil {
        panic(err.Error())
    }

    // 实例化 DiscoveryClient
    discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
    if err != nil {
        panic(err.Error())
    }

    _, apiResources, err := discoveryClient.ServerGroupsAndResources()
    if err != nil {
        panic(err.Error())
    }

    for _, list := range apiResources {
        gv, err := schema.ParseGroupVersion(list.GroupVersion)
        if err != nil {
            panic(err.Error())
        }
        for _, resource := range list.APIResources {
            fmt.Printf("name: %v, group: %v, version: %v\n", resource.Name, gv.Group, gv.Version)
        }
    }
}

输出结果:

➜  demo1 go run main.go
name: bindings, group: , version: v1
name: componentstatuses, group: , version: v1
name: configmaps, group: , version: v1
name: endpoints, group: , version: v1
name: events, group: , version: v1
...

1.获取 Kubernetes API Server 所支持的资源组、资源版本、资源信息

Kubernetes API Server 暴露出 /api/apis 接口。DiscoveryClient 通过 RESTClient 分别请求 /api/apis 接口,从而获取 Kubernetes API Server 所支持的资源组、资源版本、资源信息。其核心实现位于 ServerGroupAndResources -> ServerGroups 中,代码示例如下:

代码路径:vendor\k8s.io\client-go\discovery\discovery_client.go

// ServerGroups returns the supported groups, with information like supported versions and the
// preferred version.
func (d *DiscoveryClient) ServerGroups() (apiGroupList *metav1.APIGroupList, err error) {
	// Get the groupVersions exposed at /api
	v := &metav1.APIVersions{}
	err = d.restClient.Get().AbsPath(d.LegacyPrefix).Do(context.TODO()).Into(v)
	apiGroup := metav1.APIGroup{}
	if err == nil && len(v.Versions) != 0 {
		apiGroup = apiVersionsToAPIGroup(v)
	}
	if err != nil && !errors.IsNotFound(err) && !errors.IsForbidden(err) {
		return nil, err
	}

	// Get the groupVersions exposed at /apis
	apiGroupList = &metav1.APIGroupList{}
	err = d.restClient.Get().AbsPath("/apis").Do(context.TODO()).Into(apiGroupList)
	if err != nil && !errors.IsNotFound(err) && !errors.IsForbidden(err) {
		return nil, err
	}
	// to be compatible with a v1.0 server, if it's a 403 or 404, ignore and return whatever we got from /api
	if err != nil && (errors.IsNotFound(err) || errors.IsForbidden(err)) {
		apiGroupList = &metav1.APIGroupList{}
	}

	// prepend the group retrieved from /api to the list if not empty
	if len(v.Versions) != 0 {
		apiGroupList.Groups = append([]metav1.APIGroup{apiGroup}, apiGroupList.Groups...)
	}
	return apiGroupList, nil
}

首先 DiscoveryClient 通过 RESTClient 分别请求 /api 接口,将请求结果放于 metav1.APIVersions 结构体中。然后再次通过 RESTClient 请求 /apis 接口,将请求结果存放于 metav1.APIGroupList 结构体中。最后,将 /api 接口中检索到的资源组信息合并到 apiGroupList 列表中并返回。

2.本地缓存的 DiscoveryClient

DiscoveryClient 可以将资源相关信息存储到本地,默认存储位置为~/.kube/cache~/.kube/http-cache 中。存储可以减轻 client-go 对 Kubernetes API Server 的访问压力。默认每 10 分钟与 Kubernetes API Server 同步一次,同步周期较长,因为资源组、资源版本、资源信息一般很少变动

DiscoveryClient 第一次获取资源组、资源版本、资源信息时,首先会查询本地缓存,如果数据不存在(没有命中)则请求 Kubernetes API Server 接口(回源),Cache 将 Kubernetes API Server 响应的数据存储在本地一份并返回给 DiscoveryClient。当下一次 DiscoveryClient 再次获取资源信息时,会将数据直接从本地缓存返回(命中)给 DiscoveryClient。本地缓存的默认存储周期为 10 分钟

代码路径:vendor\k8s.io\client-go\discovery\cached\disk\cached_discovery.go

// ServerResourcesForGroupVersion returns the supported resources for a group and version.
func (d *CachedDiscoveryClient) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
	filename := filepath.Join(d.cacheDirectory, groupVersion, "serverresources.json")
	cachedBytes, err := d.getCachedFile(filename)
	// don't fail on errors, we either don't have a file or won't be able to run the cached check. Either way we can fallback.
	if err == nil {
		cachedResources := &metav1.APIResourceList{}
		if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), cachedBytes, cachedResources); err == nil {
			klog.V(10).Infof("returning cached discovery info from %v", filename)
			return cachedResources, nil
		}
	}

	liveResources, err := d.delegate.ServerResourcesForGroupVersion(groupVersion)
	if err != nil {
		klog.V(3).Infof("skipped caching discovery info due to %v", err)
		return liveResources, err
	}
	if liveResources == nil || len(liveResources.APIResources) == 0 {
		klog.V(3).Infof("skipped caching discovery info, no resources found")
		return liveResources, err
	}

	if err := d.writeCachedFile(filename, liveResources); err != nil {
		klog.V(1).Infof("failed to write cache to %v due to %v", filename, err)
	}

	return liveResources, nil
}

Kubernetes源码阅读 文章被收录于专栏

Kubernetes源码阅读

全部评论

相关推荐

团子 行业运营 n*15.5
点赞 评论 收藏
转发
点赞 收藏 评论
分享
牛客网
牛客企业服务