1 Disruptor
1.1 简介
1.1.1 定义
Disruptor
是一个开源的高性能内存队列,由英国外汇交易公司 LMAX 开发的,获得了 2011 年的 Oracle 官方的 Duke's Choice Awards(Duke 选择大奖)。
Disruptor
提供的功能类似于 Kafka
、RocketMQ
这类分布式队列,不过,其作为范围是 JVM
(内存),Disruptor
解决了 JDK
内置线程安全队列的性能和内存安全问题,Disruptor
有个最大的优点就是快
Disruptor
被设计用于在生产者
—消费者
(producer-consumer problem
,简称PCP
)问题上获得尽量高的吞吐量(TPS)和尽量低的延迟
Disruptor
是LMAX在线交易平台的关键组成部分,LMAX平台使用该框架对订单处理速度能达到600万TPS
,除金融领域之外,其他一般的应用中都可以用到 Disruptor
,它可以带来显著的性能提升。其实 Disruptor
与其说是一个框架,不如说是一种设计思路,这个设计思路对于存在并发、缓冲区、生产者—消费者模型、事务处理
这些元素的程序来说,Disruptor
提出了一种大幅提升性能(TPS)的方案。
github 地址
Github 地址:https://github.com/LMAX-Exchange/disruptor
官方教程:https://lmax-exchange.github.io/disruptor/user-guide/index.html
1.1.2 Java中线程安全队列
JDK 中常见的线程安全的队列如下:
队列名字 | 锁 | 是否有界 |
---|---|---|
ArrayBlockingQueue | 加锁(ReentrantLock) | 有界 |
LinkedBlockingQueue | 加锁(ReentrantLock) | 有界 |
LinkedTransferQueue | 无锁(CAS) | 无界 |
ConcurrentLinkedQueue | 无锁(CAS) | 无界 |
从上表中可以看出:这些队列要不就是加锁有界,要不就是无锁无界。而加锁的的队列势必会影响性能,无界的队列又存在内存溢出的风险。
因此,一般情况下,我们都是不建议使用 JDK 内置线程安全队列。
Disruptor
就不一样了!它在无锁的情况下还能保证队列有界,并且还是线程安全的。
1.1.3 Disruptor 核心概念
Disruptor
核心概念:
-
Event
:可以把Event
理解为存放在队列中等待消费的消息对象。
在Disruptor
的语义中,生产者和消费者之间进行交换的数据被称为事件(Event
)。它不是一个被Disruptor
定义的特定类型,而是由Disruptor
的使用者定义并指定。 -
EventFactory
:事件工厂用于生产事件,我们在初始化Disruptor
类的时候需要用到。 -
EventHandler
:Event
在对应的Handler
中被处理,你可以将其理解为生产消费者模型中的消费者。
Disruptor
定义的事件处理接口,由用户实现,用于处理事件,是Consumer
的真正实现 -
EventProcessor
:EventProcessor
持有特定消费者(Consumer
)的Sequence
,并提供用于调用事件处理实现的事件循环(Event Loop
) -
Disruptor
:事件的生产和消费需要用到Disruptor
对象。 -
RingBuffer
:RingBuffer
(环形数组)用于保存事件
。
如其名,环形的缓冲区。曾经RingBuffer
是Disruptor
中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过Disruptor
进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer
可以由用户的自定义实现来完全替代。 -
WaitStrategy
:等待策略。决定了没有事件可以消费的时候,事件消费者如何等待新事件的到来。定义Consumer
如何进行等待下一个事件的策略。(注:Disruptor
定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现) -
Producer
:生产者,只是泛指调用Disruptor
发布事件的用户代码,Disruptor
没有定义特定接口或类型 -
ProducerType
:指定是单个事件发布者模式
还是多个事件发布者模式
(发布者和生产者的意思类似)。 -
Sequencer
:Sequencer
是Disruptor
的真正核心。此接口有两个实现类 -SingleProducerSequencer
、MultiProducerSequencer
,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。 -
Sequence Disruptor
:通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个Sequence
用于跟踪标识某个特定的事件处理者(RingBuffer/Consumer
)的处理进度。
虽然一个AtomicLong
也可以用于标识进度,但定义Sequence
来负责该问题还有另一个目的,那就是防止不同的Sequence
之间的CPU
缓存伪共享(Flase Sharing
)问题。(注:这是Disruptor
实现高性能的关键点之一) -
Sequence Barrier
:用于保持对RingBuffer
的main published Sequence
和Consumer
依赖的其它Consumer
的Sequence
的引用。Sequence Barrier
还定义了决定Consumer
是否还有可处理的事件的逻辑。
1.2 操作
1.2.1 坐标依赖
pom.xml
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.4</version>
</dependency>
Gradle:
implementation 'com.lmax:disruptor:3.4.4'
1.2.2 创建事件
我们先来定义一个代表日志事件的类:LogEvent 。
事件中包含了一些和事件相关的属性,比如我们这里定义的 LogEvent 对象中就有一个用来表示日志消息内容的属性:message。
@Data
public class LogEvent {
private String message;
}
我们这里只是为了演示,实际项目中,一个标准日志事件对象所包含的属性肯定不是只有一个 message
1.2.3 创建事件工厂
创建一个工厂类 LogEventFactory 用来创建 LogEvent 对象。
LogEventFactory 继承 EventFactory
接口并实现了 newInstance()
方法 。
public class LogEventFactory implements EventFactory<LogEvent> {
@Override
public LogEvent newInstance() {
return new LogEvent();
}
}
1.2.4 创建处理事件Handler--消费者
创建一个用于处理后续发布的事件的类:LogEventHandler 。
LogEventHandler 继承 EventHandler
接口并实现了 onEvent()
方法 。
public class LogEventHandler implements EventHandler<LogEvent> {
@Override
public void onEvent(LogEvent logEvent, long sequence, boolean endOfBatch) throws Exception {
System.out.println(logEvent.getMessage());
}
}
EventHandler
接口的 onEvent()
方法共有 3 个参数:
-
event
:待消费/处理
的事件 -
sequence
:正在处理的事件在环形数组(RingBuffer)中的位置 -
endOfBatch
:表示这是否是来自环形数组(RingBuffer)中一个批次的最后一个事件(批量处理事件)
1.2.5 初始化 Disruptor
1.2.5.1 静态类
我们这里定义一个方法用于获取 Disruptor 对象
private static Disruptor<LogEvent> getLogEventDisruptor() {
// 创建 LogEvent 的工厂
LogEventFactory logEventFactory = new LogEventFactory();
// Disruptor 的 RingBuffer 缓存大小
int bufferSize = 1024 * 1024;
// 生产者的线程工厂
ThreadFactory threadFactory = new ThreadFactory() {
final AtomicInteger threadNum = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "LogEventThread" + " [#" + threadNum.incrementAndGet() + "]");
}
};
//实例化 Disruptor
return new Disruptor<>(
logEventFactory,
bufferSize,
threadFactory,
// 单生产者
ProducerType.SINGLE,
// 阻塞等待策略
new BlockingWaitStrategy());
}
1.2.5.2 配置类
使用配置类的方式
@Configuration
public class MQManager {
@Bean("messageModel")
public RingBuffer<LogEvent> messageModelRingBuffer() {
//定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程来触发consumer的事件处理
// 生产者的线程工厂
ThreadFactory threadFactory = new ThreadFactory() {
final AtomicInteger threadNum = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "LogEventThread" + " [#" + threadNum.incrementAndGet() + "]");
}
};
//指定事件工厂
LogEventFactory factory = new LogEventFactory();
//指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率
int bufferSize = 1024 * 256;
//单线程模式,获取额外的性能
Disruptor<LogEvent> disruptor = new Disruptor<>(factory,
bufferSize,
threadFactory,
ProducerType.SINGLE,
new BlockingWaitStrategy());
//设置事件业务处理器---消费者
//Disruptor 的 handleEventsWith 方法来绑定处理事件的 Handler 对象。
disruptor.handleEventsWith(new LogEventHandler ());
// Disruptor 可以设置多个处理事件的 Handler,并且可以灵活的设置消费者的处理顺序,串行,并行都是可以的。
//就比如下面的代码表示 Handler1 和 Handler2 是并行执行,最后再执行 Handler3 。
//disruptor.handleEventsWith(new Handler1(), new Handler2()).handleEventsWith(new Handler3());
// 启动disruptor线程
disruptor.start();
//获取ringbuffer环,用于接取生产者生产的事件
RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
return ringBuffer;
}
1.2.5.3 Disruptor 构造函数讲解
Disruptor 的推荐使用的构造函数如下:
public class Disruptor<T> {
public Disruptor(
final EventFactory<T> eventFactory,
final int ringBufferSize,
final ThreadFactory threadFactory,
final ProducerType producerType,
final WaitStrategy waitStrategy)
{
this(
RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
new BasicExecutor(threadFactory));
}
......
}
我们需要传递 5 个参数:
-
eventFactory
:我们自定义的事件工厂。 -
ringBufferSize
:指定RingBuffer
的容量大小。 -
threadFactory
:自定义的线程工厂。Disruptor 的默认线程池是自定义的,我们只需要传入线程工厂即可。 -
producerType
:指定是单个事件发布者模式还是多个事件发布者模式(发布者和生产者的意思类似,我个人比较喜欢用发布者)。 -
waitStrategy
:等待策略,决定了没有事件可以消费的时候,事件消费者如何等待新事件的到来。
ProducerType
的源码如下,它是一个包含两个变量的枚举类型
-
SINGLE
:单个事件发布者模式,不需要保证线程安全。 -
MULTI
:多个事件发布者模式,基于 CAS 来保证线程安全。
WaitStrategy
(等待策略)接口的实现类中只有两个方法:
-
waitFor()
:等待新事件的到来。 -
signalAllWhenBlocking()
:唤醒所有等待的消费者。
public interface WaitStrategy
{
long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier)
throws AlertException, InterruptedException, TimeoutException;
void signalAllWhenBlocking();
}
WaitStrategy 的实现类共有 8 个,也就是说共有 8 种等待策略可供选择。
除了上面介绍的这个构造函数之外,Disruptor 还有一个只有 3 个参数构造函数。
使用这个构造函数创建的 Disruptor
对象会默认使用 ProducerType.MULTI
(多个事件发布者模式)和 BlockingWaitStrategy
(阻塞等待策略) 。
public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory)
{
this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
}
1.2.6 发布事件
1.2.6.1 main方法测试
//获取 Disruptor 对象
Disruptor<LogEvent> disruptor = getLogEventDisruptor();
//绑定处理事件的Handler对象
disruptor.handleEventsWith(new LogEventHandler());
//启动 Disruptor
disruptor.start();
//获取保存事件的环形数组(RingBuffer)
RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
//发布 10w 个事件
for (int i = 1; i <= 100000; i++) {
// 通过调用 RingBuffer 的 next() 方法获取下一个空闲事件槽的序号
long sequence = ringBuffer.next();
try {
LogEvent logEvent = ringBuffer.get(sequence);
// 初始化 Event,对其赋值
logEvent.setMessage("这是第%d条日志消息".formatted(i));
} finally {
// 发布事件
ringBuffer.publish(sequence);
}
}
// 关闭 Disruptor
disruptor.shutdown();
1.2.6.2 使用配置方式
public interface DisruptorMqService {
/**
* 消息
* @param message
*/
void sayHelloMq(String message);
}
@Slf4j
@Component
@Service
public class DisruptorMqServiceImpl implements DisruptorMqService {
@Autowired
private RingBuffer<LogEvent> messageModelRingBuffer;
@Override
public void sayHelloMq(String message) {
log.info("record the message: {}",message);
//获取下一个Event槽的下标
long sequence = messageModelRingBuffer.next();
try {
//给Event填充数据
MessageModel event = messageModelRingBuffer.get(sequence);
event.setMessage(message);
log.info("往消息队列中添加消息:{}", event);
} catch (Exception e) {
log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());
} finally {
//发布Event,激活观察者去消费,将sequence传递给改消费者
//注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer
messageModelRingBuffer.publish(sequence);
}
}
}