当前位置: 首页>编程语言>正文

java netty客户端主动推送消息 netty推送系统


       作为高性能的NIO框架,利用Netty开发高效的推送服务技术上是可行的,但是由于推送服务自身的复杂性,想要开发出稳定、高性能的推送服务并非易事,需要在设计阶段针对推送服务的特点进行合理设计。

1. 最大句柄数修改


       百万长连接接入,首先需要优化的就是Linux内核参数,其中Linux最大文件句柄数是最重要的调优参数之一,默认单进程打开的最大句柄数是1024,通过ulimit -a可以查看相关参数。当单个推送服务接收到的链接超过上限后,就会报“too many open files”,所有新的客户端接入将失败。可以通过/etc/security/limits.conf修改这个限制。

       需要指出的是,尽管我们可以将单个进程打开的最大句柄数修改的非常大,但是当句柄数达到一定数量级之后,处理效率将出现明显下降,因此,需要根据服务器的硬件配置和处理能力进行合理设置。如果单个服务器性能不行也可以通过集群的方式实现。


2. 当心CLOSE_WAIT


       从事移动推送服务开发的同学可能都有体会,移动无线网络可靠性非常差,经常存在客户端重置连接,网络闪断等。在百万长连接的推送系统中,服务端需要能够正确处理这些网络异常,设计要点如下:

       1. 客户端的重连间隔需要合理设置,防止连接过于频繁导致的连接失败(例如端口还没有被释放);

       2. 客户端重复登陆拒绝机制;

       3. 服务端正确处理I/O异常和解码异常等,防止句柄泄露。

       最后特别需要注意的一点就是close_wait过多问题,由于网络不稳定经常会导致客户端断连,如果服务端没有能够及时关闭socket,就会导致处于close_wait状态的链路过多。close_wait状态的链路并不释放句柄和内存等资源,如果积压过多可能会导致系统句柄耗尽,发生“Too many open files”异常,新的客户端无法接入,涉及创建或者打开句柄的操作都将失败。

       close_wait是被动关闭连接是形成的,根据TCP状态机,服务器端收到客户端发送的FIN,TCP协议栈会自动发送ACK,链接进入close_wait状态。但如果服务器端不执行socket的close()操作,状态就不能由close_wait迁移到last_ack,则系统中会存在很多close_wait状态的连接。通常来说,一个close_wait会维持至少2个小时的时间(系统默认超时时间的是7200秒,也就是2小时)。如果服务端程序因某个原因导致系统造成一堆close_wait消耗资源,那么通常是等不到释放那一刻,系统就已崩溃。


       导致close_wait过多的可能原因如下:程序处理Bug,导致接收到对方的fin之后没有及时关闭socket;关闭socket不及时,例如I/O线程被意外阻塞,或者I/O线程执行的用户自定义Task比例过高,导致I/O操作处理不及时,链路不能被及时释放。


       下面结合Netty的原理,对潜在的故障点进行分析。

       设计要点1:不要在Netty的I/O线程上处理业务(心跳发送和检测除外)。在实际业务处理中,经常会有一些额外的复杂逻辑处理,例如性能统计、记录接口日志等,这些业务操作性能开销也比较大,如果在I/O线程上直接做业务逻辑处理,可能会阻塞I/O线程,影响对其它链路的读写操作,这就会导致被动关闭的链路不能及时关闭,造成close_wait堆积。

       设计要点2:在I/O线程上执行自定义Task要当心。Netty的I/O处理线程NioEventLoop支持两种自定义Task的执行:Runnable,通过调用NioEventLoop的execute(Runnable task)方法执行;定时任务ScheduledFutureTask,通过调用NioEventLoop的schedule(Runnable command, long delay, TimeUnit unit)系列接口执行。在NioEventLoop中执行Runnable和ScheduledFutureTask,意味着允许用户在NioEventLoop中执行非I/O操作类的业务逻辑,这些业务逻辑通常用消息报文的处理和协议管理相关。它们的执行会抢占NioEventLoop I/O读写的CPU时间,如果用户自定义Task过多,或者单个Task执行周期过长,会导致I/O读写操作被阻塞,这样也间接导致close_wait堆积。

       所以,如果在代码中使用到了Runnable和ScheduledFutureTask,请合理设置ioRatio的比例,通过NioEventLoop的setIoRatio(int ioRatio)方法可以设置该值,默认值为50,即I/O操作和用户自定义任务的执行时间比为1:1。

       建议是当服务端处理海量客户端长连接的时候,不要在NioEventLoop中执行自定义Task,或者非心跳类的定时任务。

       设计要点3:IdleStateHandler使用要当心。很多情况下会使用IdleStateHandler做心跳发送和检测,在实际开发中需要注意的是,在心跳的业务逻辑处理中,无论是正常还是异常场景,处理时延要可控,防止时延不可控导致的NioEventLoop被意外阻塞。例如,心跳超时或者发生I/O异常时,业务调用Email发送接口告警,由于Email服务端处理超时,导致邮件发送客户端被阻塞,级联引起IdleStateHandler的AllIdleTimeoutTask任务被阻塞,最终NioEventLoop多路复用器上其它的链路读写被阻塞。


