当前位置: 首页>编程语言>正文

net api监控 netty监控

一:概述

Netty为了向使用者屏蔽NIO通信的底层细节,在和用户交互的边界做了封装,目的就是为了减少开发者的开发工作量,降低开发难度。用户可以通过ServerBootstrap启动辅助类,方便的创建服务端。可以通过时序图了解下netty服务端运行流程:

net api监控 netty监控,net api监控 netty监控_Netty,第1张

一:流程

直接看源码走,查看运行流程:

服务端创建以ServerBootstrap开始

net api监控 netty监控,net api监控 netty监控_服务端_02,第2张

,往group方法里添加两个参数,跟踪源码发现NioEventLoopGroup其实是继承 Excutor框架的线程池

net api监控 netty监控,net api监控 netty监控_net api监控_03,第3张

sb.group(bossGroup,workGroup)

其实就是一个处理客户端链接的线程池,一个处理IO操作的线程池

继续往下走

.channel(NioServerSocketChannel.class)
public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }

通过反射拿出构造参数NioServerSocketChannel.class的实例,

public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
        return channelFactory((ChannelFactory<C>) channelFactory);
    }

查看NioServerSocketChannel发现

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {

其实继承jdk提供的io.netty.channel.socket.ServerSocketChannel 的channel

然后

.option(ChannelOption.SO_BACKLOG,1024)

是给TCP设置一些参数,作为服务端,主要是设置TCP的backlog参数

然后:

.childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            // 通过是否以换行符结果进行判断,解决粘包问题,即每遇到一条换行符算作一条单独的消息
//                            pipeline.addLast(new LineBasedFrameDecoder(1024));
//                            ByteBuf delimeter = Unpooled.copiedBuffer("$_".getBytes());
//                            pipeline.addLast(new DelimiterBasedFrameDecoder(1024,delimeter));
                            // 固定长度解码器
                            pipeline.addLast(new FixedLengthFrameDecoder(53));
                            pipeline.addLast(new TimeServerHandler());
                        }
                    });

通过ChannelPipeline中添加handler来处理网络事件。
最后

ChannelFuture cf = sb.bind(port).sync();
            cf.channel().closeFuture().sync();

绑定端口

}finally {
            // 优雅关闭线程资源
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }

优雅的释放资源.
以上是写一个Netty服务端的基本流程。

二 : 服务端运行原理跟踪

看到这里可能还很模糊,不知道服务端是怎么跑起来的,
先看服务端写的时候会写这么一句代码,这是入口:

ChannelFuture cf = sb.bind(port).sync();

查询看bind(port).方法,发现是ServerBootstrap的父类AbstractBootstrap中bind()方法:

public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
    }

继续跟进:

public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        return doBind(localAddress);
    }

跟进doBind():

private ChannelFuture doBind(final SocketAddress localAddress) {
		// 1、initAndRegister();方法初始化注册通道
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }
		// 2、注册完doBind0() 把处理结果放到线程池中监听
        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

单独看下方法initAndRegister():

final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            // 实例化通道,子类实现
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }
		// 注册
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }

发现其最终调用 :

abstract void init(Channel channel) throws Exception;

这个抽象方法,这个需要子类实现,因为是服务端,子类实现,我们则看ServerBootstrap:
其是把所有的配置配置完之后,通过ChannelPipeline, 核心是ServerBootstrapAcceptor 这个类继承ChannelInboundHandlerAdapter是一个Handler

跟踪代码initAndRegister()方法中的:

ChannelFuture regFuture = config().group().register(channel);

发现调用EventLoopGroup线程池的regitser()注册通道,即一个开启一个线程执行注册任务
跟踪 找到核心代码部分:

private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                doRegister();
                neverRegistered = false;
                registered = true;

                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.
                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();
                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // 读操作
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

以上为netty服务启动源码跟踪



https://www.xamrdz.com/lan/5r31944386.html

相关文章: