当前位置: 首页>编程语言>正文

dubbo 请求指定ip节点 dubbo一次请求处理流程

1. 前言

前面的文章分析了Consumer是如何发起RPC调用,以及请求对象Request是如何从客户端编码然后经过网络发送到服务端,服务端再解码的一个过程,接下来,开始分析服务端Provider是如何处理RPC调用请求的。
本文会从两个纬度去分析,首先是ChannelHandler,既然是处理网络请求,那么必然要处理网络IO事件,Provider从接收到字节序列的那一刻起,是如何将它们一步一步转化为Request对象并进行处理的。然后是Invoker,得到Request对象后Provider是如何执行本地调用,然后将方法结果响应给客户端的。
还记得在分析Consumer的时候,Dubbo生成的代理对象通过一层层Invoker包装,最终实现了一套复杂的功能。对于Provider而言,ChannelHandler和Invoker也是一样的,也是经过了层层包装,类的职责很清晰,就是源码看起来会有点晕。

2. ChannelHandler

ChannelHandler是Dubbo用来处理网络IO事件的接口,方法对应的功能如下:

方法

功能

connected

连接事件

disconnected

断开连接事件

sent

消息发送事件

received

消息接收事件

caught

异常事件

注意,这是Dubbo定义的接口,而非Netty提供的接口,不要搞混了!

例如,当Provider接收到消息时,就会触发received()方法。但是消息的处理是一项大工程,包括消息的解码、心跳处理、消息派发等等复杂逻辑,因此不会将所有的代码都写在一个ChannelHandler类里面,Dubbo采用装饰者模式,将一个个ChannelHandler一层包一层,最终来实现这套复杂的功能。
Dubbo默认使用Netty作为网络传输层框架,这里我们同样也以Netty为例分析。先来看看ChannelHandler的包装过程,之前的文章里就分析过,Provider在暴露服务时,会创建服务端ExchangeServer,代码如下:

ExchangeServer server = Exchangers.bind(url, requestHandler);

这个requestHandler是DubboProtocol的匿名内部类对象,它的职责是根据Invocation匹配到Invoker并调用,然后返回结果,它是最底层的ChannelHandler。
bind()方法代码如下,这里会经过HeaderExchangeHandler和DecodeHandler的包装。

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

Transporters#bind()最终会创建NettyServer,它的构造函数会将ChannelHandler再做三层包装。

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
    super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), 
          ChannelHandlers.wrap(handler, url));
}

ChannelHandlers#wrap()方法,将ChannelHandler经过Dispatcher、HeartbeatHandler、MultiMessageHandler包装,后面会说它们的作用。

protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
    return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                                                        .getAdaptiveExtension().dispatch(handler, url)));
}

综上所述,最终ChannelHandler的包装结构如下,我们一个个分析。

AbstractPeer#received
>>MultiMessageHandler#received
>>>>HeartbeatHandler#received
>>>>>>AllChannelHandler#received
>>>>>>>>DecodeHandler#received
>>>>>>>>>>HeaderExchangeHandler#received
>>>>>>>>>>>>DubboProtocol.requestHandler#reply

2.1 AbstractPeer

AbstractPeer是NettyServer的父类,为啥会有它呢,是因为NettyServer在编排ChannelHandlerPipeline的时候,尾巴放的是NettyServerHandler,它依赖ChannelHandler,这个ChannelHandler正是NettyServer本身。

NettyServerHandler继承自ChannelDuplexHandler,这意味着它可以同时处理入站和出站事件,它仅仅是一个空壳,或者说它是一个适配器,它会在Channel有可读事件时,触发ChannelHandler#received()

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    handler.received(channel, msg);
}

这个时候,就会触发AbstractPeer#received()了,父类的逻辑非常简单,确保Channel在没有关闭的情况下,转交给下一个Handler处理。

public void received(Channel ch, Object msg) throws RemotingException {
    if (closed) {
        return;
    }
    handler.received(ch, msg);
}

2.2 MultiMessageHandler

ChannelHandler#received()方法被设计用来接收和处理单条消息的,但是前面在分析服务端解码的时候有说过,可能由于网络的原因,服务端会一次接收到多条消息,此时Dubbo会创建MultiMessage来存储,底层只是用List来存储多条消息。
从名字也可以看出来,该Handler具备处理多消息的能力,代码也很简单,如果是多消息,就循环处理,否则直接处理。

public void received(Channel channel, Object message) throws RemotingException {
    if (message instanceof MultiMessage) {
        MultiMessage list = (MultiMessage) message;
        for (Object obj : list) {
            // 多消息,循环挨个处理
            handler.received(channel, obj);
        }
    } else {
        handler.received(channel, message);
    }
}

