当前位置: 首页>后端>正文

Java进阶-Netty-进阶

一、Reactor线程

??源码基于4.1.6.Final版本。

1.1 Reactor线程启动

??NioEventLoop的run方法是reactor线程的主体,在第一次添加任务的时候被启动

  • 入口:NioEventLoop父类SingleThreadEventExecutor的execute方法
@Override
public void execute(Runnable task) {
    ...
    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        addTask(task);
    } else {
        startThread();
        addTask(task);
        ...
    }
    ...
}
  • netty的reactor线程在添加一个任务的时候被创建,该线程实体为FastThreadLocalThread。
  • 最后线程执行主体为NioEventLoop的run方法。

1.2 Reactor线程执行

@Override
protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
                    // fallthrough
            }
            processSelectedKeys();
            runAllTasks(...);
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        ...
    }
Java进阶-Netty-进阶,第1张
  • 轮询注册到reactor线程对应的selector上的所有的channel的IO事件
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
      selector.wakeup();
}

定时任务截止事时间快到了,中断本次轮询
轮询过程中发现有任务加入,中断本次轮询
阻塞式select操作

??netty会在每次进行selector.select(timeoutMillis)之前记录一下开始时间currentTimeNanos,在select之后记录一下结束时间,判断select操作是否至少持续了timeoutMillis秒,如果持续的时间大于等于timeoutMillis,说明就是一次有效的轮询,重置selectCnt标志,否则,表明该阻塞方法并没有阻塞这么长时间,可能触发了jdk的空轮询bug,当空轮询的次数超过一个阀值的时候,默认是512,就开始重建selector

  • 处理产生网络IO事件的channel
processSelectedKeys();

private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized(selectedKeys.flip());
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

取出IO事件以及对应的netty channel类
处理该channel
判断是否该再来次轮询

对于boss NioEventLoop来说,轮询到的是基本上就是连接事件,后续的事情就通过他的pipeline将连接扔给一个worker NioEventLoop处理
对于worker NioEventLoop来说,轮询到的基本上都是io读写事件,后续的事情就是通过他的pipeline将读取到的字节流传递给每个channelHandler来处理SelectedSelectionKeySet

??netty使用数组替换掉jdk原生的HashSet来保证IO事件的高效处理每个SelectionKey上绑定了netty类AbstractChannel对象作为attachment,在处理每个SelectionKey的时候,就可以找到AbstractChannel,然后通过pipeline的方式将处理串行到ChannelHandler,回调到用户方法。

  • 处理任务队列

用户自定义普通任务
非当前reactor线程调用channel的各种方法
用户自定义定时任务:1)若干时间后执行一次 2)每隔一段时间执行一次 3)每次执行结束,隔一定时间再执行一次

??taskQueue在NioEventLoop中默认是mpsc队列,mpsc队列,即多生产者单消费者队列,netty使用mpsc,方便的将外部线程的task聚集,在reactor线程内部用单线程来串行执行

??reactor线程task调度:

  • 从scheduledTaskQueue转移到期的定时任务到taskQueue(mpsc queue)
  • 计算本次任务循环的截止时间
  • 执行任务
  • 收尾

当前reactor线程调用当前eventLoop执行任务,直接执行,否则,添加到任务队列稍后执行
netty内部的任务分为普通任务和定时任务,分别落地到MpscQueue和PriorityQueue
netty每次执行任务循环之前,会将已经到期的定时任务从PriorityQueue转移到MpscQueue
netty每隔64个任务检查一下是否该退出任务循环

二、服务端启动

b.bind(8888).sync();

public ChannelFuture bind(int inetPort) {
    return bind(new InetSocketAddress(inetPort));
} 

public ChannelFuture bind(SocketAddress localAddress) {
    validate();
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    return doBind(localAddress);
}

private ChannelFuture doBind(final SocketAddress localAddress) {
    //...
    final ChannelFuture regFuture = initAndRegister();
    //...
    final Channel channel = regFuture.channel();
    //...
    doBind0(regFuture, channel, localAddress, promise);
    //...
    return promise;
}
  • new一个channel
Java进阶-Netty-进阶,第2张

用户调用方法Bootstrap.bind(port)第一步就是通过反射的方式new一个NioServerSocketChannel对象,并且在new的过程中创建了一系列的核心组件

  • init这个channel

设置option和attr
设置新接入channel的option和attr
加入新连接处理器

  • 将这个channel register到某个对象

1)设置启动类参数,最重要的就是设置channel创建server对应的channel,创建各大组件,包括ChannelConfig,ChannelId,ChannelPipeline,ChannelHandler,Unsafe等
2)初始化server对应的channel,设置一些attr,option,以及设置子channel的attr,option,给server的channel添加新channel接入器,并触发addHandler,register等事件
3)调用jdk底层做端口绑定,并触发active事件,active触发的时候,真正做服务端口绑定

