guava的RateLimiter限流器使用
背景
之前项目中有个门店会员卡导入需求,大致意思就是有一个Excel文件,文件中包含一家门店的会员及其会员卡相关信息,需要将这些信息导入到系统中。
当时开发这个需求的时候采用的是web页面操作后保存导入记录内容,由Job服务来定时扫描执行真正导入的逻辑,导入逻辑就包括解析内容、验证内容、格式转换、保存信息几个步骤。
在这个导入过程当中运用了线程池的技术,来提高导入的速度。因为导入的场景实时性还是要有的,不能够太慢,太慢了会对商家影响不好。综合下游各服务的压测瓶颈,如会员服务的QPS限制、会员卡的QPS限制,以及结合导入的平均耗时,创建了20个核心线程的线程池,如果过大容易导致下游服务限流,且对系统的压力会过大,影响正常流量请求。
上面的描述看似正常,但是这种Job触发非交互式的请求,也就是非页面客户端操作的请求有一个比较极端的问题,当操作人员填写的Excel内容信息过于简单或者数据有问题就还是会请求过快,导致被下游服务限流,比如说当一个Excel中有一万条数据时,这一万条数据内容信息都是非常的简单或者说数据都是填写有误的,那在调用验证服务的时候就会立马被返回结果就是请求过快限流报错(我们这个前提是按会员纬度一条条验证的,先不考虑设计的问题)。
对于上诉问题,我们不能去限制操作人员(其实也无法强限制),只能通过技术手段来保证服务的稳定,为此我们采用在线程池之上增加一个限速器,来平滑这异常的突增流量,使用的是guava的RateLimiter限流器,其实也可以成为限速器。
使用配置
pom文件文件中添加如下依赖
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>
使用方式
每秒生产令牌的个数,60表示每秒生产60个。
RateLimiter importRateLimiter = RateLimiter.create(60);
guava的RateLimiter令牌算法提供阻塞方法和非阻塞方法acquire()
表示阻塞方法,当没有获取到时就会等待;tryAcquire()
是非阻塞方法,如果没有获取到就会立马返回false,这当个方法的参数都表示每秒获取多少个令牌,默认为一秒钟一个,当然tryAcquire()
有提供其它几个API使用。
测试代码
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import com.google.common.util.concurrent.RateLimiter;
import lombok.Data;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Created by zhangfeibiao on 2020/9/24.
*
* 模拟会员卡导入增加速率限制器(限速)
*/
public class ImportRateLimiterTest {
private static ThreadPoolExecutor importExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(20);
final RateLimiter importRateLimiter = RateLimiter.create(60);
@Data
public class memberCardRunnable implements Runnable{
int flag ;
@Override
public void run() {
System.out.println(DateUtil.formatDateTime(new Date()) + "flag=" + flag);
}
public memberCardRunnable(int flag) {
this.flag = flag;
}
}
/**
* tryAcquire尝试获取permit,默认超时时间是0,意思是拿不到就立即返回false
*/
public void importMemberCard(){
long start = System.currentTimeMillis();
for (int i = 0; i < 300; i++) {
if (importRateLimiter.tryAcquire(1)) {
System.out.println(DateUtil.formatDateTime(new Date()) + "获得permits成功");
importExecutor.execute(new memberCardRunnable(i));
} else {
System.out.println(DateUtil.formatDateTime(new Date()) + "获得permits失败");
}
}
long end = System.currentTimeMillis();
System.out.println("限速不等待耗时:" + (end - start) + "ms");
}
/**
* 不限制速度
*/
public void importMemberCardNoLimitRate(){
long start = System.currentTimeMillis();
for (int i = 0; i < 300; i++) {
importExecutor.execute(new memberCardRunnable(i));
}
long end = System.currentTimeMillis();
System.out.println("不限速耗时:" + (end - start) + "ms");
}
/**
* acquire需要等待 等价于300个请求限制在5秒左右时间投递到线程池中,每秒允许60个请求放行到线程池去抢占
*/
public void importMemberCardWait(){
long start = System.currentTimeMillis();
for (int i = 0; i < 300; i++) {
importRateLimiter.acquire(1);
System.out.println(DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MS_PATTERN) + "获得permits成功");
importExecutor.execute(new memberCardRunnable(i));
}
long end = System.currentTimeMillis();
System.out.println("耗时:" + (end - start) + "ms");
}
/**
* 模拟会员卡导入
*/
public static void main(String[] args) {
// 不限速耗时 5ms
new ImportRateLimiterTest().importMemberCardNoLimitRate();
// 限速耗时耗时 4986ms
new ImportRateLimiterTest().importMemberCardWait();
// 限速不等待耗时 1ms
new ImportRateLimiterTest().importMemberCard();
}
}
结果分析
运行main方法后,我们可以得到一个结果
如果有300个非简单请求,当只有线程池不加限速器时,所有的请求在5ms就会被执行完(这里不考虑队列的大小,不是主要影响点),可想如果有一万个请求,在1s中就能发完,那这样下游服务肯定会因为QPS过大,导出被限流拒绝。
我们再来看如果这300个请求,在线程池之上增加一个限速器,整个请求会在将近5s的时间内完成,这是因为我们限制了每秒中只生产60个令牌,每个请求需要获得一个令牌,没有获得就需要阻塞等待,这就控制了我们请求投放的速度。
总结
我们从结果中可以看出,令牌桶算法可以从我们客户端出发控制请求的频率,平滑我们的流量,保证服务链路的健康。
思考
源码片段解读
public abstract class RateLimiter {
/**
* 用给定的吞吐量(“permits per second”)创建一个RateLimiter。
* 通常是QPS
*/
public static RateLimiter create(double permitsPerSecond) {
return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}
/**
* 用给定的吞吐量(QPS)和一个预热期创建一个RateLimiter
*/
public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
checkArgument(warmupPeriod >= 0, "warmupPeriod must not be negative: %s", warmupPeriod);
return create(permitsPerSecond, warmupPeriod, unit, 3.0, SleepingStopwatch.createFromSystemTimer());
}
static RateLimiter create(
double permitsPerSecond,
long warmupPeriod,
TimeUnit unit,
double coldFactor,
SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}
private final SleepingStopwatch stopwatch;
// 锁
private volatile Object mutexDoNotUseDirectly;
private Object mutex() {
Object mutex = mutexDoNotUseDirectly;
if (mutex == null) {
synchronized (this) {
mutex = mutexDoNotUseDirectly;
if (mutex == null) {
mutexDoNotUseDirectly = mutex = new Object();
}
}
}
return mutex;
}
/**
* 从RateLimiter中获取一个permit,阻塞直到请求可以获得为止
* @return 休眠的时间,单位是秒,如果没有被限制则是0.0
*/
public double acquire() {
return acquire(1);
}
/**
* 从RateLimiter中获取指定数量的permits,阻塞直到请求可以获得为止
*/
public double acquire(int permits) {
long microsToWait = reserve(permits);
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
/**
* 预定给定数量的permits以备将来使用
* 直到这些预定数量的permits可以被消费则返回逝去的微秒数
*/
final long reserve(int permits) {
checkPermits(permits);
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
private static void checkPermits(int permits) {
checkArgument(permits > 0, "Requested permits (%s) must be positive", permits);
}
final long reserveAndGetWaitLength(int permits, long nowMicros) {
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
return max(momentAvailable - nowMicros, 0);
}
}
abstract class SmoothRateLimiter extends RateLimiter {
/** The currently stored permits. */
double storedPermits;
/** The maximum number of stored permits. */
double maxPermits;
/**
* The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
* per second has a stable interval of 200ms.
*/
double stableIntervalMicros;
/**
* The time when the next request (no matter its size) will be granted. After granting a request,
* this is pushed further in the future. Large requests push this further than small requests.
*/
private long nextFreeTicketMicros = 0L; // could be either in the past or future
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros);
long returnValue = nextFreeTicketMicros;
double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // 本次可以获取到的permit数量
double freshPermits = requiredPermits - storedPermitsToSpend; // 差值,如果存储的permit大于本次需要的permit数量则此处是0,否则是一个正数
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros); // 计算需要等待的时间(微秒)
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
this.storedPermits -= storedPermitsToSpend; // 减去本次消费的permit数
return returnValue;
}
void resync(long nowMicros) {
// if nextFreeTicket is in the past, resync to now
if (nowMicros > nextFreeTicketMicros) { // 表示当前可以获得permit
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); // 计算这段时间可以生成多少个permit
storedPermits = min(maxPermits, storedPermits + newPermits); // 如果超过maxPermit,则取maxPermit,否则取存储的permit+新生成的permit
nextFreeTicketMicros = nowMicros; // 设置下一次可以获得permit的时间点为当前时间
}
}
}