Inside Kubernetes controller-02
二 低层架构-client-go
2.1 Informer
Informer watches 对象事件(added,updated,deleted)
当控制器每次向api服务器查询对象状态时,监视对象更改,api服务器高负载。
informer watch api,通过controller设置关联cache。
func main() {
...
clientset, err := kubernetes.NewForConfig(config)
// Create InformerFactory
informerFactory := informers.NewSharedInformerFactory(clientset, time.Second*30)
// Create pod informer by informerFactory
podInformer := informerFactory.Core().V1().Pods()
// Add EventHandler to informer
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(new interface{}) { log.Println("Added") },
UpdateFunc: func(old, new interface{}) { log.Println("Updated") },
DeleteFunc: func(old interface{}) { log.Println("Deleted") },
})
// Start Go routines
informerFactory.Start(wait.NeverStop)
// Wait until finish caching with List API
informerFactory.WaitForCacheSync(wait.NeverStop)
// Create Pod Lister
podLister := podInformer.Lister()
// Get List of pods
_, err = podLister.List(labels.Nothing())
…
}
2.1.1 Shared Informer
当我们使用Informer,我们不直接使用informer自己,而是使用共享informer。
共享informer 共享相同的资源在一个单独的二进制文件中。
2.1.2 Informer 和workqueue
- Reflector list & watch api-server
- Reflector将对象添加到Delta FIFO 队列中
- informer 从Delta FIFO中弹出对象
- 将对象添加到Indexer
- Indexer 存储对象到安全线程存储中
- informer 设置事件reference
- 用户自定义的Controller,事件Handlers 将对象添加至workqueue,
- 进程获取key,process item
- 获取对象的key从indexer reference中。
2.1.3 Informer 详解
Informer 整体架构
- reflector 开始listAndWatch api对象中的用户创建的心资源pod对象。
- informer 从DeltaFIFO 队列中Pop初事件。
- HandleDelta
- indexer.Add 增加事件到内存存储中
- 开始循环弹出事件并进行处理
- 后期get or list 请求都从indexer中获取。
Informer 和其组件
- Informer:watch 一个对象事件并且存储在内存缓存中
- Reflector:listAndWatch api-server
- DeltaFIFO:FIFO 队列存储对象临时
- Indexer:设置或者获取对象
- Store:存储对象
- Lister:获取对象在内存通过index
2.2 WorkQueue
WorkQueue不同于DeltaFIFO队列,
WorkQueue是使用存储Contrl Loop 的item。
调谐将执行WorkQWueue中的内容。
感觉的控制存储enqueues item 到工作队列中,当事件发生时。
2.2.1 WorkQueue 简单代码
func main() {
...
clientset, err := kubernetes.NewForConfig(config)
// Create InformerFactory
informerFactory := informers.NewSharedInformerFactory(clientset, time.Second*30)
// Create pod informer by informerFactory
podInformer := informerFactory.Core().V1().Pods()
// Create RateLimitQueue
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
// shutdown when process ends
defer queue.ShutDown()
// Add EventHandler to informer
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(old interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(old); err != nil {
runtime.HandleError(err)
return
}
queue.Add(key)
log.Println("Added: " + key)
},
UpdateFunc: func(old, new interface{}) { … },
DeleteFunc: func(old interface{}) { … },
})
…
}
2.2.2 WorkQueue 详解
整体示意图
- Reflector watch k8s-api 将其中资源发送到DeltaFIFO中
- Informer EventHandler 从DeltaFIFO中pop出数据。
- Informer EventHandler 调用资源事件函数,例如:AddFunc、UpdateFunc、DeleteFunc来将事件对象放进WorkQueue中。
注意:再次添加的key为资源的唯一标示,后期使用的时候从index中之间获取,从而减轻api-server 的压力。
- workqueue.Add 进入WorkQueue中,例如namespace/name、default/nginx
循环3-4,处理事件放入队列中。
- 用户自定义的controller 逻辑从WorkQueue中获取对象进行处理。
2.2.3 Informer Resync Period
Informer 重新同步周期
重新同步周期对于Informer是可选的。
Informer监视api服务器上的对象事件。
再同步期过后,无论发生什么事件,UpdateFunc被回调。结果,调谐将再次执行。
2.3 Controller Cycle Main Logic
完整示意图:
2.3.1 Controller Cycle
- 假设用户创建了两个pod,生成了两个update 事件
- informer 执行UpdateFunc
- Informer 调用workQueue.Add 函数 增加事件到WorkQueue中。
- 程序处理调用workqueue.Get 获取事件
- 调谐处理具体更新业务逻辑。
- 当调谐成功后,处理程序进行workQueue.Forget/workQueue.Done,这个item从workQueue中移除。
当调谐是错误结束后,workQueue 增加速率限制,并且控制器重新入队item,并且开始新一轮调谐。
每次事件发生时,项都会继续存储在工作队列中,控制器处理工作队列中的项并执行协调。该循环持续不断,直到控制器停止。
2.3.2 控制器基本策略
从内存中读取,写向api-server。
但是,如果我们直接更新缓存中的对象,很难保证其一致性。
所以,我们在更新对象时使用DeepCopy(获取克隆数据)。
例如代码:kubernetes/pkg/controller/replicaset/replica_set.go
rs = rs.DeepCopy()
newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
// Always updates status as pods come up or die.
updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1(). ⇨
ReplicaSets(rs.Namespace), rs, newStatus)
2.3.3 控制器主逻辑
- worker:无限循环 processNextWorkItem。
- processNet WorkItem:操作WorkQueue(Get, Add)并且调用调谐逻辑。
- syncHandler:这就是调谐具体逻辑。
2.3.4 ReplicaSet Controller 源码
基于K8s v1.16
- worker:https://github.com/kubernetes/kubernetes/blob/release-1.16/pkg/controller/replicaset/replica_set.go#L432
- processNextWorkItem:https://github.com/kubernetes/kubernetes/blob/release-1.16/pkg/controller/replicaset/replica_set.go#L437
- syncReplicaSet:https://github.com/kubernetes/kubernetes/blob/release-1.16/pkg/controller/replicaset/replica_set.go#L562
其他
Informer 从etcd 同步对象数据到内存中。
再次需要考虑内存数据是否和etcd数据不相同呢。
这是没有问题的,对象有resourceVersion,如果etcd的resourceVersion 于内存缓存中的resourceVersion不同,Controller在更新对象状态时出错,控制器请求调谐,直到迁移完成。
review
- Informer:通过eventandler将Control Loop的项添加到WorkQueue中
- Lister:通过Indexer从内存缓存中获取对象数据
- WorkQueue:存储控制循环项的队列。该项是Reconcile Logic的目标。如果在调解结束时发生错误,控制器将项请求到工作队列。控制器再次执行Reconcile。
- 点赞
- 收藏
- 关注作者
评论(0)