三、新连接接入

所有的channel底层都会有一个与unsafe绑定,每种类型的channel实际的操作都由unsafe来实现

Java进阶-Netty-进阶,第3张

??流水线的开始就是HeadContxt,流水线的结束就是TailConext,HeadContxt中调用Unsafe做具体的操作,TailConext中用于向用户抛出pipeline中未处理异常以及对未处理消息的警告

  • 检测到有新连接进入
  • 将新的连接注册到worker线程组
  • 注册新连接的读事件
//NioEventLoop.java
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    int readyOps = k.readyOps();
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
        unsafe.read();
    }
}
  • boos reactor线程轮询到有新的连接进入
  • 通过封装jdk底层的channel创建NioSocketChannel以及一系列的netty核心组件
  • 将该条连接通过chooser,选择一条worker reactor线程绑定上去
  • 注册读事件,开始新连接的读写

四、pipeline

4.1 pipeline 初始化

??pipeline是channel其中的一员,在AbstractChannel中被创建

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}
protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

??pipeline中保存了channel的引用,默认情况下,一条pipeline会有两个节点,head和tail

Java进阶-Netty-进阶,第4张

4.2 pipeline添加节点

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new Spliter())
         p.addLast(new Decoder());
         p.addLast(new BusinessHandler())
         p.addLast(new Encoder());
     }
});
Java进阶-Netty-进阶,第5张
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        // 1.检查是否有重复handler
        checkMultiplicity(handler);
        // 2.创建节点
        newCtx = newContext(group, filterName(name, handler), handler);
        // 3.添加节点
        addLast0(newCtx);
    }
   
    // 4.回调用户方法
    callHandlerAdded0(handler);
    
    return this;
}

netty中用两个字段来表示这个channelHandlerContext属于inBound还是outBound,或者两者都是

private static boolean isInbound(ChannelHandler handler) {
    return handler instanceof ChannelInboundHandler;
}

private static boolean isOutbound(ChannelHandler handler) {
    return handler instanceof ChannelOutboundHandler;
}
Java进阶-Netty-进阶,第6张
  • 检查是否有重复handler
  • 创建节点
  • 添加节点
  • 回调用户方法
private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev; // 1
    newCtx.next = tail; // 2
    prev.next = newCtx; // 3
    tail.prev = newCtx; // 4
}
Java进阶-Netty-进阶,第7张

4.3 pipeline删除节点

@Override
public final ChannelPipeline remove(ChannelHandler handler) {
    remove(getContextOrDie(handler));
    
    return this;
}
  • 找到待删除的节点
  • 调整双向链表指针删除
  • 回调用户函数
Java进阶-Netty-进阶,第8张

4.4 pipeline其他

  • 一个Channel对应一个Unsafe,Unsafe处理底层操作,NioServerSocketChannel对应NioMessageUnsafe, NioSocketChannel对应NioByteUnsafe
  • inBound事件从head节点传播到tail节点,outBound事件从tail节点传播到head节点
  • 异常传播只会往后传播,而且不分inbound还是outbound节点,不像outBound事件一样会往前传播

五、writeAndFlush

  • pipeline中的标准链表结构
Java进阶-Netty-进阶,第9张
  • Encoder节点分配一个ByteBuf,调用encode方法,将java对象根据自定义协议写入到ByteBuf,然后再把ByteBuf传入到下一个节点。
  • pipeline中的编码器原理是创建一个ByteBuf,将java对象转换为ByteBuf,然后再把ByteBuf继续向前传递
  • 调用write方法并没有将数据写到Socket缓冲区中,而是写到了一个单向链表的数据结构中,flush才是真正的写出
  • writeAndFlush等价于先将数据写到netty的缓冲区,再将netty缓冲区中的数据写到Socket缓冲区中,写的过程与并发编程类似,用自旋锁保证写成功
  • netty中的缓冲区中的ByteBuf为DirectByteBuf

六、拆包器

  • 不断从TCP缓冲区中读取数据,每次读取完都需要判断是否是一个完整的数据包
  • 如果当前读取的数据不足以拼接成一个完整的业务数据包,那就保留该数据,继续从tcp缓冲区中读取,直到得到一个完整的数据包
  • 如果当前读到的数据加上已经读取的数据足够拼接成一个数据包,那就将已经读取的数据拼接上本次读取的数据,够成一个完整的业务数据包传递到业务逻辑,多余的数据仍然保留,以便和下次读到的数据尝试拼接

netty中的拆包内部会有一个累加器,每次读取到数据都会不断累加,然后尝试对累加到的数据进行拆包,拆成一个完整的业务数据包,这个基类叫做ByteToMessageDecoder

netty将具体如何拆包抽象出一个decode方法,不同的拆包器实现不同的decode方法,就能实现不同协议的拆包


https://www.xamrdz.com/backend/3pt1945772.html

相关文章: