当前位置: 首页>后端>正文

8) Ribbon客户端负载均衡

Ribbon简介

  • 负载均衡框架,支持可插拔式的负载均衡规则
  • 支持多种协议,HTTP,TCP等
  • 提供负载均衡客户端
    Ribbon为了提供基本的负载均衡功能,提供了三大子模块:
  • Rule:规则
  • Ping:心跳
  • ServerList:服务列表

内置Rule

RoundRobinRule - 轮询
RandomRule - 随机
AvailabilityFilteringRule - 会先过滤掉由于多次访问故障而处于断路跳闸状态的服务,还有并发超过阈值的服务,对剩下的服务按照轮询方式进行访问。
WeightedResponseTimeRule - 根据平均响应时间计算所有服务的权重,响应时间越快服务权重越大被选中的几率越高,前期统计信息不足时先按轮询策略访问,后期会切换回来
RetryRule - 先按照轮询方式获取服务,如果获取服务失败则在指定时间内会进行重试,获取可用服务。
BestAvaliableRule - 会先过滤掉由于多次访问故障而处于断路跳闸状态的服务,然后选择一个并发量小的服务。
ZoneAovidanceRule - 默认规则,复合判断server所在区域的可能性和server的可用性选择服务器。

使用

Ribbon可以脱离Eureka使用,但是需要人为提供Service对应的url去访问。

server:
  port: 8080
ribbon:
  eureka:
   enabled: false
user-service:
  ribbon:
    listOfServers: http://user-8001.com:8001,http://user-8002.com:8002,http://user-8003.com:8003
实现原理

通过LoadBalancerClient来实现,而LoadBalancerClient具体交给ILoadBalancer来处理,通过配置的IRule,IPing等,向Eureka Client获取注册列表信息,没10秒向Eureka Client发送一次Ping,进而检查是否需要更新服务的注册列表信息,最后得到服务注册列表信息之后,根据IRule的策略进行负载均衡。
RestTemplate增加@LoadBalance注解后,对远程调用就能够做到负载均衡,通过添加一个拦截去,使用LoadBalancerClient去处理。

@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
        final LoadBalancerInterceptor loadBalancerInterceptor) {
    return restTemplate -> {
        List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                restTemplate.getInterceptors());
        list.add(loadBalancerInterceptor);
        restTemplate.setInterceptors(list);
    };
}

源码解析

根据上面所提到的原理,代码如下,配置的RestTemplate只要加上@LoadBalanced注解就能提供负载均衡的能力。

@Configuration
public class RibbonConfig {
    @Bean
    //在LoadBalancerAutoConfiguration 会通过autowired注解获取有@LoadBalanced的restTemplate
    @LoadBalanced
    public RestTemplate restTemplate(){
        return new RestTemplate();
    }
}
LoadBalancerAutoConfiguration

根据SpingBoot的经验,Ribbon肯定会提供一个Auto-Configuration类来加载所需要的bean。
如下代码可以看出,LoadBalancerAutoConfiguration的条件是在Classpath下存在RestTemplate以及BeanFactory中存在LoadBalancerClient的时候,自动装配LoadBalancerRequestFactory以及LoadBalancerInterceptor相关。

@Configuration
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {
    //重要的地方,有@LoadBalanced注解的restTemplate才会被注入到这里
    @LoadBalanced
    @Autowired(required = false)
    private List<RestTemplate> restTemplates = Collections.emptyList();

    @Autowired(required = false)
    private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();


    @Bean
    @ConditionalOnMissingBean
    //创建LoadBalancerRequestFactory,主要对HttpRequest进行转化成LoadBalancerRequest
    public LoadBalancerRequestFactory loadBalancerRequestFactory(
            LoadBalancerClient loadBalancerClient) {
        return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
    }

    @Configuration
    @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
    static class LoadBalancerInterceptorConfig {

        //创建LoadBalancerInterceptor
        @Bean
        public LoadBalancerInterceptor ribbonInterceptor(
                LoadBalancerClient loadBalancerClient,
                LoadBalancerRequestFactory requestFactory) {
            return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
        }

        //给RestTemplate添加LoadBalancerInterceptor
        @Bean
        @ConditionalOnMissingBean
        public RestTemplateCustomizer restTemplateCustomizer(
                final LoadBalancerInterceptor loadBalancerInterceptor) {
            return restTemplate -> {
                List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                        restTemplate.getInterceptors());
                list.add(loadBalancerInterceptor);
                restTemplate.setInterceptors(list);
            };
        }

    }
}
//省略retry相关的代码
LoadBalancerInterceptor

在RestTemplate发送请求过程中,会通过LoadBalancerInterceptor对请求进行处理。

@RequestMapping("list")
public User list(){
    return restTemplate.getForObject("http://user-service/simple/1", User.class);
}

restTemplate.getForObject()内部调用execute()方法。

@Nullable
protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
        @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {

    ClientHttpResponse response = null;
    try {
        //execute()方法会调用createRequest()方法来创建HttpRequest
        ClientHttpRequest request = createRequest(url, method);
        if (requestCallback != null) {
            requestCallback.doWithRequest(request);
        }
        //然后调用request的execute()方法获得response返回
        response = request.execute();
        handleResponse(url, method, response);
        return (responseExtractor != null responseExtractor.extractData(response) : null);
    }
    catch (IOException ex) {
        String resource = url.toString();
        String query = url.getRawQuery();
        resource = (query != null resource.substring(0, resource.indexOf('?')) : resource);
        throw new ResourceAccessException("I/O error on " + method.name() +
                " request for \"" + resource + "\": " + ex.getMessage(), ex);
    }
    finally {
        if (response != null) {
            response.close();
        }
    }
}
//根据URL和HttpMethod创建Request
protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
    //因为存在LoadBalancerInterceptor,所以这里getRequestFactory() 返回的是InterceptingClientHttpRequestFactory
    //同样创建的request是InterceptingClientHttpRequest
    ClientHttpRequest request = getRequestFactory().createRequest(url, method);
    if (logger.isDebugEnabled()) {
        logger.debug("HTTP " + method.name() + " " + url);
    }
    return request;
}
//request.execute()就是InterceptingClientHttpRequest提供的
private class InterceptingRequestExecution implements ClientHttpRequestExecution {

    private final Iterator<ClientHttpRequestInterceptor> iterator;

    public InterceptingRequestExecution() {
        this.iterator = interceptors.iterator();
    }

    @Override
    public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
        if (this.iterator.hasNext()) {
            //这里调用LoadBalancerInteceptor
            ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
            return nextInterceptor.intercept(request, body, this);
        }
        else {
            HttpMethod method = request.getMethod();
            Assert.state(method != null, "No standard HTTP method");
            ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
            request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
            if (body.length > 0) {
                if (delegate instanceof StreamingHttpOutputMessage) {
                    StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
                    streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
                }
                else {
                    StreamUtils.copy(body, delegate.getBody());
                }
            }
            return delegate.execute();
        }
    }
}
//LoadBalancerInteceptor的intercept()方法
//会通过LoadBalancerRequestFactory对request进行改造,然后调用RibbonLoadBalancerClient来执行改造之后的request
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
        final ClientHttpRequestExecution execution) throws IOException {
    final URI originalUri = request.getURI();
    String serviceName = originalUri.getHost();
    Assert.state(serviceName != null,
            "Request URI does not contain a valid hostname: " + originalUri);
    return this.loadBalancer.execute(serviceName,
            this.requestFactory.createRequest(request, body, execution));
}
LoadBalancerRequestFactory
//这个方法在LoadBalancerInteceptor里调用,将HttpRequest封装成LoadBalancerRequest
//LoadBalancerRequest的作用
//将HttpRequest封装成ServiceRequestWrapper,然后调用LoadBalancerRequestTransformer将HttpRequest进行Transform
public LoadBalancerRequest<ClientHttpResponse> createRequest(
        final HttpRequest request, final byte[] body,
        final ClientHttpRequestExecution execution) {
    return instance -> {
        HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance,
                this.loadBalancer);
        if (this.transformers != null) {
            for (LoadBalancerRequestTransformer transformer : this.transformers) {
                serviceRequest = transformer.transformRequest(serviceRequest,
                        instance);
            }
        }
        return execution.execute(serviceRequest, body);
    };
}
//RibbonLoadBalancerClient的execute()方法
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
        throws IOException {
    //LoadBalancerClient内部就是使用ILoadBalancer来获取server,
    //    ILoadBalancer里面封装了IRule,allServerList以及所有的upServerList
    ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
    //ILoadBalancer提供的chooseServer()方法根据IRule获取需要访问的Server
    Server server = getServer(loadBalancer, hint);
    if (server == null) {
        throw new IllegalStateException("No instances available for " + serviceId);
    }
    RibbonServer ribbonServer = new RibbonServer(serviceId, server,
            isSecure(server, serviceId),
            serverIntrospector(serviceId).getMetadata(server));

    return execute(serviceId, ribbonServer, request);
}
//这里会调用Request的apply()方法,即之前Lambda的实现
public <T> T execute(String serviceId, ServiceInstance serviceInstance,
        LoadBalancerRequest<T> request) throws IOException {
    Server server = null;
    if (serviceInstance instanceof RibbonServer) {
        server = ((RibbonServer) serviceInstance).getServer();
    }
    if (server == null) {
        throw new IllegalStateException("No instances available for " + serviceId);
    }

    RibbonLoadBalancerContext context = this.clientFactory
            .getLoadBalancerContext(serviceId);
    RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

    try {
        //调用LoadBalancerRequestFactory.createRequest()方法提供的lambda表达式
        T returnVal = request.apply(serviceInstance);
        statsRecorder.recordStats(returnVal);
        return returnVal;
    }
    // catch IOException and rethrow so RestTemplate behaves correctly
    catch (IOException ex) {
        statsRecorder.recordStats(ex);
        throw ex;
    }
    catch (Exception ex) {
        statsRecorder.recordStats(ex);
        ReflectionUtils.rethrowRuntimeException(ex);
    }
    return null;
}
//这里的lambda表达式就是上面调用的,内部就是对request进行transform,最后调用InterceptingRequestExecution来执行request获取response
public LoadBalancerRequest<ClientHttpResponse> createRequest(
        final HttpRequest request, final byte[] body,
        final ClientHttpRequestExecution execution) {
    return instance -> {
        HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance,
                this.loadBalancer);
        if (this.transformers != null) {
            for (LoadBalancerRequestTransformer transformer : this.transformers) {
                serviceRequest = transformer.transformRequest(serviceRequest,
                        instance);
            }
        }
        return execution.execute(serviceRequest, body);
    };
}