3. 合理的心跳周期


       百万级的推送服务,意味着会存在百万个长连接,每个长连接都需要靠和App之间的心跳来维持链路。合理设置心跳周期是非常重要的工作,推送服务的心跳周期设置需要考虑移动无线网络的特点。在Netty中,可以通过在ChannelPipeline中增加IdleStateHandler的方式实现心跳检测,在构造函数中指定链路空闲时间,然后实现空闲回调接口,实现心跳的发送和检测。


4. 合理设置接收和发送缓冲区容量


       对于长链接,每个链路都需要维护自己的消息接收和发送缓冲区,JDK原生的NIO类库使用的是java.nio.ByteBuffer,它实际是一个长度固定的Byte数组,数组是无法动态扩容,ByteBuffer也有这个限制。

       容量无法动态扩展会给用户带来一些麻烦,例如由于无法预测每条消息报文的长度,可能需要预分配一个比较大的ByteBuffer,这通常也没有问题。但是在海量推送服务系统中,这会给服务端带来沉重的内存负担。假设单条推送消息最大上限为10K,消息平均大小为5K,为了满足10K消息的处理,ByteBuffer的容量被设置为10K,这样每条链路实际上多消耗了5K内存,如果长链接链路数为100万,每个链路都独立持有ByteBuffer接收缓冲区,则额外损耗的总内存 Total(M) = 1000000 * 5K = 4882M。内存消耗过大,不仅仅增加了硬件成本,而且大内存容易导致长时间的Full GC,对系统稳定性会造成比较大的冲击。

       实际上,最灵活的处理方式就是能够动态调整内存,即接收缓冲区可以根据以往接收的消息进行计算,动态调整内存,利用CPU资源来换内存资源,具体的策略如下:ByteBuffer支持容量的扩展和收缩,可以按需灵活调整,以节约内存;接收消息的时候,可以按照指定的算法对之前接收的消息大小进行分析,并预测未来的消息大小,按照预测值灵活调整缓冲区容量,以做到最小的资源损耗满足程序正常功能。

       Netty提供的ByteBuf支持容量动态调整,对于接收缓冲区的内存分配器,Netty提供了两种:

       FixedRecvByteBufAllocator:固定长度的接收缓冲区分配器,由它分配的ByteBuf长度都是固定大小的,并不会根据实际数据报的大小动态收缩。但是,如果容量不足,支持动态扩展。动态扩展是Netty ByteBuf的一项基本功能,与ByteBuf分配器的实现没有关系;

       AdaptiveRecvByteBufAllocator:容量动态调整的接收缓冲区分配器,它会根据之前Channel接收到的数据报大小进行计算,如果连续填充满接收缓冲区的可写空间,则动态扩展容量。如果连续2次接收到的数据报都小于指定值,则收缩当前的容量,以节约内存。

       相对于FixedRecvByteBufAllocator,使用AdaptiveRecvByteBufAllocator更为合理,可以在创建客户端或者服务端的时候指定RecvByteBufAllocator,代码如下:


Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY, true)
             .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)

       如果没有设置,默认使用AdaptiveRecvByteBufAllocator。

       另外值得注意的是,无论是接收缓冲区还是发送缓冲区,缓冲区的大小建议设置为消息的平均大小,不要设置成最大消息的上限,这会导致额外的内存浪费。

5. 内存池


       推送服务器承载了海量的长链接,每个长链接实际就是一个会话。如果每个会话都持有心跳数据、接收缓冲区、指令集等数据结构,而且这些实例随着消息的处理朝生夕灭,这就会给服务器带来沉重的GC压力,同时消耗大量的内存。

       最有效的解决策略就是使用内存池,每个NioEventLoop线程处理N个链路,在线程内部,链路的处理时串行的。假如A链路首先被处理,它会创建接收缓冲区等对象,待解码完成之后,构造的POJO对象被封装成Task后投递到后台的线程池中执行,然后接收缓冲区会被释放,每条消息的接收和处理都会重复接收缓冲区的创建和释放。如果使用内存池,则当A链路接收到新的数据报之后,从NioEventLoop的内存池中申请空闲的ByteBuf,解码完成之后,调用release将ByteBuf释放到内存池中,供后续B链路继续使用。

       使用内存池优化之后,单个NioEventLoop的ByteBuf申请和GC次数从原来的N次减少为最少0次(假设每次申请都有可用的内存)。下面以推特使用Netty4的PooledByteBufAllocator进行GC优化作为案例,对内存池的效果进行评估,结果如下:垃圾生成速度是原来的1/5,而垃圾清理速度快了5倍。使用新的内存池机制,几乎可以把网络带宽压满。

       Netty4之前的版本问题如下:每当收到新信息或者用户发送信息到远程端,Netty 3均会创建一个新的堆缓冲区。这意味着,对应每一个新的缓冲区,都会有一个new byte[capacity]。这些缓冲区会导致GC压力,并消耗内存带宽。为了安全起见,新的字节数组分配时会用零填充,这会消耗内存带宽。然而,用零填充的数组很可能会再次用实际的数据填充,这又会消耗同样的内存带宽。如果Java虚拟机(JVM)提供了创建新字节数组而又无需用零填充的方式,那么就可以将内存带宽消耗减少50%,但是目前没有那样一种方式。

       在Netty 4中实现了一个新的ByteBuf内存池,它是一个纯Java版本的 jemalloc (Facebook也在用)。现在,Netty不会再因为用零填充缓冲区而浪费内存带宽了。不过,由于它不依赖于GC,开发人员需要小心内存泄漏。如果忘记在处理程序中释放缓冲区,那么内存使用率会无限地增长。

       Netty默认不使用内存池,需要在创建客户端或者服务端的时候进行指定,代码如下:


Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY, true)
             .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)

       使用内存池之后,内存的申请和释放必须成对出现,即retain()和release()要成对出现,否则会导致内存泄露。如果使用内存池,完成ByteBuf的解码工作之后必须显式的调用ReferenceCountUtil.release(msg)对接收缓冲区ByteBuf进行内存释放,否则它会被认为仍然在使用中,这样会导致内存泄露。

6. TCP参数优化


       常用的TCP参数,例如TCP层面的接收和发送缓冲区大小设置,在Netty中分别对应ChannelOption的SO_SNDBUF和SO_RCVBUF,需要根据推送消息的大小,合理设置,对于海量长连接,通常32K是个不错的选择。

       另外一个比较常用的优化手段就是软中断。


Zero Copy实现


       《Netty权威指南(第二版)》中专门有一节介绍Netty的Zero Copy,但针对的是Netty内部的零拷贝功能。如何在应用代码中实现Zero Copy,最典型的应用场景就是消息透传。因为透传不需要完整解析消息,只需要知道消息要转发给下游哪个系统就足够了。所以透传时,我们可以只解析出部分消息,消息整体还原封不动地放在Direct Buffer里,最后直接将它写入到连接下游系统的Channel中。所以应用层的Zero Copy实现就分为两部分:Direct Buffer配置和Buffer的零拷贝传递。

       1. 内存池

       使用Netty带来的又一个好处就是内存管理。只需一行简单的配置,就能获得到内存池带来的好处。在底层,Netty实现了一个Java版的Jemalloc内存管理库,为我们做完了所有“脏活累活”!


ServerBootstrap b = new ServerBootstrap()
            .group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .localAddress(port)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(...);
                }
            });

       2 应用层的Zero Copy

       默认情况下,Netty会自动释放ByteBuf。也就是说当覆写的channelRead0()返回时,ByteBuf就结束了它的使命,被Netty自动释放掉(如果是池化的就可会被放回到内存池中)。

public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        boolean release = true;
        try {
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I imsg = (I) msg;
                channelRead0(ctx, imsg);
            } else {
                release = false;
                ctx.fireChannelRead(msg);
            }
        } finally {
            if (autoRelease && release) {
                ReferenceCountUtil.release(msg);
            }
        }
    }
}

       因为Netty是用引用计数的方式来判断是否回收的,所以要想继续使用ByteBuf而不让Netty释放的话,就要增加它的引用计数。只要在ChannelPipeline中的任意一个Handler中调用ByteBuf.retain()将引用计数加1,Netty就不会释放掉它了。在连接下游的客户端的Encoder中发送消息成功后再释放掉,这样就达到了零拷贝透传的效果:

public class RespEncoder extends MessageToByteEncoder<Resp> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Msg msg, ByteBuf out) throws Exception {
        // Raw in Msg is retained ByteBuf
        out.writeBytes(msg.getRaw(), 0, msg.getRaw().readerIndex());
        msg.getRaw().release();
    }
}



https://www.xamrdz.com/lan/5sp1962512.html

相关文章: