Ribbon简介
- 负载均衡框架,支持可插拔式的负载均衡规则
- 支持多种协议,HTTP,TCP等
- 提供负载均衡客户端
Ribbon为了提供基本的负载均衡功能,提供了三大子模块: - Rule:规则
- Ping:心跳
- ServerList:服务列表
内置Rule
RoundRobinRule - 轮询
RandomRule - 随机
AvailabilityFilteringRule - 会先过滤掉由于多次访问故障而处于断路跳闸状态的服务,还有并发超过阈值的服务,对剩下的服务按照轮询方式进行访问。
WeightedResponseTimeRule - 根据平均响应时间计算所有服务的权重,响应时间越快服务权重越大被选中的几率越高,前期统计信息不足时先按轮询策略访问,后期会切换回来
RetryRule - 先按照轮询方式获取服务,如果获取服务失败则在指定时间内会进行重试,获取可用服务。
BestAvaliableRule - 会先过滤掉由于多次访问故障而处于断路跳闸状态的服务,然后选择一个并发量小的服务。
ZoneAovidanceRule - 默认规则,复合判断server所在区域的可能性和server的可用性选择服务器。
使用
Ribbon可以脱离Eureka使用,但是需要人为提供Service对应的url去访问。
server:
port: 8080
ribbon:
eureka:
enabled: false
user-service:
ribbon:
listOfServers: http://user-8001.com:8001,http://user-8002.com:8002,http://user-8003.com:8003
实现原理
通过LoadBalancerClient来实现,而LoadBalancerClient具体交给ILoadBalancer来处理,通过配置的IRule,IPing等,向Eureka Client获取注册列表信息,没10秒向Eureka Client发送一次Ping,进而检查是否需要更新服务的注册列表信息,最后得到服务注册列表信息之后,根据IRule的策略进行负载均衡。
RestTemplate增加@LoadBalance注解后,对远程调用就能够做到负载均衡,通过添加一个拦截去,使用LoadBalancerClient去处理。
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final LoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(
restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
源码解析
根据上面所提到的原理,代码如下,配置的RestTemplate只要加上@LoadBalanced注解就能提供负载均衡的能力。
@Configuration
public class RibbonConfig {
@Bean
//在LoadBalancerAutoConfiguration 会通过autowired注解获取有@LoadBalanced的restTemplate
@LoadBalanced
public RestTemplate restTemplate(){
return new RestTemplate();
}
}
LoadBalancerAutoConfiguration
根据SpingBoot的经验,Ribbon肯定会提供一个Auto-Configuration类来加载所需要的bean。
如下代码可以看出,LoadBalancerAutoConfiguration的条件是在Classpath下存在RestTemplate以及BeanFactory中存在LoadBalancerClient的时候,自动装配LoadBalancerRequestFactory以及LoadBalancerInterceptor相关。
@Configuration
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {
//重要的地方,有@LoadBalanced注解的restTemplate才会被注入到这里
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
@Autowired(required = false)
private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();
@Bean
@ConditionalOnMissingBean
//创建LoadBalancerRequestFactory,主要对HttpRequest进行转化成LoadBalancerRequest
public LoadBalancerRequestFactory loadBalancerRequestFactory(
LoadBalancerClient loadBalancerClient) {
return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
}
@Configuration
@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
static class LoadBalancerInterceptorConfig {
//创建LoadBalancerInterceptor
@Bean
public LoadBalancerInterceptor ribbonInterceptor(
LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
//给RestTemplate添加LoadBalancerInterceptor
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final LoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(
restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
}
//省略retry相关的代码
LoadBalancerInterceptor
在RestTemplate发送请求过程中,会通过LoadBalancerInterceptor对请求进行处理。
@RequestMapping("list")
public User list(){
return restTemplate.getForObject("http://user-service/simple/1", User.class);
}
restTemplate.getForObject()内部调用execute()方法。
@Nullable
protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
@Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
ClientHttpResponse response = null;
try {
//execute()方法会调用createRequest()方法来创建HttpRequest
ClientHttpRequest request = createRequest(url, method);
if (requestCallback != null) {
requestCallback.doWithRequest(request);
}
//然后调用request的execute()方法获得response返回
response = request.execute();
handleResponse(url, method, response);
return (responseExtractor != null responseExtractor.extractData(response) : null);
}
catch (IOException ex) {
String resource = url.toString();
String query = url.getRawQuery();
resource = (query != null resource.substring(0, resource.indexOf('?')) : resource);
throw new ResourceAccessException("I/O error on " + method.name() +
" request for \"" + resource + "\": " + ex.getMessage(), ex);
}
finally {
if (response != null) {
response.close();
}
}
}
//根据URL和HttpMethod创建Request
protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
//因为存在LoadBalancerInterceptor,所以这里getRequestFactory() 返回的是InterceptingClientHttpRequestFactory
//同样创建的request是InterceptingClientHttpRequest
ClientHttpRequest request = getRequestFactory().createRequest(url, method);
if (logger.isDebugEnabled()) {
logger.debug("HTTP " + method.name() + " " + url);
}
return request;
}
//request.execute()就是InterceptingClientHttpRequest提供的
private class InterceptingRequestExecution implements ClientHttpRequestExecution {
private final Iterator<ClientHttpRequestInterceptor> iterator;
public InterceptingRequestExecution() {
this.iterator = interceptors.iterator();
}
@Override
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
if (this.iterator.hasNext()) {
//这里调用LoadBalancerInteceptor
ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
return nextInterceptor.intercept(request, body, this);
}
else {
HttpMethod method = request.getMethod();
Assert.state(method != null, "No standard HTTP method");
ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
if (body.length > 0) {
if (delegate instanceof StreamingHttpOutputMessage) {
StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
}
else {
StreamUtils.copy(body, delegate.getBody());
}
}
return delegate.execute();
}
}
}
//LoadBalancerInteceptor的intercept()方法
//会通过LoadBalancerRequestFactory对request进行改造,然后调用RibbonLoadBalancerClient来执行改造之后的request
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null,
"Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancer.execute(serviceName,
this.requestFactory.createRequest(request, body, execution));
}
LoadBalancerRequestFactory
//这个方法在LoadBalancerInteceptor里调用,将HttpRequest封装成LoadBalancerRequest
//LoadBalancerRequest的作用
//将HttpRequest封装成ServiceRequestWrapper,然后调用LoadBalancerRequestTransformer将HttpRequest进行Transform
public LoadBalancerRequest<ClientHttpResponse> createRequest(
final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) {
return instance -> {
HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance,
this.loadBalancer);
if (this.transformers != null) {
for (LoadBalancerRequestTransformer transformer : this.transformers) {
serviceRequest = transformer.transformRequest(serviceRequest,
instance);
}
}
return execution.execute(serviceRequest, body);
};
}
//RibbonLoadBalancerClient的execute()方法
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
throws IOException {
//LoadBalancerClient内部就是使用ILoadBalancer来获取server,
// ILoadBalancer里面封装了IRule,allServerList以及所有的upServerList
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
//ILoadBalancer提供的chooseServer()方法根据IRule获取需要访问的Server
Server server = getServer(loadBalancer, hint);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonServer ribbonServer = new RibbonServer(serviceId, server,
isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
return execute(serviceId, ribbonServer, request);
}
//这里会调用Request的apply()方法,即之前Lambda的实现
public <T> T execute(String serviceId, ServiceInstance serviceInstance,
LoadBalancerRequest<T> request) throws IOException {
Server server = null;
if (serviceInstance instanceof RibbonServer) {
server = ((RibbonServer) serviceInstance).getServer();
}
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId);
RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
try {
//调用LoadBalancerRequestFactory.createRequest()方法提供的lambda表达式
T returnVal = request.apply(serviceInstance);
statsRecorder.recordStats(returnVal);
return returnVal;
}
// catch IOException and rethrow so RestTemplate behaves correctly
catch (IOException ex) {
statsRecorder.recordStats(ex);
throw ex;
}
catch (Exception ex) {
statsRecorder.recordStats(ex);
ReflectionUtils.rethrowRuntimeException(ex);
}
return null;
}
//这里的lambda表达式就是上面调用的,内部就是对request进行transform,最后调用InterceptingRequestExecution来执行request获取response
public LoadBalancerRequest<ClientHttpResponse> createRequest(
final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) {
return instance -> {
HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance,
this.loadBalancer);
if (this.transformers != null) {
for (LoadBalancerRequestTransformer transformer : this.transformers) {
serviceRequest = transformer.transformRequest(serviceRequest,
instance);
}
}
return execution.execute(serviceRequest, body);
};
}
LoadBalancerClient.getLoadBalancer()
这个方法很重要,如何根据serviceId来获取到对应的loadBalancer以及对应的instance。
//根据serviceId获得ILoadBalancer
protected ILoadBalancer getLoadBalancer(String serviceId) {
return this.clientFactory.getLoadBalancer(serviceId);
}
//根据serviceId获得ILoadBalancer
public ILoadBalancer getLoadBalancer(String name) {
return getInstance(name, ILoadBalancer.class);
}
//name是serviceId,type是ILoadBalancer类
public <C> C getInstance(String name, Class<C> type) {
//调用父类NamedContextFactory提供的getInstance方法
C instance = super.getInstance(name, type);
if (instance != null) {
return instance;
}
//同理去获得IClientConfig实例
IClientConfig config = getInstance(name, IClientConfig.class);
return instantiateWithConfig(getContext(name), type, config);
}
//重点,这里是从applicationContext里面获取的
public <T> T getInstance(String name, Class<T> type) {
AnnotationConfigApplicationContext context = getContext(name);
if (BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context,
type).length > 0) {
return context.getBean(type);
}
return null;
}
//这里通过concurrentHashMap保存了serviceId与对应的applicationContext的关系
protected AnnotationConfigApplicationContext getContext(String name) {
if (!this.contexts.containsKey(name)) {
synchronized (this.contexts) {
if (!this.contexts.containsKey(name)) {
//根据serviceId创建对应的ApplicationContext
this.contexts.put(name, createContext(name));
}
}
}
return this.contexts.get(name);
}
//根据serviceId创建对应的ApplicationContext
protected AnnotationConfigApplicationContext createContext(String name) {
//创建AnnotationConfigApplicationContext的applicationContext
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
if (this.configurations.containsKey(name)) {
for (Class<?> configuration : this.configurations.get(name)
.getConfiguration()) {
context.register(configuration);
}
}
for (Map.Entry<String, C> entry : this.configurations.entrySet()) {
if (entry.getKey().startsWith("default.")) {
for (Class<?> configuration : entry.getValue().getConfiguration()) {
context.register(configuration);
}
}
}
context.register(PropertyPlaceholderAutoConfiguration.class,
this.defaultConfigType);
context.getEnvironment().getPropertySources().addFirst(new MapPropertySource(
this.propertySourceName,
Collections.<String, Object>singletonMap(this.propertyName, name)));
if (this.parent != null) {
// Uses Environment from parent as well as beans
context.setParent(this.parent);
// jdk11 issue
// https://github.com/spring-cloud/spring-cloud-netflix/issues/3101
context.setClassLoader(this.parent.getClassLoader());
}
context.setDisplayName(generateDisplayName(name));
context.refresh();
return context;
}