自研RPC框架之限流器设计

自研RPC框架之限流器设计

1.限流的必要性

限流是一种控制流量的技术,用于保护系统免受突发流量或恶意流量的影响。其基本原理是通过控制请求的速率或数量,确保系统在可承受的范围内运行。我们RPC系统中的限流器都实现了Limiter接口:

1
2
3
public interface Limiter {
boolean limit();
}

2.计数器限流器设计

计数器限流器通常在较大的时间粒度上进行限流(如每秒、每分钟),难以应对短时间内的突发流量。当每个时间窗口刷新时,所有等待的请求可能同时被放行,导致瞬间流量突刺

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 计数器限流即固定窗口限流器
public class CounterLimiter implements Limiter{
// 开始计数时间
private long startTimeStamp = System.currentTimeMillis();
// 请求次数
private int requestCount = 0;
// 每秒限制的最大请求数
private int maxLimitCount = 10;
// 时间窗口长度(单位ms)
private long interval = 1000;

public CounterLimiter(int maxLimitCount) {
this.maxLimitCount = maxLimitCount;
}

/**
* 固定窗口算法存在请求突刺现象
* @return true表示限流,false表示放行
*/
@Override
public synchronized boolean limit() {
long now = System.currentTimeMillis();
// 处于当前时间窗口内
if (now < startTimeStamp + interval) {
// 判断当前时间窗口内的请求是否超过限流阈值
if (requestCount + 1 > maxLimitCount) {
return true;
}
requestCount++;
return false;
} else {
// 开启新的时间窗口
startTimeStamp = now;
// 重置请求计数器
requestCount = 1;
return false;
}
}
}

3.滑动窗口限流器设计

简单的基于Java实现的滑动窗口限流器的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class RollingWindowLimiter implements Limiter {
// 记录滑动窗口的队列
private LinkedList<Integer> slots = new LinkedList<>();
// 窗口的单位长度(单位ms)
private int windowLength = 100;
// 滑动窗口的个数
private int windowNum = 10;
// 每秒限制的最大请求数
private int maxLimitCount = 10;
// 请求计数
private int requestCount = 0;

/**
* 存在线程安全问题
* @return true表示限流,false表示放行
*/
@Override
public synchronized boolean limit() {
// 滑动窗口内的请求数超过限流阈值
if ((requestCount + 1) > maxLimitCount) {
return true;
}
// 当前请求处于的单元格(其实就是队尾的最新单元格)请求数递增
slots.set(slots.size() - 1, slots.peekLast() + 1);
requestCount++;
return false;
}

public RollingWindowLimiter(int windowLength, int windowNum, int maxLimitCount) {
this.windowLength = windowLength;
this.windowNum = windowNum;
this.maxLimitCount = maxLimitCount;
rolling();
}

private void rolling() {
slots.addLast(0);
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
executorService.scheduleAtFixedRate(()-> {
// 这里存在并发安全问题,如果负载过高,大量请求会阻塞等待加锁,那么时间窗口迟迟无法滑动更新
synchronized (this) {
// 滑动窗口
slots.addLast(0);
// 移除超出范围的单元格
if (slots.size() > windowNum) {
requestCount -= slots.getFirst();
slots.removeFirst();
}
}
}, 0, windowLength, TimeUnit.MILLISECONDS);
}
}

利用Redis,我们就可以实现一个简单的滑动窗口限流的功能。因为滑动窗口和时间有关,所以很容易能想到要基于时间进行统计。那么我们只需要在每一次有请求进来的时候,记录下请求的时间戳请求的数据,然后在统计窗口内请求的数量时,只需要统计窗口内的被记录的数据量有多少条就行了。

在Redis中,我们可以基于ZSET来实现这个功能。假如我们限定login接口一分钟只能调用100次:

那么,我们就可以把login接口这个需要做限流的资源名作为key在Redis中进行存储,然后value使用ZSET这种数据结构,把他的score设置为当前请求的时间戳,member的话建议用请求的详情的hash进行存储(或者UUID、MD5什么的),避免在并发时,时间戳一致出现score和member一样导致被zadd幂等的问题。

image-20240210210106473

所以,我们实现滑动窗口限流的主要思想是:==只保留在特定时间窗口内的请求记录,而丢弃窗口之外的记录==。

主要步骤如下:

  1. 定义滑动窗口的时间范围,例如,窗口大小为60秒。
  2. 每次收到一个请求时,我们就定义出一个zset的value然后存储到Redis中。
  3. 然后再通过ZREMRANGEBYSCORE命令来删除分值小于窗口起始时间戳(当前时间戳-60s))的数据。
  4. 最后,再使用ZCARD命令来获取有序集合中的成员数量,即在窗口内的请求量。

image-20240210210421254

以上代码在高并发情况下,可能会存在原子性的问题,需要考虑加事务或者lua脚本:

image-20240210210615457

4.令牌桶限流器设计

令牌桶限流算法是一种灵活且广泛应用的流量控制机制,它允许在保持平均传输速率的前提下,进行一定程度的突发传输。这种算法的核心思想是通过一个固定容量的令牌桶来控制数据的传输速率。系统以恒定的速率往桶中添加令牌,每个传入的请求都需要从桶中获取一个令牌,如果桶中没有令牌可用,则请求被限流。令牌桶限流算法允许突发请求在短时间内通过,而不是简单地以恒定速率处理请求,从而平滑了网络流量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class TokenBucketLimiter implements Limiter {
// 上次计算的时间
private long timeStamp = System.currentTimeMillis();
// 桶的容量
private int capacity = 100;
// 令牌的生成速度(单位是s)
private int rate = 20;
// 当前剩余的令牌数量
private int tokens = 0;

public TokenBucketLimiter(int capacity, int rate) {
this.capacity = capacity;
this.rate = rate;
}

@Override
public synchronized boolean limit() {
long now = System.currentTimeMillis();
// 先添加令牌
tokens = Math.min(capacity, tokens + (int) ((now - timeStamp) / 1000) * rate);
// 更新计算时间
timeStamp = now;
// 令牌消耗完毕
if (tokens < 1) {
return true;
}
// 领取令牌
tokens--;
return false;
}
}

5.Guava限流器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class SmoothBurstyLimiter implements Limiter {
private RateLimiter rateLimiter;

public SmoothBurstyLimiter(double permitsPerSecond) {
this.rateLimiter = RateLimiter.create(permitsPerSecond);
}
@Override
public boolean limit() {
return rateLimiter.tryAcquire();
}
}

public class SmoothWarmingUpLimiter implements Limiter {
// 可用于优雅开机
private RateLimiter rateLimiter;

public SmoothWarmingUpLimiter(double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
this.rateLimiter = RateLimiter.create(permitsPerSecond, warmupPeriod, unit);
}

@Override
public boolean limit() {
return rateLimiter.tryAcquire();
}
}

参考文章:源码分析 - Guava RateLimiter源码解析 - 林中小舍 - SegmentFault 思否

Google开源工具包Guava提供了限流工具类RateLimiter,该类基于令牌桶算法实现流量限制,使用十分方便。

1
2
3
4
5
6
7
8
@Test
fun rateLimiterTest() {
val rateLimiter = RateLimiter.create(0.5)

arrayOf(1,6,2).forEach {
println("${System.currentTimeMillis()} acq $it:\twait ${rateLimiter.acquire(it)}s")
}
}

以上示例,创建一个RateLimiter,指定每秒放0.5个令牌(2秒放1个令牌),其输出见下

1
2
3
1516166482561 acq 1: wait 0.0s
1516166482563 acq 6: wait 1.997664s
1516166484569 acq 2: wait 11.991958s

从输出结果可以看出,RateLimiter具有预消费的能力:
acq 1时并没有任何等待直接预消费了1个令牌
acq 6时,由于之前预消费了1个令牌,故而等待了2秒,之后又预消费了6个令牌
acq 2时同理,由于之前预消费了6个令牌,故而等待了12秒

从另一方面讲,RateLimiter通过限制后面请求的等待时间,来支持一定程度的突发请求(预消费)。但是某些情况下并**不需要这种突发请求处理能力,如某IM厂商提供消息推送接口,但推送接口有严格的频率限制(600次/30秒),在调用该IM厂商推送接口时便不能预消费**,否则,则可能出现推送频率超出限制而失败。

Guava有两种限流模式,一种为稳定模式(SmoothBursty:令牌生成速度恒定),一种为渐进模式(SmoothWarmingUp:令牌生成速度缓慢提升直到维持在一个稳定值),两种模式实现思路类似,主要区别在等待时间的计算上。

在调用create接口时,实际实例化的为SmoothBursty类。

1
2
3
4
5
6
7
8
9
public static RateLimiter create(double permitsPerSecond) {
return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}

static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}

在解析SmoothBursty原理前,重点解释下SmoothBursty中几个属性的含义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* The currently stored permits.
* 当前存储令牌数
*/
double storedPermits;

/**
* The maximum number of stored permits.
* 最大存储令牌数
*/
double maxPermits;

/**
* The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
* per second has a stable interval of 200ms.
* 添加令牌时间间隔
*/
double stableIntervalMicros;

