自定义实现Controller

举报
kaliarch 发表于 2022/02/21 12:57:07 2022/02/21
【摘要】 一 前言通过之前的文章,到目前可以对client-go中的Informer机制有了全局的认识,并针对其中每一个组件功能也有了深入的理解,本文将从一个实际例子,完整实现一个自定义控制器,加深对controller的认识。 二 回顾Informer流程首先在控制器启动时,通过运行Informer的run函数,执行Reflector中的run,全量的list kubernetes中的资源对象到d...

一 前言

通过之前的文章,到目前可以对client-go中的Informer机制有了全局的认识,并针对其中每一个组件功能也有了深入的理解,本文将从一个实际例子,完整实现一个自定义控制器,加深对controller的认识。

二 回顾Informer流程

图片

  • 首先在控制器启动时,通过运行Informer的run函数,执行Reflector中的run,全量的list kubernetes中的资源对象到deltaFIFO中。
  • Reflector 通过 ListAndWatch 首先获取全量的资源对象数据,然后通过 syncWith 函数全量替换(Replace) 到 DeltaFIFO queue/items 中,,如果设置了定时同步,则定时更新Indexer中的内容,之后通过持续监听 Watch(目标资源类型) 增量事件,并去重更新到 DeltaFIFO queue/items 中,等待被消费。
  • sharedIndexInformer的HandleDeltas处理从deltaFIFO pod出来的增量时,先尝试更新到本地缓存cache,更新成功的话会调用processor.distribute方法向processor中的listener添加notification,listener启动之后会不断获取notification回调用户的EventHandler方法。
  • 当用户注册的EventHandler收到事件后,将其添加进workqueue中。
  • 之后运行用户自定义的processNextItem,不断从workqueue中获取对象,并对其进行业务逻辑处理,如果处理发生异常,则重新入队列,如果处理成功,则forget调改对象。

三 控制器逻辑

K8s控制器是一主动的调协过程,watch一些资源对象期望状态,也会watch实际状态,之后通过控制器发送执行动作指令,让对象的当前状态往期望状态逼近。

for {
  desired := getDesiredState()
  current := getCurrentState()
  makeChanges(desired, current)
}

更多的规则,可以去看k8s兴趣小组的控制器指南:https://github.com/kubernetes/community/blob/master/contributors/devel/sig-api-machinery/controllers.md

##四 编程实现

  • 编码
// kubernetes/staging/src/k8s.io/client-go/examples/workqueue

// 定义控制器结构体,包含indexer/queue/informer
type Controller struct {
	indexer  cache.Indexer
  // 使用限速队列
	queue    workqueue.RateLimitingInterface
	informer cache.Controller
}

// 控制器构造方法
func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {
	return &Controller{
		informer: informer,
		indexer:  indexer,
		queue:    queue,
	}
}

// 从限速队列中获取一个执行syncToStdout业务路径进行处理
func (c *Controller) processNextItem() bool {
	// 从队列中获取一个元素进行处理
	key, quit := c.queue.Get()
	if quit {
		return false
	}
  // 告诉队列我们处理完这个键。这为其他worker解锁该key
  // 这允许安全的并行处理,因为具有相同key的两个 pod 永远不会并行处理。
	defer c.queue.Done(key)

	// 具体的业务逻辑方法处理key
	err := c.syncToStdout(key.(string))
	// 错误处理
	c.handleErr(err, key)
	return true
}

// syncToStdout 是控制器的业务逻辑。在这个控制器中,它只是将有关 pod 的信息打印到标准输出。如果发生错误,它必须简单地返回错误。重试逻辑不应该是业务逻辑的一部分。
func (c *Controller) syncToStdout(key string) error {
  // 从刚indexer中获取key
	obj, exists, err := c.indexer.GetByKey(key)
	if err != nil {
		klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
		return err
	}

	if !exists {
		fmt.Printf("Pod %s does not exist anymore\n", key)
	} else {
		// 大家pod名称到控制台
		fmt.Printf("Sync/Add/Update for Pod %s\n", obj.(*v1.Pod).GetName())
	}
	return nil
}

// 检查是否发生错误并确保重试
func (c *Controller) handleErr(err error, key interface{}) {
	if err == nil {

    // 处理成功则Forget该key
		c.queue.Forget(key)
		return
	}

	// 如果出现问题,此控制器会重试 5 次。之后,它停止尝试。
	if c.queue.NumRequeues(key) < 5 {
		klog.Infof("Error syncing pod %v: %v", key, err)

		// 将key 从新入队列
		c.queue.AddRateLimited(key)
		return
	}
  // 如果还是异常,则forget该key
	c.queue.Forget(key)
	// 向外部实体报告,即使经过多次重试,我们也无法成功处理此密钥,丢弃了该key
	runtime.HandleError(err)
	klog.Infof("Dropping pod %q out of the queue: %v", key, err)
}

// controller的run函数
func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
	defer runtime.HandleCrash()

	// 如果发生异常停止队列
	defer c.queue.ShutDown()
	klog.Info("Starting Pod controller")
  // 在goroutine中运行informer.Run()
	go c.informer.Run(stopCh)

	// 等待所有注册的informer都启动完成,开始运行队列
	if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
		runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
		return
	}
  // 启动多个携程运行runWorker
	for i := 0; i < threadiness; i++ {
		go wait.Until(c.runWorker, time.Second, stopCh)
	}

	<-stopCh
	klog.Info("Stopping Pod controller")
}

// 不断的运行processNextItem
func (c *Controller) runWorker() {
	for c.processNextItem() {
	}
}

func main() {
	var kubeconfig string
	var master string

	flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
	flag.StringVar(&master, "master", "", "master url")
	flag.Parse()

	// creates the connection
	config, err := clientcmd.BuildConfigFromFlags(master, kubeconfig)
	if err != nil {
		klog.Fatal(err)
	}

	// 获取clientSet
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		klog.Fatal(err)
	}

	// 创建podListWatcher
	podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())

	// 创建限速队列
	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

  // 创建IndexerInformer,获取indexer和informer
	indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			key, err := cache.MetaNamespaceKeyFunc(obj)
			if err == nil {
				queue.Add(key)
			}
		},
		UpdateFunc: func(old interface{}, new interface{}) {
			key, err := cache.MetaNamespaceKeyFunc(new)
			if err == nil {
				queue.Add(key)
			}
		},
		DeleteFunc: func(obj interface{}) {
			// IndexerInformer uses a delta queue, therefore for deletes we have to use this
			// key function.
			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
			if err == nil {
				queue.Add(key)
			}
		},
	}, cache.Indexers{})
  // 构造controller
	controller := NewController(queue, indexer, informer)

	// 添加哥测试pod
	indexer.Add(&v1.Pod{
		ObjectMeta: meta_v1.ObjectMeta{
			Name:      "mypod",
			Namespace: v1.NamespaceDefault,
		},
	})

	// 启动controller
	stop := make(chan struct{})
	defer close(stop)
	go controller.Run(1, stop)

	// Wait forever
	select {}
}

  • 运行测试
add/update/delete pod:smartkm-api-k8s-exampledeploy-7cddf8cbf4-hn7f9
add/update/delete pod:etcd-1
add/update/delete pod:etcd-2
add/update/delete pod:etcd-0
add/update/delete pod:podname1
add/update/delete pod:smartkm-api-k8s-exampledeploy-7cddf8cbf4-ztv2k
add/update/delete pod:po
add/update/delete pod:po
Pod default/po does not exist anymore

当我们创建一个pod后,可以看到在控制台已经打印出我们的pod名称,删除pod后打印出不存在改对象。

参考链接

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。