dubbo如何集成路由
1.默认路由是怎么设置的
消费者服务启动时,会监听注册中心的变更。所以我调用下面的方法
//RegistryDirectory
public synchronized void notify(List<URL> urls) {
// 根据 URL 的分类或协议,分组成三个集合 。
//.......省略代码
// 2.routers
if (routerUrls != null && !routerUrls.isEmpty()) {
List<Router> routers = toRouters(routerUrls);
if (routers != null) { //
setRouters(routers);
}
}
//....
}
protected void setRouters(List<Router> routers) {
//...
// append mock invoker selector
routers.add(new MockInvokersSelector());//这里设置MockInvokersSelector
Collections.sort(routers);
this.routers = routers;
}
//经过上面的代码,我们已经拿到的默认路由
复制代码
2.下面看下我们是怎么使用默认路由的
当消费者引用服务时会走到这里
//AbstractDirectory
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
//抽象方法,子类实现
List<Invoker<T>> invokers = doList(invocation);//5 RegistryDirectory.doList-->6 7
// 根据路由规则,找出 Invoker 集合
List<Router> localRouters = this.routers; //拿到路由MockInvokersSelector
if (localRouters != null && !localRouters.isEmpty()) {
for (Router router : localRouters) {
if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
invokers = router.route(invokers, getConsumerUrl(), invocation);//6 开始使用路由
}
}
}
return invokers;
}
复制代码
3.MockInvokersSelector
整个流程简单,看下代码注释就ok了
public class MockInvokersSelector implements Router {
@Override
public <T> List<Invoker<T>> route(final List<Invoker<T>> invokers, URL url, final Invocation invocation) throws RpcException {
if (invocation.getAttachments() == null) {
// 获得普通 Invoker 集合
return getNormalInvokers(invokers);
} else {
// 获得 "invocation.need.mock" 配置项
String value = invocation.getAttachments().get("invocation.need.mock");
if (value == null)
return getNormalInvokers(invokers);//7
else if (Boolean.TRUE.toString().equalsIgnoreCase(value)) {
return getMockedInvokers(invokers);
}
}// 其它,不匹配,直接返回 `invokers` 集合
return invokers;
}
private <T> List<Invoker<T>> getMockedInvokers(final List<Invoker<T>> invokers) {
if (!hasMockProviders(invokers)) {
return null;
}// 过滤掉普通 kInvoker ,创建 MockInvoker 集合
List<Invoker<T>> sInvokers = new ArrayList<Invoker<T>>(1);
for (Invoker<T> invoker : invokers) {
if (invoker.getUrl().getProtocol().equals("mock")) {
sInvokers.add(invoker);
}
}
return sInvokers;
}
private <T> List<Invoker<T>> getNormalInvokers(final List<Invoker<T>> invokers) {
if (!hasMockProviders(invokers)) {
return invokers;
} else {// 若包含 MockInvoker,过滤掉 MockInvoker ,创建普通 Invoker 集合
List<Invoker<T>> sInvokers = new ArrayList<Invoker<T>>(invokers.size());
for (Invoker<T> invoker : invokers) {
if (!invoker.getUrl().getProtocol().equals("mock")) {
sInvokers.add(invoker);
}
}
return sInvokers;
}
}
private <T> boolean hasMockProviders(final List<Invoker<T>> invokers) {
boolean hasMockProvider = false;
for (Invoker<T> invoker : invokers) {
if (invoker.getUrl().getProtocol().equals("mock")) {
hasMockProvider = true;
break;
}
}
return hasMockProvider;
}
}
复制代码
新加路由是怎么设置进去的
注册中心服务变更的时候-->RegistryDirectory.notify()设置新的路由
怎么筛选出Invoker
//RegistryDirectory
private void refreshInvoker(List<URL> invokerUrls) {
//...
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);//1
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); //2
//..
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
}
复制代码
- 1处:url转成调用者Invoker的列表,这里会有服务禁用、启用功能的实现
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<String>();
String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
for (URL providerUrl : urls) {
//如果reference端配置了protocol,则只选择匹配的protocol
if (queryProtocols != null && queryProtocols.length() > 0) {
boolean accept = false;
String[] acceptProtocols = queryProtocols.split(",");
for (String acceptProtocol : acceptProtocols) {
if (providerUrl.getProtocol().equals(acceptProtocol)) {
accept = true;
break;
}
}
if (!accept) {
continue;
}
}
if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
continue;
}
if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
logger);
continue;
}
URL url = mergeUrl(providerUrl);
String key = url.toFullString(); // URL参数是排序的
if (keys.contains(key)) { // 重复URL
continue;
}
keys.add(key);
// 缓存key为没有合并消费端参数的URL,不管消费端如何合并参数,如果服务端URL发生变化,则重新refer
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // 缓存中没有,重新refer
try {
boolean enabled = true;
if (url.hasParameter("disabled")) {
enabled = !url.getParameter("disabled", false);
} else {
enabled = url.getParameter("enabled", true);
}
if (enabled) {
invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger;
}
if (invoker != null) { // 将新的引用放入缓存
newUrlInvokerMap.put(key, invoker);
}
} else {
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
复制代码
- 2处:
// 按提供者URL所声明的methods分类,兼容注册中心执行路由过滤掉的methods
//即消费者调用了某个方法,会根据方法映射到提供者
private Map<String, List<Invoker<T>>> toMethodInvokers(Map<String, Invoker<T>> invokersMap) {
Map<String, List<Invoker<T>>> newMethodInvokerMap = new HashMap<String, List<Invoker<T>>>();
List<Invoker<T>> invokersList = new ArrayList<Invoker<T>>();
if (invokersMap != null && invokersMap.size() > 0) {
for (Invoker<T> invoker : invokersMap.values()) {
String parameter = invoker.getUrl().getParameter("methods");
if (parameter != null && parameter.length() > 0) {
String[] methods = Constants.COMMA_SPLIT_PATTERN.split(parameter);
if (methods != null && methods.length > 0) {
for (String method : methods) {
if (method != null && method.length() > 0
&& !Constants.ANY_VALUE.equals(method)) {
List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
if (methodInvokers == null) {
methodInvokers = new ArrayList<Invoker<T>>();
newMethodInvokerMap.put(method, methodInvokers);
}
methodInvokers.add(invoker);
}
}
}
}
invokersList.add(invoker);
}
}
List<Invoker<T>> newInvokersList = route(invokersList, null);
newMethodInvokerMap.put(Constants.ANY_VALUE, newInvokersList);
if (serviceMethods != null && serviceMethods.length > 0) {
for (String method : serviceMethods) {
List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
if (methodInvokers == null || methodInvokers.isEmpty()) {
methodInvokers = newInvokersList;
}
newMethodInvokerMap.put(method, route(methodInvokers, method));//路由功能实现
}
}
// sort and unmodifiable
for (String method : new HashSet<String>(newMethodInvokerMap.keySet())) {
List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
Collections.sort(methodInvokers, InvokerComparator.getComparator());
newMethodInvokerMap.put(method, Collections.unmodifiableList(methodInvokers));
}
return Collections.unmodifiableMap(newMethodInvokerMap);
}
复制代码
我们进入路由功能实现的源码
private List<Invoker<T>> route(List<Invoker<T>> invokers, String method) {
Invocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
List<Router> routers = getRouters();
if (routers != null) {// MockInvokersSelector和ConditionRouter
for (Router router : routers) {
if (router.getUrl() != null && !router.getUrl().getParameter("runtime", false)) {
//从invocation参数可看出:是用方法一个一个到路由表里找的
invokers = router.route(invokers, getConsumerUrl(), invocation);//--进ConditionRouter
}
}
}
return invokers;
}
从上面的注释,进入到进ConditionRouter
// 1处,判断消费者是否匹配路由规则的左边
// 比如,路由规则method = find*,list*,get*,is* => host = 11.22.3.91,12.221.3.95,12.232.3.16,
// 规则左边,即判断消费者调用的方法是读方法
// 2处,判断提供者是否匹配路由规则的右边
// 判断提供者是否这几个host的服务,否则不会被映射到方法的提供列表
@Override
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation)
try {
if (!matchWhen(url, invocation)) {//1
return invokers;
}
List<Invoker<T>> result = new ArrayList<Invoker<T>>();
for (Invoker<T> invoker : invokers) {
if (matchThen(invoker.getUrl(), url)) {//2
result.add(invoker);
}
}
return invokers;
}
好了。路由结束了。貌似也没那么高大上。呵呵
下一篇会进入负载均衡。
复制代码