当前位置: 首页>后端>正文

Disruptor quickStart!

??说到队列,大家都很熟悉,像生活中不管是吃饭还是买东西基本上都会遇到排队,先排队的人先付款,不允许插队,否则可能会出现下面的情况:

Disruptor quickStart!,第1张

先进先出,这就是典型的“队列”。

简单回顾jdk里的队列

这里简单讲一下以下俩种队列

1、阻塞队列:

ArrayBlockingQueue: Object[] + count + lock.condition(notEmpty、notFull)

入队:

不阻塞:add、offer 满了直接报错

阻塞:put 满了:notFull.await();(当出队和删除元素时唤醒put操作)

出队

take():当空时,notEmpty.await();当有元素入队时唤醒.

poll():当空时直接返回null

LinkedBlockingQueue:Node实现、加锁(读锁、写锁分离)、可选的有界队列。需要考虑实际使用中的内存问题,防止溢出。

应用:Eexcutors默认是使用LinkedBlockingQueue,但是在实际应用中,更应该手动创建线程池使用有界队列,防止生产者生产过快,导致内存溢出。

2、延迟队列:

DelayQueue : PriorityQueue + Lock.condition + leader

PriorityQueue优先级队列

condition 延迟等待

leader 避免不必要的kong等待

方法:

getDelay()延迟时间

compareTo()通过该方法比较从PriorityQueue里取值

入队:

add、put、offer:入队时会将换唤醒等待中的线程,进行一次出队处理

出队:

take()阻塞:

1、如果队列里无数据,元素入队时会被唤醒

2、有数据,会阻塞至时间满足

poll():满足队列有数据并且delay时间不大于0会取出元素,否则立即返回null—可能会抢占成为leader

还有优先级队列等就不一一细说,有兴趣的同学可以去看一下。

应用:延时任务:设置任务延迟多久执行;需要设置过期值的处理,例如缓存过期,实现方式:每次getDealy()方法提供一个缓存创建时间与当前时间的差值,出队时compareTo()方法取差值最小的。每次入队时都会重新取出队列里差值最小的值进行处理。

??我们使用队列的,更多的是像生产者、消费者这种场景。这种场景大多数情况又对处理速度有着要求,所以我们会使用多线程技术。使用多线程就可能会出现并发,为了避免出错,我们会选择线程安全的队列。例如ArrayBlockingQueue、LinkedBlockingQueue或者是ConcurrentLinkedQueue,前俩者是通过加锁取实现,后面一种是通过cas去实现线程安全。但是又要考虑到生产者过快可能造出的内存溢出的问题,所以看起来ArrayBlockingQueue是最符合要求的。但是恰恰加锁效率又是最慢的,所以就引出了我们今天需要讨论的主题:Disruptor!

比较:

ArrayBlockingQueue VS Disruptor

看代码。。。

介绍

??Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易。这个系统是建立在JVM平台上,其核心是一个业务逻辑处理器,它能够在一个线程里每秒处理6百万订单。业务逻辑处理器完全是运行在内存中(圈起来要考),使用事件源驱动方式。业务逻辑处理器的核心是Disruptor。

接下来我们来看一下disruptor是如何做到无阻塞、多生产、多消费的。

Disruptor quickStart!,第2张

EventFactory:创建消息(任务)的工厂类

ringBufferSize:容器的长度

Executor:消费者线程池,执行任务的线程

ProductType:生产者类型:单生产者、多生产者

WaitStrategy:等待策略

下面简单看一下disruptor的代码。

看代码。。。

可以看出在调用了start()方法后,消费者线程就已经开启,其中涉及到一个重要的概念:EventProcessor

BatchEventProcessor主要事件循环,处理disruptor中的event,拥有消费者的Sequence

Disruptor quickStart!,第3张

另一个核心概念:RingBuffer:它是一个首尾相接的环状的容器,用来在多线程中传递数据。可以看到我们进行生产者时,先从ringbuffer里拿,再进行投递。

Disruptor quickStart!,第4张

这里使用next()获得的序号为数组中下一个可用的元素,再get(seq)获取到该位置的元素,再进行赋值处理。

这里的序号是如何产生的呢?

Sequence:顺序递增的序号来编号,管理交换的数据。生产者和消费者都会有维护自己的Sequence,通过进行比较,来平衡生产者和消费者的关系。消除伪共享(填充缓存行)。

Sequencer:在生产者和消费者之间快速、正确的传递数据的并发算法

Sequence Barrier:序号栅栏,用来平衡生产者和消费者之间的关系

Disruptor quickStart!,第5张

上面说到ringBuffer有定义长度,说明是一个有界的队列,那么可能会出现以下俩种情况:当消费者消费速度大于生产者生产者速度,生产者还未来得及往队列写入,或者生产者生产速度大于消费者消费速度,此时怎么办呢?

常用的WaitStrategy等待策略(消费者等待)

BlockingWaitStrategy使用了锁,低效的策略。

SleepingWaitStrategy对生产者线程的影响最小,适合用于异步日志类似的场景。(不加锁空等)

YieldingWaitStrategy性能最好,适合用于低延迟的系统,在要求极高性能且之间处理线数小于cpu逻辑核心数的场景中,推荐使用。(无锁策略。主要是使用了Thread.yield()多线程交替执行)

至此,disruptor的基本核心概念已经介绍完毕!

Disruptor多边形操作:

Disruptor quickStart!,第6张

如何实现第一张图里的多边形操作?


disruptor.handleEventsWith(E1, E2);

disruptor.after(E1).handleEventsWith(E3);

disruptor.after(E2).handleEventsWith(E4);

disruptor.after(E3, E4).handleEventsWith(E5);

有兴趣的同学可以试一下!

再了解了disruptor的核心概念和看了代码之后,就可以继续学习disruptor的多生产多消费模型了,disruptor的多线程才能发挥真正的力量!

多生产多消费模型

Disruptor quickStart!,第7张

简单看一下代码。。。

Disruptor quickStart!,第8张

简单分析,多个生产者同时向ringbuffer投递数据,假设此时俩个生产者将ringbuffer已经填满,因为sequence的序号是自增+1(若不满足获取条件则循环挂起当前线程),所以生产的时候能保证线程安全,只需要一个sequence即可。当多消费者来消费的时候,因为消费速度不同,例如消费者1来消费0、1,消费者2消费2、4,消费者3消费3。当消费者消费完0后,消费者2消费完2后,消费者3消费完3后,生产者再往队列投递数据时,其他位置还未被消费,会投递到第0个位置, 此时再想投递数据时,虽然消费2的第二个位置空缺、消费者3的第三个位置空缺,消费者还在消费1时,无法继续投递。因为是通过比较消费者自身维护的sequence的最小的序号,来进行比较。

应用:

Apache Storm、Camel、Log4j 2

Log4j2 example:

使用了实现EventTranslator的提交机制。

Disruptor quickStart!,第9张

可参考美团文章:https://tech.meituan.com/2016/11/18/disruptor.html中指出:美团在公司内部统一推行日志接入规范,要求必须使用Log4j 2,使普通单机QPS的上限不再只停留在几千,极高地提升了服务性能。

over。~!


https://www.xamrdz.com/backend/3b71930499.html

相关文章: