当前位置: 首页>编程语言>正文

dubbo 支持mono flux dubbo monitorservice

文章目录

  • 开启dubbo monitor
  • 原理分析

开启dubbo monitor

监控中心配置。对应的配置类: org.apache.dubbo.config.MonitorConfig
开启有两种方法, 2. 通过注册中心发现服务

  1. 直连监控中心的服务
<dubbo:monitor address="dubbo//127.0.0.1:18109" />
  1. 通过注册中心
<dubbo:monitor protocol="registry"/>

dubbo 支持mono flux dubbo monitorservice,dubbo 支持mono flux dubbo monitorservice_dubbo 支持mono flux,第1张

原理分析

<dubbo:monitor …/>

如果项目中的dubbo配置有增加dubbo:monitor标签,在dubbo的MonitorFilter中会检测该标签,如果有的话,就会调用monitor服务进行统计

MonitorFilter.java

// intercepting invocation
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) { // 匹配monitor
            RpcContext context = RpcContext.getContext(); // provider must fetch context before invoke() gets called
            String remoteHost = context.getRemoteHost();
            long start = System.currentTimeMillis(); // record start timestamp
            getConcurrent(invoker, invocation).incrementAndGet(); // count up
            try {
                Result result = invoker.invoke(invocation); // proceed invocation chain
                 // 收集接口调用信息
                collect(invoker, invocation, result, remoteHost, start, false); 
               
                return result;
            } catch (RpcException e) {
                collect(invoker, invocation, null, remoteHost, start, true);
                throw e;
            } finally {
                getConcurrent(invoker, invocation).decrementAndGet(); // count down
            }
        } else {
            return invoker.invoke(invocation);
        }
    }

获取dubboMonitor

MonitorFilter.java中有个方法collect,会通过monitorFactory创建moniter,并且收集信息
dubboMonitor创建过程

  1. 执行MonitorFilter中collect
  2. dubbo 支持mono flux dubbo monitorservice,dubbo 支持mono flux dubbo monitorservice_sed_02,第2张

  3. 执行DubboMonitoryFactory中createMonitor方法,创建dubboMoniter对象
  4. dubbo 支持mono flux dubbo monitorservice,dubbo 支持mono flux dubbo monitorservice_dubbo_03,第3张

  5. 执行dubboMonitor构造方法
    注意这个dubboMonitor构造方法里面,会传入一个moniterService,这个就是通过protocal.refer从注册中心中获取到的monitorService实现,这个实现我们可以自定义,自定义监控。 (dubbo-monitor-simple监控工具就实现了这个接口)
  6. dubbo 支持mono flux dubbo monitorservice,dubbo 支持mono flux dubbo monitorservice_dubbo_04,第4张

接口信息收集

回到MonitorFilter.java中,拿到dubboMonitor之后,就执行dubboMonitor中的collect方法,收集接口调用信息,将接口调用信息放入一个Map集合中(statisticsMap),将接口调用次数进行累加统计,存放再本地集合Map中。

dubbo 支持mono flux dubbo monitorservice,dubbo 支持mono flux dubbo monitorservice_dubbo 支持mono flux_05,第5张

怎样的算是同一个Statistics 对象,其实equals 方法就是比对这堆属性值,hashCode 也是分别以这些属性值的hashcode算的。

索引位置指标:

0:监控周期总调用成功次数

1:监控周期总调用失败次数

2:监控周期总请求字节数

3:监控周期总响应字节数

4:监控周期总响应时间

5:监控周期均并发数

6:监控周期最大请求字节数

7:监控周期最大响应字节数

8:监控周期最大响应时间

9:监控周期最大并发数

这个监控周期就是上报数据定时任务的周期,也就是monitorInterval 变量的值。

statisticsMap的数据结构

dubbo 支持mono flux dubbo monitorservice,dubbo 支持mono flux dubbo monitorservice_dubbo 支持mono flux_06,第6张

statisticsMap.putIfAbsent(statistics, new AtomicReference<long[]>());
  1. key: key是Statistics,这个是有接口的名称,版本号,group,服务提供者、消费者的地址组成,并且重写了hashCode 和equals方法,保证上述信息相同时候,就算是同一个对象。
  2. value: value是一个原子类型的AtomicReference,防止并发调用导致的数据统计错误,
    AtomicReference里面的value是由volatile(内存屏障),每次设置值都是通过cas操作,保证线程安全,同时避免加锁和释放锁引起的性能开销。
    value的内容是一个long数组,数组是有接口调用总成功次数、总失败次数,总响应时间、最大响应时间、最大并发数等信息。

远程发送调用信息

在第2步中获取dubboMonitor对象时候,会创建一个定时任务线程池,定时将本地收集的服务调用信息发往远程服务。使用ScheduledExecutorService.scheduleWithFixedDelay();

dubbo 支持mono flux dubbo monitorservice,dubbo 支持mono flux dubbo monitorservice_字节数_07,第7张

任务周期:启动服务1分钟后发送,下一次在上一次发送完成后+1分钟发送(60000ms,可配置)。

public void send() {
        logger.debug("Send statistics to monitor " + getUrl());
        // 时间
        String timestamp = String.valueOf(System.currentTimeMillis());
        for (Map.Entry<Statistics, AtomicReference<long[]>> entry : statisticsMap.entrySet()) {
            // get statistics data
            Statistics statistics = entry.getKey();
            AtomicReference<long[]> reference = entry.getValue();
            long[] numbers = reference.get();
            long success = numbers[0];
            long failure = numbers[1];
            long input = numbers[2];
            long output = numbers[3];
            long elapsed = numbers[4];
            long concurrent = numbers[5];
            long maxInput = numbers[6];
            long maxOutput = numbers[7];
            long maxElapsed = numbers[8];
            long maxConcurrent = numbers[9];
            String version = getUrl().getParameter(Constants.DEFAULT_PROTOCOL);
            // send statistics data
            URL url = statistics.getUrl()
                    .addParameters(MonitorService.TIMESTAMP, timestamp,
                            MonitorService.SUCCESS, String.valueOf(success),
                            MonitorService.FAILURE, String.valueOf(failure),
                            MonitorService.INPUT, String.valueOf(input),
                            MonitorService.OUTPUT, String.valueOf(output),
                            MonitorService.ELAPSED, String.valueOf(elapsed),
                            MonitorService.CONCURRENT, String.valueOf(concurrent),
                            MonitorService.MAX_INPUT, String.valueOf(maxInput),
                            MonitorService.MAX_OUTPUT, String.valueOf(maxOutput),
                            MonitorService.MAX_ELAPSED, String.valueOf(maxElapsed),
                            MonitorService.MAX_CONCURRENT, String.valueOf(maxConcurrent),
                            Constants.DEFAULT_PROTOCOL, version
                    );
            // 
            // 通过构造方法传入的monitorService,远程发送接口信息,用于监控
            // 这个是从注册中心加载出来的,详细看上面的“获取monitor”中的createMonitor方法
            monitorService.collect(url);
            // reset
            long[] current;
            long[] update = new long[LENGTH];
            do {
                current = reference.get();
                if (current == null) {// 这里是重置
                    update[0] = 0;/// 监控周期内总成功次数
                    update[1] = 0;/// 监控周期内总失败次数
                    update[2] = 0;/// 监控周期内接收请求字节数
                    update[3] = 0;/// 监控周期内接收响应字节数
                    update[4] = 0;/// 监控周期内总响应时间
                    update[5] = 0;/// 监控周期内平均并发数
                } else {// 这个周期发送完了,减去这个周期的的统计数,剩下的就是下一个周期的
                    update[0] = current[0] - success;
                    update[1] = current[1] - failure;
                    update[2] = current[2] - input;
                    update[3] = current[3] - output;
                    update[4] = current[4] - elapsed;
                    update[5] = current[5] - concurrent;
                }
                // cas 替换监控指标值数组
            } while (!reference.compareAndSet(current, update));
        }
    }


https://www.xamrdz.com/lan/5o91937226.html

相关文章: