当前位置: 首页>后端>正文

分布式大型环境如何用AOP限流?

限流是保护高并发系统的利器,处理高并发还包括:

1.缓存 缓存的目的是提升系统访问速度和增大系统处理容量
2.降级 降级是当服务出现问题或者影响到核心流程时,需要暂时屏蔽掉,待高峰或者问题解决后再打开
3.限流 限流的目的是通过对并发访问/请求进行限速,或者对一个时间窗口内的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务、排队或等待、降级等处理

限流的目的是通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务或进行流量整形。

漏桶算法:

漏桶一个固定容量的漏桶,按照固定常量速率流出请求,流入请求速率任意,当流入的请求数累积到漏桶容量时,则新流入的请求被拒绝。漏桶可以看做是一个具有固定容量、固定流出速率的队,漏桶限制的是请求的流出速率。
漏桶算法的实现往往依赖于队列,请求到达如果队列未满则直接放入队列,然后有一个处理器按照固定频率从队列头取出请求进行处理。如果请求量大,则会导致队列满,那么新来的请求就会被抛弃。

令牌桶算法

分布式大型环境如何用AOP限流?,第1张
令牌桶算法.png

1.令牌桶是按照固定速率往桶中添加令牌,请求是否被处理需要看桶中令牌是否足够,当令牌数减为零时则拒绝新的请求;漏桶则是按照常量固定速率流出请求,流入请求速率任意,当流入的请求数累积到漏桶容量时,则新流入的请求被拒绝;
2.令牌桶限制的是平均流入速率,允许突发请求,只要有令牌就可以处理,支持一次拿3个令牌,4个令牌;漏桶限制的是常量流出速率,即流出速率是一个固定常量值,比如都是1的速率流出,而不能一次是1,下次又是2,从而平滑突发流入速率;
3.令牌桶允许一定程度的突发,而漏桶主要目的是平滑流出速率;

Google开源工具包Guava提供了限流工具类RateLimiter,该类基于令牌桶算法(Token Bucket)来完成限流

下面是RateLimiter令牌桶实现

import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LxRateLimit {

    double limitNum() default 20;  //默认每秒放入桶中的token

    //获取令牌的等待时间
    int timeOut() default 0;

    //等待时间单位
    TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
}
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.ResponseBody;
import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

@Aspect
@Component
@Slf4j
@Order(1)
public class LxRateLimitAspect {

    //用来存放不同接口的RateLimiter(key为接口名称,value为RateLimiter)
    private ConcurrentHashMap<String, RateLimiter> map = new ConcurrentHashMap<>();
    private RateLimiter rateLimiter;

    /**
     * 定义切点
     * 1、通过扫包切入
     * 2、带有指定注解切入
     */
    @Pointcut("@annotation(com.yang.aspect.LxRateLimit)")
    public void checkPointcut() { }

    @ResponseBody
    @Around(value = "checkPointcut()")
    public Object aroundNotice(ProceedingJoinPoint joinPoint) throws Throwable {
        log.info("拦截到了{}方法", joinPoint.getSignature().getName());
        Object obj = null;
        //获取拦截的方法名
        Signature sig = joinPoint.getSignature();
        //获取拦截的方法名
        MethodSignature msig = (MethodSignature) sig;
        //返回被织入增加处理目标对象
        Object target = joinPoint.getTarget();
        //为了获取注解信息
        Method currentMethod = target.getClass().getMethod(msig.getName(), msig.getParameterTypes());
        //获取注解信息
        LxRateLimit annotation = currentMethod.getAnnotation(LxRateLimit.class);
        double limitNum = annotation.limitNum(); //获取注解每秒加入桶中的token
        TimeUnit timeUnit = annotation.timeUnit();//获取时间单位
        String functionName = msig.getName(); // 注解所在方法名区分不同的限流策略
        int timeOut = annotation.timeOut();

        //获取rateLimiter
        if(map.containsKey(functionName)){
            rateLimiter = map.get(functionName);
        }else {
            map.put(functionName, RateLimiter.create(limitNum));
            rateLimiter = map.get(functionName);
        }

       try {
            if (rateLimiter.tryAcquire(timeOut,timeUnit)) {
                //执行方法
                obj = joinPoint.proceed();
            } else {
                log.info("{}方法限流超载", joinPoint.getSignature().getName());
                return "方法限流超载";
            }
        } catch (Throwable throwable) {
            log.error("{}方法限流异常", joinPoint.getSignature().getName());
            throwable.printStackTrace();
            return "方法限流超载";
        }
        log.info("{}方法限流正常", joinPoint.getSignature().getName());
        return obj;
    }
}

但是RateLimiter只作用于一个jvm,而往往实际项目中会部署在多台机器,运行于多个jvm,这样的话我们的LxRateLimit 又要修改。
比如我们有20服务器,我们的需求是所有服务器总限流每秒在100个左右,这样的话,单个应用中LxRateLimit.limitNum就要改为100/20=5。这样就能保证。所有的服务器流量在100个左右(前提是nginx分发请求是平均的)。

当然分布式环境也可以用redis限流,且不用修改limitNum数量,不过这样增加了redis压力(当然完全不要用担心redis,单机redis最高可以每秒处理几十万请求,何况我们可以做集群)

import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisLimit {

    //存储key的前缀
    String prefix() default "limit";

    //获取等待时间,redis最高每秒处理几十万请求
    int expire() default 1;

    //等待时间单位,毫秒
    TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
}

注意这里的策略有修改,不在是发放令牌,而是redis的过期时间,单位最好是毫秒。比如我们需求是1s保持500个pv,那么RedisLimit.expire=1000/500=2
以下是redisson实现


import com.yang.annotation.RedisLimit;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.ResponseBody;
import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;

@Aspect
@Component
@Slf4j
@Order(1)
public class RedisLimitAspect {

    @Autowired
    private RedissonClient redisson;

    /**
     * 定义切点
     * 1、带有指定注解切入
     */
    @Pointcut("@annotation(com.yang.annotation.RedisLimit)")
    public void checkPointcut() { }

    @ResponseBody
    @Around(value = "checkPointcut()")
    public Object aroundNotice(ProceedingJoinPoint joinPoint) throws Throwable {
        log.info("拦截到了{}方法", joinPoint.getSignature().getName());
        Object obj = null;
        // 得到类名
        String clazzName = joinPoint.getTarget().getClass().getName();
        //获取拦截的方法
        MethodSignature msig = (MethodSignature) joinPoint.getSignature();
        //返回被织入增加处理目标对象
        Object target = joinPoint.getTarget();
        //为了获取注解信息
        Method currentMethod = target.getClass().getMethod(msig.getName(), msig.getParameterTypes());
        //获取参数列表
        Object[] args = joinPoint.getArgs();
        //获取注解信息
        RedisLimit annotation = currentMethod.getAnnotation(RedisLimit.class);
        String prefix = annotation.prefix();
        int expire = annotation.expire();//获取时间单位
        TimeUnit timeUnit = annotation.timeUnit();//获取时间单位
        String key = getKey(prefix,clazzName,msig.getName()); //注解限制的前缀+所在类+方法名区分不同的限流策略
        RLock lock = redisson.getLock(key);
        try {
            boolean res = lock.tryLock(0,expire,timeUnit);
            if (res) {
                //执行方法
                obj = joinPoint.proceed();
            } else {
                log.info("{}方法限流超载", msig.getName());
                return "方法限流超载";
            }
        } catch (Exception throwable) {
            log.error("{}方法限流异常", msig.getName());
            throwable.printStackTrace();
            return "方法限流超载";
        }
        log.info("{}方法限流正常", msig.getName());
        return obj;
    }

    /**
     *      * 根据类名、方法名和参数生成Key
     *      * @param clazzName
     *      * @param methodName
     *      * @return key格式:全类名|方法名|参数类型
     *
     */
    private String getKey( String prefix,String clazzName, String methodName) {
        StringBuilder key = new StringBuilder();
        key.append(prefix);
        key.append(clazzName);
        key.append(methodName);
        return key.toString();
    }

}

注意这里不要手动释放锁,等时间到了自动释放。
也可以用原生的redisTemplate 实现

import com.google.common.util.concurrent.RateLimiter;
import com.yang.annotation.RedisLimit;
import com.yang.utils.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.ResponseBody;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

@Aspect
@Component
@Slf4j
@Order(1)
public class RedisLimitAspect {

    @Autowired
    private RedisTemplate redisTemplate ;

    /**
     * 定义切点
     * 1、带有指定注解切入
     */
    @Pointcut("@annotation(com.yang.annotation.RedisLimit)")
    public void checkPointcut() { }

    @ResponseBody
    @Around(value = "checkPointcut()")
    public Object aroundNotice(ProceedingJoinPoint joinPoint) throws Throwable {
        log.info("拦截到了{}方法", joinPoint.getSignature().getName());
        Object obj = null;
        // 得到类名
        String clazzName = joinPoint.getTarget().getClass().getName();
        //获取拦截的方法
        MethodSignature msig = (MethodSignature) joinPoint.getSignature();
        //返回被织入增加处理目标对象
        Object target = joinPoint.getTarget();
        //为了获取注解信息
        Method currentMethod = target.getClass().getMethod(msig.getName(), msig.getParameterTypes());
        //获取参数列表
        Object[] args = joinPoint.getArgs();
        //获取注解信息
        RedisLimit annotation = currentMethod.getAnnotation(RedisLimit.class);
        String prefix = annotation.prefix();
        int expire = annotation.expire();//获取时间单位
        TimeUnit timeUnit = annotation.timeUnit();//获取时间单位
        String key = getKey(prefix,clazzName,msig.getName()); // 注解限制的前缀+所在类+方法名区分不同的限流策略
        String value = getValue(key,args);
        try {
            if (!redisTemplate.hasKey(key)) {
                //执行方法
                redisTemplate.opsForValue().set(key,value,expire,timeUnit);
                obj = joinPoint.proceed();
            } else {
                log.info("{}方法限流超载", msig.getName());
                return "方法限流超载";
            }
        } catch (Throwable throwable) {
            log.error("{}方法限流异常", msig.getName());
            throwable.printStackTrace();
        }
        log.info("{}方法限流正常", msig.getName());
        return obj;
    }

    /**
     *      * 根据类名、方法名和参数生成Key
     *      * @param clazzName
     *      * @param methodName
     *      * @return key格式:全类名|方法名|参数类型
     *
     */
    private String getKey( String prefix,String clazzName, String methodName) {
        StringBuilder key = new StringBuilder();
        key.append(prefix);
        key.append(clazzName);
        key.append(methodName);
        return key.toString();
    }

    /**
     *      * 根据类名、方法名和参数生成value
     *      * @param clazzName
     *      * @param methodName
     *      * @param args
     *      * @return key格式:全类名|方法名|参数类型
     *
     */
    private String getValue(String key, Object[] args) {
        StringBuilder value = new StringBuilder();
        value.append(key);
        value.append(Arrays.toString(args));
        return value.toString();
    }
}

https://www.xamrdz.com/backend/34y1932434.html

相关文章: