一、Reactor线程
??源码基于4.1.6.Final版本。
1.1 Reactor线程启动
??NioEventLoop的run方法是reactor线程的主体,在第一次添加任务的时候被启动。
- 入口:NioEventLoop父类SingleThreadEventExecutor的execute方法
@Override
public void execute(Runnable task) {
...
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
...
}
...
}
- netty的reactor线程在添加一个任务的时候被创建,该线程实体为FastThreadLocalThread。
- 最后线程执行主体为NioEventLoop的run方法。
1.2 Reactor线程执行
@Override
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
processSelectedKeys();
runAllTasks(...);
}
} catch (Throwable t) {
handleLoopException(t);
}
...
}
- 轮询注册到reactor线程对应的selector上的所有的channel的IO事件
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
定时任务截止事时间快到了,中断本次轮询
轮询过程中发现有任务加入,中断本次轮询
阻塞式select操作
??netty会在每次进行selector.select(timeoutMillis)之前记录一下开始时间currentTimeNanos,在select之后记录一下结束时间,判断select操作是否至少持续了timeoutMillis秒,如果持续的时间大于等于timeoutMillis,说明就是一次有效的轮询,重置selectCnt标志,否则,表明该阻塞方法并没有阻塞这么长时间,可能触发了jdk的空轮询bug,当空轮询的次数超过一个阀值的时候,默认是512,就开始重建selector。
- 处理产生网络IO事件的channel
processSelectedKeys();
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
取出IO事件以及对应的netty channel类
处理该channel
判断是否该再来次轮询
对于boss NioEventLoop来说,轮询到的是基本上就是连接事件,后续的事情就通过他的pipeline将连接扔给一个worker NioEventLoop处理
对于worker NioEventLoop来说,轮询到的基本上都是io读写事件,后续的事情就是通过他的pipeline将读取到的字节流传递给每个channelHandler来处理SelectedSelectionKeySet
??netty使用数组替换掉jdk原生的HashSet来保证IO事件的高效处理,每个SelectionKey上绑定了netty类AbstractChannel对象作为attachment,在处理每个SelectionKey的时候,就可以找到AbstractChannel,然后通过pipeline的方式将处理串行到ChannelHandler,回调到用户方法。
- 处理任务队列
用户自定义普通任务
非当前reactor线程调用channel的各种方法
用户自定义定时任务:1)若干时间后执行一次 2)每隔一段时间执行一次 3)每次执行结束,隔一定时间再执行一次
??taskQueue在NioEventLoop中默认是mpsc队列,mpsc队列,即多生产者单消费者队列,netty使用mpsc,方便的将外部线程的task聚集,在reactor线程内部用单线程来串行执行。
??reactor线程task调度:
- 从scheduledTaskQueue转移到期的定时任务到taskQueue(mpsc queue)
- 计算本次任务循环的截止时间
- 执行任务
- 收尾
当前reactor线程调用当前eventLoop执行任务,直接执行,否则,添加到任务队列稍后执行
netty内部的任务分为普通任务和定时任务,分别落地到MpscQueue和PriorityQueue
netty每次执行任务循环之前,会将已经到期的定时任务从PriorityQueue转移到MpscQueue
netty每隔64个任务检查一下是否该退出任务循环
二、服务端启动
b.bind(8888).sync();
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
private ChannelFuture doBind(final SocketAddress localAddress) {
//...
final ChannelFuture regFuture = initAndRegister();
//...
final Channel channel = regFuture.channel();
//...
doBind0(regFuture, channel, localAddress, promise);
//...
return promise;
}
- new一个channel
用户调用方法Bootstrap.bind(port)第一步就是通过反射的方式new一个NioServerSocketChannel对象,并且在new的过程中创建了一系列的核心组件。
- init这个channel
设置option和attr
设置新接入channel的option和attr
加入新连接处理器
- 将这个channel register到某个对象
1)设置启动类参数,最重要的就是设置channel创建server对应的channel,创建各大组件,包括ChannelConfig,ChannelId,ChannelPipeline,ChannelHandler,Unsafe等
2)初始化server对应的channel,设置一些attr,option,以及设置子channel的attr,option,给server的channel添加新channel接入器,并触发addHandler,register等事件
3)调用jdk底层做端口绑定,并触发active事件,active触发的时候,真正做服务端口绑定
三、新连接接入
所有的channel底层都会有一个与unsafe绑定,每种类型的channel实际的操作都由unsafe来实现
??流水线的开始就是HeadContxt,流水线的结束就是TailConext,HeadContxt中调用Unsafe做具体的操作,TailConext中用于向用户抛出pipeline中未处理异常以及对未处理消息的警告。
- 检测到有新连接进入
- 将新的连接注册到worker线程组
- 注册新连接的读事件
//NioEventLoop.java
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
int readyOps = k.readyOps();
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
}
- boos reactor线程轮询到有新的连接进入
- 通过封装jdk底层的channel创建NioSocketChannel以及一系列的netty核心组件
- 将该条连接通过chooser,选择一条worker reactor线程绑定上去
- 注册读事件,开始新连接的读写
四、pipeline
4.1 pipeline 初始化
??pipeline是channel其中的一员,在AbstractChannel中被创建。
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
??pipeline中保存了channel的引用,默认情况下,一条pipeline会有两个节点,head和tail。
4.2 pipeline添加节点
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new Spliter())
p.addLast(new Decoder());
p.addLast(new BusinessHandler())
p.addLast(new Encoder());
}
});
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 1.检查是否有重复handler
checkMultiplicity(handler);
// 2.创建节点
newCtx = newContext(group, filterName(name, handler), handler);
// 3.添加节点
addLast0(newCtx);
}
// 4.回调用户方法
callHandlerAdded0(handler);
return this;
}
netty中用两个字段来表示这个channelHandlerContext属于inBound还是outBound,或者两者都是。
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
}
private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler;
}
- 检查是否有重复handler
- 创建节点
- 添加节点
- 回调用户方法
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev; // 1
newCtx.next = tail; // 2
prev.next = newCtx; // 3
tail.prev = newCtx; // 4
}
4.3 pipeline删除节点
@Override
public final ChannelPipeline remove(ChannelHandler handler) {
remove(getContextOrDie(handler));
return this;
}
- 找到待删除的节点
- 调整双向链表指针删除
- 回调用户函数
4.4 pipeline其他
- 一个Channel对应一个Unsafe,Unsafe处理底层操作,NioServerSocketChannel对应NioMessageUnsafe, NioSocketChannel对应NioByteUnsafe
- inBound事件从head节点传播到tail节点,outBound事件从tail节点传播到head节点
- 异常传播只会往后传播,而且不分inbound还是outbound节点,不像outBound事件一样会往前传播
五、writeAndFlush
- pipeline中的标准链表结构
- Encoder节点分配一个ByteBuf,调用encode方法,将java对象根据自定义协议写入到ByteBuf,然后再把ByteBuf传入到下一个节点。
- pipeline中的编码器原理是创建一个ByteBuf,将java对象转换为ByteBuf,然后再把ByteBuf继续向前传递
- 调用write方法并没有将数据写到Socket缓冲区中,而是写到了一个单向链表的数据结构中,flush才是真正的写出
- writeAndFlush等价于先将数据写到netty的缓冲区,再将netty缓冲区中的数据写到Socket缓冲区中,写的过程与并发编程类似,用自旋锁保证写成功
- netty中的缓冲区中的ByteBuf为DirectByteBuf
六、拆包器
- 不断从TCP缓冲区中读取数据,每次读取完都需要判断是否是一个完整的数据包
- 如果当前读取的数据不足以拼接成一个完整的业务数据包,那就保留该数据,继续从tcp缓冲区中读取,直到得到一个完整的数据包
- 如果当前读到的数据加上已经读取的数据足够拼接成一个数据包,那就将已经读取的数据拼接上本次读取的数据,够成一个完整的业务数据包传递到业务逻辑,多余的数据仍然保留,以便和下次读到的数据尝试拼接
netty中的拆包内部会有一个累加器,每次读取到数据都会不断累加,然后尝试对累加到的数据进行拆包,拆成一个完整的业务数据包,这个基类叫做ByteToMessageDecoder
netty将具体如何拆包抽象出一个decode方法,不同的拆包器实现不同的decode方法,就能实现不同协议的拆包