??说到队列,大家都很熟悉,像生活中不管是吃饭还是买东西基本上都会遇到排队,先排队的人先付款,不允许插队,否则可能会出现下面的情况:
先进先出,这就是典型的“队列”。
简单回顾jdk里的队列
这里简单讲一下以下俩种队列
1、阻塞队列:
ArrayBlockingQueue: Object[] + count + lock.condition(notEmpty、notFull)
入队:
不阻塞:add、offer 满了直接报错
阻塞:put 满了:notFull.await();(当出队和删除元素时唤醒put操作)
出队:
take():当空时,notEmpty.await();当有元素入队时唤醒.
poll():当空时直接返回null
LinkedBlockingQueue:Node实现、加锁(读锁、写锁分离)、可选的有界队列。需要考虑实际使用中的内存问题,防止溢出。
应用:Eexcutors默认是使用LinkedBlockingQueue,但是在实际应用中,更应该手动创建线程池使用有界队列,防止生产者生产过快,导致内存溢出。
2、延迟队列:
DelayQueue : PriorityQueue + Lock.condition + leader
PriorityQueue优先级队列
condition 延迟等待
leader 避免不必要的kong等待
方法:
getDelay()延迟时间
compareTo()通过该方法比较从PriorityQueue里取值
入队:
add、put、offer:入队时会将换唤醒等待中的线程,进行一次出队处理
出队:
take()阻塞:
1、如果队列里无数据,元素入队时会被唤醒
2、有数据,会阻塞至时间满足
poll():满足队列有数据并且delay时间不大于0会取出元素,否则立即返回null—可能会抢占成为leader
还有优先级队列等就不一一细说,有兴趣的同学可以去看一下。
应用:延时任务:设置任务延迟多久执行;需要设置过期值的处理,例如缓存过期,实现方式:每次getDealy()方法提供一个缓存创建时间与当前时间的差值,出队时compareTo()方法取差值最小的。每次入队时都会重新取出队列里差值最小的值进行处理。
??我们使用队列的,更多的是像生产者、消费者这种场景。这种场景大多数情况又对处理速度有着要求,所以我们会使用多线程技术。使用多线程就可能会出现并发,为了避免出错,我们会选择线程安全的队列。例如ArrayBlockingQueue、LinkedBlockingQueue或者是ConcurrentLinkedQueue,前俩者是通过加锁取实现,后面一种是通过cas去实现线程安全。但是又要考虑到生产者过快可能造出的内存溢出的问题,所以看起来ArrayBlockingQueue是最符合要求的。但是恰恰加锁效率又是最慢的,所以就引出了我们今天需要讨论的主题:Disruptor!
比较:
ArrayBlockingQueue VS Disruptor
看代码。。。
介绍
??Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易。这个系统是建立在JVM平台上,其核心是一个业务逻辑处理器,它能够在一个线程里每秒处理6百万订单。业务逻辑处理器完全是运行在内存中(圈起来要考),使用事件源驱动方式。业务逻辑处理器的核心是Disruptor。
接下来我们来看一下disruptor是如何做到无阻塞、多生产、多消费的。
EventFactory:创建消息(任务)的工厂类
ringBufferSize:容器的长度
Executor:消费者线程池,执行任务的线程
ProductType:生产者类型:单生产者、多生产者
WaitStrategy:等待策略
下面简单看一下disruptor的代码。
看代码。。。
可以看出在调用了start()方法后,消费者线程就已经开启,其中涉及到一个重要的概念:EventProcessor:
BatchEventProcessor主要事件循环,处理disruptor中的event,拥有消费者的Sequence
另一个核心概念:RingBuffer:它是一个首尾相接的环状的容器,用来在多线程中传递数据。可以看到我们进行生产者时,先从ringbuffer里拿,再进行投递。
这里使用next()获得的序号为数组中下一个可用的元素,再get(seq)获取到该位置的元素,再进行赋值处理。
这里的序号是如何产生的呢?
Sequence:顺序递增的序号来编号,管理交换的数据。生产者和消费者都会有维护自己的Sequence,通过进行比较,来平衡生产者和消费者的关系。消除伪共享(填充缓存行)。
Sequencer:在生产者和消费者之间快速、正确的传递数据的并发算法
Sequence Barrier:序号栅栏,用来平衡生产者和消费者之间的关系
上面说到ringBuffer有定义长度,说明是一个有界的队列,那么可能会出现以下俩种情况:当消费者消费速度大于生产者生产者速度,生产者还未来得及往队列写入,或者生产者生产速度大于消费者消费速度,此时怎么办呢?
常用的WaitStrategy等待策略(消费者等待)
BlockingWaitStrategy使用了锁,低效的策略。
SleepingWaitStrategy对生产者线程的影响最小,适合用于异步日志类似的场景。(不加锁空等)
YieldingWaitStrategy性能最好,适合用于低延迟的系统,在要求极高性能且之间处理线数小于cpu逻辑核心数的场景中,推荐使用。(无锁策略。主要是使用了Thread.yield()多线程交替执行)
至此,disruptor的基本核心概念已经介绍完毕!
Disruptor多边形操作:
如何实现第一张图里的多边形操作?
disruptor.handleEventsWith(E1, E2);
disruptor.after(E1).handleEventsWith(E3);
disruptor.after(E2).handleEventsWith(E4);
disruptor.after(E3, E4).handleEventsWith(E5);
有兴趣的同学可以试一下!
再了解了disruptor的核心概念和看了代码之后,就可以继续学习disruptor的多生产多消费模型了,disruptor的多线程才能发挥真正的力量!
多生产多消费模型:
简单看一下代码。。。
简单分析,多个生产者同时向ringbuffer投递数据,假设此时俩个生产者将ringbuffer已经填满,因为sequence的序号是自增+1(若不满足获取条件则循环挂起当前线程),所以生产的时候能保证线程安全,只需要一个sequence即可。当多消费者来消费的时候,因为消费速度不同,例如消费者1来消费0、1,消费者2消费2、4,消费者3消费3。当消费者消费完0后,消费者2消费完2后,消费者3消费完3后,生产者再往队列投递数据时,其他位置还未被消费,会投递到第0个位置, 此时再想投递数据时,虽然消费2的第二个位置空缺、消费者3的第三个位置空缺,消费者还在消费1时,无法继续投递。因为是通过比较消费者自身维护的sequence的最小的序号,来进行比较。
应用:
Apache Storm、Camel、Log4j 2
Log4j2 example:
使用了实现EventTranslator的提交机制。
可参考美团文章:https://tech.meituan.com/2016/11/18/disruptor.html中指出:美团在公司内部统一推行日志接入规范,要求必须使用Log4j 2,使普通单机QPS的上限不再只停留在几千,极高地提升了服务性能。
over。~!