token bucket令牌桶限流算法原理及代码

举报
JavaEdge 发表于 2021/06/04 00:26:27 2021/06/04
【摘要】 1 概述 限流算法主要有如下几种: 基于信号量Semaphore 只有数量维度,没有时间维度基于fixed window 带上了时间维度,不过在两个窗口的临界点容易出现超出限流的情况,比如限制每分钟10个请求,在00:59请求了10次,在01:01又请求了10次,而从00:30-01:30这个时间窗口来看,这一分钟请求了20次,没有控制好基于rolling wind...

1 概述

限流算法主要有如下几种:

  • 基于信号量Semaphore
    只有数量维度,没有时间维度
  • 基于fixed window
    带上了时间维度,不过在两个窗口的临界点容易出现超出限流的情况,比如限制每分钟10个请求,在00:59请求了10次,在01:01又请求了10次,而从00:30-01:30这个时间窗口来看,这一分钟请求了20次,没有控制好
  • 基于rolling window
    就是要解决fixed window没解决的窗口临界问题,主要有基于token bucket的算法,以及基于leaky bucket的算法
  • token bucket算法
    token按指定速率添加到bucket中
    一个bucket有其容量限制,超过其容量则多余的token会被丢弃
    当请求到来时,先试图获取token,如果剩余token足够则放行,不够则不允许放行(可能等待token足够再继续)

2 简单实现

2.1 Java版

/**
 * The minimalistic token-bucket implementation
 */
public class MinimalisticTokenBucket { private final long capacity; private final double refillTokensPerOneMillis; private double availableTokens; private long lastRefillTimestamp; /** * Creates token-bucket with specified capacity and refill rate equals to refillTokens/refillPeriodMillis */ public MinimalisticTokenBucket(long capacity, long refillTokens, long refillPeriodMillis) { this.capacity = capacity; this.refillTokensPerOneMillis = (double) refillTokens / (double) refillPeriodMillis; this.availableTokens = capacity; this.lastRefillTimestamp = System.currentTimeMillis(); } synchronized public boolean tryConsume(int numberTokens) { refill(); if (availableTokens < numberTokens) { return false; } else { availableTokens -= numberTokens; return true; } } private void refill() { long currentTimeMillis = System.currentTimeMillis(); if (currentTimeMillis > lastRefillTimestamp) { long millisSinceLastRefill = currentTimeMillis - lastRefillTimestamp; double refill = millisSinceLastRefill * refillTokensPerOneMillis; this.availableTokens = Math.min(capacity, availableTokens + refill); this.lastRefillTimestamp = currentTimeMillis; } } private static final class Selftest { public static void main(String[] args) { // 100 tokens per 1 second MinimalisticTokenBucket limiter = new MinimalisticTokenBucket(100, 100, 1000); long startMillis = System.currentTimeMillis(); long consumed = 0; while (System.currentTimeMillis() - startMillis < 10000) { if (limiter.tryConsume(1)) { consumed++; } } System.out.println(consumed); } }

}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61

以上是bucket4j给出的一个简单实现,用于理解token bucket算法
这个算法没有采用线程去refill token,因为bucket太多的话,线程太多,耗cpu
这个算法没有存储每个period使用的token,设计了lastRefillTimestamp字段,用于计算需要填充的token
每次tryConsume的时候,方法内部首先调用refill,根据设定的速度以及时间差计算这个时间段需要补充的token,更新availableTokens以及lastRefillTimestamp
之后限流判断,就是判断availableTokens与请求的numberTokens

2.2 Golang

package main 

import (
	"log"
)

type ConnLimiter struct {
	concurrentConn int
	bucket chan int
}

// go 无原生构造函数,必须手动定义并实现
func NewConnLimiter(cc int) *ConnLimiter {
	return &ConnLimiter {
		concurrentConn: cc,
		bucket: make(chan int, cc),
	}
}

func (cl *ConnLimiter) GetConn() bool {
	if len(cl.bucket) >= cl.concurrentConn {
		log.Printf("Reached the rate limitation.")
		return false
	}

	cl.bucket <- 1
	return true
}

func (cl *ConnLimiter) ReleaseConn() {
	c :=<- cl.bucket // 释放写进去的token
	log.Printf("New connction coming: %d", c)
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

高性能限流器Guava RateLimiter

令牌桶算法,其核心是想通过限流器,必须拿到令牌。
只要我们能够限制发放令牌的速率,那么就能控制流速:

  • 令牌以固定速率添加到令牌桶中,假设限流速率是 r/秒,则令牌每 1/r 秒会添加一个
  • 假设令牌桶的容量是 b ,如果令牌桶已满,则新的令牌会被丢弃
  • 请求能够通过限流器的前提是令牌桶中有令牌

b 其实是burst的简写,意义是限流器允许的最大突发流量。比如b=10,而且令牌桶中的令牌已满,此时限流器允许10个请求同时通过限流器,这只是突发流量,这10个请求会带走10个令牌,所以后续流量只能按照速率 r 通过限流器。

如何实现呢?基于生产者-消费者模式?

  • 一个生产者线程定时向阻塞队列添加令牌
  • 试图通过限流器的线程则作为消费者线程
  • 只有从阻塞队列中获取到令牌,才允许通过限流器

设计看上去很完美,实现也简单,若并发量不大,这没有什么问题。可使用限流大部分都是高并发场景,而且系统压力已经临近极限了,此时这个实现就有问题了。
问题出在定时器,高并发下,系统压力已临近极限,定时器精度误差会很大,定时器本身还会创建调度线程,对系统性能影响极大。

所以Guava没有使用定时器,它是如何实现的呢?

Guava的令牌桶算法

关键是记录并动态计算下一令牌的发放时间。
假设令牌桶的容量为 b=1,限流速率 r = 1个请求/s。如下所示,若当前令牌桶无令牌,下一个令牌的发放时间是在第3s,而在第2s时,有个线程T1请求令牌,此时该如何处理?

  • 线程T1请求令牌

    对于该请求令牌的线程,很显然需要等待1s,1s以后(第3s)它就能拿到令牌。下一个令牌发放的时间也要增加1秒,因为第3s发放的令牌已被线程T1预占。
  • 线程T1请求结束
    假设T1在预占第3s令牌后,马上又有一个线程T2请求令牌
  • 线程T2请求令牌

由于下一个令牌产生的时间是第4s,所以线程T2要等待2s,才能获取到令牌,同时由于T2预占第4s令牌,所以下一令牌产生时间还要增加1s

  • 线程T2请求结束

    线程T1、T2都是在下一令牌产生时间之前请求令牌,若线程在下一令牌产生时间之后请求令牌会咋样?
    假设在线程T1请求令牌之后的5秒,即第7秒,线程T3请求令牌,如下图所示。
  • 线程T3请求令牌

    由于第5s已产生一个令牌,所以此时线程T3可直接拿到令牌,无需等待。
    在第7s,实际上限流器能产生3个令牌,第5、6、7秒各产生一个令牌。由于我们假设令牌桶的容量是1,所以第6、7秒产生的令牌就丢弃了,其实等价地你也可以认为是保留的第7秒的令牌,丢弃的第5、6秒的令牌,也就是说第7秒的令牌被线程T3占有了,于是下一令牌的的产生时间应该是第8秒
  • 线程T3请求结束

所以我们只需要记录一个下一令牌产生的时间,并动态更新它。

依然假设令牌桶的容量是1。关键是reserve()方法,这个方法会为请求令牌的线程预分配令牌,同时返回该线程能够获取令牌的时间。其实现逻辑就是上面提到的:如果线程请求令牌的时间在下一令牌产生时间之后,那么该线程立刻就能够获取令牌;反之,如果请求时间在下一令牌产生时间之前,那么该线程是在下一令牌产生的时间获取令牌。由于此时下一令牌已经被该线程预占,所以下一令牌产生的时间需要加上1秒。

3 小结

token bucket算法,是基于QPS来限流,其简单的实现,就是计算单位时间补充token的速率,然后每次tryConsume的时候根据速率修正availableTokens。

参考

  • https://github.com/vladimir-bukhtoyarov/bucket4j/blob/master/doc-pages/token-bucket-brief-overview.md
  • https://en.wikipedia.org/wiki/Token_bucket

文章来源: javaedge.blog.csdn.net,作者:JavaEdge.,版权归原作者所有,如需转载,请联系作者。

原文链接:javaedge.blog.csdn.net/article/details/109913106

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。