LoadBalancerClient.getLoadBalancer()

这个方法很重要,如何根据serviceId来获取到对应的loadBalancer以及对应的instance。

//根据serviceId获得ILoadBalancer
protected ILoadBalancer getLoadBalancer(String serviceId) {
    return this.clientFactory.getLoadBalancer(serviceId);
}
//根据serviceId获得ILoadBalancer
public ILoadBalancer getLoadBalancer(String name) {
    return getInstance(name, ILoadBalancer.class);
}   
//name是serviceId,type是ILoadBalancer类
public <C> C getInstance(String name, Class<C> type) {
    //调用父类NamedContextFactory提供的getInstance方法
    C instance = super.getInstance(name, type);
    if (instance != null) {
        return instance;
    }
    //同理去获得IClientConfig实例
    IClientConfig config = getInstance(name, IClientConfig.class);
    return instantiateWithConfig(getContext(name), type, config);
}
//重点,这里是从applicationContext里面获取的
public <T> T getInstance(String name, Class<T> type) {
    AnnotationConfigApplicationContext context = getContext(name);
    if (BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context,
            type).length > 0) {
        return context.getBean(type);
    }
    return null;
}
//这里通过concurrentHashMap保存了serviceId与对应的applicationContext的关系
protected AnnotationConfigApplicationContext getContext(String name) {
    if (!this.contexts.containsKey(name)) {
        synchronized (this.contexts) {
            if (!this.contexts.containsKey(name)) {
                //根据serviceId创建对应的ApplicationContext
                this.contexts.put(name, createContext(name));
            }
        }
    }
    return this.contexts.get(name);
}
//根据serviceId创建对应的ApplicationContext
protected AnnotationConfigApplicationContext createContext(String name) {
    //创建AnnotationConfigApplicationContext的applicationContext
    AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
    if (this.configurations.containsKey(name)) {
        for (Class<?> configuration : this.configurations.get(name)
                .getConfiguration()) {
            context.register(configuration);
        }
    }
    for (Map.Entry<String, C> entry : this.configurations.entrySet()) {
        if (entry.getKey().startsWith("default.")) {
            for (Class<?> configuration : entry.getValue().getConfiguration()) {
                context.register(configuration);
            }
        }
    }
    context.register(PropertyPlaceholderAutoConfiguration.class,
            this.defaultConfigType);
    context.getEnvironment().getPropertySources().addFirst(new MapPropertySource(
            this.propertySourceName,
            Collections.<String, Object>singletonMap(this.propertyName, name)));
    if (this.parent != null) {
        // Uses Environment from parent as well as beans
        context.setParent(this.parent);
        // jdk11 issue
        // https://github.com/spring-cloud/spring-cloud-netflix/issues/3101
        context.setClassLoader(this.parent.getClassLoader());
    }
    context.setDisplayName(generateDisplayName(name));
    context.refresh();
    return context;
}
8) Ribbon客户端负载均衡,第1张
beans.png

https://www.xamrdz.com/backend/3en1935313.html

相关文章: