文章目录
- 前言
- 重要类解释:
- 代码调用逻辑图
- 具体代码分析
- doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List registryURLs)
- proxyFactory.getInvoker(T proxy, Class type, URL url)
- protocol.export(Invoker invoker)
- RegistryProtocol.export(final Invoker originInvoker)
- doLocalExport(final Invoker originInvoker)
- DubboProtocol.export(Invoker invoker)
- DubboProtocol.openServer(URL url)
- DubboProtocol.createServer(URL url)
- Exchangers.bind(URL url, ExchangeHandler handler)
- getExchanger(URL url)
- HeaderExchanger.bind(URL url, ExchangeHandler handler)
- Transporters.bind(URL url, ChannelHandler... handlers)
- NettyTransporter.bind(URL url, ChannelHandler listener)
- NettyServer 构造器
- doOpen()
前言
dubbo服务发布共6个步骤:
1)暴露本地服务
2)暴露远程服务
3)启动netty服务
4)打开连接zk
5)注册到zk
6)监听zk
上次我们学习了《dubbo服务暴露之本地服务暴露》,今天我们学习下暴露远程服务的过程
重要类解释:
重要类解释:
ProxyFactory:就是为了获取一个接口的代理类,例如:获取一个远程接口的代理,Dubbo 默认的 ProxyFactory 实现类是 JavassistProxyFactory
Wrapper:它包装了一个类或接口,类似于spring的BeanWrapper,可以通过Wrapper对实例对象进行赋值、取值以及对方法的调用
Invoker:它是一个可执行对象,能够根据方法的名称、参数得到相应的结果
Exporter:维护Invoker的生命周期。
Transporter:网络传输层,用来抽象netty和mina的统一接口
Exchanger:信息交换层,封装请求响应模式,同步转异步
Protocol:它有两个方法:
1.export: 暴露远程服务;将proxyFactory.getInvoker创建的代理类invoker对象,通过协议暴露给外部。
2.refer:引用远程服务(用于客户端)
代码调用逻辑图
具体代码分析
doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List registryURLs)
代码逻辑:
1)将一些信息,比如版本、时间戳、方法名以及各种配置对象的字段信息放入到 map
2)构建好 map 后,紧接着是获取上下文路径、主机名以及端口号等信息
3)最后将 map 和主机名等数据传给 URL 构造方法创建 URL 对象,通过 URL 可让 Dubbo 的各种配置在各个模块之间传递
4)暴露本地服务
5)暴露远程服务
我们在《dubbo服务暴露之本地服务暴露》中讲了前4个步骤,今天从第5步暴露远程服务开始,源码如下:
注:源码中只放了暴露远程服务相关代码
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
// -----------------------构建URL的代码已省略,----------------------------------
// scope 为none表示不暴露
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
// 配置不是remote的情况下做本地暴露(配置remote则只暴露远程服务)
if (!"remote"..equalsIgnoreCase(scope)) {
exportLocal(url); //暴露本地服务
}
// 配置不是local的情况下做远程暴露(配置local则只暴露本地服务)
if (!"local".equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
// 如果有注册中心,就循环发布到所有注册中心里
if (registryURLs != null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
// 加载监视器链接
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
// 将监视器链接作为参数添加到 url 中
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
// 为服务提供类(ref)生成 Invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
// DelegateProviderMetaDataInvoker 用于持有 Invoker 和 ServiceConfig
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 导出服务,并生成 Exporter
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
// 不存在注册中心,仅导出服务
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
}
在本方法中暴露远程服务的过程是:
1)如果有注册中心,就循环发布到所有注册中心里
2)通过proxyFactory.getInvoker(T proxy, Class<T> type, URL url)
方法创建一个可执行的Invoker对象
3)调用protocol.export(Invoker<T>)
方法执行暴露逻辑
proxyFactory.getInvoker(T proxy, Class type, URL url)
ProxyFactory
类的作用在上面已经介绍了,proxyFactory
对象是ServiceConfig
类的成员变量:
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
学习完《dubbo SPI 的实现》这一节后我们知道proxyFactory
对象实际是ProxyFactory$Adaptive
类型的对象ProxyFactory$Adaptive
代码如下:
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adaptive implements ProxyFactory {
public Object getProxy(Invoker arg0) throws RpcException {
if (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url("
+ url.toString() + ") use keys([proxy])");
ProxyFactory extension = (ProxyFactory) ExtensionLoader
.getExtensionLoader(ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0);
}
public Invoker getInvoker(Object arg0, Class arg1, URL arg2) throws RpcException {
if (arg2 == null)
throw new IllegalArgumentException("url == null");
URL url = arg2;
String extName = url.getParameter("proxy", "javassist");
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url("
+ url.toString() + ") use keys([proxy])");
ProxyFactory extension = (ProxyFactory) ExtensionLoader
.getExtensionLoader(ProxyFactory.class).getExtension(extName);
return extension.getInvoker(arg0, arg1, arg2);
}
}
通过getInvoker(Object arg0, Class arg1, URL arg2)
的源码可以看到,最终返回了extension.getInvoker(arg0, arg1, arg2)
对象,而extension
是ProxyFactory
通过dubbo spi加载出来的对象,本例中spi加载的对象实例为JavassistProxyFactory
源码如下:
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// 为目标类创建 Wrapper
// 在本例中为proxy.getClass().getName()==com.alibaba.dubbo.demo.provider.DemoServiceImpl
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
// 创建匿名 Invoker 类对象,并实现 doInvoke 方法。
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
// 调用 Wrapper 的 invokeMethod 方法,invokeMethod 最终会调用目标方法
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
最终返回的Invoker对象就是DemoServiceImpl的可执行对象(关于Invoker类的介绍,请看上面)
protocol.export(Invoker invoker)
Protocol类的作用在上面已经介绍了,protocol
对象也是ServiceConfig
类的成员变量:
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
学习完《dubbo SPI 的实现》这一节后我们知道protocol
对象实际是Protocol$Adaptive
类型的对象Protocol$Adaptive
代码如下:
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adaptive implements Protocol {
public void destroy() {
throw new UnsupportedOperationException(
"method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException(
"method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public Invoker refer(Class arg0, URL arg1) throws RpcException {
if (arg1 == null)
throw new IllegalArgumentException("url == null");
URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
Protocol extension = (Protocol) ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
public Exporter export(Invoker arg0) throws RpcException {
if (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
Protocol extension = (Protocol) ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
return extension.export(arg0);
}
}
通过export(Invoker arg0)
的源码可以看到,最终返回了extension.export(arg0)
对象,而extension
是Protocol
接口通过dubbo spi加载出来的对象,本例中spi对象实例为RegistryProxyFactory
,不过由于ExtensionLoader.createExtension(String name)
方法中对RegistryProxyFactory
进行了AOP的包装,所以:
1)extension.export(arg0)
调用的是ProtocolFilterWrapper
的export(invoker)
方法
2)ProtocolFilterWrapper
的export
方法又调用了ProtocolListenerWrapper
的export(invoker)
方法
3)ProtocolListenerWrapper
的export
方法,调用了RegistryProtocol
对象的export
方法
注:ProtocolFilterWrapper和ProtocolListenerWrapper的调用逻辑很简单,就不做展示了
RegistryProtocol.export(final Invoker originInvoker)
逻辑分析:
1)调用 doLocalExport 导出服务
2)向注册中心注册服务
3)向注册中心进行订阅 override 数据
4)创建并返回 DestroyableExporter
源码如下:
注:只有第一步与暴露远程服务有关,故其他代码省略
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//netty服务暴露
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
// --------------------------连接zk的代码已省略-------------------------
}
服务暴露的逻辑都在doLocalExport
方法中了
doLocalExport(final Invoker originInvoker)
代码逻辑:
1)通过originInvoker
对象的URL获取缓存key
2)通过缓存key,从缓存对象bounds
中取出缓存对象
3)如果缓存不存在就调用 protocol
的 export
方法导出服务,得到缓存对象
4)将缓存对象存入缓存并返回
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
// 获取invoker在bounds中缓存的key
String key = getCacheKey(originInvoker);
// 访问缓存
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
// 双重检查锁
if (exporter == null) {
// 创建 Invoker 为委托类对象
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
// 调用 protocol 的 export 方法导出服务
exporter = new ExporterChangeableWrapper<T>(
(Exporter<T>) protocol.export(invokerDelegete),
originInvoker);
// 入写缓存
bounds.put(key, exporter);
}
}
}
return exporter;
}
第一次访问时,缓存中肯定没有对象,所以一定会调用protocol.export(invokerDelegete)
,而protocol
对象是在RegistryProtocol
类被dubbo spi加载的时候,就通过调用IoC原理的ExtensionLoader.injectExtension(T instance)
方法注入的Protocol$Adaptive
对象的实例。所以这里调用的是Protocol$Adaptive
类的export()方法,Protocol$Adaptive
类源码在上面。Protocol$Adaptive.export(Invoker arg0)
方法执行了下面的代码:
Protocol extension = (Protocol) ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
return extension.export(arg0);
其中extName
的值为“dubbo”
,所以SPI加载的是DubboProtocol
类,但是根据dubbo SPI的AOP,dubbo在DubboProtocol的基础上包装了ProtocolFilterWrapper
类和ProtocolListenerWrapper
类,所以:
1)extension.export(arg0)
调用的是ProtocolFilterWrapper
的export(invoker)
方法
2)ProtocolFilterWrapper
的export
方法又调用了ProtocolListenerWrapper
的export(invoker)
方法
3)ProtocolListenerWrapper
的export
方法,调用了DubboProtocol
对象的export
方法
注:ProtocolFilterWrapper和ProtocolListenerWrapper的调用逻辑很简单,就不做展示了,我们直接查看DubboProtocol
的export
方法
DubboProtocol.export(Invoker invoker)
代码逻辑:
1)创建DubboExporter维护invoker的生命周期
2)维护本地存根代码
3)启动服务器
4)优化序列化
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// 获取服务标识,由服务组名,服务名,服务版本号以及端口组成。比如:
// demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880
String key = serviceKey(url);
// 创建 DubboExporter
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
// 将 <key, exporter> 键值对放入缓存中
exporterMap.put(key, exporter);
// 本地存根相关代码
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
// 启动服务器
openServer(url);
// 优化序列化
optimizeSerialization(url);
return exporter;
}
我们重点看下 DubboExporter
的创建以及 openServer
方法,其他逻辑看不懂也不影响我们服务发布得到逻辑。其中DubboExporter
就是用来维护我们的invoker
对象生命周期的,代码很简单。
我们直接分析openServer
方法
DubboProtocol.openServer(URL url)
1)获取服务器实例的 key,格式为host:port
2)通过key,从缓存中获取对象
3)如果没有获取到,就调用createServer
新建一个
4)如果获取到了就调用reset
重置服务
private void openServer(URL url) {
// 获取 host:port,并将其作为服务器实例的 key,用于标识当前的服务器实例,如本例中key = 192.168.25.1:20880
String key = url.getAddress();
boolean isServer = url.getParameter("isserver", true);
if (isServer) {
// 访问缓存
ExchangeServer server = serverMap.get(key);
if (server == null) {
// 创建服务器实例
serverMap.put(key, createServer(url));
} else {
// 服务器已创建,则根据 url 中的配置重置服务器
server.reset(url);
}
}
}
第一次调用时,缓存中没有ExchangeServer
对象,所以会调用createServer
方法创建
DubboProtocol.createServer(URL url)
/**
* 1)组装URL
* 2)从url中获取参数server的值,并检测是否存在参数 server 所代表的 Transporter 拓展,不存在则抛出异常
* 3)创建服务器实例ExchangeServer
* 4)从url中获取参数 client 的值,并检测是否支持参数 client 所表示的 Transporter 拓展,不存在也是抛出异常
*/
private ExchangeServer createServer(URL url) {
url = url.addParameterIfAbsent("channel.readonly.sent", Boolean.TRUE.toString());
// 添加心跳检测配置到 url 中
url = url.addParameterIfAbsent("heartbeat", String.valueOf(60 * 1000));
// 获取 server 参数,默认为 netty
String str = url.getParameter("server", "netty");
// 通过 SPI 检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
// 添加编码解码器参数
url = url.addParameter("codec", "dubbo");
ExchangeServer server;
try {
// 创建 ExchangeServer
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
// 获取 client 参数,可指定 netty,mina
str = url.getParameter("client");
if (str != null && str.length() > 0) {
// 获取所有的 Transporter 实现类名称集合,比如 supportedTypes = [netty, mina]
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
// 检测当前 Dubbo 所支持的 Transporter 实现类名称列表中,是否包含 client 所表示的 Transporter,若不包含,则抛出异常
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
Exchangers.bind(URL url, ExchangeHandler handler)
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent("codec", "exchange");
// 获取 Exchanger,默认为 HeaderExchanger。
// 紧接着调用 HeaderExchanger 的 bind 方法创建 ExchangeServer 实例
return getExchanger(url).bind(url, handler);
}
bind方法先执行了getExchanger(url)获取了Exchanger的实例对象
getExchanger(URL url)
public static Exchanger getExchanger(URL url) {
String type = url.getParameter("exchanger", "header");
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);;
}
上面的方法最终返回的是HeaderExchanger
类的实例对象,所以getExchanger(url).bind(url, handler)
实际上执行的是HeaderExchanger.bind(URL url, ExchangeHandler handler)
HeaderExchanger.bind(URL url, ExchangeHandler handler)
创建 HeaderExchangeServer
实例,该方法包含了多个逻辑,分别如下:
1)new HeaderExchangeHandler(handler)
2) new DecodeHandler(new HeaderExchangeHandler(handler))
3) Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(
Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
);
}
Transporters.bind(URL url, ChannelHandler… handlers)
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
// 如果 handlers 元素数量大于1,则创建 ChannelHandler 分发器
handler = new ChannelHandlerDispatcher(handlers);
}
// 获取自适应 Transporter 实例,并调用实例方法
return getTransporter().bind(url, handler);
}
上面的方法先调用getTransporter()
获取Transporter
的自适应扩展类,代码如下:
public static Transporter getTransporter() {
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
最终返回的对象是自适应扩展类Transporter$Adaptive
的实例对象,然后调用对象的bind(URL arg0, ChannelHandler arg1)
方法,代码如下:
public Server bind(URL arg0, ChannelHandler arg1) throws RemotingException {
if (arg0 == null)
throw new IllegalArgumentException("url == null");
URL url = arg0;
String extName = url.getParameter("server", url.getParameter("transporter", "netty"));
if (extName == null)
throw new IllegalStateException(
"Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString()
+ ") use keys([server, transporter])");
Transporter extension = (Transporter) ExtensionLoader
.getExtensionLoader(Transporter.class).getExtension(extName);
return extension.bind(arg0, arg1);
}
通过这个bind
方法可以知道,最终调用了DUBBO SPI加载的Transporter
的实现类NettyTransporter
的bind(URL url, ChannelHandler listener)
方法
NettyTransporter.bind(URL url, ChannelHandler listener)
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
NettyServer 构造器
public class NettyServer extends AbstractServer implements Server {
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// 调用父类构造方法
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, "DubboServerHandler")));
}
}
/**
* 该类是NettyServer 的父类,由于NettyServer构造器调用了父类构造器,故而展示下父类构造方法
*/
public abstract class AbstractServer extends AbstractEndpoint implements Server {
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
// 调用父类构造方法,这里就不用跟进去了,没什么复杂逻辑
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
// 获取 ip 和端口
String bindIp = getUrl().getParameter("bind.ip", getUrl().getHost());
int bindPort = getUrl().getParameter("bind.port", getUrl().getPort());
if (url.getParameter("anyhost", false) || NetUtils.isInvalidLocalHost(bindIp)) {
// 设置 ip 为 0.0.0.0
bindIp = NetUtils.ANYHOST;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
// 获取最大可接受连接数
this.accepts = url.getParameter("accepts", 0);
this.idleTimeout = url.getParameter("idle.timeout", 600 * 1000);
try {
// 调用模板方法 doOpen 启动服务器
doOpen();
} catch (Throwable t) {
throw new RemotingException("Failed to bind ");
}
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
protected abstract void doOpen() throws Throwable;
protected abstract void doClose() throws Throwable;
}
上面的代码大多都是从url中获取参数值的代码,我们直接看doOpen()
方法
doOpen()
这个方法就是调用netty服务的方法了,里面的代码大多数也都是netty的代码,代码逻辑为:
1)设置new NioServerSocketChannelFactory() boss worker线程池
2)设置编解码 decoder、encoder、handler3个管道
3)bootstrap.bind()
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
// 创建 boss 和 worker 线程池
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
// 创建 ServerBootstrap
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
bootstrap.setOption("child.tcpNoDelay", true);
// 设置 PipelineFactory
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// 绑定到指定的 ip 和端口上
channel = bootstrap.bind(getBindAddress());
}
至此,我们dubbo服务暴露之远程暴露的逻辑就捋完了。