2.3 HeartbeatHandler

依然可以从名字就可以看出来,该Handler专门用来处理心跳的。如果是心跳请求/响应,该类会直接处理掉,不走后续流程了。如果是RPC调用请求,交给下一个Handler处理。

public void received(Channel channel, Object message) throws RemotingException {
    setReadTimestamp(channel);
    if (isHeartbeatRequest(message)) {// 心跳请求
        Request req = (Request) message;
        if (req.isTwoWay()) {// 期望得到回复
            Response res = new Response(req.getId(), req.getVersion());
            res.setEvent(HEARTBEAT_EVENT);
            channel.send(res);// 直接构建Response发送
            if (logger.isInfoEnabled()) {
                int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
                if (logger.isDebugEnabled()) {
                    logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
                                 + ", cause: The channel has no data-transmission exceeds a heartbeat period"
                                 + (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
                }
            }
        }
        return;
    }
    if (isHeartbeatResponse(message)) {// 心跳响应
        if (logger.isDebugEnabled()) {
            logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName());
        }
        return;
    }
    handler.received(channel, message);
}

2.4 AllChannelHandler

该类的职责是将消息派发到具体的线程处理,这个线程可能是IO线程,也可能是业务线程。
Dubbo将底层通信框架中接收请求的线程称为IO线程,如果事件处理逻辑很简单,全是纯内存操作,那么可以考虑直接在IO线程直接执行,避免了线程切换的开销。但是如果事件处理逻辑复杂,涉及到数据库查询等操作,那么强烈建议派发到业务线程池执行,因为IO线程非常宝贵,一旦阻塞导致IO线程被占满,将不会接收新的请求了!

Dubbo支持多种线程派发策略,如下:

策略

说明

all

所有消息都派发到线程池,包括请求,响应,连接事件,断开事件等

direct

所有消息都不派发到线程池,全部在 IO 线程上直接执行

message

只有请求和响应消息派发到线程池,其它消息均在 IO 线程上执行

execution

只有请求消息派发到线程池,其它消息均在 IO 线程上执行

connection

在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池

默认的策略是all,对应的类是AllChannelHandler,它会把消息派发到业务线程池执行。

public void received(Channel channel, Object message) throws RemotingException {
    // 获取线程池
    ExecutorService executor = getPreferredExecutorService(message);
    try {
        // 提交异步任务,处理消息
        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    } catch (Throwable t) {}
}

ChannelEventRunnable创建后会被提交到线程池等待调度执行,最终执行run()方法,如果state是接收消息,则会转交给下一个Handler处理。

public void run() {
    if (state == ChannelState.RECEIVED) {
        try {
            handler.received(channel, message);
        } catch (Exception e) {
            logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                        + ", message is " + message, e);
        }
    }
}

2.5 DecodeHandler

从名字可以看出来,它的职责是负责消息的解码。这里说解码可能不太准确,严格来讲应该是Body的反序列化。字节序列解码为Request对象是在IO线程上执行的,Dubbo允许配置,反序列化的过程是否也在IO线程上执行,默认是false,如果IO线程没有完成反序列化,那么该类会在业务线程上进行反序列化操作,然后再交给下一个Handler处理。

public void received(Channel channel, Object message) throws RemotingException {
    if (message instanceof Decodeable) {
        decode(message);
    }
    // 接收到的是请求
    if (message instanceof Request) {
        decode(((Request) message).getData());
    }
    // 接收到的是响应
    if (message instanceof Response) {
        decode(((Response) message).getResult());
    }
    handler.received(channel, message);
}

2.6 HeaderExchangeHandler

该类会对消息做部分前置处理,对于请求对象Request,它会判断是否是事件消息、对端是否期望得到回复等等,然后调用对应的方法进行处理。以RPC请求为例,最终会调用handleRequest()方法。

public void received(Channel channel, Object message) throws RemotingException {
    final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    if (message instanceof Request) {
        // 处理请求
        Request request = (Request) message;
        if (request.isEvent()) {
            // 处理事件消息
            handlerEvent(channel, request);
        } else {
            if (request.isTwoWay()) {
                // 期望得到答复 需要响应结果
                handleRequest(exchangeChannel, request);
            } else {
                // 无需响应结果
                handler.received(exchangeChannel, request.getData());
            }
        }
    }
}

handleRequest()方法会创建Response对象,然后由最后一个ChannelHandler处理请求并返回结果,最终将Response响应给客户端。

