1. 前言
前面的文章分析了Consumer是如何发起RPC调用,以及请求对象Request是如何从客户端编码然后经过网络发送到服务端,服务端再解码的一个过程,接下来,开始分析服务端Provider是如何处理RPC调用请求的。
本文会从两个纬度去分析,首先是ChannelHandler,既然是处理网络请求,那么必然要处理网络IO事件,Provider从接收到字节序列的那一刻起,是如何将它们一步一步转化为Request对象并进行处理的。然后是Invoker,得到Request对象后Provider是如何执行本地调用,然后将方法结果响应给客户端的。
还记得在分析Consumer的时候,Dubbo生成的代理对象通过一层层Invoker包装,最终实现了一套复杂的功能。对于Provider而言,ChannelHandler和Invoker也是一样的,也是经过了层层包装,类的职责很清晰,就是源码看起来会有点晕。
2. ChannelHandler
ChannelHandler是Dubbo用来处理网络IO事件的接口,方法对应的功能如下:
方法 | 功能 |
connected | 连接事件 |
disconnected | 断开连接事件 |
sent | 消息发送事件 |
received | 消息接收事件 |
caught | 异常事件 |
注意,这是Dubbo定义的接口,而非Netty提供的接口,不要搞混了!
例如,当Provider接收到消息时,就会触发received()
方法。但是消息的处理是一项大工程,包括消息的解码、心跳处理、消息派发等等复杂逻辑,因此不会将所有的代码都写在一个ChannelHandler类里面,Dubbo采用装饰者模式,将一个个ChannelHandler一层包一层,最终来实现这套复杂的功能。
Dubbo默认使用Netty作为网络传输层框架,这里我们同样也以Netty为例分析。先来看看ChannelHandler的包装过程,之前的文章里就分析过,Provider在暴露服务时,会创建服务端ExchangeServer,代码如下:
ExchangeServer server = Exchangers.bind(url, requestHandler);
这个requestHandler是DubboProtocol的匿名内部类对象,它的职责是根据Invocation匹配到Invoker并调用,然后返回结果,它是最底层的ChannelHandler。bind()
方法代码如下,这里会经过HeaderExchangeHandler和DecodeHandler的包装。
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
Transporters#bind()
最终会创建NettyServer,它的构造函数会将ChannelHandler再做三层包装。
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME),
ChannelHandlers.wrap(handler, url));
}
ChannelHandlers#wrap()
方法,将ChannelHandler经过Dispatcher、HeartbeatHandler、MultiMessageHandler包装,后面会说它们的作用。
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
综上所述,最终ChannelHandler的包装结构如下,我们一个个分析。
AbstractPeer#received
>>MultiMessageHandler#received
>>>>HeartbeatHandler#received
>>>>>>AllChannelHandler#received
>>>>>>>>DecodeHandler#received
>>>>>>>>>>HeaderExchangeHandler#received
>>>>>>>>>>>>DubboProtocol.requestHandler#reply
2.1 AbstractPeer
AbstractPeer是NettyServer的父类,为啥会有它呢,是因为NettyServer在编排ChannelHandlerPipeline的时候,尾巴放的是NettyServerHandler,它依赖ChannelHandler,这个ChannelHandler正是NettyServer本身。
NettyServerHandler继承自ChannelDuplexHandler,这意味着它可以同时处理入站和出站事件,它仅仅是一个空壳,或者说它是一个适配器,它会在Channel有可读事件时,触发ChannelHandler#received()
。
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
handler.received(channel, msg);
}
这个时候,就会触发AbstractPeer#received()
了,父类的逻辑非常简单,确保Channel在没有关闭的情况下,转交给下一个Handler处理。
public void received(Channel ch, Object msg) throws RemotingException {
if (closed) {
return;
}
handler.received(ch, msg);
}
2.2 MultiMessageHandler
ChannelHandler#received()
方法被设计用来接收和处理单条消息的,但是前面在分析服务端解码的时候有说过,可能由于网络的原因,服务端会一次接收到多条消息,此时Dubbo会创建MultiMessage来存储,底层只是用List来存储多条消息。
从名字也可以看出来,该Handler具备处理多消息的能力,代码也很简单,如果是多消息,就循环处理,否则直接处理。
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof MultiMessage) {
MultiMessage list = (MultiMessage) message;
for (Object obj : list) {
// 多消息,循环挨个处理
handler.received(channel, obj);
}
} else {
handler.received(channel, message);
}
}
2.3 HeartbeatHandler
依然可以从名字就可以看出来,该Handler专门用来处理心跳的。如果是心跳请求/响应,该类会直接处理掉,不走后续流程了。如果是RPC调用请求,交给下一个Handler处理。
public void received(Channel channel, Object message) throws RemotingException {
setReadTimestamp(channel);
if (isHeartbeatRequest(message)) {// 心跳请求
Request req = (Request) message;
if (req.isTwoWay()) {// 期望得到回复
Response res = new Response(req.getId(), req.getVersion());
res.setEvent(HEARTBEAT_EVENT);
channel.send(res);// 直接构建Response发送
if (logger.isInfoEnabled()) {
int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
if (logger.isDebugEnabled()) {
logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period"
+ (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
}
}
}
return;
}
if (isHeartbeatResponse(message)) {// 心跳响应
if (logger.isDebugEnabled()) {
logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName());
}
return;
}
handler.received(channel, message);
}
2.4 AllChannelHandler
该类的职责是将消息派发到具体的线程处理,这个线程可能是IO线程,也可能是业务线程。
Dubbo将底层通信框架中接收请求的线程称为IO线程,如果事件处理逻辑很简单,全是纯内存操作,那么可以考虑直接在IO线程直接执行,避免了线程切换的开销。但是如果事件处理逻辑复杂,涉及到数据库查询等操作,那么强烈建议派发到业务线程池执行,因为IO线程非常宝贵,一旦阻塞导致IO线程被占满,将不会接收新的请求了!
Dubbo支持多种线程派发策略,如下:
策略 | 说明 |
all | 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件等 |
direct | 所有消息都不派发到线程池,全部在 IO 线程上直接执行 |
message | 只有请求和响应消息派发到线程池,其它消息均在 IO 线程上执行 |
execution | 只有请求消息派发到线程池,其它消息均在 IO 线程上执行 |
connection | 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池 |
默认的策略是all
,对应的类是AllChannelHandler,它会把消息派发到业务线程池执行。
public void received(Channel channel, Object message) throws RemotingException {
// 获取线程池
ExecutorService executor = getPreferredExecutorService(message);
try {
// 提交异步任务,处理消息
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {}
}
ChannelEventRunnable创建后会被提交到线程池等待调度执行,最终执行run()
方法,如果state是接收消息,则会转交给下一个Handler处理。
public void run() {
if (state == ChannelState.RECEIVED) {
try {
handler.received(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
}
}
2.5 DecodeHandler
从名字可以看出来,它的职责是负责消息的解码。这里说解码可能不太准确,严格来讲应该是Body的反序列化。字节序列解码为Request对象是在IO线程上执行的,Dubbo允许配置,反序列化的过程是否也在IO线程上执行,默认是false,如果IO线程没有完成反序列化,那么该类会在业务线程上进行反序列化操作,然后再交给下一个Handler处理。
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
decode(message);
}
// 接收到的是请求
if (message instanceof Request) {
decode(((Request) message).getData());
}
// 接收到的是响应
if (message instanceof Response) {
decode(((Response) message).getResult());
}
handler.received(channel, message);
}
2.6 HeaderExchangeHandler
该类会对消息做部分前置处理,对于请求对象Request,它会判断是否是事件消息、对端是否期望得到回复等等,然后调用对应的方法进行处理。以RPC请求为例,最终会调用handleRequest()
方法。
public void received(Channel channel, Object message) throws RemotingException {
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
if (message instanceof Request) {
// 处理请求
Request request = (Request) message;
if (request.isEvent()) {
// 处理事件消息
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
// 期望得到答复 需要响应结果
handleRequest(exchangeChannel, request);
} else {
// 无需响应结果
handler.received(exchangeChannel, request.getData());
}
}
}
}
handleRequest()
方法会创建Response对象,然后由最后一个ChannelHandler处理请求并返回结果,最终将Response响应给客户端。
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
Object msg = req.getData();
// 最后一个Handler处理请求,返回结果
CompletionStage<Object> future = handler.reply(channel, msg);
future.whenComplete((appResult, t) -> {
try {
if (t == null) {
res.setStatus(Response.OK);
res.setResult(appResult);
} else {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
// 响应结果
channel.send(res);
} catch (RemotingException e) {
logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
}
});
}
2.7 DubboProtocol内部类
DubboProtocol有一个ExchangeHandler匿名内部类对象,它专门用来处理dubbo协议的RPC调用请求,方法是reply()
,逻辑也很简单,参数Invocation已经告诉我们要调用哪个Service的哪个方法了,我们只需要匹配到对应的Invoker,执行invoke本地调用,将结果返回即可。
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
Invocation inv = (Invocation) message;
// 从exporterMap获取对应的Invoker
Invoker<?> invoker = getInvoker(channel, inv);
// 远程address
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
// 执行本地调用
Result result = invoker.invoke(inv);
// 返回结果
return result.thenApply(Function.identity());
}
至此,所有的ChannelHandler流程全部结束。
3. Invoker
ChannelHandler最终根据请求参数Invocation匹配到Invoker,然后开始执行本地调用获取结果。Invoker也经过了一层层的封装,不过别担心,Dubbo大部分逻辑都是在客户端实现的,Provider侧的Invoker并不复杂。
Provider侧的Invoker包装结构总结如下:
ProtocolFilterWrapper 执行各种Filter...
>>DelegateProviderMetaDataInvoker
>>>>AbstractProxyInvoker
>>>>>>JavassistProxyFactory 代理对象
3.1 ProtocolFilterWrapper
该类是一个包装类,它会对Invoker对象包装一层过滤器链FilterChain,方法是buildInvokerChain()
,利用SPI加载激活的所有Filter,然后串成一个单向链表,Invoker被放在末位,只有经过前面所有的Filter,才能执行最终的invoke方法。
依次执行完所有的Filter之后,最终会交给DelegateProviderMetaDataInvoker执行。
3.2 DelegateProviderMetaDataInvoker
它是一个纯粹的包装类,没有任何逻辑,只是在引用Invoker的基础上,还持有元数据服务对象。
public class DelegateProviderMetaDataInvoker<T> implements Invoker {
protected final Invoker<T> invoker;
private ServiceConfig<?> metadata;
public Result invoke(Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}
}
3.3 AbstractProxyInvoker
代理Invoker的父类,它会调用抽象方法doInvoke()
发起本地调用,拿到结果后封装成AppResponse并返回
public Result invoke(Invocation invocation) throws RpcException {
Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
CompletableFuture<Object> future = wrapWithFuture(value);
CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
AppResponse result = new AppResponse();
result.setValue(obj);
return result;
});
return new AsyncRpcResult(appResponseFuture, invocation);
}
doInvoke()
是子类实现的,本地调用的方式有两种,一种是理由Java的反射,另一种是字节码技术动态生成的类会进行直接的方法调用,性能上来讲,后者更好一些,Dubbo默认使用后者,代理类由JavassistProxyFactory类生成。
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// 提升反射效率
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
// 动态生成的Wrapper对象,在字节码层面进行直接的方法调用
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
4. 总结
Provider处理Consumer的RPC调用请求,需要经过两个重要的类,分别是ChannelHandler和Invoker。
ChannelHandler用来处理网络IO事件,received()
方法用于处理接收到的消息,接收消息是一个大工程,需要对消息做解码、多消息处理、消息派发等操作,不可能将所有代码都写在一个类里,所以Dubbo利用装饰者模式,将ChannelHandler经过一层层的包装,每个包装类各司其职,最终实现一整套复杂流程。针对RPC请求,ChannelHandler最终会根据Invocation参数匹配到Invoker,发起本地调用然后响应结果。
Invoker也经过了层层包装,但是逻辑并不复杂,核心的是ProtocolFilterWrapper用于执行Filter逻辑,最后就是根据代理对象执行本地调用了,方式有Java反射和字节码技术生成代理对象,字节码层面的直接方法调用。