当前位置: 首页>后端>正文

springboot集成CCFlow springboot集成zuul

一、背景

这几天在做服务的高可用。

为了确保提供服务的某一台机器出现故障导致客户的请求不可用,我们需要对这台服务器做故障重试或者智能路由到下一个可用服务器。

为此,特地上网查了些资料,最后选用了ribbon+spring retry的重试策略。

 

从参考的技术文章中可以看出,故障重试的核心

1是引入spring retry的依赖



<dependency>
            <groupId>org.springframework.retry</groupId>
            <artifactId>spring-retry</artifactId>
            <version>1.2.2.RELEASE</version>
        </dependency>



 

2是开启zuul和ribbon重试配置



zuul:
  retryable: true    #重试必配
ribbon:
  MaxAutoRetriesNextServer: 2     #更换服务实例次数
  MaxAutoRetries: 0        #当前服务重试次数
  OkToRetryOnAllOperations: true   #设置成false,只处理get请求故障



 

当然本文的目的并不止于此。

添加完了这些配置后,我们发现依然存在一些局限性。

1、当提供服务的集群机器实例小于MaxAutoRetriesNextServer时,只有采用轮询策略的负载可以正常使用。

2、当提供服务的集群机器实例大于MaxAutoRetriesNextServer时,采取轮询或者随机策略的负载偶尔可以正常使用。

而采用最小并发策略,或者单一负载(一般是为了解决session丢失问题,即同一个客户端发的请求固定访问某个服务器)则

完全不能正常工作。

      为什么这么说呢?比如我们有5台机器提供服务,第一台机器可以正常提供服务,第二台并发量最小。

      当第二到第五台服务器挂掉以后,采用轮询方式且MaxAutoRetriesNextServer=2。那么,ribbon会尝试访问第三台、第四台服务器。

      结果不言而喻。当然如果运气好,第三台或第四台服务器是可以用的,那就能正常提供服务。

      采用随机策略,同样要依靠运气。

      最小并发或单一策略的,则是不论重试几次则因为总是选择挂掉的第二个节点而完全失效。
那么,有什么解决办法呢?

 

二、动态设置MaxAutoRetriesNextServer

出现这些问题,一个关键是MaxAutoRetriesNextServer被写死了,而我们的提供server的数量又可能随着集群的负载情况增加(减少并不影响)。

总不能因为每次增加服务器数量就改一次MaxAutoRetriesNextServer配置吧?既然不想改配置,那当然就是动态设置MaxAutoRetriesNextServer的值啊。

 

翻看重试的源码 RibbonLoadBalancedRetryPolicy.java



@Override
    public boolean canRetryNextServer(LoadBalancedRetryContext context) {
        //this will be called after a failure occurs and we increment the counter
        //so we check that the count is less than or equals to too make sure
        //we try the next server the right number of times
        return nextServerCount <= lbContext.getRetryHandler().getMaxRetriesOnNextServer() && canRetry(context);
    }



 

可以看出MaxAutoRetriesNextServer的值是从DefaultLoadBalancerRetryHandler里面获取的。但是DefaultLoadBalancerRetryHandler又不提供设置MaxAutoRetriesNextServer的接口。

往上追溯DefaultLoadBalancerRetryHandler实例化的源码



@Bean
    @ConditionalOnMissingBean
    public RibbonLoadBalancerContext ribbonLoadBalancerContext(ILoadBalancer loadBalancer,
            IClientConfig config, RetryHandler retryHandler) {
        return new RibbonLoadBalancerContext(loadBalancer, config, retryHandler);
    }

    @Bean
    @ConditionalOnMissingBean
    public RetryHandler retryHandler(IClientConfig config) {
        return new DefaultLoadBalancerRetryHandler(config);
    }



发现DefaultLoadBalancerRetryHandler对象可以从RibbonLoadBalancerContext实例中获取, 而RibbonLoadBalancerContext却可以从SpringClientFactory获取,那么我们只要新建retryHandler并重新赋值给RibbonLoadBalancerContext就可以了。

 

