Velero 控制器流程-01

举报
kaliarch 发表于 2022/04/04 20:29:10 2022/04/04
【摘要】 Velero 控制器流程 一 控制器通用在 velero 所有的业务逻辑控制器中都包含通用控制器// velero/pkg/controller/generic_controller.go// 通用控制器type genericController struct { name string //通用控制器名称 queue ...

Velero 控制器流程

一 控制器通用

在 velero 所有的业务逻辑控制器中都包含通用控制器

// velero/pkg/controller/generic_controller.go


// 通用控制器
type genericController struct {
	name             string														//通用控制器名称
	queue            workqueue.RateLimitingInterface	// 限速队列
	logger           logrus.FieldLogger								// 日志
	syncHandler      func(key string) error						// 同步函数
	resyncFunc       func()														// 重新同步方法
	resyncPeriod     time.Duration										// 重新同步周期
	cacheSyncWaiters []cache.InformerSynced						// cacheSync
}

// 

1.1 Run方法

在控制器中定义有控制器实现的接口

// velero/pkg/controller/interface.go 

// 代表一个可运行的组建
type Interface interface {
	// Run runs the component.
	Run(ctx context.Context, workers int) error
}

通用接口具体 Run 方法实现

// velero/pkg/controller/generic_controller.go

func (c *genericController) Run(ctx context.Context, numWorkers int) error {
  // 必须提供syncHandler和resyncFunc
	if c.syncHandler == nil && c.resyncFunc == nil {
		// programmer error
		panic("at least one of syncHandler or resyncFunc is required")
	}

	var wg sync.WaitGroup
	// 最后处理
	defer func() {
		c.logger.Info("Waiting for workers to finish their work")
		// 关闭队列
		c.queue.ShutDown()

		// We have to wait here in the deferred function instead of at the bottom of the function body
		// because we have to shut down the queue in order for the workers to shut down gracefully, and
		// we want to shut down the queue via defer and not at the end of the body.
    // 等待所有的 worker waitgroup完成
		wg.Wait()

		c.logger.Info("All workers have finished")

	}()

	c.logger.Info("Starting controller")
	defer c.logger.Info("Shutting down controller")

	// only want to log about cache sync waiters if there are any
  // 如果存在缓存syncwaiter,等待informer sync完成
	if len(c.cacheSyncWaiters) > 0 {
		c.logger.Info("Waiting for caches to sync")
    // 等待sync 完成
		if !cache.WaitForCacheSync(ctx.Done(), c.cacheSyncWaiters...) {
			return errors.New("timed out waiting for caches to sync")
		}
		c.logger.Info("Caches are synced")
	}
	// 开始执行syncHandler
	if c.syncHandler != nil {
		wg.Add(numWorkers)
		for i := 0; i < numWorkers; i++ {
			go func() {
        // 运行runWorker
				wait.Until(c.runWorker, time.Second, ctx.Done())
				wg.Done()
			}()
		}
	}

  // 写成运行resync
	if c.resyncFunc != nil {
		if c.resyncPeriod == 0 {
			// Programmer error
			panic("non-zero resyncPeriod is required")
		}

		wg.Add(1)
		go func() {
      
			wait.Until(c.resyncFunc, c.resyncPeriod, ctx.Done())
			wg.Done()
		}()
	}

	<-ctx.Done()

	return nil
}

1.2 runWorker

runWorker 方法就是不断运行 processNextWorkItem。

func (c *genericController) runWorker() {
	// continually take items off the queue (waits if it's
	// empty) until we get a shutdown signal from the queue
	for c.processNextWorkItem() {
	}
}

1.3 processNextWorkItem

processNextWorkItem 方法,中主要是从限速队列获取元素,并调用用户实现的 syncHandler 业务逻辑方法对元素进行处理,如果处理错误则再次延迟添加至队列,等待下一次处理。

func (c *genericController) processNextWorkItem() bool {
  // 从workqueue获取元素
	key, quit := c.queue.Get()
  // 如果shutdown,则直接返回false
	if quit {
		return false
	}

  // 最后标记改元素以及处理完成
	defer c.queue.Done(key)
	// 通过syncHandler 处理具体业务逻辑
	err := c.syncHandler(key.(string))
	if err == nil {
		// 标记改元素处理完成
		c.queue.Forget(key)
		return true
	}

	c.logger.WithError(err).WithField("key", key).Error("Error in syncHandler, re-adding item to queue")
	// 再次延迟添加至队列中
	c.queue.AddRateLimited(key)

	return true
}

// 利用MetaNamespaceKeyFunc 获取元素namespace/resourcename 添加进队列
func (c *genericController) enqueue(obj interface{}) {
	key, err := cache.MetaNamespaceKeyFunc(obj)
	if err != nil {
		c.logger.WithError(errors.WithStack(err)).
			Error("Error creating queue key, item not added to queue")
		return
	}

	c.queue.Add(key)
}

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。