void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
    Response res = new Response(req.getId(), req.getVersion());
    Object msg = req.getData();
    // 最后一个Handler处理请求,返回结果
    CompletionStage<Object> future = handler.reply(channel, msg);
    future.whenComplete((appResult, t) -> {
        try {
            if (t == null) {
                res.setStatus(Response.OK);
                res.setResult(appResult);
            } else {
                res.setStatus(Response.SERVICE_ERROR);
                res.setErrorMessage(StringUtils.toString(t));
            }
            // 响应结果
            channel.send(res);
        } catch (RemotingException e) {
            logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
        }
    });
}

2.7 DubboProtocol内部类

DubboProtocol有一个ExchangeHandler匿名内部类对象,它专门用来处理dubbo协议的RPC调用请求,方法是reply(),逻辑也很简单,参数Invocation已经告诉我们要调用哪个Service的哪个方法了,我们只需要匹配到对应的Invoker,执行invoke本地调用,将结果返回即可。

public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
    Invocation inv = (Invocation) message;
    // 从exporterMap获取对应的Invoker
    Invoker<?> invoker = getInvoker(channel, inv);
    // 远程address
    RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
    // 执行本地调用
    Result result = invoker.invoke(inv);
    // 返回结果
    return result.thenApply(Function.identity());
}

至此,所有的ChannelHandler流程全部结束。

3. Invoker

ChannelHandler最终根据请求参数Invocation匹配到Invoker,然后开始执行本地调用获取结果。Invoker也经过了一层层的封装,不过别担心,Dubbo大部分逻辑都是在客户端实现的,Provider侧的Invoker并不复杂。

Provider侧的Invoker包装结构总结如下:

ProtocolFilterWrapper 执行各种Filter...
>>DelegateProviderMetaDataInvoker
>>>>AbstractProxyInvoker
>>>>>>JavassistProxyFactory 代理对象

3.1 ProtocolFilterWrapper

该类是一个包装类,它会对Invoker对象包装一层过滤器链FilterChain,方法是buildInvokerChain(),利用SPI加载激活的所有Filter,然后串成一个单向链表,Invoker被放在末位,只有经过前面所有的Filter,才能执行最终的invoke方法。

依次执行完所有的Filter之后,最终会交给DelegateProviderMetaDataInvoker执行。

3.2 DelegateProviderMetaDataInvoker

它是一个纯粹的包装类,没有任何逻辑,只是在引用Invoker的基础上,还持有元数据服务对象。

public class DelegateProviderMetaDataInvoker<T> implements Invoker {
    protected final Invoker<T> invoker;
    private ServiceConfig<?> metadata;
    
    public Result invoke(Invocation invocation) throws RpcException {
        return invoker.invoke(invocation);
    }
}

3.3 AbstractProxyInvoker

代理Invoker的父类,它会调用抽象方法doInvoke()发起本地调用,拿到结果后封装成AppResponse并返回

public Result invoke(Invocation invocation) throws RpcException {
    Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
    CompletableFuture<Object> future = wrapWithFuture(value);
    CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
        AppResponse result = new AppResponse();
        result.setValue(obj);
        return result;
    });
    return new AsyncRpcResult(appResponseFuture, invocation);
}

doInvoke()是子类实现的,本地调用的方式有两种,一种是理由Java的反射,另一种是字节码技术动态生成的类会进行直接的方法调用,性能上来讲,后者更好一些,Dubbo默认使用后者,代理类由JavassistProxyFactory类生成。

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    // 提升反射效率
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName,
                                  Class<?>[] parameterTypes,
                                  Object[] arguments) throws Throwable {
            // 动态生成的Wrapper对象,在字节码层面进行直接的方法调用
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
}

4. 总结

Provider处理Consumer的RPC调用请求,需要经过两个重要的类,分别是ChannelHandler和Invoker。
ChannelHandler用来处理网络IO事件,received()方法用于处理接收到的消息,接收消息是一个大工程,需要对消息做解码、多消息处理、消息派发等操作,不可能将所有代码都写在一个类里,所以Dubbo利用装饰者模式,将ChannelHandler经过一层层的包装,每个包装类各司其职,最终实现一整套复杂流程。针对RPC请求,ChannelHandler最终会根据Invocation参数匹配到Invoker,发起本地调用然后响应结果。
Invoker也经过了层层包装,但是逻辑并不复杂,核心的是ProtocolFilterWrapper用于执行Filter逻辑,最后就是根据代理对象执行本地调用了,方式有Java反射和字节码技术生成代理对象,字节码层面的直接方法调用。



https://www.xamrdz.com/lan/5sw1932610.html

相关文章: