当前位置: 首页>后端>正文

Ribbon源码

在Spring Cloud中Ribbon具体做了哪些东西?

  1. 负载均衡 chooseServer
  2. 服务名更改为ip:port的形式

查探源码时 需要结合Spring Boot的特性,以启动使用两个阶段来看Ribbon具体做了什么!

启动时

不论是Spring Cloud Starter还是Spring Boot Starter 都有一个AutoConfiguration,因此 查探Ribbon时 顾名思义 必然有一个类叫做RibbonAutoConfiguration,启动时就以此类为切入点,开始源码的学习。

RibbonAutoConfiguration

@Configuration
@Conditional(RibbonAutoConfiguration.RibbonClassesConditions.class)
@RibbonClients
@AutoConfigureAfter(
        name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration")
@AutoConfigureBefore({ LoadBalancerAutoConfiguration.class,
        AsyncLoadBalancerAutoConfiguration.class })
@EnableConfigurationProperties({ RibbonEagerLoadProperties.class,
        ServerIntrospectorProperties.class })
public class RibbonAutoConfiguration {
    ....
    
    @Bean
    @ConditionalOnMissingBean(LoadBalancerClient.class)
    public LoadBalancerClient loadBalancerClient() {
        return new RibbonLoadBalancerClient(springClientFactory());
    }
    
    ....
}

这里面就是给Spring的IOC容器注入了一些Bean,这些Bean 在后面的使用过程中再来看。这里主要观察一下这个Configuration上标注的注解。其中:

  • @AutoConfigureAfter 这仅仅表示一个顺序 即使没有Eureka也没有关系
  • @RibbonClients好像是注入了一个RibbonClientSpecification
  • LoadBalancerAutoConfigurationAsyncLoadBalancerAutoConfiguration 另外两个AutoConfiguration 且这两个类在RibbonAutoConfiguration之后加载

LoadBalancerAutoConfiguration

@Configuration(proxyBeanMethods = false)
// 这个在业务代码中会实例化这个 且会标注@LoadBalanced
@ConditionalOnClass(RestTemplate.class)
// 在 RibbonAutoConfiguration 默认会有 RibbonLoadBalancerClient
@ConditionalOnBean(LoadBalancerClient.class)
// 重试的配置 暂时没有用到过
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {

    // 这里只注入标注了@LoadBalanced的RestTemplate
    @@LoadBalanced
    @Autowired(required = false)
    private List<RestTemplate> restTemplates = Collections.emptyList();
  
    @Configuration(proxyBeanMethods = false)
    @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
    static class LoadBalancerInterceptorConfig {

        @Bean
        public LoadBalancerInterceptor loadBalancerInterceptor(
                LoadBalancerClient loadBalancerClient,
                LoadBalancerRequestFactory requestFactory) {
            return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
        }
      
        // 这里给RestTemplate添加了拦截器 LoadBalancerInterceptor
        @Bean
        @ConditionalOnMissingBean
        public RestTemplateCustomizer restTemplateCustomizer(
                final LoadBalancerInterceptor loadBalancerInterceptor) {
            return restTemplate -> {
                List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                        restTemplate.getInterceptors());
                list.add(loadBalancerInterceptor);
                restTemplate.setInterceptors(list);
            };
        }

    }
  
}

总结一下 这个AutoConfiguration做了什么:

  1. 创建了一个LoadBalancerInterceptor的Bean,用于实现对客户端发起请求时进行拦截,以实现客户端负载均衡。
  2. 创建了一个RestTemplateCustomizer的Bean,用于给RestTemplate增加LoadbalancerInterceptor
  3. 维护了一个被@LoadBalanced注解修饰的RestTemplate对象列表,并在这里进行初始化,通过调用RestTemplateCustomizer的实例来给需要客户端负载均衡的RestTemplate增加LoadBalancerInterceptor拦截器。

这里着重记一下LoadBalancerInterceptor这个拦截器。

使用时

使用Ribbon时首先需要声明一个RestTemplate的bean,且标注上注解@LoadBalanced,如下:

@Bean
@LoadBalanced
public RestTemplate restTemplate() {

    return new RestTemplate();
}

在进行远程调用时,采用restTemplate.getForObject("http://producer-url", String.class)进行通信。RestTemplate的通信其实就是类似于HttpClient的一种调用方式,为什么可以解析服务名这种url呢?

关于@LoadBalanced

// Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient.
// 该注解用来给RestTemplate标记,以使用负载均衡的客户端(LoadBalancerClient)来配置它。
@Qualifier
public @interface LoadBalanced {

}

到这里 有两个点:

  1. @Qualifier这个注解表明了被标注了LoadBalanced的RestTemplate会被特殊处理 这里呼应上了org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration#restTemplates
  2. LoadBalancerClient会配置一些不同的内容

LoadBalancerClient

这是一个接口,实现类呢只有BlockingLoadBalancerClientRibbonLoadBalancerClient。由于是在看Ribbon相关的内容,先忽略BlockingLoadBalancerClient,记下RibbonLoadBalancerClient看后面什么地方用。

/**
 * 父接口ServiceInstanceChooser的方法
 * 
 * 根据传入的服务名serviceId,从负载均衡器中挑选一个对应服务的实例。
 */
ServiceInstance choose(String serviceId);

/**
 * 使用从负载均衡器中挑选出的服务实例来执行请求内容
 */
<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;

<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;

/**
 * 在分布式系统中,我们使用逻辑上的服务名称作为host来构建URI(替代服务实例的host:port形式)进行请求,比如http://myservice/path/to/service。
 *
 * 在该操作的定义中,前者ServiceInstance对象是带有host和port的具体服务实例,而后者URI对象则是使用逻辑服务名定义为host的URI
 * 而返回的URI内容则是通过ServiceInstance的服务实例详情拼接host:port形式的请求地址
 */
URI reconstructURI(ServiceInstance instance, URI original);

RestTemplate远程通信

不论是GET、POST、DELETE 最终方法都调用到了org.springframework.web.client.RestTemplate#doExecute

@Nullable
protected <T> T doExecute(URI url, @Nullable HttpMethod method, 
        @Nullable RequestCallback requestCallback,
        @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {

    Assert.notNull(url, "URI is required");
    Assert.notNull(method, "HttpMethod is required");
    ClientHttpResponse response = null;
    try {
        // 这里先不看
        ClientHttpRequest request = createRequest(url, method);
        if (requestCallback != null) {
            requestCallback.doWithRequest(request);
        }
        // 重点在这里 一层层点进去
        response = request.execute();
        handleResponse(url, method, response);
        return (responseExtractor != null responseExtractor.extractData(response) : null);
    } catch (IOException ex) {
        String resource = url.toString();
        String query = url.getRawQuery();
        resource = (query != null resource.substring(0, resource.indexOf('?')) : resource);
        throw new ResourceAccessException("I/O error on " + method.name() +
                " request for \"" + resource + "\": " + ex.getMessage(), ex);
    } finally {
        if (response != null) {
            response.close();
        }
    }
}

一直会到这里:org.springframework.http.client.InterceptingClientHttpRequest.InterceptingRequestExecution#execute

public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
    // 如果是标注了LoadBalanced注解的RestTemplate时 必然会走这里
    if (this.iterator.hasNext()) {
        ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
        //nextInterceptor=org.springframework.cloud.client.loadbalancer.LoadBalancerInterceptor
        // 这里对应上了 LoadBalancerInterceptorConfig的配置
        return nextInterceptor.intercept(request, body, this);
    } else {
        HttpMethod method = request.getMethod();
        Assert.state(method != null, "No standard HTTP method");
        ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
        request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
        if (body.length > 0) {
            if (delegate instanceof StreamingHttpOutputMessage) {
                StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
                streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
            } else {
                StreamUtils.copy(body, delegate.getBody());
            }
        }
        return delegate.execute();
    }
}

LoadBalancerInterceptor

当一个被@LoadBalanced注解修饰的RestTemplate对象向外发起HTTP请求时,会被LoadBalancerInterceptor类的intercept函数所拦截。

这里就用到了LoadBalancerClient 这就可以理解@LoadBalanced的注释是啥意思了。简单说一下:由于标注了@LoadBalanced的RestTemplate会被添加一个拦截器LoadBalancerInterceptor,通信时才会被这个拦截器拦截,因此走到了intercept方法,所以就会到LoadBalancerClient的execute方法。

从上面过来 直接就该进入到org.springframework.cloud.client.loadbalancer.LoadBalancerInterceptor#intercept

public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
            final ClientHttpRequestExecution execution) throws IOException {
    // 获取请求的地址 这里会是服务名
    final URI originalUri = request.getURI();
    String serviceName = originalUri.getHost();
    Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
    // loadBalancer = org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient
    return this.loadBalancer.execute(serviceName,
                        this.requestFactory.createRequest(request, body, execution));
}

