一:概述
Netty为了向使用者屏蔽NIO通信的底层细节,在和用户交互的边界做了封装,目的就是为了减少开发者的开发工作量,降低开发难度。用户可以通过ServerBootstrap启动辅助类,方便的创建服务端。可以通过时序图了解下netty服务端运行流程:
一:流程
直接看源码走,查看运行流程:
服务端创建以ServerBootstrap开始
,往group方法里添加两个参数,跟踪源码发现NioEventLoopGroup其实是继承 Excutor框架的线程池
sb.group(bossGroup,workGroup)
其实就是一个处理客户端链接的线程池,一个处理IO操作的线程池
继续往下走
.channel(NioServerSocketChannel.class)
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
通过反射拿出构造参数NioServerSocketChannel.class的实例,
public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
return channelFactory((ChannelFactory<C>) channelFactory);
}
查看NioServerSocketChannel发现
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
其实继承jdk提供的io.netty.channel.socket.ServerSocketChannel 的channel
然后
.option(ChannelOption.SO_BACKLOG,1024)
是给TCP设置一些参数,作为服务端,主要是设置TCP的backlog参数
然后:
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 通过是否以换行符结果进行判断,解决粘包问题,即每遇到一条换行符算作一条单独的消息
// pipeline.addLast(new LineBasedFrameDecoder(1024));
// ByteBuf delimeter = Unpooled.copiedBuffer("$_".getBytes());
// pipeline.addLast(new DelimiterBasedFrameDecoder(1024,delimeter));
// 固定长度解码器
pipeline.addLast(new FixedLengthFrameDecoder(53));
pipeline.addLast(new TimeServerHandler());
}
});
通过ChannelPipeline中添加handler来处理网络事件。
最后
ChannelFuture cf = sb.bind(port).sync();
cf.channel().closeFuture().sync();
绑定端口
}finally {
// 优雅关闭线程资源
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
优雅的释放资源.
以上是写一个Netty服务端的基本流程。
二 : 服务端运行原理跟踪
看到这里可能还很模糊,不知道服务端是怎么跑起来的,
先看服务端写的时候会写这么一句代码,这是入口:
ChannelFuture cf = sb.bind(port).sync();
查询看bind(port).方法,发现是ServerBootstrap的父类AbstractBootstrap中bind()方法:
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
继续跟进:
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
跟进doBind():
private ChannelFuture doBind(final SocketAddress localAddress) {
// 1、initAndRegister();方法初始化注册通道
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
// 2、注册完doBind0() 把处理结果放到线程池中监听
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
单独看下方法initAndRegister():
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
// 实例化通道,子类实现
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 注册
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
发现其最终调用 :
abstract void init(Channel channel) throws Exception;
这个抽象方法,这个需要子类实现,因为是服务端,子类实现,我们则看ServerBootstrap:
其是把所有的配置配置完之后,通过ChannelPipeline, 核心是ServerBootstrapAcceptor 这个类继承ChannelInboundHandlerAdapter是一个Handler
跟踪代码initAndRegister()方法中的:
ChannelFuture regFuture = config().group().register(channel);
发现调用EventLoopGroup线程池的regitser()注册通道,即一个开启一个线程执行注册任务
跟踪 找到核心代码部分:
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// 读操作
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
以上为netty服务启动源码跟踪