/**
* The time when the next request (no matter its size) will be granted. After granting a request,
* this is pushed further in the future. Large requests push this further than small requests.
* 下一次请求可以获取令牌的起始时间
* 由于RateLimiter允许预消费,上次请求预消费令牌后
* 下次请求需要等待相应的时间到nextFreeTicketMicros时刻才可以获取令牌
*/
private long nextFreeTicketMicros = 0L; // could be either in the past or future

接下来介绍几个关键函数。

1
2
3
4
5
6
7
8
9
10
11
/**
* Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time.
*/
void resync(long nowMicros) {
// if nextFreeTicket is in the past, resync to now
if (nowMicros > nextFreeTicketMicros) {
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}

根据令牌桶算法,桶中的令牌是持续生成存放的,有请求时需要先从桶中拿到令牌才能开始执行,谁来持续生成令牌存放呢?

一种解法是,开启一个定时任务,由定时任务持续生成令牌。这样的问题在于会极大的消耗系统资源,如,某接口需要分别对每个用户做访问频率限制,假设系统中存在6W用户,则至多需要开启6W个定时任务来维持每个桶中的令牌数,这样的开销是巨大的。

另一种解法则是延迟计算,如上resync函数。该函数会在每次获取令牌之前调用,其实现思路为,若当前时间晚于nextFreeTicketMicros,则计算该段时间内可以生成多少令牌,将生成的令牌加入令牌桶中并更新数据。这样一来,只需要在获取令牌时计算一次即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros);
long returnValue = nextFreeTicketMicros; // 返回的是上次计算的nextFreeTicketMicros
double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // 可以消费的令牌数
double freshPermits = requiredPermits - storedPermitsToSpend; // 还需要的令牌数
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros); // 根据freshPermits计算需要等待的时间

this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); // 本次计算的nextFreeTicketMicros不返回
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}

该函数用于获取requiredPermits个令牌,并返回需要等待到的时间点。其中,storedPermitsToSpend为桶中可以消费的令牌数,freshPermits为还需要的(需要补充的)令牌数,根据该值计算需要等待的时间,追加并更新到nextFreeTicketMicros

需要注意的是,该函数的返回是更新前的(上次请求计算的)nextFreeTicketMicros,而不是本次更新的nextFreeTicketMicros,通俗来讲,本次请求需要为上次请求的预消费行为买单,这也是RateLimiter可以预消费(处理突发)的原理所在。若需要禁止预消费,则修改此处返回更新后的nextFreeTicketMicros值。

回头来看SmoothBursty的构造函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
super(stopwatch);
this.maxBurstSeconds = maxBurstSeconds; // 最大存储maxBurstSeconds秒生成的令牌
}

@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
maxPermits = maxBurstSeconds * permitsPerSecond; // 计算最大存储令牌数
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = maxPermits;
} else {
storedPermits =
(oldMaxPermits == 0.0)
? 0.0 // initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}

桶中可存放的最大令牌数由maxBurstSeconds计算而来,其含义为最大存储maxBurstSeconds秒生成的令牌。该参数的作用在于,可以更为灵活地控制流量。如,某些接口限制为300次/20秒,某些接口限制为50次/45秒等。

在了解以上概念后,就非常容易理解RateLimiter暴露出来的接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@CanIgnoreReturnValue
public double acquire() {
return acquire(1);
}

@CanIgnoreReturnValue
public double acquire(int permits) {
long microsToWait = reserve(permits);
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}

final long reserve(int permits) {
checkPermits(permits);
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}

acquire函数主要用于获取permits个令牌,并计算需要等待多长时间,进而挂起等待,并将该值返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public boolean tryAcquire(int permits) {
return tryAcquire(permits, 0, MICROSECONDS);
}

public boolean tryAcquire() {
return tryAcquire(1, 0, MICROSECONDS);
}

public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
long timeoutMicros = max(unit.toMicros(timeout), 0);
checkPermits(permits);
long microsToWait;
synchronized (mutex()) {
long nowMicros = stopwatch.readMicros();
if (!canAcquire(nowMicros, timeoutMicros)) {
return false;
} else {
microsToWait = reserveAndGetWaitLength(permits, nowMicros);
}
}
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return true;
}

private boolean canAcquire(long nowMicros, long timeoutMicros) {
return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
}

@Override
final long queryEarliestAvailable(long nowMicros) {
return nextFreeTicketMicros;
}

tryAcquire函数可以尝试在timeout时间内获取令牌,如果可以则挂起等待相应时间并返回true,否则立即返回false,canAcquire用于判断timeout时间内是否可以获取令牌。