RibbonLoadBalancerClient

org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient#execute

public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
        throws IOException {
    /**
     * 创建loadBalancer的过程可以理解为组装选取服务的规则(IRule)、
     * 服务集群的列表(ServerList)、检验服务是否存活(IPing)等特性
     * 的过程(加载RibbonClientConfiguration这个配置类),需要注意
     * 的是这个过程并不是在启动时进行的,而是当有请求到来时才会处理。
     */
    // loadBalancer = com.netflix.loadbalancer.ZoneAwareLoadBalancer
    // 这里用到了 NamedContextFactory
    ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
  
    // 这个方法等于 loadBalancer.chooseServer(hint != null hint : "default") 这里是Spring Cloud到Netflix的源码跳转处。
    // 最终到 com.netflix.loadbalancer.ZoneAwareLoadBalancer#chooseServer
    // 这里可以看出并没有使用LoadBalancerClient接口中的choose函数,而是使用了ribbon自身的ILoadBalancer接口中定义的chooseServer函数
    Server server = getServer(loadBalancer, hint);
    if (server == null) {
        throw new IllegalStateException("No instances available for " + serviceId);
    }
    RibbonServer ribbonServer = new RibbonServer(serviceId, server,
            isSecure(server, serviceId),
            serverIntrospector(serviceId).getMetadata(server));

    return execute(serviceId, ribbonServer, request);
}

ZoneAwareLoadBalancer

public Server chooseServer(Object key) {
    // 先以Zone判断 如果仅仅有一个Zone可用 那就走这里
    if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
        logger.debug("Zone aware logic disabled or there is only one zone");
        // com.netflix.loadbalancer.BaseLoadBalancer#chooseServer
        return super.chooseServer(key);
    }
    Server server = null;
    try {
        LoadBalancerStats lbStats = getLoadBalancerStats();
        Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
        logger.debug("Zone snapshots: {}", zoneSnapshot);
        if (triggeringLoad == null) {
            triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
                    "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
        }

        if (triggeringBlackoutPercentage == null) {
            triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
                    "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
        }
        Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
        logger.debug("Available zones: {}", availableZones);
        if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {
            String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
            logger.debug("Zone chosen: {}", zone);
            if (zone != null) {
                BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
                server = zoneLoadBalancer.chooseServer(key);
            }
        }
    } catch (Exception e) {
        logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
    }
    if (server != null) {
        return server;
    } else {
        logger.debug("Zone avoidance logic is not invoked.");
        return super.chooseServer(key);
    }
}

BaseLoadBalancer

由上面而来, !ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1

  1. private static final DynamicBooleanProperty ENABLED = DynamicPropertyFactory.getInstance().getBooleanProperty("ZoneAwareNIWSDiscoveryLoadBalancer.enabled", true);

ENABLED 默认情况下为true

  1. 可用Zone在一般情况下也是1个

因此这个条件总体为true 如果是true 则在这个类里面

看下面这段代码 不做特殊配置的情况下默认的负载均衡策略应该是RoundRobinRule,但是,但是,但是 如果开发人员没有特别指定IRule时 在RibbonClientConfiguration中却实实在在给注入了一个IRule 很遗憾 并不是RoundRobinRule。这里时额外需要注意的地方。

private final static IRule DEFAULT_RULE = new RoundRobinRule();

protected IRule rule = DEFAULT_RULE;

public Server chooseServer(Object key) {
    if (counter == null) {
        counter = createCounter();
    }
    counter.increment();
    if (rule == null) {
        return null;
    } else {
        try {
            return rule.choose(key);
        } catch (Exception e) {
            logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
            return null;
        }
    }
}

关于默认的负载均衡策略

直接说结论:默认的负载均衡策略是 ZoneAvoidanceRule。里面的实现是轮询的方式。

以下代码片段在org.springframework.cloud.netflix.ribbon.RibbonClientConfiguration里面。

@Bean
@ConditionalOnMissingBean
public IRule ribbonRule(IClientConfig config) {
  if (this.propertiesFactory.isSet(IRule.class, name)) {
    return this.propertiesFactory.get(IRule.class, config, name);
  }
  ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
  rule.initWithNiwsConfig(config);
  return rule;
}

@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList,                                           ServerListFilter<Server> serverListFilter, IRule rule,                                         IPing ping, ServerListUpdater serverListUpdater) {
  
  if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
    return this.propertiesFactory.get(ILoadBalancer.class, config, name);
  }
  
  return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                                     serverListFilter, serverListUpdater);
}

