自研RPC框架之限流器设计 1.限流的必要性 限流是一种控制流量的技术,用于保护系统免受突发流量或恶意流量的影响。其基本原理是通过控制请求的速率或数量,确保系统在可承受的范围内运行。我们RPC系统中的限流器都实现了Limiter
接口:
1 2 3 public interface Limiter { boolean limit () ; }
2.计数器限流器设计 计数器限流器通常在较大的时间粒度上进行限流(如每秒、每分钟),难以应对短时间内的突发流量。当每个时间窗口刷新时,所有等待的请求可能同时被放行,导致瞬间流量突刺
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public class CounterLimiter implements Limiter { private long startTimeStamp = System.currentTimeMillis(); private int requestCount = 0 ; private int maxLimitCount = 10 ; private long interval = 1000 ; public CounterLimiter (int maxLimitCount) { this .maxLimitCount = maxLimitCount; } @Override public synchronized boolean limit () { long now = System.currentTimeMillis(); if (now < startTimeStamp + interval) { if (requestCount + 1 > maxLimitCount) { return true ; } requestCount++; return false ; } else { startTimeStamp = now; requestCount = 1 ; return false ; } } }
3.滑动窗口限流器设计 简单的基于Java实现的滑动窗口限流器的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public class RollingWindowLimiter implements Limiter { private LinkedList<Integer> slots = new LinkedList <>(); private int windowLength = 100 ; private int windowNum = 10 ; private int maxLimitCount = 10 ; private int requestCount = 0 ; @Override public synchronized boolean limit () { if ((requestCount + 1 ) > maxLimitCount) { return true ; } slots.set(slots.size() - 1 , slots.peekLast() + 1 ); requestCount++; return false ; } public RollingWindowLimiter (int windowLength, int windowNum, int maxLimitCount) { this .windowLength = windowLength; this .windowNum = windowNum; this .maxLimitCount = maxLimitCount; rolling(); } private void rolling () { slots.addLast(0 ); ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); executorService.scheduleAtFixedRate(()-> { synchronized (this ) { slots.addLast(0 ); if (slots.size() > windowNum) { requestCount -= slots.getFirst(); slots.removeFirst(); } } }, 0 , windowLength, TimeUnit.MILLISECONDS); } }
利用Redis,我们就可以实现一个简单的滑动窗口限流的功能。因为滑动窗口和时间有关,所以很容易能想到要基于时间进行统计。那么我们只需要在每一次有请求进来的时候,记录下请求的时间戳
和请求的数据
,然后在统计窗口内请求的数量时,只需要统计窗口内的被记录的数据量有多少条就行了。
在Redis中,我们可以基于ZSET
来实现这个功能。假如我们限定login接口一分钟只能调用100次:
那么,我们就可以把login接口这个需要做限流的资源名作为key在Redis中进行存储,然后value使用ZSET
这种数据结构,把他的score设置为当前请求的时间戳
,member的话建议用请求的详情的hash
进行存储(或者UUID、MD5什么的),避免在并发时,时间戳一致出现score和member一样导致被zadd幂等的问题。
所以,我们实现滑动窗口限流的主要思想是:==只保留在特定时间窗口内的请求记录,而丢弃窗口之外的记录==。
主要步骤如下:
定义滑动窗口的时间范围,例如,窗口大小为60秒。
每次收到一个请求时,我们就定义出一个zset的value然后存储到Redis中。
然后再通过ZREMRANGEBYSCORE
命令来删除分值小于窗口起始时间戳(当前时间戳-60s))的数据。
最后,再使用ZCARD
命令来获取有序集合中的成员数量,即在窗口内的请求量。
以上代码在高并发情况下,可能会存在原子性的问题,需要考虑加事务或者lua脚本:
4.令牌桶限流器设计 令牌桶限流算法是一种灵活且广泛应用的流量控制机制,它允许在保持平均传输速率的前提下,进行一定程度的突发传输
。这种算法的核心思想是通过一个固定容量的令牌桶来控制数据的传输速率。系统以恒定的速率往桶中添加令牌,每个传入的请求都需要从桶中获取一个令牌,如果桶中没有令牌可用,则请求被限流。令牌桶限流算法允许突发请求在短时间内通过,而不是简单地以恒定速率处理请求,从而平滑了网络流量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class TokenBucketLimiter implements Limiter { private long timeStamp = System.currentTimeMillis(); private int capacity = 100 ; private int rate = 20 ; private int tokens = 0 ; public TokenBucketLimiter (int capacity, int rate) { this .capacity = capacity; this .rate = rate; } @Override public synchronized boolean limit () { long now = System.currentTimeMillis(); tokens = Math.min(capacity, tokens + (int ) ((now - timeStamp) / 1000 ) * rate); timeStamp = now; if (tokens < 1 ) { return true ; } tokens--; return false ; } }
5.Guava限流器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class SmoothBurstyLimiter implements Limiter { private RateLimiter rateLimiter; public SmoothBurstyLimiter (double permitsPerSecond) { this .rateLimiter = RateLimiter.create(permitsPerSecond); } @Override public boolean limit () { return rateLimiter.tryAcquire(); } } public class SmoothWarmingUpLimiter implements Limiter { private RateLimiter rateLimiter; public SmoothWarmingUpLimiter (double permitsPerSecond, long warmupPeriod, TimeUnit unit) { this .rateLimiter = RateLimiter.create(permitsPerSecond, warmupPeriod, unit); } @Override public boolean limit () { return rateLimiter.tryAcquire(); } }
参考文章:源码分析 - Guava RateLimiter源码解析 - 林中小舍 - SegmentFault 思否
Google开源工具包Guava提供了限流工具类RateLimiter,该类基于令牌桶算法实现流量限制,使用十分方便。
1 2 3 4 5 6 7 8 @Test fun rateLimiterTest () { val rateLimiter = RateLimiter.create(0.5 ) arrayOf(1 ,6 ,2 ).forEach { println("${System.currentTimeMillis()} acq $it:\twait ${rateLimiter.acquire(it)}s" ) } }
以上示例,创建一个RateLimiter,指定每秒放0.5个令牌(2秒放1个令牌),其输出见下
1 2 3 1516166482561 acq 1 : wait 0. 0s1516166482563 acq 6 : wait 1. 997664s1516166484569 acq 2 : wait 11. 991958s
从输出结果可以看出,RateLimiter具有预消费 的能力:acq 1
时并没有任何等待直接预消费了1个令牌acq 6
时,由于之前预消费了1个令牌,故而等待了2秒,之后又预消费了6个令牌acq 2
时同理,由于之前预消费了6个令牌,故而等待了12秒
从另一方面讲,RateLimiter通过限制后面请求的等待时间,来支持一定程度的突发请求(预消费)。但是某些情况下并**不需要
这种突发请求处理能力,如某IM厂商提供消息推送接口,但推送接口有严格的频率限制(600次/30秒),在调用该IM厂商推送接口时便不能 预消费**,否则,则可能出现推送频率超出限制而失败。
Guava有两种限流模式,一种为稳定模式
(SmoothBursty:令牌生成速度恒定),一种为渐进模式
(SmoothWarmingUp:令牌生成速度缓慢提升直到维持在一个稳定值),两种模式实现思路类似,主要区别在等待时间的计算上。
在调用create
接口时,实际实例化的为SmoothBursty
类。
1 2 3 4 5 6 7 8 9 public static RateLimiter create (double permitsPerSecond) { return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer()); } static RateLimiter create (double permitsPerSecond, SleepingStopwatch stopwatch) { RateLimiter rateLimiter = new SmoothBursty (stopwatch, 1.0 ); rateLimiter.setRate(permitsPerSecond); return rateLimiter; }
在解析SmoothBursty原理前,重点解释下SmoothBursty中几个属性的含义。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 double storedPermits;double maxPermits;double stableIntervalMicros;private long nextFreeTicketMicros = 0L ;
接下来介绍几个关键函数。
1 2 3 4 5 6 7 8 9 10 11 void resync (long nowMicros) { if (nowMicros > nextFreeTicketMicros) { double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); storedPermits = min(maxPermits, storedPermits + newPermits); nextFreeTicketMicros = nowMicros; } }
根据令牌桶算法,桶中的令牌是持续生成存放的,有请求时需要先从桶中拿到令牌才能开始执行,谁来持续生成令牌存放呢?
一种解法是,开启一个定时任务,由定时任务持续生成令牌。这样的问题在于会极大的消耗系统资源,如,某接口需要分别对每个用户做访问频率限制,假设系统中存在6W用户,则至多需要开启6W个定时任务来维持每个桶中的令牌数,这样的开销是巨大的。
另一种解法则是延迟计算,如上resync
函数。该函数会在每次获取令牌之前调用,其实现思路为,若当前时间晚于nextFreeTicketMicros
,则计算该段时间内可以生成多少令牌,将生成的令牌加入令牌桶中并更新数据。这样一来,只需要在获取令牌时计算一次即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 final long reserveEarliestAvailable (int requiredPermits, long nowMicros) { resync(nowMicros); long returnValue = nextFreeTicketMicros; double storedPermitsToSpend = min(requiredPermits, this .storedPermits); double freshPermits = requiredPermits - storedPermitsToSpend; long waitMicros = storedPermitsToWaitTime(this .storedPermits, storedPermitsToSpend) + (long ) (freshPermits * stableIntervalMicros); this .nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); this .storedPermits -= storedPermitsToSpend; return returnValue; }
该函数用于获取requiredPermits个令牌,并返回需要等待到的时间点。其中,storedPermitsToSpend
为桶中可以消费的令牌数,freshPermits
为还需要的(需要补充的)令牌数,根据该值计算需要等待的时间,追加并更新到nextFreeTicketMicros
。
需要注意的是,该函数的返回是更新前的(上次请求计算的)nextFreeTicketMicros
,而不是本次更新的nextFreeTicketMicros
,通俗来讲,本次请求需要为上次请求的预消费行为买单 ,这也是RateLimiter可以预消费(处理突发)的原理所在。若需要禁止预消费,则修改此处返回更新后的nextFreeTicketMicros
值。
回头来看SmoothBursty
的构造函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) { super (stopwatch); this .maxBurstSeconds = maxBurstSeconds; } @Override void doSetRate (double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = this .maxPermits; maxPermits = maxBurstSeconds * permitsPerSecond; if (oldMaxPermits == Double.POSITIVE_INFINITY) { storedPermits = maxPermits; } else { storedPermits = (oldMaxPermits == 0.0 ) ? 0.0 : storedPermits * maxPermits / oldMaxPermits; } }
桶中可存放的最大令牌数由maxBurstSeconds
计算而来,其含义为最大存储maxBurstSeconds秒生成的令牌。该参数的作用在于,可以更为灵活地控制流量。如,某些接口限制为300次/20秒,某些接口限制为50次/45秒等。
在了解以上概念后,就非常容易理解RateLimiter暴露出来的接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @CanIgnoreReturnValue public double acquire () { return acquire(1 ); } @CanIgnoreReturnValue public double acquire (int permits ) { long microsToWait = reserve(permits ); stopwatch.sleepMicrosUninterruptibly(microsToWait); return 1.0 * microsToWait / SECONDS.toMicros(1L ); } final long reserve (int permits ) { checkPermits(permits ); synchronized (mutex()) { return reserveAndGetWaitLength(permits , stopwatch.readMicros()); } }
acquire
函数主要用于获取permits个令牌,并计算需要等待多长时间,进而挂起等待,并将该值返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public boolean tryAcquire (int permits ) { return tryAcquire(permits , 0 , MICROSECONDS); } public boolean tryAcquire () { return tryAcquire(1 , 0 , MICROSECONDS); } public boolean tryAcquire (int permits , long timeout, TimeUnit unit) { long timeoutMicros = max(unit.toMicros(timeout), 0 ); checkPermits(permits ); long microsToWait; synchronized (mutex()) { long nowMicros = stopwatch.readMicros(); if (!canAcquire(nowMicros, timeoutMicros)) { return false ; } else { microsToWait = reserveAndGetWaitLength(permits , nowMicros); } } stopwatch.sleepMicrosUninterruptibly(microsToWait); return true ; } private boolean canAcquire (long nowMicros, long timeoutMicros) { return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros; } @Override final long queryEarliestAvailable (long nowMicros) { return nextFreeTicketMicros; }
tryAcquire
函数可以尝试在timeout时间内获取令牌,如果可以则挂起等待相应时间并返回true,否则立即返回false,canAcquire
用于判断timeout时间内是否可以获取令牌。