Inside Kubernetes controller-02

举报
kaliarch 发表于 2022/04/04 20:32:02 2022/04/04
【摘要】 二 低层架构-client-go 2.1 InformerInformer watches 对象事件(added,updated,deleted)当控制器每次向api服务器查询对象状态时,监视对象更改,api服务器高负载。informer watch api,通过controller设置关联cache。func main() { ... clientset, err := kubernet...

二 低层架构-client-go

2.1 Informer

Informer watches 对象事件(added,updated,deleted)

当控制器每次向api服务器查询对象状态时,监视对象更改,api服务器高负载。

informer watch api,通过controller设置关联cache。

func main() {
 ...
 clientset, err := kubernetes.NewForConfig(config)
 // Create InformerFactory
 informerFactory := informers.NewSharedInformerFactory(clientset, time.Second*30)
 // Create pod informer by informerFactory
 podInformer := informerFactory.Core().V1().Pods()
 // Add EventHandler to informer
 podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
 AddFunc: func(new interface{}) { log.Println("Added") },
 UpdateFunc: func(old, new interface{}) { log.Println("Updated") },
 DeleteFunc: func(old interface{}) { log.Println("Deleted") },
 })
 // Start Go routines
 informerFactory.Start(wait.NeverStop)
 // Wait until finish caching with List API
 informerFactory.WaitForCacheSync(wait.NeverStop)
 // Create Pod Lister
 podLister := podInformer.Lister()
 // Get List of pods
 _, err = podLister.List(labels.Nothing())}

2.1.1 Shared Informer

当我们使用Informer,我们不直接使用informer自己,而是使用共享informer。

共享informer 共享相同的资源在一个单独的二进制文件中。

2.1.2 Informer 和workqueue

  1. Reflector list & watch api-server
  2. Reflector将对象添加到Delta FIFO 队列中
  3. informer 从Delta FIFO中弹出对象
  4. 将对象添加到Indexer
  5. Indexer 存储对象到安全线程存储中
  6. informer 设置事件reference
  7. 用户自定义的Controller,事件Handlers 将对象添加至workqueue,
  8. 进程获取key,process item
  9. 获取对象的key从indexer reference中。

2.1.3 Informer 详解

Informer 整体架构

  1. reflector 开始listAndWatch api对象中的用户创建的心资源pod对象。

  1. informer 从DeltaFIFO 队列中Pop初事件。

  1. HandleDelta

  1. indexer.Add 增加事件到内存存储中

  1. 开始循环弹出事件并进行处理

  1. 后期get or list 请求都从indexer中获取。

Informer 和其组件

  • Informer:watch 一个对象事件并且存储在内存缓存中
  • Reflector:listAndWatch api-server
  • DeltaFIFO:FIFO 队列存储对象临时
  • Indexer:设置或者获取对象
  • Store:存储对象
  • Lister:获取对象在内存通过index

2.2 WorkQueue

WorkQueue不同于DeltaFIFO队列,

WorkQueue是使用存储Contrl Loop 的item。

调谐将执行WorkQWueue中的内容。

感觉的控制存储enqueues item 到工作队列中,当事件发生时。

2.2.1 WorkQueue 简单代码

func main() {
 ...
 clientset, err := kubernetes.NewForConfig(config)
 // Create InformerFactory
 informerFactory := informers.NewSharedInformerFactory(clientset, time.Second*30)
 // Create pod informer by informerFactory
 podInformer := informerFactory.Core().V1().Pods()
 // Create RateLimitQueue
 queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
 // shutdown when process ends
 defer queue.ShutDown()
 // Add EventHandler to informer
 podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
 AddFunc: func(old interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(old); err != nil {
runtime.HandleError(err)
return
}
queue.Add(key)
log.Println("Added: " + key)
},
 UpdateFunc: func(old, new interface{}) {},
 DeleteFunc: func(old interface{}) {},
 })}

2.2.2 WorkQueue 详解

整体示意图

  1. Reflector watch k8s-api 将其中资源发送到DeltaFIFO中

  1. Informer EventHandler 从DeltaFIFO中pop出数据。

  1. Informer EventHandler 调用资源事件函数,例如:AddFunc、UpdateFunc、DeleteFunc来将事件对象放进WorkQueue中。

注意:再次添加的key为资源的唯一标示,后期使用的时候从index中之间获取,从而减轻api-server 的压力。

  1. workqueue.Add 进入WorkQueue中,例如namespace/name、default/nginx

循环3-4,处理事件放入队列中。

  1. 用户自定义的controller 逻辑从WorkQueue中获取对象进行处理。

2.2.3 Informer Resync Period

Informer 重新同步周期

重新同步周期对于Informer是可选的。

Informer监视api服务器上的对象事件。

再同步期过后,无论发生什么事件,UpdateFunc被回调。结果,调谐将再次执行。

2.3 Controller Cycle Main Logic

完整示意图:

2.3.1 Controller Cycle

  1. 假设用户创建了两个pod,生成了两个update 事件

  1. informer 执行UpdateFunc

  1. Informer 调用workQueue.Add 函数 增加事件到WorkQueue中。

  1. 程序处理调用workqueue.Get 获取事件

  1. 调谐处理具体更新业务逻辑。

  1. 当调谐成功后,处理程序进行workQueue.Forget/workQueue.Done,这个item从workQueue中移除。

当调谐是错误结束后,workQueue 增加速率限制,并且控制器重新入队item,并且开始新一轮调谐。

每次事件发生时,项都会继续存储在工作队列中,控制器处理工作队列中的项并执行协调。该循环持续不断,直到控制器停止。

2.3.2 控制器基本策略

从内存中读取,写向api-server。

但是,如果我们直接更新缓存中的对象,很难保证其一致性。

所以,我们在更新对象时使用DeepCopy(获取克隆数据)。

例如代码:kubernetes/pkg/controller/replicaset/replica_set.go

rs = rs.DeepCopy()
newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
// Always updates status as pods come up or die.
updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)

2.3.3 控制器主逻辑

  • worker:无限循环 processNextWorkItem。
  • processNet WorkItem:操作WorkQueue(Get, Add)并且调用调谐逻辑。
  • syncHandler:这就是调谐具体逻辑。

2.3.4 ReplicaSet Controller 源码

基于K8s v1.16

其他

Informer 从etcd 同步对象数据到内存中。

再次需要考虑内存数据是否和etcd数据不相同呢。

这是没有问题的,对象有resourceVersion,如果etcd的resourceVersion 于内存缓存中的resourceVersion不同,Controller在更新对象状态时出错,控制器请求调谐,直到迁移完成。

review

  • Informer:通过eventandler将Control Loop的项添加到WorkQueue中
  • Lister:通过Indexer从内存缓存中获取对象数据
  • WorkQueue:存储控制循环项的队列。该项是Reconcile Logic的目标。如果在调解结束时发生错误,控制器将项请求到工作队列。控制器再次执行Reconcile。
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。