静态化配置

在不使用任何注册中心的情况下,Ribbon支持手动配置服务的URL,如下:

[server-name].ribbon.listOfServers=localhost:8080,localhost:8081,localhost:8082

其中 server-name是服务名 也就是远程调用时使用的服务名。

静态获取

ILoadBalancer

public interface ILoadBalancer {

    /**
     * 向负载均衡器中维护的实例列表增加服务实例
     */
    public void addServers(List<Server> newServers);
    
    /**
     * 通过某种策略,从负载均衡器中挑选出一个具体实例
     */
    public Server chooseServer(Object key);
    
    /**
     * 用来通知和标识负载均衡器中某个实例已经停止服务
     *
     * 不然负载均衡器在下一次获取服务实例清单前都会认为服务实例均是正常服务的
     */
    public void markServerDown(Server server);
    
    /**
     * 获取当前正常服务的实例列表
     */
    public List<Server> getReachableServers();

    /**
     * 获取所有已知的服务实例列表,包括正常服务和停止服务的实例
     */
    public List<Server> getAllServers();
}

com.netflix.loadbalancer.DynamicServerListLoadBalancer#restOfInit

void restOfInit(IClientConfig clientConfig) {
    boolean primeConnection = this.isEnablePrimingConnections();
    // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
    this.setEnablePrimingConnections(false);
    // 开启一个定时任务 更新 最终还是执行updateListOfServers()
    enableAndInitLearnNewServersFeature();

    // 更新服务列表
    updateListOfServers();
    if (primeConnection && this.getPrimeConnections() != null) {
        this.getPrimeConnections()
                .primeConnections(getReachableServers());
    }
    this.setEnablePrimingConnections(primeConnection);
    LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
@VisibleForTesting
public void updateListOfServers() {
    List<T> servers = new ArrayList<T>();
    if (serverListImpl != null) {
        servers = serverListImpl.getUpdatedListOfServers();
        LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                getIdentifier(), servers);

        if (filter != null) {
            servers = filter.getFilteredListOfServers(servers);
            LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);
        }
    }
    // 这个方法有调用 com.netflix.loadbalancer.BaseLoadBalancer#forceQuickPing 处理心跳
    updateAllServerList(servers);
}

结论

更新策略 ServerListUpdater实现
基于定时任务的拉取服务列表 com.netflix.loadbalancer.PollingServerListUpdater
基于Eureka服务事件通知的方式更新 com.netflix.loadbalancer.EurekaNotificationServerListUpdater

未完部分

getLoadBalancerStats

Zone stats: {unknown=[Zone:unknown; Instance count:3;   Active connections count: 0;    Circuit breaker tripped count: 0;   Active connections per server: 0.0;]
},
Server stats: [
[Server:localhost:8887; Zone:UNKNOWN;   Total Requests:0;   Successive connection failure:0;    Total blackout seconds:0;   Last connection made:Thu Jan 01 08:00:00 CST 1970;  First connection made: Thu Jan 01 08:00:00 CST 1970;    Active Connections:0;   total failure count in last (1000) msecs:0; average resp time:0.0;  90 percentile resp time:0.0;    95 percentile resp time:0.0;    min resp time:0.0;  max resp time:0.0;  stddev resp time:0.0]
, [Server:localhost:8886;   Zone:UNKNOWN;   Total Requests:0;   Successive connection failure:0;    Total blackout seconds:0;   Last connection made:Thu Jan 01 08:00:00 CST 1970;  First connection made: Thu Jan 01 08:00:00 CST 1970;    Active Connections:0;   total failure count in last (1000) msecs:0; average resp time:0.0;  90 percentile resp time:0.0;    95 percentile resp time:0.0;    min resp time:0.0;  max resp time:0.0;  stddev resp time:0.0]
, [Server:localhost:8889;   Zone:UNKNOWN;   Total Requests:0;   Successive connection failure:0;    Total blackout seconds:0;   Last connection made:Thu Jan 01 08:00:00 CST 1970;  First connection made: Thu Jan 01 08:00:00 CST 1970;    Active Connections:0;   total failure count in last (1000) msecs:0; average resp time:0.0;  90 percentile resp time:0.0;    95 percentile resp time:0.0;    min resp time:0.0;  max resp time:0.0;  stddev resp time:0.0]
]

https://www.xamrdz.com/backend/3b41942764.html

相关文章: