漏斗限流
1、需求
限定用户的某个行为在指定时间T内,只允许发生N次。假设T为1秒钟,N为1000次。
2、常见的错误设计
程序员设计了一个在每分钟内只允许访问1000次的限流方案,如下图01:00s-02:00s之间只允许访问1000次,这种设计最大的问题在于,请求可能在01:59s-02:00s之间被请求1000次,02:00s-02:01s之间被请求了1000次,这种情况下01:59s-02:01s间隔0.02s之间被请求2000次,很显然这种设计是错误的。
3、漏斗限流
3.1 解决方案
漏斗容量有限,当流水的的速度小于灌水的速度,漏斗就会水满溢出,利用这个原理我们可以设计限流代码!漏斗的剩余的空间就代表着当前行为(请求)可以持续进行的数量,漏斗的流水速率代表系统允许行为(请求)发生的最大频率,通常安装系统的处理能力权衡后进行设值。
3.2 Java代码实现
package com.lizba.redis.limit;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* <p>
* 漏斗限流
* </p>
*
* @Author: Liziba
*/
public class FunnelRateLimiter {
/** map用于存储多个漏斗 */
private Map<String, Funnel> funnels = new ConcurrentHashMap<>();
/**
* 请求(行为)是否被允许
*
* @param userId 用户id
* @param actionKey 行为key
* @param capacity 漏斗容量
* @param leakingRate 剩余容量
* @param quota 请求次数
* @return
*/
public boolean isActionAllowed(String userId, String actionKey, int capacity, float leakingRate, int quota) {
String key = String.format("%s:%s", userId, actionKey);
Funnel funnel = funnels.get(key);
if (funnel == null) {
funnel = new Funnel(capacity, leakingRate);
funnels.put(key, funnel);
}
return funnel.waterLeaking(quota);
}
/**
* 漏斗类
*/
class Funnel {
/** 漏斗容量 */
int capacity;
/** 漏斗流速,每毫秒允许的流速(请求) */
float leakingRate;
/** 漏斗剩余空间 */
int leftCapacity;
/** 上次漏水时间 */
long leakingTs;
public Funnel(int capacity, float leakingRate) {
this.capacity = this.leftCapacity = capacity;
this.leakingRate = leakingRate;
leakingTs = System.currentTimeMillis();
}
/**
* 计算剩余空间
*/
void makeSpace() {
long nowTs = System.currentTimeMillis();
long intervalTs = nowTs - leakingTs;
int intervalCapacity = (int) (intervalTs * leakingRate);
// int 溢出
if (intervalCapacity < 0) {
this.leftCapacity = this.capacity;
this.leakingTs = nowTs;
return;
}
// 腾出空间 >= 1
if (intervalCapacity < 1) {
return;
}
// 增加漏斗剩余容量
this.leftCapacity += intervalCapacity;
this.leakingTs = nowTs;
// 容量不允许超出漏斗容量
if (this.leftCapacity > this.capacity) {
this.leftCapacity = this.capacity;
}
}
/**
* 漏斗流水
*
* @param quota 流水量
* @return
*/
boolean waterLeaking(int quota) {
// 触发漏斗流水
this.makeSpace();
if (this.leftCapacity >= quota) {
leftCapacity -= quota;
return true;
}
return false;
}
}
}
测试代码:
计算机运行如下的代码速度会非常的块,我通过TimeUnit.SECONDS.sleep(2);模拟客户端过一段时间后再请求。
设置漏斗容量为10,每毫秒允许0.002次请求(2 次/秒),每次请求数量为1;
package com.lizba.redis.limit;
import java.util.concurrent.TimeUnit;
/**
* @Author: Liziba
*/
public class TestFunnelRateLimit {
public static void main(String[] args) throws InterruptedException {
FunnelRateLimiter limiter = new FunnelRateLimiter();
for (int i = 1; i <= 20; i++) {
if (i == 15) {
TimeUnit.SECONDS.sleep(2);
}
// 设置漏斗容量为10,每毫秒允许0.002次请求(2 次/秒),每次请求数量为1
boolean success = limiter.isActionAllowed("Liziba", "commit", 10, 0.002f, 1);
System.out.println("第" + i + "请求" + (success ? "成功" : "失败"));
}
}
}
测试结果:
- 01-10次请求成功,初始漏斗大小为10,因此前10次请求成功
- 11-14次请求失败,由于漏斗已满,并且漏斗的流速在这四次请求之间未能释放1
- 15-18次请求成功,因为i == 15时主线程睡眠2秒,2秒时间漏斗流出 0.002*1000*2 = 4,因此这四次请求成功
- 19-20次请求失败,与11-14次请求失败的原因一致
3.3 结合Redis实现
我们采用hash结构,将Funnel的属性字段,放入hash中,并且在代码中进行运算即可
package com.lizba.redis.limit;
import redis.clients.jedis.Jedis;
import java.util.HashMap;
import java.util.Map;
/**
* <p>
* redis hash 漏斗限流
* </p>
*
* @Author: Liziba
* @Date: 2021/9/7 23:46
*/
public class FunnelRateLimiterByHash {
private Jedis client;
public FunnelRateLimiterByHash(Jedis client) {
this.client = client;
}
/**
* 请求是否成功
*
* @param userId
* @param actionKey
* @param capacity
* @param leakingRate
* @param quota
* @return
*/
public boolean isActionAllowed(String userId, String actionKey, int capacity, float leakingRate, int quota) {
String key = this.key(userId, actionKey);
long nowTs = System.currentTimeMillis();
Map<String, String> funnelMap = client.hgetAll(key);
if (funnelMap == null || funnelMap.isEmpty()) {
return initFunnel(key, nowTs, capacity, quota);
}
long intervalTs = nowTs - Long.parseLong(funnelMap.get("leakingTs"));
int intervalCapacity = (int) (intervalTs * leakingRate);
// 时间过长, int可能溢出
if (intervalCapacity < 0) {
intervalCapacity = 0;
initFunnel(key, nowTs, capacity, quota);
}
// 腾出空间必须 >= 1
if (intervalCapacity < 1) {
intervalCapacity = 0;
}
int leftCapacity = Integer.parseInt(funnelMap.get("leftCapacity")) + intervalCapacity;
if (leftCapacity > capacity) {
leftCapacity = capacity;
}
return initFunnel(key, nowTs, leftCapacity, quota);
}
/**
* 存入redis,初始funnel
*
* @param key
* @param nowTs
* @param capacity
* @param quota
* @return
*/
private boolean initFunnel(String key,long nowTs, int capacity, int quota) {
Map<String, String> funnelMap = new HashMap<>();
funnelMap.put("leftCapacity", String.valueOf((capacity > quota) ? (capacity - quota) : 0));
funnelMap.put("leakingTs", String.valueOf(nowTs));
client.hset(key, funnelMap);
return capacity >= quota;
}
/**
* 限流key
*
* @param userId
* @param actionKey
* @return
*/
private String key(String userId, String actionKey) {
return String.format("limit:%s:%s", userId, actionKey);
}
}
测试代码:
package com.lizba.redis.limit;
import redis.clients.jedis.Jedis;
import java.util.concurrent.TimeUnit;
/**
* @Author: Liziba
*/
public class TestFunnelRateLimiterByHash {
public static void main(String[] args) throws InterruptedException {
Jedis jedis = new Jedis("192.168.211.108", 6379);
FunnelRateLimiterByHash limiter = new FunnelRateLimiterByHash(jedis);
for (int i = 1; i <= 20; i++) {
if (i == 15) {
TimeUnit.SECONDS.sleep(2);
}
boolean success = limiter.isActionAllowed("liziba", "view", 10, 0.002f, 1);
System.out.println("第" + i + "请求" + (success ? "成功" : "失败"));
}
jedis.close();
}
}
测试结果:
与上面的java代码结构一致
4、总结
上述说了两种实现漏斗限流的方式,其实思想都是一样的,但是这两者都无法在分布式环境中使用,即便是在单机环境中也是不准确的,存在线程安全问题/原子性问题,因此我们一般使用Redis提供的限流模块Redis-Cell来限流,Redis-Cell提供了原子的限流指令cl.throttle,这个留到后续在详细说吧,我要睡觉去了!
- 点赞
- 收藏
- 关注作者
评论(0)