文章目录
- 开启dubbo monitor
- 原理分析
开启dubbo monitor
监控中心配置。对应的配置类: org.apache.dubbo.config.MonitorConfig
开启有两种方法, 2. 通过注册中心发现服务
- 直连监控中心的服务
<dubbo:monitor address="dubbo//127.0.0.1:18109" />
- 通过注册中心
<dubbo:monitor protocol="registry"/>
原理分析
<dubbo:monitor …/>
如果项目中的dubbo配置有增加dubbo:monitor标签,在dubbo的MonitorFilter中会检测该标签,如果有的话,就会调用monitor服务进行统计
MonitorFilter.java
// intercepting invocation
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) { // 匹配monitor
RpcContext context = RpcContext.getContext(); // provider must fetch context before invoke() gets called
String remoteHost = context.getRemoteHost();
long start = System.currentTimeMillis(); // record start timestamp
getConcurrent(invoker, invocation).incrementAndGet(); // count up
try {
Result result = invoker.invoke(invocation); // proceed invocation chain
// 收集接口调用信息
collect(invoker, invocation, result, remoteHost, start, false);
return result;
} catch (RpcException e) {
collect(invoker, invocation, null, remoteHost, start, true);
throw e;
} finally {
getConcurrent(invoker, invocation).decrementAndGet(); // count down
}
} else {
return invoker.invoke(invocation);
}
}
获取dubboMonitor
MonitorFilter.java中有个方法collect,会通过monitorFactory创建moniter,并且收集信息
dubboMonitor创建过程
- 执行MonitorFilter中collect
- 执行DubboMonitoryFactory中createMonitor方法,创建dubboMoniter对象
- 执行dubboMonitor构造方法
注意这个dubboMonitor构造方法里面,会传入一个moniterService,这个就是通过protocal.refer从注册中心中获取到的monitorService实现,这个实现我们可以自定义,自定义监控。 (dubbo-monitor-simple监控工具就实现了这个接口)
接口信息收集
回到MonitorFilter.java中,拿到dubboMonitor之后,就执行dubboMonitor中的collect方法,收集接口调用信息,将接口调用信息放入一个Map集合中(statisticsMap),将接口调用次数进行累加统计,存放再本地集合Map中。
怎样的算是同一个Statistics 对象,其实equals 方法就是比对这堆属性值,hashCode 也是分别以这些属性值的hashcode算的。
索引位置指标:
0:监控周期总调用成功次数
1:监控周期总调用失败次数
2:监控周期总请求字节数
3:监控周期总响应字节数
4:监控周期总响应时间
5:监控周期均并发数
6:监控周期最大请求字节数
7:监控周期最大响应字节数
8:监控周期最大响应时间
9:监控周期最大并发数
这个监控周期就是上报数据定时任务的周期,也就是monitorInterval 变量的值。
statisticsMap的数据结构
statisticsMap.putIfAbsent(statistics, new AtomicReference<long[]>());
- key: key是Statistics,这个是有接口的名称,版本号,group,服务提供者、消费者的地址组成,并且重写了hashCode 和equals方法,保证上述信息相同时候,就算是同一个对象。
- value: value是一个原子类型的AtomicReference,防止并发调用导致的数据统计错误,
AtomicReference里面的value是由volatile(内存屏障),每次设置值都是通过cas操作,保证线程安全,同时避免加锁和释放锁引起的性能开销。
value的内容是一个long数组,数组是有接口调用总成功次数、总失败次数,总响应时间、最大响应时间、最大并发数等信息。
远程发送调用信息
在第2步中获取dubboMonitor对象时候,会创建一个定时任务线程池,定时将本地收集的服务调用信息发往远程服务。使用ScheduledExecutorService.scheduleWithFixedDelay();
任务周期:启动服务1分钟后发送,下一次在上一次发送完成后+1分钟发送(60000ms,可配置)。
public void send() {
logger.debug("Send statistics to monitor " + getUrl());
// 时间
String timestamp = String.valueOf(System.currentTimeMillis());
for (Map.Entry<Statistics, AtomicReference<long[]>> entry : statisticsMap.entrySet()) {
// get statistics data
Statistics statistics = entry.getKey();
AtomicReference<long[]> reference = entry.getValue();
long[] numbers = reference.get();
long success = numbers[0];
long failure = numbers[1];
long input = numbers[2];
long output = numbers[3];
long elapsed = numbers[4];
long concurrent = numbers[5];
long maxInput = numbers[6];
long maxOutput = numbers[7];
long maxElapsed = numbers[8];
long maxConcurrent = numbers[9];
String version = getUrl().getParameter(Constants.DEFAULT_PROTOCOL);
// send statistics data
URL url = statistics.getUrl()
.addParameters(MonitorService.TIMESTAMP, timestamp,
MonitorService.SUCCESS, String.valueOf(success),
MonitorService.FAILURE, String.valueOf(failure),
MonitorService.INPUT, String.valueOf(input),
MonitorService.OUTPUT, String.valueOf(output),
MonitorService.ELAPSED, String.valueOf(elapsed),
MonitorService.CONCURRENT, String.valueOf(concurrent),
MonitorService.MAX_INPUT, String.valueOf(maxInput),
MonitorService.MAX_OUTPUT, String.valueOf(maxOutput),
MonitorService.MAX_ELAPSED, String.valueOf(maxElapsed),
MonitorService.MAX_CONCURRENT, String.valueOf(maxConcurrent),
Constants.DEFAULT_PROTOCOL, version
);
//
// 通过构造方法传入的monitorService,远程发送接口信息,用于监控
// 这个是从注册中心加载出来的,详细看上面的“获取monitor”中的createMonitor方法
monitorService.collect(url);
// reset
long[] current;
long[] update = new long[LENGTH];
do {
current = reference.get();
if (current == null) {// 这里是重置
update[0] = 0;/// 监控周期内总成功次数
update[1] = 0;/// 监控周期内总失败次数
update[2] = 0;/// 监控周期内接收请求字节数
update[3] = 0;/// 监控周期内接收响应字节数
update[4] = 0;/// 监控周期内总响应时间
update[5] = 0;/// 监控周期内平均并发数
} else {// 这个周期发送完了,减去这个周期的的统计数,剩下的就是下一个周期的
update[0] = current[0] - success;
update[1] = current[1] - failure;
update[2] = current[2] - input;
update[3] = current[3] - output;
update[4] = current[4] - elapsed;
update[5] = current[5] - concurrent;
}
// cas 替换监控指标值数组
} while (!reference.compareAndSet(current, update));
}
}