当前位置: 首页>后端>正文

Fegin-完整请求流程解析

这一章说说基于Fegin的声明式调用请求是怎么个流程

首先我们从构建流程中知道,大体上来说是基于JDK的动态代理机制实现的,那么在JDK的动态代理中,对方法进行增强的类就是InvocationHandler,核心方法就是invoke(),在Fegin中就是FeignInvocationHandler

我们看看这个类

feign.ReflectiveFeign$FeignInvocationHandler

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
      //对Object原生方法做几个判断
      if ("equals".equals(method.getName())) {
        try {
          Object
              otherHandler =
              args.length > 0 && args[0] != null Proxy.getInvocationHandler(args[0]) : null;
          return equals(otherHandler);
        } catch (IllegalArgumentException e) {
          return false;
        }
      } else if ("hashCode".equals(method.getName())) {
        return hashCode();
      } else if ("toString".equals(method.getName())) {
        return toString();
      }
      //从methodHandle中依据接口的Method获取SynchronousMethodHandler
      return dispatch.get(method).invoke(args);
    }

真正执行的Method是解析好的SynchronousMethodHandler,args是请求的方法参数,从这里看出,真正执行的是构建的
SynchronousMethodHandler

进入SynchronousMethodHandler#invoke方法中

  @Override
  public Object invoke(Object[] argv) throws Throwable {
    RequestTemplate template = buildTemplateFromArgs.create(argv);
    Retryer retryer = this.retryer.clone();
    while (true) {
      try {
        //看名字就知道是执行并解码返回值
        return executeAndDecode(template);
      } catch (RetryableException e) {
        retryer.continueOrPropagate(e);
        if (logLevel != Logger.Level.NONE) {
          logger.logRetry(metadata.configKey(), logLevel);
        }
        continue;
      }
    }
  }
Object executeAndDecode(RequestTemplate template) throws Throwable {
    Request request = targetRequest(template);

    if (logLevel != Logger.Level.NONE) {
      logger.logRequest(metadata.configKey(), logLevel, request);
    }

    Response response;
    long start = System.nanoTime();
    try {
      //真正执行方法的核心    
      response = client.execute(request, options);
      // ensure the request is set. TODO: remove in Feign 10
      response.toBuilder().request(request).build();
    } catch (IOException e) {
      if (logLevel != Logger.Level.NONE) {
        logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
      }
      throw errorExecuting(request, e);
    }
    //省略……………………
}

我们知道在targetToHandlersByName.apply(target)方法中将接口的注解参数使用SpringMvcContract解析生成MethodMetadata,其中就有template,在上面executeAndDecode方法,第一步是将实际的请求地址进行拼装,参数替换,最后形成的就像这样:

http://serverA/1?name=tom?age=12

最后形成一个包含服务名的请求路径,是不是很眼熟,和我们在ribbon中传入的URL很像

这里方法嵌套的很多,没办法,只有一个个分析

    @Override
    public Response execute(Request request, Request.Options options) throws IOException {
        try {
            //获取请求URL 类似:http://serverA/1?name=tom?age=12
            URI asUri = URI.create(request.url());
            //获取服务名 类似:serverA
            String clientName = asUri.getHost();
            //替换掉服务名,类似:http:///1?name=tom?age=12
            URI uriWithoutHost = cleanUrl(request.url(), clientName);
            //这一步是将请求封装为RibbonRequest
            FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
                    this.delegate, request, uriWithoutHost);
            //获取当前服务的请求参数
            IClientConfig requestConfig = getClientConfig(options, clientName);
            //核心方法 细讲
            return lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
                    requestConfig).toResponse();
        }
        catch (ClientException e) {
            IOException io = findIOException(e);
            if (io != null) {
                throw io;
            }
            throw new RuntimeException(e);
        }
    }

上面excute()方的核心lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
requestConfig).toResponse(),看第一个方法lbClient(clientName):

org.springframework.cloud.netflix.feign.ribbon.CachingSpringLoadBalancerFactory#create

    public FeignLoadBalancer create(String clientName) {
        if (this.cache.containsKey(clientName)) {
            return this.cache.get(clientName);
        }
        IClientConfig config = this.factory.getClientConfig(clientName);
        ILoadBalancer lb = this.factory.getLoadBalancer(clientName);
        ServerIntrospector serverIntrospector = this.factory.getInstance(clientName, ServerIntrospector.class);
        FeignLoadBalancer client = enableRetry new RetryableFeignLoadBalancer(lb, config, serverIntrospector,
            loadBalancedRetryPolicyFactory, loadBalancedBackOffPolicyFactory, loadBalancedRetryListenerFactory) : new FeignLoadBalancer(lb, config, serverIntrospector);
        this.cache.put(clientName, client);
        return client;
    }

第一步是先去缓存中获取该服务对应的FeignLoadBalancer,如果没有进行创建,我们看看创建的流程

  • 获取该服务对应的配置类
  • 获取服务对应的ILoadBalancer,其实在Ribbon中提到的,这里默认的走的是ZoneAwareLoadBalancer,注意啊,在ZoneAwareLoadBalancer初始话的时候已经完成了和EurekaClient本地注册表的拉取,保存在allServerList(BaseLoadBalancer)中,并启动了定时调度任务,每30S进行一次全量更新。初始的话也是创建和服务对应的Ribbon上下文,从该上下文中获取该服务实例的
  • 获取服务拦截器
  • 是否配置重试机制,一般没有配置,走FeignLoadBalancer

好了,lbClient(clientName)返回的是一个FeignLoadBalancer,接着执行它的executeWithLoadBalancer方法

