Netty的编码和解码
- 编写网络应用程序时,因为数据在网络中传输的都是二进制字节码数据,在发送数据时就需要编码,接收数据时就需要解码
- codec(编解码器) 的组成部分有两个:decoder(解码器)和 encoder(编码器)。encoder 负责把业务数据转换成字节码数据,decoder 负责把字节码数据转换成业务数据。
Netty提供的编码器和解码器
- Netty 提供的编码器
- StringEncoder,对字符串数据进行编码
- ObjectEncoder,对 Java 对象进行编码
- Netty 提供的解码器
- StringDecoder, 对字符串数据进行解码
- ObjectDecoder,对 Java 对象进行解码
Netty本身的编解码的机制和问题分析
Netty 本身自带的 ObjectDecoder 和 ObjectEncoder 可以用来实现 POJO 对象或各种业务对象的编码和解码,底层使用的仍是 Java 序列化技术 , 而Java 序列化技术本身效率就不高,存在如下问题
无法跨语言
序列化后的体积太大,是二进制编码的 5 倍多。
序列化性能太低
Protobuf简介
Protobuf是用来将对象序列化的,相类似的技术还有Json序列化等等。它是一种高效的结构化数据存储格式,可以用于结构化数据串行化(序列化)。
- Protobuf 是以 message 的方式来管理数据的
- 支持跨平台、跨语言,即[客户端和服务器端可以是不同的语言编写的] (支持目前绝大多数语言,例如 C++、C#、Java、python 等
- 高性能,高可靠性
- 使用 protobuf 编译器能自动生成代码,Protobuf 是将类的定义使用.proto 文件进行描述。说明,在idea 中编写 .proto 文件时,会自动提示是否下载 .ptotot 编写插件. 可以让语法高亮。
-
然后通过 protoc.exe 编译器根据.proto 自动生成.java 文件
Protobuf.exe下载
https://github.com/protocolbuffers/protobuf/releases
protobuf中的变量类型和其他语言对照表
Protobuf的使用
定义proto文件
syntax = "proto3"; //版本
option optimize_for = SPEED; //加快解析
option java_outer_classname = "MyDataInfo"; //生成的外部类名,同时也是文件名
message MyMessage {
//定义一个枚举类型
enum DateType {
StudentType = 0; //在proto3中,要求enum的编号从0开始
WorkerType = 1;
}
//相当于message的属性,用data_type来标识传的是哪一个枚举类型,1表示属性序号1
DateType data_type = 1;
//标识每次枚举类型最多只能出现其中的一个类型,节省空间
oneof dataBody {
Student stuent = 2;
Worker worker = 3;
}
}
message Student { //会在StudentPojo 外部类生成一个内部类Student,他是真正发送的pojo对象
int32 id = 1; //Student类中有一个属性名字为ID,类型为int32(protobuf类型),1表示序号,不是值
string name = 2;
}
message Worker {
string name = 1;
int32 age = 2;
}
生成java文件
protoc.exe --java_out=. xxx.proto
pom
<!--序列化工具protobuf-->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.15.3</version>
</dependency>
注意protoc.exe下载的版本是3.15.3,这里protobuf的pom依赖也得是对应的版本,否则protoc.exe生成的java文件放入项目中报错。
客户端
Client
/**
* <p>
*
* @Description: TODO
* </p>
* @ClassName Client
* @Author pl
* @Date 2021/2/16
* @Version V1.0.0
*/
public class Client {
//属性
private final String host;
private final int port;
public Client(String host, int port) {
this.port = port;
this.host = host;
}
public void run() throws InterruptedException {
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//加入Handler
pipeline.addLast("encoder",new ProtobufEncoder());
pipeline.addLast(new GroupChatClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
//得到channel
Channel channel = channelFuture.channel();
System.out.println("--------" + channel.localAddress() + "---------");
//客户端需要输入信息,创建一个扫描器
Scanner scanner = new Scanner(System.in);
MyDataInfo.MyMessage message = null;
while (scanner.hasNextLine()){
String msg = scanner.nextLine();
if ("1".equals(msg)){
message = MyDataInfo.MyMessage.
newBuilder().
setDataType(MyDataInfo.MyMessage.DateType.StudentType)
.setStuent(MyDataInfo.Student
.newBuilder()
.setId(5)
.setName("王五")
.build())
.build();
}else {
message = MyDataInfo.MyMessage.
newBuilder().
setDataType(MyDataInfo.MyMessage.DateType.WorkerType).
setWorker(MyDataInfo.Worker.
newBuilder().
setName("李四").
setAge(11).
build()).build();
}
channel.writeAndFlush(message);
}
} finally {
eventExecutors.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new Client("127.0.0.1", 7000).run();
}
}
服务端
Server
package com.pl.netty.protobuf;
/**
* <p>
*
* @Description: TODO
* </p>
* @ClassName Server
* @Author pl
* @Date 2021/2/16
* @Version V1.0.0
*/
public class Server {
private int port;
public Server(int port) {
this.port = port;
}
public void run(){
//bossGroup 用于接收连接
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
//workerGroup 用于具体的处理
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
//Netty 网络通信的组件,能够用于执行网络 I/O 操作。 选择哪种channel 异步的服务器端 TCP Socket 连接
.channel(NioServerSocketChannel.class)
//ChannelOption.SO_BACKLOG 对应 TCP/IP 协议 listen 函数中的 backlog 参数,用来初始化服务器可连接队列大小。
//服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog 参数指定了队列的大小。
//Socket参数,服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝。默认值,Windows为200,其他为128。
.option(ChannelOption.SO_BACKLOG,128)
//Netty中的option主要是设置的ServerChannel的一些选项,而childOption主要是设置的ServerChannel的子Channel的选项。
//SO_KEEPALIVE 连接保活,默认值为False。启用该功能时,TCP会主动探测空闲连接的有效性。可以将此功能视为TCP的心跳机制,需要注意的是:默认的心跳间隔是7200s即2小时。Netty默认关闭该功能。
.childOption(ChannelOption.SO_KEEPALIVE, true)
//定义worker
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("decoder",new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
pipeline.addLast(new GroupChatServerHandler());
}
});
System.out.println("netty 服务端启动");
//Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理。但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过 Future 和 ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
//监听关闭事件
channelFuture.channel().closeFuture().sync();
}catch (InterruptedException e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
Server server = new Server(7000);
server.run();
}
}
GroupChatServerHandler
package com.pl.netty.protobuf;
/**
* <p>
*
* @Description: TODO
* </p>
* @ClassName GroupchatServerHandler 继承 SimpleChannelInboundHandler 入栈,指定信息泛型为MyDataInfo.MyMessage
* @Author pl
* @Date 2021/2/16
* @Version V1.0.0
*/
public class GroupChatServerHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {
//定义一个Channel组,管理所有的channel
//GlobalEventExecutor.INSTANCE 是全局事件执行器,是一个单例
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//此方法表示连接建立,一旦建立连接,就第一个被执行
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
//该方法会将 channelGroup 中所有 channel 遍历,并发送消息,而不需要我们自己去遍历
channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + sdf.format(new Date()) + "加入聊天\n");
//将当前的Channel加入到 ChannelGroup
channelGroup.add(channel);
}
//表示 channel 处于活动状态,提示 xxx 上线
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " " + sdf.format(new Date()) + "上线了~");
}
//表示 channel 处于不活动状态,提示 xxx 离线
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " " + sdf.format(new Date()) + "离线了~");
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, MyDataInfo.MyMessage msg) throws Exception {
MyDataInfo.MyMessage.DateType dataType = msg.getDataType();
if (dataType == MyDataInfo.MyMessage.DateType.StudentType){
MyDataInfo.Student student = msg.getStuent();
System.out.println("学生Id = " + student.getId() + student.getName());
}else if (dataType == MyDataInfo.MyMessage.DateType.WorkerType){
MyDataInfo.Worker worker = msg.getWorker();
System.out.println("工人:name = " + worker.getName() + worker.getAge());
}else {
System.out.println("输入的类型不正确");
}
}
}