Client-go源码分析之Informer

举报
kaliarch 发表于 2022/02/21 12:50:09 2022/02/21
【摘要】 一 前言Informer 是 Client-go 中的一个核心工具包,其实就是一个带有本地缓存和索引机制的、可以注册 EventHandler 的 client,本地缓存被称为 Store,索引被称为 Index。Informer 中主要包含 Controller、Reflector、DeltaFIFO、LocalStore、Lister 和 Processor 六个组件,这篇文章主要从C...

一 前言

Informer 是 Client-go 中的一个核心工具包,其实就是一个带有本地缓存和索引机制的、可以注册 EventHandler 的 client,本地缓存被称为 Store,索引被称为 Index。Informer 中主要包含 Controller、Reflector、DeltaFIFO、LocalStore、Lister 和 Processor 六个组件,这篇文章主要从Controller来讲,单独拿Controller来将,注意Informer中的Controller和我们K8s内部传统的controller不是一个概念。

源图地址,有兴趣可以打开原图学习。

二 处理流程

我们今天展开的controller struct,不断从DeltaFIFO中弹出对象,并和内存存储对象Indexer同步,随后调用用户注册的回调函数ResourceEventHandlers。

  • Informer 首先会 list/watch apiserver,Informer 所使用的 Reflector 包负责与 apiserver 建立连接,Reflector 使用 ListAndWatch 的方法,会先从 apiserver 中 list 该资源的所有实例,list 会拿到该对象最新的 resourceVersion,然后使用 watch 方法监听该 resourceVersion 之后的所有变化,若中途出现异常,reflector 则会从断开的 resourceVersion 处重现尝试监听所有变化,一旦该对象的实例有创建、删除、更新动作,Reflector 都会收到”事件通知”,这时,该事件及它对应的 API 对象这个组合,被称为增量(Delta),它会被放进 DeltaFIFO 中。
  • Informer 会不断地从这个 DeltaFIFO 中读取增量,每拿出一个对象,Informer 就会判断这个增量的时间类型,然后创建或更新本地的缓存,也就是 store。
  • 如果事件类型是 Added(添加对象),那么 Informer 会通过 Indexer 的库把这个增量里的 API 对象保存到本地的缓存中,并为它创建索引,若为删除操作,则在本地缓存中删除该对象。
  • DeltaFIFO 再 pop 这个事件到 controller 中,controller 会调用事先注册的 ResourceEventHandler 回调函数进行处理。
  • 在 ResourceEventHandler 回调函数中,其实只是做了一些很简单的过滤,然后将关心变更的 Object 放到 workqueue 里面。
  • Controller 从 workqueue 里面取出 Object,启动一个 worker 来执行自己的业务逻辑,业务逻辑通常是计算目前集群的状态和用户希望达到的状态有多大的区别,然后让 apiserver 将状态演化到用户希望达到的状态,比如为 deployment 创建新的 pods,或者是扩容/缩容 deployment。
  • 在worker中就可以使用 lister 来获取 resource,而不用频繁的访问 apiserver,因为 apiserver 中 resource 的变更都会反映到本地的 cache 中。

三 源码分析

我们从源码中看到controller包含,config配置,reflecotr,及锁,其中config很重要,包含了controller的设置。

type controller struct {
	config         Config
	reflector      *Reflector
	reflectorMutex sync.RWMutex
	clock          clock.Clock
}

// Config contains all the settings for one of these low-level controllers.
type Config struct {
	// DeltaFIFO
	Queue

	// ListerWatcher
	ListerWatcher

	// 处理pop出来的元素方法
	Process ProcessFunc

	// 具体资源对象
	ObjectType runtime.Object

	// 全量resync周期间隔
	FullResyncPeriod time.Duration

	// ShouldResync is periodically used by the reflector to determine
	// whether to Resync the Queue. If ShouldResync is `nil` or
	// returns true, it means the reflector should proceed with the
	// resync.
	ShouldResync ShouldResyncFunc

	// If true, when Process() returns an error, re-enqueue the object.
	// TODO: add interface to let you inject a delay/backoff or drop
	//       the object completely if desired. Pass the object in
	//       question to this interface as a parameter.  This is probably moot
	//       now that this functionality appears at a higher level.
	RetryOnError bool

	// Called whenever the ListAndWatch drops the connection with an error.
	WatchErrorHandler WatchErrorHandler

	// WatchListPageSize is the requested chunk size of initial and relist watch lists.
	WatchListPageSize int64
}

type Controller interface {
	// 运行controller
	Run(stopCh <-chan struct{})
  // 判断是否同步完成
	HasSynced() bool
	// LastSyncResourceVersion delegates to the Reflector when there
	// is one, otherwise returns the empty string
  // 获取最后一次同步的resourceversion
	LastSyncResourceVersion() string
}

3.1 newInformer创建

我们可以在源码中先单独看看controller的实现,创建一个Informer需要传入ListerWatcher,资源对象,及ResourceEventHandler,以及Store。

// client-go/tools/cache/controller.go
func newInformer(
	lw ListerWatcher,
	objType runtime.Object,
	resyncPeriod time.Duration,
	h ResourceEventHandler,
	clientState Store,
) Controller {
	// 存储资源DeltaFIFO
	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
		KnownObjects:          clientState,
		EmitDeltaTypeReplaced: true,
	})

	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    lw,
		ObjectType:       objType,
		FullResyncPeriod: resyncPeriod,
		RetryOnError:     false,
    
    // 对你pop出去的资源进行操作
		Process: func(obj interface{}) error {
			// from oldest to newest
			for _, d := range obj.(Deltas) {
				switch d.Type {
				case Sync, Replaced, Added, Updated:
					if old, exists, err := clientState.Get(d.Object); err == nil && exists {
            // store 中更新资源
						if err := clientState.Update(d.Object); err != nil {
							return err
						}
						h.OnUpdate(old, d.Object)
					} else {
						if err := clientState.Add(d.Object); err != nil {
							return err
						}
						h.OnAdd(d.Object)
					}
				case Deleted:
					if err := clientState.Delete(d.Object); err != nil {
						return err
					}
					h.OnDelete(d.Object)
				}
			}
			return nil
		},
	}
	return New(cfg)
}

3.2 Run

controller的run函数实际上创建reflector之后并reflector中的run。

func (c *controller) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	go func() {
		<-stopCh
		c.config.Queue.Close()
	}()
	r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)
	r.ShouldResync = c.config.ShouldResync
	r.WatchListPageSize = c.config.WatchListPageSize
	r.clock = c.clock
	if c.config.WatchErrorHandler != nil {
		r.watchErrorHandler = c.config.WatchErrorHandler
	}

	c.reflectorMutex.Lock()
	c.reflector = r
	c.reflectorMutex.Unlock()

	var wg wait.Group
  // 运行reflector的run函数
	wg.StartWithChannel(stopCh, r.Run)
  // 从队列中pop对象资源并进行处理
	wait.Until(c.processLoop, time.Second, stopCh)
	wg.Wait()
}
// 从DeltaFIFO中弹出元素,通过用户穿入的PopProcessFunc进行处理
func (c *controller) processLoop() {
	for {
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		if err != nil {
			if err == ErrFIFOClosed {
				return
			}
			if c.config.RetryOnError {
				// This is the safe way to re-enqueue.
				c.config.Queue.AddIfNotPresent(obj)
			}
		}
	}
}

3.3 HasSynced

其实是调用FIFO中的,判断被插入的资源被pop处理完成,返回true

// Returns true once this controller has completed an initial resource listing
func (c *controller) HasSynced() bool {
	return c.config.Queue.HasSynced()
}

四 小试牛刀

func Must(e error) {
	if e != nil {
		panic(e)
	}
}
// 生成clientset
func InitClientSet() *kubernetes.Clientset {
	configFlags := genericclioptions.NewConfigFlags(false)
	restConfig, err := configFlags.ToRESTConfig()
	Must(err)
	return kubernetes.NewForConfigOrDie(restConfig)
}

func main() {
	clientSet := InitClientSet()
  // 构造pod listwatcher
	podlw := cache.NewListWatchFromClient(clientSet.CoreV1().RESTClient(), "pods", "default", fields.Everything())
  // 生成PodInformer,添加事件,在此事件仅仅是打印出pod对象名称
	_, controller := cache.NewInformer(podlw, &corev1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			podObj, _ := obj.(*corev1.Pod)
			fmt.Println("addfunc", podObj.GetName())
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			oldpodObj, _ := oldObj.(*corev1.Pod)
			newpodObj, _ := oldObj.(*corev1.Pod)
			fmt.Println("addfunc", newpodObj.GetName())
			fmt.Println("addfunc", oldpodObj.GetName())
		},
		DeleteFunc: func(obj interface{}) {
			podObj, _ := obj.(*corev1.Pod)
			fmt.Println("deletefunc", podObj.GetName())
		},
	})

	stopCh := make(chan struct{})
	defer close(stopCh)
	go controller.Run(stopCh)

	cache.WaitForCacheSync(stopCh, controller.HasSynced)

	<-stopCh
}

Informer帮我们做了针对不同类型的对象操作,我们只需要关系Handler,在同步周期中如果设置非0,则定时同步中会产生SYNC事件,触发updateFunc操作。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。