一、简介
前面一篇讲了服务调用方启动的大致流程
本章主要讲refer服务引用,分成两个重点,一个是创建invoker,一个是创建代理。
入口:ReferenceConfig类的createProxy方法
1.关于connection的问题
- 如果connections不配置,则共享连接,否则每服务每连接,
- 共享连接的意思是对于同一个ip+port的所有服务只创建一个连接,
- 如果是非共享连接则每个服务+(ip+port)创建一个连接
二、服务引用
该方法里面有一句
invoker = refprotocol.refer(interfaceClass, urls.get(0));
这一句是本文的入口
这里的 invoker = refprotocol.refer(interfaceClass, urls.get(0));就是通过dubbo的SPI机制,跟进去的调用流程是:
ProtocolListenerWrapper.refer方法 --------> ProtocolFilterWrapper.refer --------> QosProtocolWrapper.refer --------> QosProtocolWrapper.startQosServer(url); --------> RegistryProtocol.refer
--------> MockClusterWrapper.join ------> FailoverCluster.join ------> ProviderConsumerRegTable.registerConsumer
1.QosProtocolWrapper.refer
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
startQosServer(url);
return protocol.refer(type, url);
}
return protocol.refer(type, url);
}
这个startQosServer方法跟进去
private void startQosServer(URL url) {
try {
boolean qosEnable = url.getParameter(QOS_ENABLE,true);
if (!qosEnable) {
logger.info("qos won't be started because it is disabled. " +
"Please check dubbo.application.qos.enable is configured either in system property, " +
"dubbo.properties or XML/spring boot configuration.");
return;
}
if (!hasStarted.compareAndSet(false, true)) {
return;
}
int port = url.getParameter(QOS_PORT, DEFAULT_PORT);
boolean acceptForeignIp = Boolean.parseBoolean(url.getParameter(ACCEPT_FOREIGN_IP,"false"));
Server server = com.alibaba.dubbo.qos.server.Server.getInstance();
server.setPort(port);
server.setAcceptForeignIp(acceptForeignIp);
server.start();
} catch (Throwable throwable) {
logger.warn("Fail to start qos server: ", throwable);
}
}
做了几件事:
开始QOS服务
2.RegistryProtocol.refer
@Override
@SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 取 registry 参数值,并将其设置为协议头
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
// 获取注册中心实例
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
//debug的时候没走这里
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
// 将 url 查询字符串转为 Map
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
// 获取 group 配置
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0) {
// debug的时候没走这里
if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
|| "*".equals(group)) {
// 通过 SPI 加载 Cluster 实例,并调用 doRefer 继续执行服务引用逻辑
return doRefer(getMergeableCluster(), registry, type, url);
}
}
// 调用 doRefer 继续执行服务引用逻辑
return doRefer(cluster, registry, type, url);
}
这里的url=zookeeper://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-annotation-consumer&dubbo=2.0.2&pid=9044&qos.port=33333&refer=application%3Ddubbo-annotation-consumer%26default.timeout%3D3000%26dubbo%3D2.0.2%26interface%3Dorg.apache.dubbo.samples.api.client.HelloService%26methods%3DsayHello%26pid%3D9044%26qos.port%3D33333%26register.ip%3DXXX.XXX.XXX.XXX%26side%3Dconsumer%26timestamp%3D1603880645072×tamp=1603880668326
里面的registryFactory.getRegistry(url);用到了dubbo的SPI,这里得到的registry是ZookeeperRegistry
3.RegistryProtocol.doRefer
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 创建 RegistryDirectory 实例
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
// 设置注册中心和协议
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
// 生成服务消费者链接
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
// 注册服务消费者,在 consumers 目录下新节点
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
registry.register(registeredConsumerUrl);
directory.setRegisteredConsumerUrl(registeredConsumerUrl);
}
// 订阅 providers、configurators、routers 等节点数据
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
// 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
做了几件事
背景例子:
消费者服务所在的项目叫:dubbo-consumer
里面就一个类ProductServiceImpl一个方法addCostByItemId,需要去调用dubbo-provider项目中的itemService服务和costService服务
服务提供者所在的项目叫:dubbo-provider
里面有2个服务itemService和costService
现在debug是搞的消费者服务
当ProductServiceImpl实例化过后,依赖注入成员的时候会走到这里来,第一次来是注入costService
1.创建 RegistryDirectory 实例。
(这个是服务目录。服务目录中存储了一些和服务提供者有关的信息,通过服务目录,服务消费者可获取到服务提供者的信息,比如 ip、端口、服务协议等。类似于注册中心在消费者端本地的缓存,不知道dubbo这个是不是从注册中心拉下来的。后面会专门开一篇来讲这个)
2.设置注册中心和协议
3.URL subscribeUrl
生成服务消费者链接
第一次进来是是注入costService出发的,此时subscribeUrl=consumer://XXX.XXX.XXX.XX/com.sid.api.service.CostService?application=dubbo-consumer&default.check=false&default.timeout=3000&dubbo=2.6.2&interface=com.sid.api.service.CostService&loadbalance=roundrobin&methods=add,subtract&pid=10116&revision=1.0.0&side=consumer×tamp=1620626010244&version=1.0.0
4.registry.register方法
注册服务消费者,在 consumers 目录下新节点
5.订阅服务directory.subscribe方法
订阅 providers、configurators、routers 等节点数据
下面三、服务订阅详细讲
6.由于一个服务可能部署在多台服务器上,这样就会在 providers 产生多个节点,这个时候就需要 Cluster 将多个服务节点合并为一个,并生成一个 Invoker
三、服务订阅
RegistryDirectory.subscribe方法
public void subscribe(URL url) {
setConsumerUrl(url);
registry.subscribe(url, this);
}
这里面调用的是FailbackRegistry.subscribe
FailbackRegistry.subscribe方法
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// 向服务器端发送订阅请求
// Sending a subscription request to the server side
doSubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;
List<URL> urls = getCacheUrls(url);
if (urls != null && !urls.isEmpty()) {
notify(url, listener, urls);
logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
} else {
// 如果开启了启动时检测,则直接抛出异常
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
// 将失败的订阅请求记录到失败列表,定时重试
// Record a failed registration request to a failed list, retry regularly
addFailedSubscribed(url, listener);
}
}
主要做了几件事:
1.ZookeeperRegistry.doSubscribe方法
向服务器端发送订阅请求
2.doSubscribe方法如果抛出异常,则
notify()或者 如果开启了启动时检测,则直接抛出异常
将失败的订阅请求记录到失败列表,定时重试addFailedSubscribed(url, listener);
ZookeeperRegistry.doSubscribe方法
@Override
protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
//如果provider的service的接口配置的是“*”
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
//获取服务分组根路径
String root = toRootPath();
//获取服务的NotifyListener
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
//如果没有则创建一个
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
//如果没有子监听器则创建一个
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
}
});
zkListener = listeners.get(listener);
}
//向服务器订阅服务,注册中心会调用NotifyListener的notify函数返回服务列表
zkClient.create(root, false);
//获取服务地址列表
List<String> services = zkClient.addChildListener(root, zkListener);
if (services != null && !services.isEmpty()) {
//如果存在服务
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
//如果serviceInterface是“*”则从分组根路径遍历service并订阅所有服务
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {
//如果serviceInterface不是“*”则创建Zookeeper客户端索取服务列表,并通知(notify)消费者(consumer)这些服务可以用了
List<URL> urls = new ArrayList<URL>();
//获取类似于http://xxx.xxx.xxx.xxx/context/com.service.xxxService/consumer的地址
for (String path : toCategoriesPath(url)) {
//获取例如com.service.xxxService对应的NotifyListener map
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
//获取ChildListener
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
//创建Zookeeper客户端
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
//提醒消费者
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
主要做了几件事
1.在zk中添加节点和节点监听器
2.notify通知消费者去创建跟服务提供者之间的TCP连接
FailbackRegistry.notify方法
里面的调用链需关注的是
这个里面是其实通知registryDirectory刷新invoker,因为这里是第一次服务调用方第一期启动,所以实际上是在创建invoker、通过NETTY与服务提供方建立TCP长链接。
可以看到通过RegistryDirectory.toInvokers方法触发了Protocol@Adaptive.refer()-------->QosProtocolWrapper.refer()-------->ProtocolFilterWrapper.refer()-------->ProtocolListenerWrapper.refer()-------->DubboProtocol.refer()
DubboProtocol.refer()方法源码:
@Override
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
主要做了几件事:
1.创建RPC用的invoker并且返回
这里创建Invoker不复杂,就是一个new,但是重点关注一下getClients(url)方法,该方法会用netty去建立跟服务提供方的TCP链接。
在四、创建Invoker中详细讲
四、创建Invoker、用netty去建立跟服务提供方的TCP链接
创建invoker的时候用netty去建立跟服务提供方的TCP链接
入口
DubboProtocol.refer()方法中的
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
getClients(url)方法源码
private ExchangeClient[] getClients(URL url) {
// whether to share connection
//是否共享连接
boolean service_share_connect = false;
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
// if not configured, connection is shared, otherwise, one connection for one service
// 如果connections不配置,则共享连接,否则每服务每连接,
// 共享连接的意思是对于同一个ip+port的所有服务只创建一个连接,
// 如果是非共享连接则每个服务+(ip+port)创建一个连接
if (connections == 0) {
service_share_connect = true;
connections = 1;
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect) {
clients[i] = getSharedClient(url);
} else {
clients[i] = initClient(url);
}
}
return clients;
}
主要做了几件事
1.获取交换客户端ExchangeClient,默认使用共享链接
这里注意下
- 如果connections不配置,则共享连接,否则每服务每连接,
- 共享连接的意思是对于同一个ip+port的所有服务只创建一个连接,
- 如果是非共享连接则每个服务+(ip+port)创建一个连接
getSharedClient(url)方法
private ExchangeClient getSharedClient(URL url) {
// 以address(ip:port)为key进行缓存
String key = url.getAddress();
ReferenceCountExchangeClient client = referenceClientMap.get(key);
if (client != null) {
// 如果连接存在了则引用数加1,引用数表示有多少个服务使用了此client,
// 当某个client调用close()时,引用数减一,
// 如果引用数大于0,表示还有服务在使用此连接, 不会真正关闭client
// 如果引用数为0,表示没有服务在用此连接,此时连接彻底关闭
if (!client.isClosed()) {
client.incrementAndGetCount();
return client;
} else {
referenceClientMap.remove(key);
}
}
synchronized (key.intern()) {
// 调用initClient来初始化Client
ExchangeClient exchangeClient = initClient(url);
// 使用ReferenceCountExchangeClient进行包装
client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
referenceClientMap.put(key, client);
ghostClientMap.remove(key);
return client;
}
}
主要做了几件事
1.以address(ip:port)为key进行缓存
如果连接存在了则引用数加1,引用数表示有多少个服务使用了此client,
当某个client调用close()时,引用数减一,
如果引用数大于0,表示还有服务在使用此连接, 不会真正关闭client
如果引用数为0,表示没有服务在用此连接,此时连接彻底关闭2.根据URL初始化exchangeClient
3.得到referenceCountExchangeClient
4.把这些client保存到MAP中
initClient方法
private ExchangeClient initClient(URL url) {
// 获取client参数的值,为空则获取server参数的值,默认为netty
// client type setting.
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
// 加入codec参数,默认为dubbo,即DubboCodec
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
//默认开启心跳,默认每60s发送一次心跳包
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// BIO存在严重性能问题,暂时不允许使用
// BIO is not allowed since it has severe performance issue.
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
ExchangeClient client;
try {
// connection should be lazy
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
主要做了几件事
1.设置client的类型,默认是netty,设置codec,默认是dubboCodec,默认开启心跳60S发一次包
2.如果Connection不是懒加载,那就立马创建跟服务提供方的connection
Exchangers.connect(url, requestHandler)-------->getExchanger(url).connect(url, handler);-------->HeaderExchanger.connect()-------->Transporters.connect()-------->NettyTransporter.connect()-------->NettyClient构造函数-------->AbstractClient构造函数
AbstractClient构造函数
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);
// The default reconnection interval is 2s, 1800 means warning interval is 1 hour.
reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800);
try {
doOpen();
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
try {
// connect.
connect();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
}
} catch (RemotingException t) {
if (url.getParameter(Constants.CHECK_KEY, true)) {
close();
throw t;
} else {
logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
}
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
}
这部分看一下:
五、创建代理proxy
创建代理的入口在ReferenceConfig类的createProxy方法中有一句
return (T) proxyFactory.getProxy(invoker);
这里的proxyFactory通过dubbo的SPI得到
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.Adaptive;
import com.alibaba.dubbo.common.extension.SPI;
/**
* ProxyFactory. (API/SPI, Singleton, ThreadSafe)
*/
@SPI("javassist")
public interface ProxyFactory {
@Adaptive({Constants.PROXY_KEY})
<T> T getProxy(Invoker<T> invoker) throws RpcException;
@Adaptive({Constants.PROXY_KEY})
<T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;
@Adaptive({Constants.PROXY_KEY})
<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
}
调用了StubProxyFactoryWrapper的getProxy方法
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
T proxy = proxyFactory.getProxy(invoker);
if (GenericService.class != invoker.getInterface()) {
String stub = invoker.getUrl().getParameter(Constants.STUB_KEY, invoker.getUrl().getParameter(Constants.LOCAL_KEY));
if (ConfigUtils.isNotEmpty(stub)) {
Class<?> serviceType = invoker.getInterface();
if (ConfigUtils.isDefault(stub)) {
if (invoker.getUrl().hasParameter(Constants.STUB_KEY)) {
stub = serviceType.getName() + "Stub";
} else {
stub = serviceType.getName() + "Local";
}
}
try {
Class<?> stubClass = ReflectUtils.forName(stub);
if (!serviceType.isAssignableFrom(stubClass)) {
throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + serviceType.getName());
}
try {
Constructor<?> constructor = ReflectUtils.findConstructor(stubClass, serviceType);
proxy = (T) constructor.newInstance(new Object[]{proxy});
//export stub service
URL url = invoker.getUrl();
if (url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT)) {
url = url.addParameter(Constants.STUB_EVENT_METHODS_KEY, StringUtils.join(Wrapper.getWrapper(proxy.getClass()).getDeclaredMethodNames(), ","));
url = url.addParameter(Constants.IS_SERVER_KEY, Boolean.FALSE.toString());
try {
export(proxy, (Class) invoker.getInterface(), url);
} catch (Exception e) {
LOGGER.error("export a stub service error.", e);
}
}
} catch (NoSuchMethodException e) {
throw new IllegalStateException("No such constructor \"public " + stubClass.getSimpleName() + "(" + serviceType.getName() + ")\" in stub implementation class " + stubClass.getName(), e);
}
} catch (Throwable t) {
LOGGER.error("Failed to create stub implementation class " + stub + " in consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", cause: " + t.getMessage(), t);
// ignore
}
}
}
return proxy;
}
AbstractProxyFactory.getProxy方法
package com.alibaba.dubbo.rpc.proxy;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.utils.ReflectUtils;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.ProxyFactory;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.service.EchoService;
import com.alibaba.dubbo.rpc.service.GenericService;
/**
* AbstractProxyFactory
*/
public abstract class AbstractProxyFactory implements ProxyFactory {
@Override
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
return getProxy(invoker, false);
}
@Override
public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
Class<?>[] interfaces = null;
// 获取接口列表
String config = invoker.getUrl().getParameter("interfaces");
// debug的时候这里没进来
if (config != null && config.length() > 0) {
// 切分接口列表
String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
if (types != null && types.length > 0) {
interfaces = new Class<?>[types.length + 2];
// 设置服务接口类和 EchoService.class 到 interfaces 中
interfaces[0] = invoker.getInterface();
interfaces[1] = EchoService.class;
for (int i = 0; i < types.length; i++) {
// 加载接口类
interfaces[i + 1] = ReflectUtils.forName(types[i]);
}
}
}
if (interfaces == null) {
interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
}
// 为 http 和 hessian 协议提供泛化调用支持
// debug的时候这里没进来
if (!invoker.getInterface().equals(GenericService.class) && generic) {
int len = interfaces.length;
Class<?>[] temp = interfaces;
// 创建新的 interfaces 数组
interfaces = new Class<?>[len + 1];
System.arraycopy(temp, 0, interfaces, 0, len);
// 设置 GenericService.class 到数组中
interfaces[len] = GenericService.class;
}
// 调用重载方法
return getProxy(invoker, interfaces);
}
public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);
}
做了几件事:
1.获取接口列表、切分接口列表、设置服务接口类和 EchoService.class 到 interfaces 中、加载接口类
2.为 http 和 hessian 协议提供泛化调用支持
3.调用子类重载方法
JavassistProxyFactory.getProxy
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
做了几件事
1.通过 Proxy 的 getProxy 方法获取 Proxy 子类
2.创建 InvokerInvocationHandler 对象,并将该对象传给 newInstance 生成 Proxy 实例。
InvokerInvocationHandler 实现 JDK 的 InvocationHandler 接口,具体的用途是拦截接口类调用。
Proxy.getProxy方法
public static Proxy getProxy(Class<?>... ics) {
return getProxy(ClassHelper.getClassLoader(Proxy.class), ics);
}
public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
if (ics.length > 65535)
throw new IllegalArgumentException("interface limit exceeded");
StringBuilder sb = new StringBuilder();
// 遍历接口列表
// 这里debug的时候接口列表有:HelloService和EchoService
for (int i = 0; i < ics.length; i++) {
String itf = ics[i].getName();
// 检测类型是否为接口
if (!ics[i].isInterface())
throw new RuntimeException(itf + " is not a interface.");
Class<?> tmp = null;
try {
// 重新加载接口类
tmp = Class.forName(itf, false, cl);
} catch (ClassNotFoundException e) {
}
// 检测接口是否相同,这里 tmp 有可能为空
if (tmp != ics[i])
throw new IllegalArgumentException(ics[i] + " is not visible from class loader");
// 拼接接口全限定名,分隔符为 ;
sb.append(itf).append(';');
}
// 使用拼接后的接口名作为 key
//debug的时候key=org.apache.dubbo.samples.api.client.HelloService;com.alibaba.dubbo.rpc.service.EchoService;
// use interface class name list as key.
String key = sb.toString();
// get cache by class loader.
Map<String, Object> cache;
synchronized (ProxyCacheMap) {
cache = ProxyCacheMap.get(cl);
if (cache == null) {
cache = new HashMap<String, Object>();
ProxyCacheMap.put(cl, cache);
}
}
Proxy proxy = null;
synchronized (cache) {
do {
// 从缓存中获取 Reference<Proxy> 实例
Object value = cache.get(key);
if (value instanceof Reference<?>) {
proxy = (Proxy) ((Reference<?>) value).get();
if (proxy != null)
return proxy;
}
// 并发控制,保证只有一个线程可以进行后续操作
if (value == PendingGenerationMarker) {
try {
// 其他线程在此处进行等待
cache.wait();
} catch (InterruptedException e) {
}
} else {
// 放置标志位到缓存中,并跳出 while 循环进行后续操作
cache.put(key, PendingGenerationMarker);
break;
}
}
while (true);
}
long id = PROXY_CLASS_COUNTER.getAndIncrement();
String pkg = null;
ClassGenerator ccp = null, ccm = null;
try {
// 创建 ClassGenerator 对象
ccp = ClassGenerator.newInstance(cl);
Set<String> worked = new HashSet<String>();
List<Method> methods = new ArrayList<Method>();
for (int i = 0; i < ics.length; i++) {
// 检测接口访问级别是否为 protected 或 privete
if (!Modifier.isPublic(ics[i].getModifiers())) {
// 获取接口包名
String npkg = ics[i].getPackage().getName();
if (pkg == null) {
pkg = npkg;
} else {
if (!pkg.equals(npkg))
// 非 public 级别的接口必须在同一个包下,否者抛出异常
throw new IllegalArgumentException("non-public interfaces from different packages");
}
}
// 添加接口到 ClassGenerator 中
ccp.addInterface(ics[i]);
// 遍历接口方法
for (Method method : ics[i].getMethods()) {
// 获取方法描述,可理解为方法签名
String desc = ReflectUtils.getDesc(method);
// 如果方法描述字符串已在 worked 中,则忽略。考虑这种情况,
// A 接口和 B 接口中包含一个完全相同的方法
if (worked.contains(desc))
continue;
worked.add(desc);
int ix = methods.size();
// 获取方法返回值类型
Class<?> rt = method.getReturnType();
// 获取参数列表
Class<?>[] pts = method.getParameterTypes();
// 生成 Object[] args = new Object[1...N]
StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
for (int j = 0; j < pts.length; j++)
// 生成 args[1...N] = ($w)...N;
code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
// 生成 InvokerHandler 接口的 invoker 方法调用语句
code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);");
// 返回值不为 void
if (!Void.TYPE.equals(rt))
// 生成返回语句,形如 return (java.lang.String) ret;
code.append(" return ").append(asArgument(rt, "ret")).append(";");
methods.add(method);
// 添加方法名、访问控制符、参数列表、方法代码等信息到 ClassGenerator 中
ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString());
}
}
if (pkg == null)
pkg = PACKAGE_NAME;
// create ProxyInstance class.
// 构建接口代理类名称:pkg + ".proxy" + id,比如 org.apache.dubbo.proxy0
String pcn = pkg + ".proxy" + id;
ccp.setClassName(pcn);
ccp.addField("public static java.lang.reflect.Method[] methods;");
// 生成 private java.lang.reflect.InvocationHandler handler;
ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
// 为接口代理类添加带有 InvocationHandler 参数的构造方法,比如:
// porxy0(java.lang.reflect.InvocationHandler arg0) {
// handler=;
// }
ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=;");
// 为接口代理类添加默认构造方法
ccp.addDefaultConstructor();
// 生成接口代理类
Class<?> clazz = ccp.toClass();
clazz.getField("methods").set(null, methods.toArray(new Method[0]));
// 构建 Proxy 子类名称,比如 Proxy1,Proxy2 等
// create Proxy class.
String fcn = Proxy.class.getName() + id;
ccm = ClassGenerator.newInstance(cl);
ccm.setClassName(fcn);
ccm.addDefaultConstructor();
ccm.setSuperClass(Proxy.class);
// 为 Proxy 的抽象方法 newInstance 生成实现代码,形如:
// public Object newInstance(java.lang.reflect.InvocationHandler h) {
// return new org.apache.dubbo.proxy0();
// }
ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "(); }");
// 生成 Proxy 实现类
Class<?> pc = ccm.toClass();
// 通过反射创建 Proxy 实例
proxy = (Proxy) pc.newInstance();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
} finally {
// release ClassGenerator
if (ccp != null)
// 释放资源
ccp.release();
if (ccm != null)
ccm.release();
synchronized (cache) {
if (proxy == null)
cache.remove(key);
else
// 写缓存
cache.put(key, new WeakReference<Proxy>(proxy));
// 唤醒其他等待线程
cache.notifyAll();
}
}
return proxy;
}
做了几件事:
1.遍历接口列表,重加载接口类
这里debug的时候接口列表有:HelloService和EchoService
2.ClassGenerator ccp 用于为服务接口生成代理类,比如测试代码中的 HelloService接口,这个接口代理类就是由 ccp 生成的。
3.ClassGenerator ccm
四、查看动态生成的.class文件
找到HelloService接口生成的代理类的.class文件,看一下里面有些什么
1.创建文件hsdb.bat 文件内容如下:
java -classpath "E:\Java\jdk1.8.0_221\lib\sa-jdi.jar" sun.jvm.hotspot.HSDB
2.双击这个文件
3.输入进程ID
4.点击Tools---->Class Browser
5.搜索代理的名字,这个名字是在代码里面打断点看到的。然后点击create .class for this class ,生成的文件会在前面启动HSDB的那个目录下
6.用idea打开看proxy0.class
package com.alibaba.dubbo.common.bytecode;
import com.alibaba.dubbo.common.bytecode.ClassGenerator.DC;
import com.alibaba.dubbo.rpc.service.EchoService;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import org.apache.dubbo.samples.api.client.HelloService;
public class proxy0 implements DC, HelloService, EchoService {
public static Method[] methods;
private InvocationHandler handler;
public proxy0(InvocationHandler var1) {
this.handler = var1;
}
public proxy0() {
}
public String sayHello(String var1) {
Object[] var2 = new Object[]{var1};
Object var3 = this.handler.invoke(this, methods[0], var2);
return (String)var3;
}
public Object $echo(Object var1) {
Object[] var2 = new Object[]{var1};
Object var3 = this.handler.invoke(this, methods[1], var2);
return (Object)var3;
}
}