代码:

1、将IClientConfig托管到spring上



@Bean
    public IClientConfig ribbonClientConfig() {
        DefaultClientConfigImpl config = new DefaultClientConfigImpl();
        config.loadProperties(this.name);
        config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT);
        config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT);
        return config;
    }



 

2、新建retryHandler并更新到RibbonLoadBalancerContext



private void setMaxAutoRetiresNextServer(int size) {  //size: 提供服务的集群数量
        SpringClientFactory factory = SpringContext.getBean(SpringClientFactory.class); //获取spring托管的单例对象
        IClientConfig clientConfig = SpringContext.getBean(IClientConfig.class);
        int retrySameServer = clientConfig.get(CommonClientConfigKey.MaxAutoRetries, 0);//获取配置文件中的值, 默认0
        boolean retryEnable = clientConfig.get(CommonClientConfigKey.OkToRetryOnAllOperations, false);//默认false。
        RetryHandler retryHandler = new DefaultLoadBalancerRetryHandler(retrySameServer, size, retryEnable);//新建retryHandler
        factory.getLoadBalancerContext(name).setRetryHandler(retryHandler);
    }



MaxAutoRetriesNextServer动态设置的问题就解决了。

 

三、剔除不可用的服务。

Eureka好像有提供服务的剔除和恢复功能,所以如果有用Eureka注册中心,就不用往下看了。具体配置我也不太清楚。

因为我们没用到eureka,所以在故障重试的时候,获取到的服务列表里依然包含了挂掉的服务器。

这样会导致最小并发策略和单一策略的负载出现问题。

跟踪源码,我们发现服务器故障后会调用canRetryNextServer方法,那么不如就在这个方法里面做文章吧。

 

自定义RetryPolicy 继承RibbonLoadBalancedRetryPolicy并且重写canRetryNextServer



public class ServerRibbonLoadBalancedRetryPolicy extends RibbonLoadBalancedRetryPolicy {

    private RetryTrigger trigger;
    public ServerRibbonLoadBalancedRetryPolicy(String serviceId, RibbonLoadBalancerContext context, ServiceInstanceChooser loadBalanceChooser, IClientConfig clientConfig) {
        super(serviceId, context, loadBalanceChooser, clientConfig);
    }

    public void setTrigger(RetryTrigger trigger) {
        this.trigger = trigger;
    }

    @Override
    public boolean canRetryNextServer(LoadBalancedRetryContext context) {
        boolean retryEnable = super.canRetryNextServer(context);
        if (retryEnable && trigger != null) {
            //回调触发
            trigger.exec(context);
        }
        return retryEnable;
    }

    @FunctionalInterface
    public interface RetryTrigger {
        void exec(LoadBalancedRetryContext context);
    }
}



 

自定义RetryPolicyFactory继承RibbonLoadBalancedRetryPolicyFactory并重写create方法



public class ServerRibbonLoadBalancedRetryPolicyFactory extends RibbonLoadBalancedRetryPolicyFactory {
    private SpringClientFactory clientFactory;
    private ServerRibbonLoadBalancedRetryPolicy policy;
    private ServerRibbonLoadBalancedRetryPolicy.RetryTrigger trigger;

    public ServerRibbonLoadBalancedRetryPolicyFactory(SpringClientFactory clientFactory) {
        super(clientFactory);
        this.clientFactory = clientFactory;
    }

    @Override
    public LoadBalancedRetryPolicy create(String serviceId, ServiceInstanceChooser loadBalanceChooser) {
        RibbonLoadBalancerContext lbContext = this.clientFactory
                .getLoadBalancerContext(serviceId);
        policy = new ServerRibbonLoadBalancedRetryPolicy(serviceId, lbContext, loadBalanceChooser, clientFactory.getClientConfig(serviceId));
        policy.setTrigger(trigger);
        return policy;
    }

    public void setTrigger(ServerRibbonLoadBalancedRetryPolicy.RetryTrigger trigger) {
        policy.setTrigger(trigger);//跟上面是setTrigger不知道谁会先触发,所以两边都设置了。
        this.trigger = trigger;
    }
}



 

把LoadBalancedRetryPolicyFactory托管到spring



@Bean
    @ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
    public LoadBalancedRetryPolicyFactory loadBalancedRetryPolicyFactory(SpringClientFactory clientFactory) {
        return new ServerRibbonLoadBalancedRetryPolicyFactory(clientFactory);
    }



 

然后我们就可以在我们rule类上面实现RetryTrigger方法。



public class ServerLoadBalancerRule extends AbstractLoadBalancerRule implements ServerRibbonLoadBalancedRetryPolicy.RetryTrigger {

    private static final Logger LOGGER = LoggerFactory.getLogger(ServerLoadBalancerRule.class);
    /**
     * 不可用的服务器
     */
    private Map<String, List<String>> unreachableServer = new HashMap<>(256);
    /**
     * 上一次请求标记
     */
    private String lastRequest;

    @Autowired
    LoadBalancedRetryPolicyFactory policyFactory;

    @Override
    public Server choose(Object key) {
        //初始化重试触发器
        retryTrigger();
        return getServer(getLoadBalancer(), key);
    }

    private Server getServer(ILoadBalancer loadBalancer, Object key) {
    //从数据库获取服务列表
    List<ServerAddress> addressList = getServerAddress();
    setMaxAutoRetriesNextServer(addressList.size());

       //过滤不可用服务
    }

    private void retryTrigger() {
        RequestContext ctx = RequestContext.getCurrentContext();
        String batchNo = (String) ctx.get(Constant.REQUEST_BATCH_NO);
        if (!isLastRequest(batchNo)) {
            //不是同一次请求,清理所有缓存的不可用服务
            unreachableServer.clear();
        }

        if (policyFactory instanceof ServerRibbonLoadBalancedRetryPolicyFactory) {
            ((ServerRibbonLoadBalancedRetryPolicyFactory) policyFactory).setTrigger(this);
        }
    }

    private boolean isLastRequest(String batchNo) {
        return batchNo != null && batchNo.equals(lastRequest);
    }

    @Override
    public void exec(LoadBalancedRetryContext context) {
        RequestContext ctx = RequestContext.getCurrentContext();
     //UUID,故障重试不会发生变化。客户每次请求时会产生新的batchNo,可以在preFilter中生成。 
        String batchNo = (String) ctx.get(Constant.REQUEST_BATCH_NO);
        lastRequest = batchNo;

        List<String> hostAndPorts = unreachableServer.get((String) ctx.get(Constant.REQUEST_BATCH_NO));
        if (hostAndPorts == null) {
            hostAndPorts = new ArrayList<>();
        }
        if (context != null && context.getServiceInstance() != null) {
            String host = context.getServiceInstance().getHost();
            int port = context.getServiceInstance().getPort();
            if (!hostAndPorts.contains(host + Constant.COLON + port))
                hostAndPorts.add(host + Constant.COLON + port);
            unreachableServer.put((String) ctx.get(Constant.REQUEST_BATCH_NO), hostAndPorts);
        }
    }
}



 

这样,我们就拿到了不可用的服务了,然后在重试的时候过滤掉unreachableServer中的服务就可以了。

这里有一点要注意的是,MaxAutoRetriesNextServer的值必须是没有过滤的服务列表的大小。

 

当然,有人会有疑问,如果服务器数量过多,重试时间超过ReadTimeout怎么办?我这里也没关于超时的设置,因为本身让客户等待过久就不是很合理的需求

所以配置文件里面设置一个合理的ReadTimeout就好了,在这个时间段里面如果重试没取到可用的服务就直接抛超时的信息给客户。

源码地址: https://github.com/rxiu/study-on-road/tree/master/trickle-gateway


https://www.xamrdz.com/backend/3ka1934836.html

相关文章: