1、LoadingCache简介

Guava编程工具类库下的一个基于键值对(key-value)的本地缓存工具类,通过key获取缓存中的值,并且可以设置缓存的过期时间,基于ConcurrentMap接口的实现,所以是线程安全的。

1.1 可以使用CacheBuilder来快速创建

1
2
3
4
5
6
7
8
9
10
LoadingCache<Key, Graph> graphs = CacheBuilder.newBuilder()
.maximumSize(10000) //缓存大小
.expireAfterWrite(10, TimeUnit.MINUTES) //缓存过期时间
.removalListener(MY_LISTENER) //缓存移除提醒
.build(
new CacheLoader<Key, Graph>() {
public Graph load(Key key) throws AnyException {
return createExpensiveGraph(key);
}
});

1.2 主要方法

  • get(K key)方法定义了默认的基本缓存操作,如果Cache中不包含该Key对应的Value,将自动调用CacheLoader.load(K key)方法进行自动加载。如果没有定义CacheLoader,将抛出一个UncheckedExecutionException异常。
  • getAll(Iterable<? extends K> keys)则扩展了get(K key),可以同时获取多组Key-Value对的结果。
  • refresh(K key)方法可以对特定Key的Value进行刷新,即调用对应Key的CacheLoader.load(K key)重新加载并替换原值。
  • asMap()方法则返回对应的ConcurrentMap对象,可以使用常规的Map方法来对缓存中的数据进行操作。

2、RateLimiter简介

同样也是Guava下的一个限流工具类,基于令牌桶算法,初始时创建指定令牌数,然后供其他操作通过acquire请求获取令牌,如果桶里没有令牌了,操作就需要等待,知道桶里再次有令牌可以获取。

参考:RateLimiter使用介绍

2.1 简单使用

  • RateLimiter.create(double permitsPerSecond):permitsPerSecond以“每秒许可数”(通常称为QPS,即每秒查询数)给出。

  • acquire()会阻塞当前线程直到获取到令牌,同时可以提前透支RateLimiter未来的令牌,所以使用acquire有风险,比如你创建一个RateLimiter.create(5.0),然后limiter.acquire(6.0)是会获取成功的,因为提前透支了后面的令牌。

  • tryAcquire()不会阻塞当前线程,直接返回boolean结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import com.google.common.util.concurrent.RateLimiter;

public class RateLimiterDemo {
public static void main(String[] args) {
// 创建一个每秒放入5个令牌的RateLimiter
RateLimiter limiter = RateLimiter.create(5.0);

for (int i = 0; i < 10; i++) {
// 请求一个令牌
limiter.acquire();//如果令牌不够,acquire()会阻塞,直到获取到令牌
limiter.tryAcquire();//tryAcquire不阻塞当前线程,直接返回boolean结果
System.out.println("处理请求: " + i);
}
}
}

3、接口限流实现

3.1 首先创建一个自定义注解@AccessLimit

主要属性有:

  • 资源名称
  • 访问限制次数
  • 访问限制类型
    • SECONDS_TYPE, //秒级
    • MINUTES_TYPE, //分钟级
    • EXCLUSIVE_TYPE; //特殊级
  • 限制返回提醒信息
  • 限制规则
    • ALL //针对接口的总请求数
    • IPADDR //根据Ip地址来限制”,
    • IPADDRACCOUNT //根据Ip地址+账户来限制;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/**
* @Description 访问控制自定义注解
*/
@Inherited //指定被修饰的Annotation将具有继承性,说明子类可以继承父类中的该注解
@Documented //说明该注解可以被生成在Javadoc中
/* @Target:描述注解的使用位置
1、@Target(ElementType.TYPE) //接口、类
2、@Target(ElementType.FIELD) //属性
3、@Target(ElementType.METHOD) //方法
4、@Target(ElementType.PARAMETER) //方法参数
5、@Target(ElementType.CONSTRUCTOR) //构造函数
6、@Target(ElementType.LOCAL_VARIABLE) //局部变量
7、@Target(ElementType.ANNOTATION_TYPE) //注解
8、@Target(ElementType.PACKAGE) //包
注:可以指定多个位置
*/
@Target({ElementType.FIELD, ElementType.TYPE, ElementType.METHOD})
/* @Retention:描述注解的生命周期,表示在什么级别保存该注解的信息
1、@Retention(RetentionPolicy.SOURCE) //注解仅存在于源码中,在class字节码文件中不包含
2、@Retention(RetentionPolicy.CLASS) //默认的保留策略,注解会在class字节码文件中存在,但运行时无法获得
3、@Retention(RetentionPolicy.RUNTIME) //注解会在class字节码文件中存在,在运行时可以通过反射获取到
*/
@Retention(RetentionPolicy.RUNTIME)
public @interface AccessLimit {

//资源名称
String name() default "default_resource_name";

/**
* 限制访问次数
*/
int limit() default 3;

/**
* 访问控制缓存类型
*/
CacheType cacheType() default CacheType.SECONDS_TYPE;

/**
* 提醒信息
*/
String tipMsg() default "当前资源无法访问,请稍后再试!";

String startTime() default "";

String endTime() default "";


/**
* 限制类型
*/
LimitKeyTypeEnum limitKeyType() default LimitKeyTypeEnum.IPADDRACCOUNT;

enum CacheType {
SECONDS_TYPE, //秒级
MINUTES_TYPE, //分钟级
EXCLUSIVE_TYPE; //特殊级
}

}

3.2 创建一个自定义拦截器RateLimitInterceptor

preHandle在处理请求前,对接口限流进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
/**
* @Description 请求拦截器
*/
@Component
public class RateLimitInterceptor implements HandlerInterceptor {

private static final Logger LOGGER = LoggerFactory.getLogger(RateLimitInterceptor.class);

/**
* 本地缓存
* maximumSize 设置缓存个数
* expireAfterWrite 写入后过期时间
* 请求过来时根据限制类型key创建一个RateLimiter,1s内有多个同样的请求时,进行访问限制拒绝处理
* acquire可以提前透支RateLimiter未来的令牌,使用acquire有风险
*/
private static final LoadingCache<String, RateLimiter> SECONDS_TYPE_CACHE = CacheBuilder.newBuilder()
.maximumSize(1000)
.expireAfterWrite(1, TimeUnit.SECONDS)
.build(new CacheLoader<String, RateLimiter>() {
@Override
public RateLimiter load(String key) {
double perSecondLimit = RateLimitUtil.getCacheKeyPerSecond(key);
return RateLimiter.create(perSecondLimit);
}
});

private static final ConcurrentHashMap<String, Object> EXCLUSIVE_TYPE_CACHE = new ConcurrentHashMap<>();

/**
* @Description 请求前置处理
* @Param
* @Return
*/
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
HandlerMethod handlerMethod = (HandlerMethod) handler;
AccessLimit accessLimit = handlerMethod.getMethodAnnotation(AccessLimit.class);
if(accessLimit !=null){
AccessLimit.CacheType cacheType = accessLimit.cacheType();
String cacheKey=null;
RateLimiter limiter =null;
switch (cacheType){
case EXCLUSIVE_TYPE: //特殊限制,限制某个时间段不能访问
if(StringUtils.isNotEmpty(accessLimit.startTime()) &&
StringUtils.isNotEmpty(accessLimit.endTime()) && !(
LocalTime.now().isAfter(LocalTime.parse(accessLimit.startTime(), DateTimeFormatter.ofPattern("HH:mm"))) &&
LocalTime.now().isBefore( LocalTime.parse(accessLimit.endTime(), DateTimeFormatter.ofPattern("HH:mm"))))
) {
doResult(response, handlerMethod, accessLimit, cacheKey);
return false;
}
cacheKey = RateLimitUtil.generateSimpleCacheKey(handlerMethod, request);
if (EXCLUSIVE_TYPE_CACHE.containsKey(cacheKey)) {
doResult(response, handlerMethod, accessLimit, cacheKey);
return false;
}else{
EXCLUSIVE_TYPE_CACHE.put(cacheKey,cacheKey);
}
break;
case SECONDS_TYPE: //秒级限制
cacheKey = RateLimitUtil.generateCacheKey(handlerMethod, request);
limiter = SECONDS_TYPE_CACHE.get(cacheKey);//当cacheKey不存在时会触发load放入缓存中
if (!limiter.tryAcquire()) {//acquire 阻塞当前线程 tryAcquire不阻塞当前线程
//return ResponseEntity.builder().code(404).msg("访问速率过快").build();
doResult(response, handlerMethod, accessLimit, cacheKey);
return false;
}
break;
case MINUTES_TYPE:
break;
}


}
return HandlerInterceptor.super.preHandle(request, response, handler);
}

/**
* @Description 访问限制返回信息处理
* @Param
*/
private void doResult(HttpServletResponse response, HandlerMethod handlerMethod, AccessLimit accessLimit, String cacheKey) throws IOException {
StringBuilder msg = new StringBuilder();
if(accessLimit.limitKeyType()== LimitKeyTypeEnum.IPADDR
|| accessLimit.limitKeyType()== LimitKeyTypeEnum.IPADDRACCOUNT
){
msg.append(cacheKey);
}
if (StringUtils.isNotEmpty(accessLimit.tipMsg())) {
msg.append(" resource info: ").append(accessLimit.tipMsg());
}
Class<?> returnType = handlerMethod.getMethod().getReturnType();
Object result= msg.toString();
//if (returnType.isAssignableFrom(CommResponse.class)) {
// result=new CommResponse(false,msg.toString());
//}else if(returnType.isAssignableFrom(QueryResponse.class)){
// result=new QueryResponse();
//}else{
// result=new CommResponse(false,msg.toString());
//}
response.setContentType("application/json;charset=UTF-8");
response.getWriter().write(JSONUtil.toJsonStr(result));
LOGGER.info(msg.toString());
}

@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
//System.out.println("拦截器: 请求处理后执行");
}

@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
//System.out.println("拦截器: 请求完成后执行");
HandlerMethod handlerMethod = (HandlerMethod) handler;
AccessLimit accessLimit = handlerMethod.getMethodAnnotation(AccessLimit.class);
if(accessLimit !=null) {
AccessLimit.CacheType cacheType = accessLimit.cacheType();
if (cacheType==EXCLUSIVE_TYPE){
String cacheKey = RateLimitUtil.generateSimpleCacheKey(handlerMethod, request);
EXCLUSIVE_TYPE_CACHE.remove(cacheKey);
}
}
HandlerInterceptor.super.afterCompletion(request, response, handler, ex);
}
}

3.2.1 RateLimitUtil限流工具类

主要用于根据注解上的属性配置获取对应数据进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/**
* @Description 访问控制处理工具类
*/
public class RateLimitUtil {

/**
* 获取唯一key根据注解类型
* <p>
* 规则 资源名:业务key:perSecond
*
* @param method
* @param request
* @return
*/
public static String generateCacheKey(HandlerMethod method, HttpServletRequest request) {
//获取方法上的注解
AccessLimit accessLimit = method.getMethodAnnotation(AccessLimit.class);
StringBuffer cacheKey = new StringBuffer(accessLimit.name() + ":");
String ipAddr = getIpAddr(request);
switch (accessLimit.limitKeyType()) {
case IPADDR:
cacheKey.append(ipAddr + ":");
break;
case IPADDRACCOUNT:
cacheKey.append(ipAddr + ":");
cacheKey.append(request.getSession().getAttribute("account") + ":");
break;
case ALL:
break;
}
cacheKey.append(accessLimit.limit());
return cacheKey.toString();
}

public static String generateSimpleCacheKey(HandlerMethod method, HttpServletRequest request) {
//获取方法上的注解
AccessLimit accessLimit = method.getMethodAnnotation(AccessLimit.class);
return String.join(":",accessLimit.name() ,String.valueOf(accessLimit.limit()));
}

/**
* 获取客户端IP地址
*
* @param request 请求
* @return
*/
public static String getIpAddr(HttpServletRequest request) {
String ip = request.getHeader("x-forwarded-for");
if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
ip = request.getHeader("Proxy-Client-IP");
}
if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
ip = request.getHeader("WL-Proxy-Client-IP");
}
if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
ip = request.getRemoteAddr();
if ("127.0.0.1".equals(ip)) {
//根据网卡取本机配置的IP
InetAddress inet = null;
try {
inet = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
e.printStackTrace();
}
ip = inet.getHostAddress();
}
}
// 对于通过多个代理的情况,第一个IP为客户端真实IP,多个IP按照','分割
if (ip != null && ip.length() > 15) {
if (ip.indexOf(",") > 0) {
ip = ip.substring(0, ip.indexOf(","));
}
}
if ("0:0:0:0:0:0:0:1".equals(ip)) {
ip = "127.0.0.1";
}
return ip;
}

public static double getCacheKeyPerSecond(String key) {
String[] ruleStr = key.split(":");
String perSecond = ruleStr[ruleStr.length-1];
return Double.parseDouble(perSecond);
}

}

3.3 具体使用

使用apache-jmeter进行接口测试,当1s请求getUserInfo接口超过5次时,则会返回限流提示信息。

注:但是可能不会那么准确,可能没到5次就限流了,或者超过5次的请求没有限制成功,猜测可能跟LoadingCache的到期时间也是1s,时间重合导致的。

后续考虑使用Sentine来实现,未完待续。。。

1
2
3
4
5
6
7
8
9
10
@GetMapping(value = "/getUserInfo")
@AccessLimit(name = "getUserInfo", cacheType = SECONDS_TYPE, limit = 5, limitKeyType = LimitKeyTypeEnum.IPADDR, tipMsg = "访问过于频繁,请稍后再试!")
public String getUserInfo(@RequestParam(value = "userName") String userName) {
UserInfo userInfo = userInfoService.getUserInfoByUserName(userName);
if (Optional.ofNullable(userInfo).isPresent()) {
return userInfo.toString();
} else {
return "用户不存在!";
}
}