Client-go源码分析之WorkQueue

举报
kaliarch 发表于 2022/02/21 12:56:07 2022/02/21
【摘要】 一 前言在上文SharedInformer中,最后将资源对象已经写入到事件回调函数中,此后我们直接处理这些数据即可,但是我们使用golang中的chanel来处理会存在处理效率低,存在数据大并发量,数据积压等其他异常情况,为此client-go单独将workqueue提出来,作为公共组件,不仅可以在Kubernetes内部使用,还可以供Client-go使用,用户侧可以通过Workqueu...

一 前言

在上文SharedInformer中,最后将资源对象已经写入到事件回调函数中,此后我们直接处理这些数据即可,但是我们使用golang中的chanel来处理会存在处理效率低,存在数据大并发量,数据积压等其他异常情况,为此client-go单独将workqueue提出来,作为公共组件,不仅可以在Kubernetes内部使用,还可以供Client-go使用,用户侧可以通过Workqueue相关方法进行更加灵活的队列处理,如失败重试,延迟入队,限速控制等,实现非阻塞异步逻辑处理。

二 WorkQueue相关概念及构成

2.1 分类

WorkQueue 支持 3 种队列,并提供了 3 种接口,不同队列实现可应对不同的使用场景,分别介绍如下。

  • Interface:通用队FIFO 队列接口,先进先出队列,并支持去重机制。
  • DelayingInterface:延迟队列接口,基于 Interface 接口封装,延迟一段时间后再将元素存入队列。
  • RateLimitingInterface:限速队列接口,基于 DelayingInterface 接口封装,支持元素存入队列时进行速率限制。

从图中可以看到,workqueue.RateLimitingInterface 集成了 DelayingInterface,DelayingInterface 集成了 Interface,最终由 rateLimitingType 进行实现,提供了 rateLimit 限速、delay 延时入队(由优先级队列通过小顶堆实现)、queue 队列处理 三大核心能力,另外,在代码中可看到 K8s 实现了三种 RateLimiter:BucketRateLimiter, ItemExponentialFailureRateLimiter, ItemFastSlowRateLimiter。

2.2 特征

WorkQueue 称为工作队列,Kubernetes 的 WorkQueue 队列与普通 FIFO(先进先出,First-In, First-Out)队列相比,实现略显复杂,它的主要功能在于标记和去重,并支持如下特性。

  • 有序:按照添加顺序处理元素(item)。
  • 去重:相同元素在同一时间不会被重复处理,例如一个元素在处理之前被添加了多次,它只会被处理一次。
  • 并发性:多生产者和多消费者。
  • 标记机制:支持标记功能,标记一个元素是否被处理,也允许元素在处理时重新排队。
  • 通知机制:ShutDown 方法通过信号量通知队列不再接收新的元素,并通知 metric goroutine 退出。
  • 延迟:支持延迟队列,延迟一段时间后再将元素存入队列。
  • 限速:支持限速队列,元素存入队列时进行速率限制。限制一个元素被重新排队(Reenqueued)的次数。
  • Metric:支持 metric 监控指标,可用于 Prometheus 监控。

三 源码分析

3.1 通用队列

3.1.1 队列接口

通用队列FIFO 队列支持最基本的队列方法,例如插入元素、获取元素、获取队列长度等。另外,WorkQueue 中的限速及延迟队列都基于 Interface 接口实现,其提供如下方法:

// k8s.io/client-go/util/workqueue/queue.go
type Interface interface {
  // 为队列添添加一个元素
	Add(item interface{})
  // 获取队列长度
	Len() int
  // 从头部获取一个元素
	Get() (item interface{}, shutdown bool)
  // 标记一个元素处理完成
	Done(item interface{})
  // 关闭队列
	ShutDown()
  // 获取队列是否正在关闭
	ShuttingDown() bool
}

3.1.2 队列实现

通用队列具体实现为Type,其中有非常重要的三个属性,queue为一个任意类型元素的slice,是具体存储元素的地方,其实一个;dirty表示元素在内存中,可以对元素进行去重;processing表示当前正在处理中的元素。

// 具体queue的实现
type Type struct {
	// 存储具体元素,类型为slice
	queue []t
	// 定义所有的元素被处理,是以map类型,可以对元素去重
	dirty set
	// 标记元素是否正在处理。
  // 有些元素可能同时存在processing和dirty中,当处理完这个元素,从集合删除的时候,如果发现元素还在dirty集合中,在此将其添加至queue队列中
	processing set

	cond *sync.Cond

	shuttingDown bool

	metrics queueMetrics

	unfinishedWorkUpdatePeriod time.Duration
	clock                      clock.Clock
}

// 空结构体
type empty struct{}
// t为空接口类型,表示任意类型
type t interface{}  
// set 集合,用 map 来模拟 set 集合(元素不重复),更快效率的查找元素
type set map[t]empty

3.1.3 通用队列方法

  • Add

Add方法为添加一个元素到item中,首先会判断队列是否关闭,其次会判断元素是否以及存在dirty中,如果在则返回实现去重,同时判断元素是否正在处理中,如果存在元素处理中,则不将其添加进queue中,而是等后期标记Done时候,将dirty中的元素添加进入queue中,如果没有正在处理,则元素成功添加进queue中。

func (q *Type) Add(item interface{}) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()
  // 如果队列已经关闭则直接返回
	if q.shuttingDown {
		return
	}
  // 如果dirty中已经有该元素,则直接返回,目的是实现去重
	if q.dirty.has(item) {
		return
	}

	q.metrics.add(item)
  // 添加元素到dirty中
	q.dirty.insert(item)
  // 如果元素正在处理,则直接返回
	if q.processing.has(item) {
		return
	}
  // 添加元素到queue中
	q.queue = append(q.queue, item)
  // 通知其他协程接触阻塞
	q.cond.Signal()
}
  • Get

Get方法,从队列头部弹出一个元素将放到proocessing集合中,从dirty集合中移除。

func (q *Type) Get() (item interface{}, shutdown bool) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()
  // 如果队列为关闭,且队列长度为0,则进行阻塞,待元素添加进队列发送解除阻塞通知执行
	for len(q.queue) == 0 && !q.shuttingDown {
		q.cond.Wait()
	}
  // 如果队列长度为0,则关闭队列
	if len(q.queue) == 0 {
		// We must be shutting down.
		return nil, true
	}
  // 从队列头部填出第一个元素
	item, q.queue = q.queue[0], q.queue[1:]

	q.metrics.get(item)
  // 将元素插入到processing中,标记为正在处理
	q.processing.insert(item)
  // 从dirty中删除队列
	q.dirty.delete(item)

	return item, false
}
  • Done

Done方法标示元素以处理完成,该方法为了保证同一时刻不能有两个相同的元素被处理,如果正在处理一个元素,由添加进来同样的一个,先将其添加进dirty中,在Done方法中,再检查dirty将新元素重新入队列。

func (q *Type) Done(item interface{}) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()

	q.metrics.done(item)
	// 将processing中的元素移除
	q.processing.delete(item)
  // 判断元素是否还在dirty中,如果在则将其重新添加至queue中
	if q.dirty.has(item) {
		q.queue = append(q.queue, item)
		q.cond.Signal()
	}
}

比较重要的就这三个方法,其他的Len/ShutDown比较简单,可以自己去参考源码。

可以看到,在正常处理1元素时候,新添加进来的1不会立即被存入到queue中,而是待processing中1标记Done的时候,再将其插入到queue中,进行后续处理,注意dirty和processing都是Hash Map类型,其不用考虑无序,只保证去重即可。

3.2 延迟队列

3.2.1 延迟队列接口

该接口涉及的目的就是可以避免某些失败的items陷入热循环. 因为某些item在出队列进行处理有可能失败, 失败了用户就有可能将该失败的item重新进队列, 如果短时间内又出队列有可能还是会失败(因为短时间内整个环境可能没有改变等等), 所以可能又重新进队列等等, 因此陷入了一种hot-loop.
所以就为用户提供了AddAfter方法, 可以允许用户告诉DelayingInterface过多长时间把该item加入队列中.

type DelayingInterface interface {
	Interface
  // 在此刻过duration时间后再加入到queue中 
	AddAfter(item interface{}, duration time.Duration)
}

3.2.2 延迟队列实现

延迟队列的实现为delayingType,其继承了通用接口,在其基础上主要新增了waitingForAddCh字段,来实现延迟的功能

waitingForAddCh:其默认初始大小为 1000,通过 AddAfter 方法插入元素时,是非阻塞状态的,只有当插入的元素大于或等于 1000 时,延迟队列才会处于阻塞状态。waitingForAddCh 字段中的数据通过 goroutine 运行的 waitingLoop 函数持久运行。

waitFor: 保存了包括数据data和该item什么时间起(readyAt)就可以进入队列了.
waitForPriorityQueue:waitForPriorityQueue 为 waitFor 项目实现优先级队列,是用于保存waitFor的优先队列, 按readyAt时间从早到晚排序,形成一个优先级队列, 先readyitem先出队列.

type delayingType struct {
	Interface

	// clock tracks time for delayed firing
	clock clock.Clock

	// stopCh lets us signal a shutdown to the waiting loop
	stopCh chan struct{}
  // 保证仅仅发送一次关闭信号
	stopOnce sync.Once

  // 在触发之前确保等待的时间不超过maxWait
	heartbeat clock.Ticker

  // waitingForAddCh是一个缓冲channel
	waitingForAddCh chan *waitFor

	// 记录重试次数
	metrics retryMetrics
}

// 保存了包括数据`data`和该`item`什么时间起(`readyAt`)就可以进入队列了.
type waitFor struct {
	data    t
	readyAt time.Time
	// index in the priority queue (heap)
	index int
}
// waitForPriorityQueue 为 waitFor 项目实现优先级队列
// 是用于保存`waitFor`的优先队列, 按`readyAt`时间从早到晚排序. 先`ready`的`item`先出队列.
type waitForPriorityQueue []*waitFor

3.2.3 延迟队列方法

  • AddAfter

AddAfter方法为延迟队列添加元素,除了具体元素外,还需要传入延迟多长时间。

func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
	// don't add if we're already shutting down
	if q.ShuttingDown() {
		return
	}

	q.metrics.retry()

	// 如果传入的时间小于等于0则立即添加
	if duration <= 0 {
		q.Add(item)
		return
	}


	select {
	case <-q.stopCh:
		// unblock if ShutDown() is called
    // 构造一个waitFor对象传入到waitingForAddCh通道中
	case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
	}
}
  • waitingLoop

waitingLoop 方法一直运行到工作队列关闭,并检查要添加的项目列表,利用channel(q.waitingForAddCh)一直接受从AddAfter过来的waitFor,将已经ready的item添加到队列中。

func (q *delayingType) waitingLoop() {
	defer utilruntime.HandleCrash()
  // 创建一个占位channel,当list没有元素的时候,利用其实现等待
	never := make(<-chan time.Time)

	// 构造一个定时器,当等待队列头部的元素准备好后,该定时器失效
  var nextReadyAtTimer clock.Timer

  // 构造一个优先队列
	waitingForQueue := &waitForPriorityQueue{}
	heap.Init(waitingForQueue)
	// 利用map实现去重,如果在此添加相同元素,仅实现更新时间
	waitingEntryByData := map[t]*waitFor{}

	for {
		if q.Interface.ShuttingDown() {
			return
		}

		now := q.clock.Now()
		// 如果优先队列有元素
		for waitingForQueue.Len() > 0 {
			entry := waitingForQueue.Peek().(*waitFor)
      // 如果队列头部的元素时间已经大于现在时间,则表明整个优先级队列中的元素都大于当前时间,还没到插入元素的时间,则直接break
      // 第一个元素时间是最小的
			if entry.readyAt.After(now) {
				break
			}
			// 否则说明以及改到了插入数据时间了,执行插入到通用队列中
			entry = heap.Pop(waitingForQueue).(*waitFor)
			q.Add(entry.data)
      // 并从优先级队列删除
			delete(waitingEntryByData, entry.data)
		}
		// 当优先级队列中没有元素,则在后面进行等待
		nextReadyAt := never
    // 如果优先级队列中有元素
		if waitingForQueue.Len() > 0 {
			if nextReadyAtTimer != nil {
				nextReadyAtTimer.Stop()
			}
      
      // 获取第一个元素
			entry := waitingForQueue.Peek().(*waitFor)
      // 构造一个nextReady时间等待器
			nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
			nextReadyAt = nextReadyAtTimer.C()
		}

		select {
		case <-q.stopCh:
			return
		// 定时器,如果过一段时间在没有任何数据,则进行一次大循环
		case <-q.heartbeat.C():
			// continue the loop, which will add ready items

		case <-nextReadyAt:
			// continue the loop, which will add ready items

    // addAfter放入的元素从该处获取
		case waitEntry := <-q.waitingForAddCh:
      // 如果元素时间大于当前时间,则添加到优先级队列中
			if waitEntry.readyAt.After(q.clock.Now()) {
				insert(waitingForQueue, waitingEntryByData, waitEntry)
			} else {
        // 时间到了则添加进通用队列
				q.Add(waitEntry.data)
			}
			// 将channel中的元素全部取出来
			drained := false
			for !drained {
				select {
				case waitEntry := <-q.waitingForAddCh:
					if waitEntry.readyAt.After(q.clock.Now()) {
						insert(waitingForQueue, waitingEntryByData, waitEntry)
					} else {
						q.Add(waitEntry.data)
					}
				default:
					drained = true
				}
			}
		}
	}
}

// 添加元素至优先级队列,如果元素相同则进行更新时间
func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
	// 判断队列中是否已经存在该元素
	existing, exists := knownEntries[entry.data]
  // 如果存则,则直接更新元素
	if exists {
		if existing.readyAt.After(entry.readyAt) {
			existing.readyAt = entry.readyAt
			heap.Fix(q, existing.index)
		}

		return
	}
	// 不存在则添加进优先级队列
	heap.Push(q, entry)
	knownEntries[entry.data] = entry
}

至此就完成了限速队列的源码分析,其在通用队列之上构造优先级队列,如果时间没到的添加到优先级队列中,待时间到了添加到通用队列中进行正常处理,其中优先级队列实现利用了golang底层库heap,又兴趣的可以进一步阅读源码。

3.3 限速队列

3.3.1 限速队列接口

限速队列,顾名思义就是在元素入对速度有一定限制,其利用延迟队列的特性实现对元素插入队列的速度控制,因此限速队列也是扩展的延迟队列,新增有AddRateLimited、Forget、NumRequeues 3个方法。

type RateLimitingInterface interface {
	DelayingInterface

	// 增加一个元素大限速队列
	AddRateLimited(item interface{})

  // 表示完成该元素重试,丢弃掉该元素
	Forget(item interface{})

	// 查询元素入队列的次数
	NumRequeues(item interface{}) int
}

3.3.2 限速队列实现

我们看其中主要还是调用rateLimiter 的接口方法,

// rateLimitingType 是限速队列的具体实现
type rateLimitingType struct {
  // 继承延迟队列
	DelayingInterface
	// 新增限速接口
	rateLimiter RateLimiter
}

// 获取到限速器延迟时间,然后加入到延迟队列
func (q *rateLimitingType) AddRateLimited(item interface{}) {
	q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}

// 通过限速器获取元素加入队列的次数
func (q *rateLimitingType) NumRequeues(item interface{}) int {
	return q.rateLimiter.NumRequeues(item)
}

// 通过限速器丢弃元素
func (q *rateLimitingType) Forget(item interface{}) {
	q.rateLimiter.Forget(item)
}

3.3.3 限速器方法

// 限速器接口
type RateLimiter interface {
	// 获取元素等待时间
	When(item interface{}) time.Duration
	// 释放指定元素
	Forget(item interface{})
	// 返回某个对象重新入队列次数
	NumRequeues(item interface{}) int
}

  • BucketRateLimiter

令牌桶算法是通过 Go 语言的第三方库 golang.org/x/time/rate 实现的。令牌桶算法内部实现了一个存放 token(令牌)的“桶”,初始时“桶”是空的,token 会以固定速率往“桶”里填充,直到将其填满为止,多余的 token 会被丢弃。每个元素都会从令牌桶得到一个 token,只有得到 token 的元素才允许通过(accept),而没有得到 token 的元素处于等待状态。令牌桶算法通过控制发放 token 来达到限速目的。

type BucketRateLimiter struct {
	*rate.Limiter
}


func (r *BucketRateLimiter) When(item interface{}) time.Duration {
	return r.Limiter.Reserve().Delay()
}

func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
	return 0
}

func (r *BucketRateLimiter) Forget(item interface{}) {
}

在实例化 rate.NewLimiter 后,传入 r 和 b 两个参数,其中 r 参数表示每秒往“桶”里填充的 token 数量,b 参数表示令牌桶的大小(即令牌桶最多存放的 token 数量)。我们假定 r 为 10,b 为 100。假设在一个限速周期内插入了 1000 个元素,通过 r.Limiter.Reserve().Delay 函数返回指定元素应该等待的时间,那么前 b(即 100)个元素会被立刻处理,而后面元素的延迟时间分别为 item100/100ms、item101/200ms、item102/300ms、item103/400ms,以此类推。

func DefaultControllerRateLimiter() RateLimiter {
	return NewMaxOfRateLimiter(
		NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
		// 10 qps, 100 bucket size.  This is only for retry speed and its only the overall factor (not per item)
    // 默认令牌桶限速器
		&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
	)
}
  • ItemExponentialFailureRateLimiter

排队指数算法将相同元素的排队数作为指数,排队数增大,速率限制呈指数级增长,但其最大值不会超过 maxDelay。元素的排队数统计是有限速周期的,一个限速周期是指从执行 AddRateLimited 方法到执行完 Forget 方法之间的时间。如果该元素被 Forget 方法处理完,则清空排队数。排队指数算法的核心实现代码示例如下:


func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
	r.failuresLock.Lock()
	defer r.failuresLock.Unlock()

	exp := r.failures[item]
	r.failures[item] = r.failures[item] + 1

	// The backoff is capped such that 'calculated' value never overflows.
	backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
	if backoff > math.MaxInt64 {
		return r.maxDelay
	}

	calculated := time.Duration(backoff)
	if calculated > r.maxDelay {
		return r.maxDelay
	}

	return calculated
}

func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
	r.failuresLock.Lock()
	defer r.failuresLock.Unlock()

	return r.failures[item]
}

func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
	r.failuresLock.Lock()
	defer r.failuresLock.Unlock()

	delete(r.failures, item)
}

failures 字段用于统计元素排队数,每当 AddRateLimited 方法插入新元素时,会为该字段加 1;另外,baseDelay 字段是最初的限速单位(默认为 5ms),maxDelay 字段是最大限速单位(默认为 1000s)

限速队列利用延迟队列的特性,延迟多个相同元素的插入时间,达到限速目的。

我们假定 baseDelay 是 1 * time.Millisecond,maxDelay 是 1000 * time.Second。假设在一个限速周期内通过 AddRateLimited 方法插入 10 个相同元素,那么第 1 个元素会通过延迟队列的 AddAfter 方法插入并设置延迟时间为 1ms(即 baseDelay),第 2 个相同元素的延迟时间为 2ms,第 3 个相同元素的延迟时间为 4ms,第 4 个相同元素的延迟时间为 8ms,第 5 个相同元素的延迟时间为 16ms……第 10 个相同元素的延迟时间为 512ms,最长延迟时间不超过 1000s(即 maxDelay)。

  • ItemFastSlowRateLimiter

计数器算法是限速算法中最简单的一种,其原理是:限制一段时间内允许通过的元素数量,例如在 1 分钟内只允许通过 100 个元素,每插入一个元素,计数器自增 1,当计数器数到 100 的阈值且还在限速周期内时,则不允许元素再通过。但 WorkQueue 在此基础上扩展了 fast 和 slow 速率。

计数器算法提供了 4 个主要字段:failures、fastDelay、slowDelay 及 maxFastAttempts。其中,failures 字段用于统计元素排队数,每当 AddRateLimited 方法插入新元素时,会为该字段加 1;而 fastDelay 和 slowDelay 字段是用于定义 fast、slow 速率的;另外,maxFastAttempts 字段用于控制从 fast 速率转换到 slow 速率。计数器算法核心实现的代码示例如下:

type ItemFastSlowRateLimiter struct {
	failuresLock sync.Mutex
	failures     map[interface{}]int

	maxFastAttempts int
	fastDelay       time.Duration
	slowDelay       time.Duration
}

func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
	r.failuresLock.Lock()
	defer r.failuresLock.Unlock()

	r.failures[item] = r.failures[item] + 1

	if r.failures[item] <= r.maxFastAttempts {
		return r.fastDelay
	}

	return r.slowDelay
}

func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
	r.failuresLock.Lock()
	defer r.failuresLock.Unlock()

	return r.failures[item]
}

func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
	r.failuresLock.Lock()
	defer r.failuresLock.Unlock()

	delete(r.failures, item)
}

假设 fastDelay 是 5 * time.Millisecond,slowDelay 是 10 * time.Second,maxFastAttempts 是 3。在一个限速周期内通过 AddRateLimited 方法插入 4 个相同的元素,那么前 3 个元素使用 fastDelay 定义的 fast 速率,当触发 maxFastAttempts 字段时,第 4 个元素使用 slowDelay 定义的 slow 速率。

  • MaxOfRateLimiter
type MaxOfRateLimiter struct {
	limiters []RateLimiter
}
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
	ret := time.Duration(0)
	for _, limiter := range r.limiters {
		curr := limiter.When(item)
		if curr > ret {
			ret = curr
		}
	}

	return ret
}

func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
	ret := 0
	for _, limiter := range r.limiters {
		curr := limiter.NumRequeues(item)
		if curr > ret {
			ret = curr
		}
	}

	return ret
}

func (r *MaxOfRateLimiter) Forget(item interface{}) {
	for _, limiter := range r.limiters {
		limiter.Forget(item)
	}
}

混合模式是将多种限速算法混合使用,即多种限速算法同时生效。例如,同时使用排队指数算法和令牌桶算法,代码示例如下:

func DefaultControllerRateLimiter() RateLimiter {
	return NewMaxOfRateLimiter(
		NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
		// 10 qps, 100 bucket size.  This is only for retry speed and its only the overall factor (not per item)
		&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
	)
}

四 小试牛刀

4.1 通用队列

4.2 延迟队列

4.3 限速队列

参考链接

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。