public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
        LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

        try {
            return command.submit(
                new ServerOperation<T>() {
                    @Override
                    public Observable<T> call(Server server) {
                        URI finalUri = reconstructURIWithServer(server, request.getUri());
                        S requestForServer = (S) request.replaceUri(finalUri);
                        try {
                            return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                        } 
                        catch (Exception e) {
                            return Observable.error(e);
                        }
                    }
                })
                .toBlocking()
                .single();
        } catch (Exception e) {
            Throwable t = e.getCause();
            if (t instanceof ClientException) {
                throw (ClientException) t;
            } else {
                throw new ClientException(e);
            }
        }
        
    }
public Observable<T> submit(final ServerOperation<T> operation) {
    final ExecutionInfoContext context = new ExecutionInfoContext();

    if (listenerInvoker != null) {
        try {
            listenerInvoker.onExecutionStart();
        } catch (AbortExecutionException e) {
            return Observable.error(e);
        }
    }

    //获取在每个服务实例重试的的次数
    final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
    //最多尝试几个服务实例
    final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();

    //对于每个服务实例的调用逻辑
    //默认field server是null,通过selectServer()方法获取一个Server
    Observable<T> o = 
            (server == null selectServer() : Observable.just(server))
            .concatMap(new Func1<Server, Observable<T>>() {
                @Override
                //对于每个Server,按顺序映射为对于每个Server包含重试逻辑的请求调用
                public Observable<T> call(Server server) {
                    //设置上下文
                    context.setServer(server);
                    final ServerStats stats = loadBalancerContext.getServerStats(server);

                    //每个Server包含重试逻辑的请求调用
                    Observable<T> o = Observable
                            .just(server)
                            .concatMap(new Func1<Server, Observable<T>>() {
                                @Override
                                public Observable<T> call(final Server server) {
                                    context.incAttemptCount();
                                    //增加Server正在处理的请求计数
                                    loadBalancerContext.noteOpenConnection(stats);

                                    //监听器
                                    if (listenerInvoker != null) {
                                        try {
                                            listenerInvoker.onStartWithServer(context.toExecutionInfo());
                                        } catch (AbortExecutionException e) {
                                            return Observable.error(e);
                                        }
                                    }

                                    //计时器
                                    final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
                                    //operation.call(server)就是刚刚分析的AbstractLoadBalancerAwareClient传过来的ServerOperation,就是直接对这个Server调用请求
                                    //doOnEach的操作就是记录请求前后的一些数据用于负载均衡数据统计
                                    return operation.call(server).doOnEach(new Observer<T>() {
                                        private T entity;
                                        @Override
                                        public void onCompleted() {
                                            //记录请求完成
                                            recordStats(tracer, stats, entity, null);
                                        }

                                        @Override
                                        public void onError(Throwable e) {
                                            //记录请求结束
                                            recordStats(tracer, stats, null, e);
                                            logger.debug("Got error {} when executed on server {}", e, server);
                                            //发生了错误,通知listener
                                            if (listenerInvoker != null) {
                                                listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
                                            }
                                        }

                                        @Override
                                        public void onNext(T entity) {
                                            //因为只有调用请求成功只有一个结果(只有一个请求), 这里的entity就是结果,只要收到结果就代表请求成功
                                            this.entity = entity;
                                            if (listenerInvoker != null) {
                                                listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
                                            }
                                        }                            

                                        private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
                                            tracer.stop();
                                            loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
                                        }
                                    });
                                }
                            });

                    if (maxRetrysSame > 0)
                        //是否retry
                        o = o.retry(retryPolicy(maxRetrysSame, true));
                    return o;
                }
            });

    if (maxRetrysNext > 0 && server == null)
        //是否retry,如果retry回调用selectServer()返回下一个Server
        o = o.retry(retryPolicy(maxRetrysNext, false));

    //异常处理
    return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
        @Override
        public Observable<T> call(Throwable e) {
            if (context.getAttemptCount() > 0) {
                //如果超过重试次数,则抛异常
                if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
                    e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
                            "Number of retries on next server exceeded max " + maxRetrysNext
                            + " retries, while making a call for: " + context.getServer(), e);
                }
                else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
                    e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
                            "Number of retries exceeded max " + maxRetrysSame
                            + " retries, while making a call for: " + context.getServer(), e);
                }
            }
            if (listenerInvoker != null) {
                listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
            }
            return Observable.error(e);
        }
    });
}

首先这个Observable使用的是Java的rx包下面的组件,服务的选取采用selectServer,这个就是

Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   

采用Ribbon的服务选取进行的,上面那一大坨主要是对服务调用包装一些重试策略

具体的重试讲解,后续开文再说

这submit()方法,请求最终还是会回调call(Server server)方法

@Override
public Observable<T> call(Server server) {
   //将服务请求URL替换为真实URL,Ribbon中选取的服务    
   URI finalUri = reconstructURIWithServer(server, request.getUri());
    S requestForServer = (S) request.replaceUri(finalUri);
    try {
        //执行请求逻辑
         return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
        } catch (Exception e) {
          return Observable.error(e);
        }
  }
    @Override
    public RibbonResponse execute(RibbonRequest request, IClientConfig configOverride)
            throws IOException {
        Request.Options options;
        if (configOverride != null) {
            //配置请求参数
            options = new Request.Options(
                    configOverride.get(CommonClientConfigKey.ConnectTimeout,
                            this.connectTimeout),
                    (configOverride.get(CommonClientConfigKey.ReadTimeout,
                            this.readTimeout)));
        }
        else {
            options = new Request.Options(this.connectTimeout, this.readTimeout);
        }
        //真实请求
        Response response = request.client().execute(request.toRequest(), options);
        return new RibbonResponse(request.getUri(), response);
    }

附上总流程图:


Fegin-完整请求流程解析,第1张
Fegin-完整Http请求流程.png

https://www.xamrdz.com/backend/3vm1938428.html

相关文章: