Velero 控制器流程-01
【摘要】 Velero 控制器流程 一 控制器通用在 velero 所有的业务逻辑控制器中都包含通用控制器// velero/pkg/controller/generic_controller.go// 通用控制器type genericController struct { name string //通用控制器名称 queue ...
Velero 控制器流程
一 控制器通用
在 velero 所有的业务逻辑控制器中都包含通用控制器
// velero/pkg/controller/generic_controller.go
// 通用控制器
type genericController struct {
name string //通用控制器名称
queue workqueue.RateLimitingInterface // 限速队列
logger logrus.FieldLogger // 日志
syncHandler func(key string) error // 同步函数
resyncFunc func() // 重新同步方法
resyncPeriod time.Duration // 重新同步周期
cacheSyncWaiters []cache.InformerSynced // cacheSync
}
//
1.1 Run方法
在控制器中定义有控制器实现的接口
// velero/pkg/controller/interface.go
// 代表一个可运行的组建
type Interface interface {
// Run runs the component.
Run(ctx context.Context, workers int) error
}
通用接口具体 Run 方法实现
// velero/pkg/controller/generic_controller.go
func (c *genericController) Run(ctx context.Context, numWorkers int) error {
// 必须提供syncHandler和resyncFunc
if c.syncHandler == nil && c.resyncFunc == nil {
// programmer error
panic("at least one of syncHandler or resyncFunc is required")
}
var wg sync.WaitGroup
// 最后处理
defer func() {
c.logger.Info("Waiting for workers to finish their work")
// 关闭队列
c.queue.ShutDown()
// We have to wait here in the deferred function instead of at the bottom of the function body
// because we have to shut down the queue in order for the workers to shut down gracefully, and
// we want to shut down the queue via defer and not at the end of the body.
// 等待所有的 worker waitgroup完成
wg.Wait()
c.logger.Info("All workers have finished")
}()
c.logger.Info("Starting controller")
defer c.logger.Info("Shutting down controller")
// only want to log about cache sync waiters if there are any
// 如果存在缓存syncwaiter,等待informer sync完成
if len(c.cacheSyncWaiters) > 0 {
c.logger.Info("Waiting for caches to sync")
// 等待sync 完成
if !cache.WaitForCacheSync(ctx.Done(), c.cacheSyncWaiters...) {
return errors.New("timed out waiting for caches to sync")
}
c.logger.Info("Caches are synced")
}
// 开始执行syncHandler
if c.syncHandler != nil {
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go func() {
// 运行runWorker
wait.Until(c.runWorker, time.Second, ctx.Done())
wg.Done()
}()
}
}
// 写成运行resync
if c.resyncFunc != nil {
if c.resyncPeriod == 0 {
// Programmer error
panic("non-zero resyncPeriod is required")
}
wg.Add(1)
go func() {
wait.Until(c.resyncFunc, c.resyncPeriod, ctx.Done())
wg.Done()
}()
}
<-ctx.Done()
return nil
}
1.2 runWorker
runWorker 方法就是不断运行 processNextWorkItem。
func (c *genericController) runWorker() {
// continually take items off the queue (waits if it's
// empty) until we get a shutdown signal from the queue
for c.processNextWorkItem() {
}
}
1.3 processNextWorkItem
processNextWorkItem 方法,中主要是从限速队列获取元素,并调用用户实现的 syncHandler 业务逻辑方法对元素进行处理,如果处理错误则再次延迟添加至队列,等待下一次处理。
func (c *genericController) processNextWorkItem() bool {
// 从workqueue获取元素
key, quit := c.queue.Get()
// 如果shutdown,则直接返回false
if quit {
return false
}
// 最后标记改元素以及处理完成
defer c.queue.Done(key)
// 通过syncHandler 处理具体业务逻辑
err := c.syncHandler(key.(string))
if err == nil {
// 标记改元素处理完成
c.queue.Forget(key)
return true
}
c.logger.WithError(err).WithField("key", key).Error("Error in syncHandler, re-adding item to queue")
// 再次延迟添加至队列中
c.queue.AddRateLimited(key)
return true
}
// 利用MetaNamespaceKeyFunc 获取元素namespace/resourcename 添加进队列
func (c *genericController) enqueue(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
c.logger.WithError(errors.WithStack(err)).
Error("Error creating queue key, item not added to queue")
return
}
c.queue.Add(key)
}
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)