限流是保护高并发系统的利器,处理高并发还包括:
1.缓存 缓存的目的是提升系统访问速度和增大系统处理容量
2.降级 降级是当服务出现问题或者影响到核心流程时,需要暂时屏蔽掉,待高峰或者问题解决后再打开
3.限流 限流的目的是通过对并发访问/请求进行限速,或者对一个时间窗口内的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务、排队或等待、降级等处理
限流的目的是通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务或进行流量整形。
漏桶算法:
漏桶一个固定容量的漏桶,按照固定常量速率流出请求,流入请求速率任意,当流入的请求数累积到漏桶容量时,则新流入的请求被拒绝。漏桶可以看做是一个具有固定容量、固定流出速率的队列,漏桶限制的是请求的流出速率。
漏桶算法的实现往往依赖于队列,请求到达如果队列未满则直接放入队列,然后有一个处理器按照固定频率从队列头取出请求进行处理。如果请求量大,则会导致队列满,那么新来的请求就会被抛弃。
令牌桶算法
1.令牌桶是按照固定速率往桶中添加令牌,请求是否被处理需要看桶中令牌是否足够,当令牌数减为零时则拒绝新的请求;漏桶则是按照常量固定速率流出请求,流入请求速率任意,当流入的请求数累积到漏桶容量时,则新流入的请求被拒绝;
2.令牌桶限制的是平均流入速率,允许突发请求,只要有令牌就可以处理,支持一次拿3个令牌,4个令牌;漏桶限制的是常量流出速率,即流出速率是一个固定常量值,比如都是1的速率流出,而不能一次是1,下次又是2,从而平滑突发流入速率;
3.令牌桶允许一定程度的突发,而漏桶主要目的是平滑流出速率;
Google开源工具包Guava提供了限流工具类RateLimiter,该类基于令牌桶算法(Token Bucket)来完成限流
下面是RateLimiter令牌桶实现
import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LxRateLimit {
double limitNum() default 20; //默认每秒放入桶中的token
//获取令牌的等待时间
int timeOut() default 0;
//等待时间单位
TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
}
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.ResponseBody;
import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@Aspect
@Component
@Slf4j
@Order(1)
public class LxRateLimitAspect {
//用来存放不同接口的RateLimiter(key为接口名称,value为RateLimiter)
private ConcurrentHashMap<String, RateLimiter> map = new ConcurrentHashMap<>();
private RateLimiter rateLimiter;
/**
* 定义切点
* 1、通过扫包切入
* 2、带有指定注解切入
*/
@Pointcut("@annotation(com.yang.aspect.LxRateLimit)")
public void checkPointcut() { }
@ResponseBody
@Around(value = "checkPointcut()")
public Object aroundNotice(ProceedingJoinPoint joinPoint) throws Throwable {
log.info("拦截到了{}方法", joinPoint.getSignature().getName());
Object obj = null;
//获取拦截的方法名
Signature sig = joinPoint.getSignature();
//获取拦截的方法名
MethodSignature msig = (MethodSignature) sig;
//返回被织入增加处理目标对象
Object target = joinPoint.getTarget();
//为了获取注解信息
Method currentMethod = target.getClass().getMethod(msig.getName(), msig.getParameterTypes());
//获取注解信息
LxRateLimit annotation = currentMethod.getAnnotation(LxRateLimit.class);
double limitNum = annotation.limitNum(); //获取注解每秒加入桶中的token
TimeUnit timeUnit = annotation.timeUnit();//获取时间单位
String functionName = msig.getName(); // 注解所在方法名区分不同的限流策略
int timeOut = annotation.timeOut();
//获取rateLimiter
if(map.containsKey(functionName)){
rateLimiter = map.get(functionName);
}else {
map.put(functionName, RateLimiter.create(limitNum));
rateLimiter = map.get(functionName);
}
try {
if (rateLimiter.tryAcquire(timeOut,timeUnit)) {
//执行方法
obj = joinPoint.proceed();
} else {
log.info("{}方法限流超载", joinPoint.getSignature().getName());
return "方法限流超载";
}
} catch (Throwable throwable) {
log.error("{}方法限流异常", joinPoint.getSignature().getName());
throwable.printStackTrace();
return "方法限流超载";
}
log.info("{}方法限流正常", joinPoint.getSignature().getName());
return obj;
}
}
但是RateLimiter只作用于一个jvm,而往往实际项目中会部署在多台机器,运行于多个jvm,这样的话我们的LxRateLimit 又要修改。
比如我们有20服务器,我们的需求是所有服务器总限流每秒在100个左右,这样的话,单个应用中LxRateLimit.limitNum就要改为100/20=5。这样就能保证。所有的服务器流量在100个左右(前提是nginx分发请求是平均的)。
当然分布式环境也可以用redis限流,且不用修改limitNum数量,不过这样增加了redis压力(当然完全不要用担心redis,单机redis最高可以每秒处理几十万请求,何况我们可以做集群)
import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisLimit {
//存储key的前缀
String prefix() default "limit";
//获取等待时间,redis最高每秒处理几十万请求
int expire() default 1;
//等待时间单位,毫秒
TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
}
注意这里的策略有修改,不在是发放令牌,而是redis的过期时间,单位最好是毫秒。比如我们需求是1s保持500个pv,那么RedisLimit.expire=1000/500=2
以下是redisson实现
import com.yang.annotation.RedisLimit;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.ResponseBody;
import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;
@Aspect
@Component
@Slf4j
@Order(1)
public class RedisLimitAspect {
@Autowired
private RedissonClient redisson;
/**
* 定义切点
* 1、带有指定注解切入
*/
@Pointcut("@annotation(com.yang.annotation.RedisLimit)")
public void checkPointcut() { }
@ResponseBody
@Around(value = "checkPointcut()")
public Object aroundNotice(ProceedingJoinPoint joinPoint) throws Throwable {
log.info("拦截到了{}方法", joinPoint.getSignature().getName());
Object obj = null;
// 得到类名
String clazzName = joinPoint.getTarget().getClass().getName();
//获取拦截的方法
MethodSignature msig = (MethodSignature) joinPoint.getSignature();
//返回被织入增加处理目标对象
Object target = joinPoint.getTarget();
//为了获取注解信息
Method currentMethod = target.getClass().getMethod(msig.getName(), msig.getParameterTypes());
//获取参数列表
Object[] args = joinPoint.getArgs();
//获取注解信息
RedisLimit annotation = currentMethod.getAnnotation(RedisLimit.class);
String prefix = annotation.prefix();
int expire = annotation.expire();//获取时间单位
TimeUnit timeUnit = annotation.timeUnit();//获取时间单位
String key = getKey(prefix,clazzName,msig.getName()); //注解限制的前缀+所在类+方法名区分不同的限流策略
RLock lock = redisson.getLock(key);
try {
boolean res = lock.tryLock(0,expire,timeUnit);
if (res) {
//执行方法
obj = joinPoint.proceed();
} else {
log.info("{}方法限流超载", msig.getName());
return "方法限流超载";
}
} catch (Exception throwable) {
log.error("{}方法限流异常", msig.getName());
throwable.printStackTrace();
return "方法限流超载";
}
log.info("{}方法限流正常", msig.getName());
return obj;
}
/**
* * 根据类名、方法名和参数生成Key
* * @param clazzName
* * @param methodName
* * @return key格式:全类名|方法名|参数类型
*
*/
private String getKey( String prefix,String clazzName, String methodName) {
StringBuilder key = new StringBuilder();
key.append(prefix);
key.append(clazzName);
key.append(methodName);
return key.toString();
}
}
注意这里不要手动释放锁,等时间到了自动释放。
也可以用原生的redisTemplate 实现
import com.google.common.util.concurrent.RateLimiter;
import com.yang.annotation.RedisLimit;
import com.yang.utils.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.ResponseBody;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
@Aspect
@Component
@Slf4j
@Order(1)
public class RedisLimitAspect {
@Autowired
private RedisTemplate redisTemplate ;
/**
* 定义切点
* 1、带有指定注解切入
*/
@Pointcut("@annotation(com.yang.annotation.RedisLimit)")
public void checkPointcut() { }
@ResponseBody
@Around(value = "checkPointcut()")
public Object aroundNotice(ProceedingJoinPoint joinPoint) throws Throwable {
log.info("拦截到了{}方法", joinPoint.getSignature().getName());
Object obj = null;
// 得到类名
String clazzName = joinPoint.getTarget().getClass().getName();
//获取拦截的方法
MethodSignature msig = (MethodSignature) joinPoint.getSignature();
//返回被织入增加处理目标对象
Object target = joinPoint.getTarget();
//为了获取注解信息
Method currentMethod = target.getClass().getMethod(msig.getName(), msig.getParameterTypes());
//获取参数列表
Object[] args = joinPoint.getArgs();
//获取注解信息
RedisLimit annotation = currentMethod.getAnnotation(RedisLimit.class);
String prefix = annotation.prefix();
int expire = annotation.expire();//获取时间单位
TimeUnit timeUnit = annotation.timeUnit();//获取时间单位
String key = getKey(prefix,clazzName,msig.getName()); // 注解限制的前缀+所在类+方法名区分不同的限流策略
String value = getValue(key,args);
try {
if (!redisTemplate.hasKey(key)) {
//执行方法
redisTemplate.opsForValue().set(key,value,expire,timeUnit);
obj = joinPoint.proceed();
} else {
log.info("{}方法限流超载", msig.getName());
return "方法限流超载";
}
} catch (Throwable throwable) {
log.error("{}方法限流异常", msig.getName());
throwable.printStackTrace();
}
log.info("{}方法限流正常", msig.getName());
return obj;
}
/**
* * 根据类名、方法名和参数生成Key
* * @param clazzName
* * @param methodName
* * @return key格式:全类名|方法名|参数类型
*
*/
private String getKey( String prefix,String clazzName, String methodName) {
StringBuilder key = new StringBuilder();
key.append(prefix);
key.append(clazzName);
key.append(methodName);
return key.toString();
}
/**
* * 根据类名、方法名和参数生成value
* * @param clazzName
* * @param methodName
* * @param args
* * @return key格式:全类名|方法名|参数类型
*
*/
private String getValue(String key, Object[] args) {
StringBuilder value = new StringBuilder();
value.append(key);
value.append(Arrays.toString(args));
return value.toString();
}
}