这一章说说基于Fegin的声明式调用请求是怎么个流程
首先我们从构建流程中知道,大体上来说是基于JDK的动态代理机制实现的,那么在JDK的动态代理中,对方法进行增强的类就是InvocationHandler,核心方法就是invoke(),在Fegin中就是FeignInvocationHandler
我们看看这个类
feign.ReflectiveFeign$FeignInvocationHandler
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//对Object原生方法做几个判断
if ("equals".equals(method.getName())) {
try {
Object
otherHandler =
args.length > 0 && args[0] != null Proxy.getInvocationHandler(args[0]) : null;
return equals(otherHandler);
} catch (IllegalArgumentException e) {
return false;
}
} else if ("hashCode".equals(method.getName())) {
return hashCode();
} else if ("toString".equals(method.getName())) {
return toString();
}
//从methodHandle中依据接口的Method获取SynchronousMethodHandler
return dispatch.get(method).invoke(args);
}
真正执行的Method是解析好的SynchronousMethodHandler,args是请求的方法参数,从这里看出,真正执行的是构建的
SynchronousMethodHandler
进入SynchronousMethodHandler#invoke方法中
@Override
public Object invoke(Object[] argv) throws Throwable {
RequestTemplate template = buildTemplateFromArgs.create(argv);
Retryer retryer = this.retryer.clone();
while (true) {
try {
//看名字就知道是执行并解码返回值
return executeAndDecode(template);
} catch (RetryableException e) {
retryer.continueOrPropagate(e);
if (logLevel != Logger.Level.NONE) {
logger.logRetry(metadata.configKey(), logLevel);
}
continue;
}
}
}
Object executeAndDecode(RequestTemplate template) throws Throwable {
Request request = targetRequest(template);
if (logLevel != Logger.Level.NONE) {
logger.logRequest(metadata.configKey(), logLevel, request);
}
Response response;
long start = System.nanoTime();
try {
//真正执行方法的核心
response = client.execute(request, options);
// ensure the request is set. TODO: remove in Feign 10
response.toBuilder().request(request).build();
} catch (IOException e) {
if (logLevel != Logger.Level.NONE) {
logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
}
throw errorExecuting(request, e);
}
//省略……………………
}
我们知道在targetToHandlersByName.apply(target)方法中将接口的注解参数使用SpringMvcContract解析生成MethodMetadata,其中就有template,在上面executeAndDecode方法,第一步是将实际的请求地址进行拼装,参数替换,最后形成的就像这样:
http://serverA/1?name=tom?age=12
最后形成一个包含服务名的请求路径,是不是很眼熟,和我们在ribbon中传入的URL很像
这里方法嵌套的很多,没办法,只有一个个分析
@Override
public Response execute(Request request, Request.Options options) throws IOException {
try {
//获取请求URL 类似:http://serverA/1?name=tom?age=12
URI asUri = URI.create(request.url());
//获取服务名 类似:serverA
String clientName = asUri.getHost();
//替换掉服务名,类似:http:///1?name=tom?age=12
URI uriWithoutHost = cleanUrl(request.url(), clientName);
//这一步是将请求封装为RibbonRequest
FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
this.delegate, request, uriWithoutHost);
//获取当前服务的请求参数
IClientConfig requestConfig = getClientConfig(options, clientName);
//核心方法 细讲
return lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
requestConfig).toResponse();
}
catch (ClientException e) {
IOException io = findIOException(e);
if (io != null) {
throw io;
}
throw new RuntimeException(e);
}
}
上面excute()方的核心lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
requestConfig).toResponse(),看第一个方法lbClient(clientName):
org.springframework.cloud.netflix.feign.ribbon.CachingSpringLoadBalancerFactory#create
public FeignLoadBalancer create(String clientName) {
if (this.cache.containsKey(clientName)) {
return this.cache.get(clientName);
}
IClientConfig config = this.factory.getClientConfig(clientName);
ILoadBalancer lb = this.factory.getLoadBalancer(clientName);
ServerIntrospector serverIntrospector = this.factory.getInstance(clientName, ServerIntrospector.class);
FeignLoadBalancer client = enableRetry new RetryableFeignLoadBalancer(lb, config, serverIntrospector,
loadBalancedRetryPolicyFactory, loadBalancedBackOffPolicyFactory, loadBalancedRetryListenerFactory) : new FeignLoadBalancer(lb, config, serverIntrospector);
this.cache.put(clientName, client);
return client;
}
第一步是先去缓存中获取该服务对应的FeignLoadBalancer,如果没有进行创建,我们看看创建的流程
- 获取该服务对应的配置类
- 获取服务对应的ILoadBalancer,其实在Ribbon中提到的,这里默认的走的是ZoneAwareLoadBalancer,注意啊,在ZoneAwareLoadBalancer初始话的时候已经完成了和EurekaClient本地注册表的拉取,保存在allServerList(BaseLoadBalancer)中,并启动了定时调度任务,每30S进行一次全量更新。初始的话也是创建和服务对应的Ribbon上下文,从该上下文中获取该服务实例的
- 获取服务拦截器
- 是否配置重试机制,一般没有配置,走FeignLoadBalancer
好了,lbClient(clientName)返回的是一个FeignLoadBalancer,接着执行它的executeWithLoadBalancer方法
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
try {
return command.submit(
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
}
catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();
} catch (Exception e) {
Throwable t = e.getCause();
if (t instanceof ClientException) {
throw (ClientException) t;
} else {
throw new ClientException(e);
}
}
}
public Observable<T> submit(final ServerOperation<T> operation) {
final ExecutionInfoContext context = new ExecutionInfoContext();
if (listenerInvoker != null) {
try {
listenerInvoker.onExecutionStart();
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}
//获取在每个服务实例重试的的次数
final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
//最多尝试几个服务实例
final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
//对于每个服务实例的调用逻辑
//默认field server是null,通过selectServer()方法获取一个Server
Observable<T> o =
(server == null selectServer() : Observable.just(server))
.concatMap(new Func1<Server, Observable<T>>() {
@Override
//对于每个Server,按顺序映射为对于每个Server包含重试逻辑的请求调用
public Observable<T> call(Server server) {
//设置上下文
context.setServer(server);
final ServerStats stats = loadBalancerContext.getServerStats(server);
//每个Server包含重试逻辑的请求调用
Observable<T> o = Observable
.just(server)
.concatMap(new Func1<Server, Observable<T>>() {
@Override
public Observable<T> call(final Server server) {
context.incAttemptCount();
//增加Server正在处理的请求计数
loadBalancerContext.noteOpenConnection(stats);
//监听器
if (listenerInvoker != null) {
try {
listenerInvoker.onStartWithServer(context.toExecutionInfo());
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}
//计时器
final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
//operation.call(server)就是刚刚分析的AbstractLoadBalancerAwareClient传过来的ServerOperation,就是直接对这个Server调用请求
//doOnEach的操作就是记录请求前后的一些数据用于负载均衡数据统计
return operation.call(server).doOnEach(new Observer<T>() {
private T entity;
@Override
public void onCompleted() {
//记录请求完成
recordStats(tracer, stats, entity, null);
}
@Override
public void onError(Throwable e) {
//记录请求结束
recordStats(tracer, stats, null, e);
logger.debug("Got error {} when executed on server {}", e, server);
//发生了错误,通知listener
if (listenerInvoker != null) {
listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
}
}
@Override
public void onNext(T entity) {
//因为只有调用请求成功只有一个结果(只有一个请求), 这里的entity就是结果,只要收到结果就代表请求成功
this.entity = entity;
if (listenerInvoker != null) {
listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
}
}
private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
tracer.stop();
loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
}
});
}
});
if (maxRetrysSame > 0)
//是否retry
o = o.retry(retryPolicy(maxRetrysSame, true));
return o;
}
});
if (maxRetrysNext > 0 && server == null)
//是否retry,如果retry回调用selectServer()返回下一个Server
o = o.retry(retryPolicy(maxRetrysNext, false));
//异常处理
return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
@Override
public Observable<T> call(Throwable e) {
if (context.getAttemptCount() > 0) {
//如果超过重试次数,则抛异常
if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
"Number of retries on next server exceeded max " + maxRetrysNext
+ " retries, while making a call for: " + context.getServer(), e);
}
else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
"Number of retries exceeded max " + maxRetrysSame
+ " retries, while making a call for: " + context.getServer(), e);
}
}
if (listenerInvoker != null) {
listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
}
return Observable.error(e);
}
});
}
首先这个Observable使用的是Java的rx包下面的组件,服务的选取采用selectServer,这个就是
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
采用Ribbon的服务选取进行的,上面那一大坨主要是对服务调用包装一些重试策略
具体的重试讲解,后续开文再说
这submit()方法,请求最终还是会回调call(Server server)方法
@Override
public Observable<T> call(Server server) {
//将服务请求URL替换为真实URL,Ribbon中选取的服务
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
//执行请求逻辑
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
} catch (Exception e) {
return Observable.error(e);
}
}
@Override
public RibbonResponse execute(RibbonRequest request, IClientConfig configOverride)
throws IOException {
Request.Options options;
if (configOverride != null) {
//配置请求参数
options = new Request.Options(
configOverride.get(CommonClientConfigKey.ConnectTimeout,
this.connectTimeout),
(configOverride.get(CommonClientConfigKey.ReadTimeout,
this.readTimeout)));
}
else {
options = new Request.Options(this.connectTimeout, this.readTimeout);
}
//真实请求
Response response = request.client().execute(request.toRequest(), options);
return new RibbonResponse(request.getUri(), response);
}
附上总流程图: