client-go开发

资料来源

kubernetes 中 informer 的使用-腾讯云开发者社区-腾讯云 (tencent.com)

深入了解 K8s 扩展神器 client-go 的详细用法 - 知乎 (zhihu.com)

简介

当使用Kubernetes进行应用程序的开发和部署时,client-go是一个非常重要的工具。它是Kubernetes的官方客户端库,提供了与Kubernetes ApiServer进行通信的接口实现

client-go主要提供了以下几个功能

  1. 与Kubernetes ApiServer进行通信:client-go提供了与Kubernetes ApiServer进行通信的接口和实现,包括基本的HTTP请求和更深层次的封装。开发人员可以使用client-go创建
  2. 访问Kubernetes ApiServer中的资源:client-go提供了访问Kubernetes ApiServer中资源的方法,包括使用ClientSet进行基于对象的访问和使用DynamicClient进行基于无类型的访问
  3. 处理Kubernetes资源事件:client-go提供了一种称为Informer的机制,它可以监听Kubernetes ApiServer中资源变更事件。开发人员可以使用Informer实现资源的快速检索和本地缓存,从而减轻对ApiServer的访问压力。
  4. 发现Kubernetes ApiServer中的资源:client-go还提供了DiscoveryClient接口,该接口可用域在Kubernetes ApiServer中查找特定资源的详细信息。

总的来说,client-go 是 Kubernetes 开发人员不可或缺的工具之一。它提供了丰富的功能和灵活的接口,使开发人员能够更轻松地构建和管理 Kubernetes 应用程序。

基础代码

可以通过kubectl api-resources来查看在哪个api资源之中

go.sum

module awesomeProject1

go 1.20

require (
    k8s.io/apimachinery v0.27.3
    k8s.io/client-go v0.27.3
)

require (
    github.com/davecgh/go-spew v1.1.1 // indirect
    github.com/emicklei/go-restful/v3 v3.9.0 // indirect
    github.com/go-logr/logr v1.2.3 // indirect
    github.com/go-openapi/jsonpointer v0.19.6 // indirect
    github.com/go-openapi/jsonreference v0.20.1 // indirect
    github.com/go-openapi/swag v0.22.3 // indirect
    github.com/gogo/protobuf v1.3.2 // indirect
    github.com/golang/protobuf v1.5.3 // indirect
    github.com/google/gnostic v0.5.7-v3refs // indirect
    github.com/google/go-cmp v0.5.9 // indirect
    github.com/google/gofuzz v1.1.0 // indirect
    github.com/google/uuid v1.3.0 // indirect
    github.com/imdario/mergo v0.3.6 // indirect
    github.com/josharian/intern v1.0.0 // indirect
    github.com/json-iterator/go v1.1.12 // indirect
    github.com/mailru/easyjson v0.7.7 // indirect
    github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
    github.com/modern-go/reflect2 v1.0.2 // indirect
    github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
    github.com/spf13/pflag v1.0.5 // indirect
    golang.org/x/net v0.8.0 // indirect
    golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b // indirect
    golang.org/x/sys v0.6.0 // indirect
    golang.org/x/term v0.6.0 // indirect
    golang.org/x/text v0.8.0 // indirect
    golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
    google.golang.org/appengine v1.6.7 // indirect
    google.golang.org/protobuf v1.28.1 // indirect
    gopkg.in/inf.v0 v0.9.1 // indirect
    gopkg.in/yaml.v2 v2.4.0 // indirect
    gopkg.in/yaml.v3 v3.0.1 // indirect
    k8s.io/api v0.27.3 // indirect
    k8s.io/klog/v2 v2.90.1 // indirect
    k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f // indirect
    k8s.io/utils v0.0.0-20230209194617-a36077c30491 // indirect
    sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
    sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
    sigs.k8s.io/yaml v1.3.0 // indirect
)

查看信息

package main

import (
    "context"
    "fmt"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
    "log"
)

func list() {
    //windows默认在user目录下
    configPath := "C:\\Users\\admin7\\.kube\\config"
    config, err := clientcmd.BuildConfigFromFlags("", configPath)
    if err != nil {
        log.Fatal(err)
    }
    clientTest, err := kubernetes.NewForConfig(config)
    if err != nil {
        log.Fatal(err)
    }
    //获取所有的node,第一个是上下文对象,
    nodeList, err := clientTest.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println("node")
    //获取item也就是node信息,可以输出节点的各种信息
    for _, node := range nodeList.Items {
        fmt.Println("name:" + node.Name)
        fmt.Println(node.Status.NodeInfo)
        fmt.Println("name:" + node.Status.Capacity.Cpu().String())
    }
    //获取所有namespace
    fmt.Println("namespace")
    nameSpaceList, err := clientTest.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{})
    for _, v := range nameSpaceList.Items {
        fmt.Println(v.Name, v.CreationTimestamp, v.Status.Phase)
    }
    fmt.Println("service")
    //service传入命名空间,可以传default不传就是所有
    serviceList, err := clientTest.CoreV1().Services("").List(context.TODO(), metav1.ListOptions{})
    for _, v := range serviceList.Items {
        //[{https TCP <nil> 443 {0 6443 } 0}] 例如这个就是443暴露到了外部0,6443是内部映射端口
        fmt.Println(v.Name, v.CreationTimestamp, v.Namespace, v.Spec.ClusterIP, v.Spec.Ports)
    }
    fmt.Println("deployment")
    deploymentList, err := clientTest.AppsV1().Deployments("").List(context.TODO(), metav1.ListOptions{})
    for _, v := range deploymentList.Items {
        fmt.Println(v.Name, v.CreationTimestamp, v.Namespace, v.Spec.Replicas, v.Status.Replicas)
    }
}

增删改查资源

kube.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: my-nginx-app
spec:
  replicas: 3
  selector:
    matchLabels:
      app: my-nginx-app
  template:
    metadata:
      labels:
        app: my-nginx-app
    spec:
      containers:
        - name: nginx
          image: nginx:1.7.9
          ports:
            - containerPort: 80

kubectl apply -f .kube.yaml

deploy的增删改查

main.go

package main

import (
    "context"
    "fmt"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
    "log"
)

func main() {
    //windows默认在user目录下
    configPath := "C:\\Users\\admin7\\.kube\\config"
    config, err := clientcmd.BuildConfigFromFlags("", configPath)
    if err != nil {
        log.Fatal(err)
    }
    clientTest, err := kubernetes.NewForConfig(config)
    if err != nil {
        log.Fatal(err)
    }
    namespace := "default"
    name := "my-nginx-app"
    deploy, err := clientTest.AppsV1().Deployments(namespace).Get(context.TODO(), name, metav1.GetOptions{})
    fmt.Println(err, deploy) //输出全部信息

    //这样就能把副本数变成1个,可以通过 kubectl get deployment -o wide
    var replicas int32 = 1
    deploy.Spec.Replicas = &replicas
    //修改容器镜像版本
    deploy.Spec.Template.Spec.Containers[0].Image = "nginx:1.14"
    deploy, err = clientTest.AppsV1().Deployments(namespace).Update(context.TODO(), deploy, metav1.UpdateOptions{})
    //删除
    clientTest.AppsV1().Deployments(namespace).Delete(context.TODO(), deploy.Name, metav1.DeleteOptions{})
}

详细用法

Client

这里简单介绍其封装好的几个client

加载kubeconfig配置

package detail
import (
    "k8s.io/client-go/discovery"
    "k8s.io/client-go/dynamic"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/clientcmd"
    "log"
)

const kubeConfigFilePath = "C:\\Users\\admin7\\.kube\\config"

type K8sConfig struct {
}

func NewK8sConfig() *K8sConfig {
    return &K8sConfig{}
}
// 读取kubeconfig 配置文件
func (this *K8sConfig) K8sRestConfig() *rest.Config {
    config, err := clientcmd.BuildConfigFromFlags("", kubeConfigFilePath)

    if err != nil {
        log.Fatal(err)
    }

    return config
}
// 初始化 clientSet
func (this *K8sConfig) InitClient() *kubernetes.Clientset {
    c, err := kubernetes.NewForConfig(this.K8sRestConfig())

    if err != nil {
        log.Fatal(err)
    }

    return c
}

// 初始化 dynamicClient
func (this *K8sConfig) InitDynamicClient() dynamic.Interface {
    c, err := dynamic.NewForConfig(this.K8sRestConfig())

    if err != nil {
        log.Fatal(err)
    }

    return c
}

// 初始化 DiscoveryClient
func (this *K8sConfig) InitDiscoveryClient() *discovery.DiscoveryClient {
    return discovery.NewDiscoveryClient(this.InitClient().RESTClient())
}

clientSet

clientSet是一个比较常用的client,常用于对K8s内部资源做CRUD或查询当前集群有什么资源

package main

import (
    "context"
    "fmt"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var ns = "default"

func main() {
    //使用的是上问题到的配置加载对象
    cliset := NewK8sConfig().InitClient()
    configMaps, err := cliset.CoreV1().ConfigMaps(ns).List(context.TODO(), metav1.ListOptions{})
    if err != nil {
        panic(err)
    }
    for _, cm := range configMaps.Items {
        fmt.Printf("configName: %v, configData: %v \n", cm.Name, cm.Data)
    }
}

DynamicClient

DynamicClient也是比较常用的client之一,但频繁度不及CilentSet,它的作用主要是CRD(自定义资源)。当然它也可以用于k8s的内部资源,我们在项目内就用它来开发出可以对任意资源做CRUD的接口。

下面将演示使用 dynamicClient 创建资源

apiVersion: apps/v1
kind: Deployment
metadata:
  name: myngx
  namespace: default
spec:
  selector:
    matchLabels:
      app: myngx
  replicas: 1
  template:
    metadata:
      labels:
        app: myngx
    spec:
      containers:
        - name: myngx-container
          image: nginx:1.18-alpine
          imagePullPolicy: IfNotPresent
          ports:
            - containerPort: 80

使用 DynamicClient 创建测试配置:

package main

import (
    "context"
    _ "embed"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    "k8s.io/apimachinery/pkg/runtime/schema"
    "k8s.io/apimachinery/pkg/util/yaml"
    "log"
)

var ns = "default"

// 这个是新特性使用注释加载配置
//
//go:embed tpls/deployment.yaml
var deployTpl string

func main() {
    //动态客户端
    dynamicCli := NewK8sConfig().InitDynamicClient()
    //可以随意指定集群拥有的资源进行创建
    deployGVR := schema.GroupVersionResource{
        Group:    "apps",
        Version:  "v1",
        Resource: "deployments",
    }
    deployObj := &unstructured.Unstructured{}
    if err := yaml.Unmarshal([]byte(deployTpl), deployObj); err != nil {
        log.Fatalln(err)
    }
    _, err := dynamicCli.Resource(deployGVR).Namespace(ns).Create(context.Background(), deployObj, metav1.CreateOptions{})
    if err != nil {
        log.Fatalln(err)
    }
    log.Println("创建成功")

}

DiscoveryClient

discoveryClient顾名思义就是用于发现K8s资源的,当我们不知道当前集群有什么资源时就会用该客户端封装好的方法进行查询。kubectl api-resource命令就是用它实现的。

package main

import (
    _ "embed"
    "fmt"
)

var ns = "default"

// 这个是新特性使用注释加载配置
//
//go:embed tpls/deployment.yaml
var deployTpl string

func main() {
    client := NewK8sConfig().InitDiscoveryClient()
    preferredResources, _ := client.ServerPreferredResources()
    for _, pr := range preferredResources {
        fmt.Println(pr.String())
    }

    // _, _, _ = client.ServerGroupsAndResources()
}

informer

作用

Informer 是 Kubernetes 中的一种机制,用于监控资源对象的变化并提供相应的通知。它是 Kubernetes 客户端库中的一部分,用于实现对 Kubernetes 资源的观察和事件通知。

informer是client-go中的核心工具包,已经被kubenetes中众多组件使用。所谓informer,其实就是一个带有本地缓存和索引机制的、可以注册Event Hander的client,本地缓存被称为Store,索引被称为Index。使用informer的目的是为了减轻apiServer数据交互的压力而抽象出来的一个cache层,客户端对apiserver数据的读取和监听操作都通过本地informer进行。informer实例的Lister()方法可以直接查找缓存在本地内存中的数据。

informer的主要功能:

  • 同步数据到本地缓存
  • 根据对应的事件类型,触发实现注册好的ResourceEventHandler

Informer 中主要有 Reflector、Delta FIFO Queue、Local Store、WorkQueue 几个组件。以下是 Informer 的工作流程图。

根据流程图来解释一下 Informer 中几个组件的作用:

  • Reflector:称之为反射器,实现对 apiserver 指定类型对象的监控(ListAndWatch),其中反射实现的就是把监控的结果实例化成具体的对象,最终也是调用 Kubernetes 的 List/Watch API;
  • DeltaIFIFO Queue:一个增量队列,将 Reflector 监控变化的对象形成一个 FIFO 队列,此处的 Delta 就是变化;
  • LocalStore:就是 informer 的 cache,这里面缓存的是 apiserver 中的对象(其中有一部分可能还在DeltaFIFO 中),此时使用者再查询对象的时候就直接从 cache 中查找,减少了 apiserver 的压力,LocalStore 只会被 Lister 的 List/Get 方法访问。
  • WorkQueue:DeltaIFIFO 收到时间后会先将时间存储在自己的数据结构中,然后直接操作 Store 中存储的数据,更新完 store 后 DeltaIFIFO 会将该事件 pop 到 WorkQueue 中,Controller 收到 WorkQueue 中的事件会根据对应的类型触发对应的回调函数。

工作流程

  • Informer 首先会 list/watch apiserver,Informer 所使用的 Reflector 包负责与 apiserver 建立连接,Reflector 使用 ListAndWatch 的方法,会先从 apiserver 中 list 该资源的所有实例,list 会拿到该对象最新的 resourceVersion,然后使用 watch 方法监听该 resourceVersion 之后的所有变化,若中途出现异常,reflector 则会从断开的 resourceVersion 处重现尝试监听所有变化,一旦该对象的实例有创建、删除、更新动作,Reflector 都会收到”事件通知”,这时,该事件及它对应的 API 对象这个组合,被称为增量(Delta),它会被放进 DeltaFIFO 中。
  • Informer 会不断地从这个 DeltaFIFO 中读取增量,每拿出一个对象,Informer 就会判断这个增量的时间类型,然后创建或更新本地的缓存,也就是 store。
  • 如果事件类型是 Added(添加对象),那么 Informer 会通过 Indexer 的库把这个增量里的 API 对象保存到本地的缓存中,并为它创建索引,若为删除操作,则在本地缓存中删除该对象。
  • DeltaFIFO 再 pop 这个事件到 controller 中,controller 会调用事先注册的 ResourceEventHandler 回调函数进行处理。
  • 在 ResourceEventHandler 回调函数中,其实只是做了一些很简单的过滤,然后将关心变更的 Object 放到 workqueue 里面。
  • Controller 从 workqueue 里面取出 Object,启动一个 worker 来执行自己的业务逻辑,业务逻辑通常是计算目前集群的状态和用户希望达到的状态有多大的区别,然后孜孜不倦地让 apiserver 将状态演化到用户希望达到的状态,比如为 deployment 创建新的 pods,或者是扩容/缩容 deployment。
  • 在worker中就可以使用 lister 来获取 resource,而不用频繁的访问 apiserver,因为 apiserver 中 resource 的变更都会反映到本地的 cache 中。

Informer 在使用时需要先初始化一个 InformerFactory,目前主要推荐使用的是 SharedInformerFactory,Shared 指的是在多个 Informer 中共享一个本地 cache。

Informer 中的 ResourceEventHandler 函数有三种:

// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
// as few of the notification functions as you want while still implementing
// ResourceEventHandler.
type ResourceEventHandlerFuncs struct {
    AddFunc    func(obj interface{})
    UpdateFunc func(oldObj, newObj interface{})
    DeleteFunc func(obj interface{})
}

复制

这三种函数的处理逻辑是用户自定义的,在初始化 controller 时注册完 ResourceEventHandler 后,一旦该对象的实例有创建、删除、更新三中操作后就会触发对应的 ResourceEventHandler。


使用示例

在实际开发中,informer主要应用于

  • 在访问k8s apiserver的客户端作为一个client缓存对象使用
  • 在一些自定义controller中使用,比如operator开发
package main

import (
    "flag"
    "fmt"
    "log"
    "path/filepath"

    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/labels"
    "k8s.io/apimachinery/pkg/util/runtime"

    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/homedir"
)

func main() {
    var kubeconfig *string
    if home := homedir.HomeDir(); home != "" {
        kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
    } else {
        kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
    }
    flag.Parse()

    config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
    if err != nil {
        panic(err)
    }

    // 初始化 client
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        log.Panic(err.Error())
    }

    stopper := make(chan struct{})
    defer close(stopper)
    
    // 初始化 informer
    factory := informers.NewSharedInformerFactory(clientset, 0)
    nodeInformer := factory.Core().V1().Nodes()
    informer := nodeInformer.Informer()
    defer runtime.HandleCrash()
    
    // 启动 informer,list & watch
    go factory.Start(stopper)
    
    // 从 apiserver 同步资源,即 list 
    if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
        runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
        return
    }

    // 使用自定义 handler
    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    onAdd,
        UpdateFunc: func(interface{}, interface{}) { fmt.Println("update not implemented") }, // 此处省略 workqueue 的使用
        DeleteFunc: func(interface{}) { fmt.Println("delete not implemented") },
    })
    
    // 创建 lister
    nodeLister := nodeInformer.Lister()
    // 从 lister 中获取所有 items
    nodeList, err := nodeLister.List(labels.Everything())
    if err != nil {
        fmt.Println(err)
    }
    fmt.Println("nodelist:", nodeList)
    <-stopper
}

func onAdd(obj interface{}) {
    node := obj.(*corev1.Node)
    fmt.Println("add a node:", node.Name)
}

Shared指的是多个 lister 共享同一个cache,而且资源的变化会同时通知到cache和 listers。这个解释和上面图所展示的内容的是一致的,cache我们在Indexer的介绍中已经分析过了,lister 指的就是OnAdd、OnUpdate、OnDelete 这些回调函数背后的对象。

以下是作为controller使用的一个整体工作流程

(1) 创建一个控制器

  • 为控制器创建 workqueue
  • 创建 informer, 为 informer 添加 callback 函数,创建 lister

(2) 启动控制器

  • 启动 informer
  • 等待本地 cache sync 完成后, 启动 workers

(3) 当收到变更事件后,执行 callback

  • 等待事件触发
  • 从事件中获取变更的 Object
  • 做一些必要的检查
  • 生成 object key,一般是 namespace/name 的形式
  • 将 key 放入 workqueue 中

(4) worker loop

  • 等待从 workqueue 中获取到 item,一般为 object key
  • 用 object key 通过 lister 从本地 cache 中获取到真正的 object 对象
  • 做一些检查
  • 执行真正的业务逻辑
  • 处理下一个 item

下面是自定义 controller 使用的一个参考:

var (
    masterURL  string
    kubeconfig string
)

func init() {
    flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
    flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
}

func main() {
    flag.Parse()

    stopCh := signals.SetupSignalHandler()

    cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
    if err != nil {
        glog.Fatalf("Error building kubeconfig: %s", err.Error())
    }

    kubeClient, err := kubernetes.NewForConfig(cfg)
    if err != nil {
        glog.Fatalf("Error building kubernetes clientset: %s", err.Error())
    }

    // 所谓 Informer,其实就是一个带有本地缓存和索引机制的、可以注册 EventHandler 的 client
    // informer watch apiserver,每隔 30 秒 resync 一次(list)
    kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, time.Second*30)

    controller := controller.NewController(kubeClient, kubeInformerFactory.Core().V1().Nodes())

    //  启动 informer
    go kubeInformerFactory.Start(stopCh)

     // start controller 
    if err = controller.Run(2, stopCh); err != nil {
        glog.Fatalf("Error running controller: %s", err.Error())
    }
}


// NewController returns a new network controller
func NewController(
    kubeclientset kubernetes.Interface,
    networkclientset clientset.Interface,
    networkInformer informers.NetworkInformer) *Controller {

    // Create event broadcaster
    // Add sample-controller types to the default Kubernetes Scheme so Events can be
    // logged for sample-controller types.
    utilruntime.Must(networkscheme.AddToScheme(scheme.Scheme))
    glog.V(4).Info("Creating event broadcaster")
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartLogging(glog.Infof)
    eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
    recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

    controller := &Controller{
        kubeclientset:    kubeclientset,
        networkclientset: networkclientset,
        networksLister:   networkInformer.Lister(),
        networksSynced:   networkInformer.Informer().HasSynced,
        workqueue:        workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Networks"),
        recorder:         recorder,
    }

    glog.Info("Setting up event handlers")
    // Set up an event handler for when Network resources change
    networkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: controller.enqueueNetwork,
        UpdateFunc: func(old, new interface{}) {
            oldNetwork := old.(*samplecrdv1.Network)
            newNetwork := new.(*samplecrdv1.Network)
            if oldNetwork.ResourceVersion == newNetwork.ResourceVersion {
                // Periodic resync will send update events for all known Networks.
                // Two different versions of the same Network will always have different RVs.
                return
            }
            controller.enqueueNetwork(new)
        },
        DeleteFunc: controller.enqueueNetworkForDelete,
    })

    return controller
}
Last modification:December 26, 2023
如果觉得我的文章对你有用,请随意赞赏