当前位置: 首页>数据库>正文

序列化工具Protobuf

Netty的编码和解码

  • 编写网络应用程序时,因为数据在网络中传输的都是二进制字节码数据,在发送数据时就需要编码,接收数据时就需要解码
  • codec(编解码器) 的组成部分有两个:decoder(解码器)和 encoder(编码器)。encoder 负责把业务数据转换成字节码数据,decoder 负责把字节码数据转换成业务数据。
序列化工具Protobuf,第1张

Netty提供的编码器和解码器

  • Netty 提供的编码器
  • StringEncoder,对字符串数据进行编码
  • ObjectEncoder,对 Java 对象进行编码
  • Netty 提供的解码器
    • StringDecoder, 对字符串数据进行解码
    • ObjectDecoder,对 Java 对象进行解码

Netty本身的编解码的机制和问题分析

  • Netty 本身自带的 ObjectDecoder 和 ObjectEncoder 可以用来实现 POJO 对象或各种业务对象的编码和解码,底层使用的仍是 Java 序列化技术 , 而Java 序列化技术本身效率就不高,存在如下问题

  • 无法跨语言

  • 序列化后的体积太大,是二进制编码的 5 倍多。

  • 序列化性能太低

Protobuf简介

Protobuf是用来将对象序列化的,相类似的技术还有Json序列化等等。它是一种高效的结构化数据存储格式,可以用于结构化数据串行化(序列化)。

  • Protobuf 是以 message 的方式来管理数据的
  • 支持跨平台、跨语言,即[客户端和服务器端可以是不同的语言编写的] (支持目前绝大多数语言,例如 C++、C#、Java、python 等
  • 高性能,高可靠性
  • 使用 protobuf 编译器能自动生成代码,Protobuf 是将类的定义使用.proto 文件进行描述。说明,在idea 中编写 .proto 文件时,会自动提示是否下载 .ptotot 编写插件. 可以让语法高亮。
  • 然后通过 protoc.exe 编译器根据.proto 自动生成.java 文件


    序列化工具Protobuf,第2张

Protobuf.exe下载

https://github.com/protocolbuffers/protobuf/releases

序列化工具Protobuf,第3张
序列化工具Protobuf,第4张

protobuf中的变量类型和其他语言对照表

序列化工具Protobuf,第5张

Protobuf的使用

定义proto文件

syntax = "proto3"; //版本
option optimize_for = SPEED; //加快解析
option java_outer_classname = "MyDataInfo"; //生成的外部类名,同时也是文件名

message MyMessage {
    //定义一个枚举类型
    enum DateType {
        StudentType = 0; //在proto3中,要求enum的编号从0开始
        WorkerType = 1;
    }

    //相当于message的属性,用data_type来标识传的是哪一个枚举类型,1表示属性序号1
    DateType data_type = 1;

    //标识每次枚举类型最多只能出现其中的一个类型,节省空间
    oneof dataBody {
        Student stuent = 2;
        Worker worker = 3;
    }
}

message Student { //会在StudentPojo 外部类生成一个内部类Student,他是真正发送的pojo对象
    int32 id = 1; //Student类中有一个属性名字为ID,类型为int32(protobuf类型),1表示序号,不是值
    string name = 2;
}
message Worker {
    string name = 1;
    int32 age = 2;
}

生成java文件

protoc.exe --java_out=. xxx.proto


序列化工具Protobuf,第6张

pom

<!--序列化工具protobuf-->
<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>3.15.3</version>
</dependency>

注意protoc.exe下载的版本是3.15.3,这里protobuf的pom依赖也得是对应的版本,否则protoc.exe生成的java文件放入项目中报错。

客户端

Client

/**
 * <p>
 *
 * @Description: TODO
 * </p>
 * @ClassName Client
 * @Author pl
 * @Date 2021/2/16
 * @Version V1.0.0
 */
public class Client {

    //属性
    private final String host;
    private final int port;

    public Client(String host, int port) {
        this.port = port;
        this.host = host;
    }

    public void run() throws InterruptedException {
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap()
                    .group(eventExecutors)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            //加入Handler
                            pipeline.addLast("encoder",new ProtobufEncoder());
                            pipeline.addLast(new GroupChatClientHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            //得到channel
            Channel channel = channelFuture.channel();
            System.out.println("--------" + channel.localAddress() + "---------");
            //客户端需要输入信息,创建一个扫描器
            Scanner scanner = new Scanner(System.in);
            MyDataInfo.MyMessage message = null;
            while (scanner.hasNextLine()){
                String msg = scanner.nextLine();
                if ("1".equals(msg)){
                    message = MyDataInfo.MyMessage.
                            newBuilder().
                            setDataType(MyDataInfo.MyMessage.DateType.StudentType)
                            .setStuent(MyDataInfo.Student
                                    .newBuilder()
                                    .setId(5)
                                    .setName("王五")
                                    .build())
                            .build();
                }else {
                    message = MyDataInfo.MyMessage.
                            newBuilder().
                            setDataType(MyDataInfo.MyMessage.DateType.WorkerType).
                            setWorker(MyDataInfo.Worker.
                                    newBuilder().
                                    setName("李四").
                                    setAge(11).
                                    build()).build();
                }
                channel.writeAndFlush(message);
            }
        } finally {
            eventExecutors.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new Client("127.0.0.1", 7000).run();
    }
}

服务端

Server

package com.pl.netty.protobuf;
/**
 * <p>
 *
 * @Description: TODO
 * </p>
 * @ClassName Server
 * @Author pl
 * @Date 2021/2/16
 * @Version V1.0.0
 */
public class Server {
    private int port;

    public Server(int port) {
        this.port = port;
    }

    public void run(){
        //bossGroup 用于接收连接
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        //workerGroup 用于具体的处理
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup)
                    //Netty 网络通信的组件,能够用于执行网络 I/O 操作。 选择哪种channel 异步的服务器端 TCP Socket 连接
                    .channel(NioServerSocketChannel.class)
                    //ChannelOption.SO_BACKLOG 对应 TCP/IP 协议 listen 函数中的 backlog 参数,用来初始化服务器可连接队列大小。
                    //服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog 参数指定了队列的大小。
                    //Socket参数,服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝。默认值,Windows为200,其他为128。
                    .option(ChannelOption.SO_BACKLOG,128)
                    //Netty中的option主要是设置的ServerChannel的一些选项,而childOption主要是设置的ServerChannel的子Channel的选项。
                    //SO_KEEPALIVE 连接保活,默认值为False。启用该功能时,TCP会主动探测空闲连接的有效性。可以将此功能视为TCP的心跳机制,需要注意的是:默认的心跳间隔是7200s即2小时。Netty默认关闭该功能。
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    //定义worker
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast("decoder",new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
                            pipeline.addLast(new GroupChatServerHandler());
                        }
                    });
            System.out.println("netty 服务端启动");
            //Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理。但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过 Future 和 ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //监听关闭事件
            channelFuture.channel().closeFuture().sync();
        }catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        Server server = new Server(7000);
        server.run();
    }
}

GroupChatServerHandler

package com.pl.netty.protobuf;
/**
 * <p>
 *
 * @Description: TODO
 * </p>
 * @ClassName GroupchatServerHandler  继承 SimpleChannelInboundHandler 入栈,指定信息泛型为MyDataInfo.MyMessage
 * @Author pl
 * @Date 2021/2/16
 * @Version V1.0.0
 */
public class GroupChatServerHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {
    //定义一个Channel组,管理所有的channel
    //GlobalEventExecutor.INSTANCE 是全局事件执行器,是一个单例
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    //此方法表示连接建立,一旦建立连接,就第一个被执行
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        //该方法会将 channelGroup 中所有 channel 遍历,并发送消息,而不需要我们自己去遍历
        channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + sdf.format(new Date()) + "加入聊天\n");
        //将当前的Channel加入到 ChannelGroup
        channelGroup.add(channel);
    }

    //表示 channel 处于活动状态,提示 xxx 上线
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + " " + sdf.format(new Date()) + "上线了~");
    }

    //表示 channel 处于不活动状态,提示 xxx 离线
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + " " + sdf.format(new Date()) + "离线了~");
    }
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, MyDataInfo.MyMessage msg) throws Exception {
        MyDataInfo.MyMessage.DateType dataType = msg.getDataType();
        if (dataType == MyDataInfo.MyMessage.DateType.StudentType){
            MyDataInfo.Student student = msg.getStuent();
            System.out.println("学生Id = " + student.getId() + student.getName());
        }else if (dataType == MyDataInfo.MyMessage.DateType.WorkerType){
            MyDataInfo.Worker worker = msg.getWorker();
            System.out.println("工人:name = " + worker.getName() + worker.getAge());
        }else {
            System.out.println("输入的类型不正确");
        }
    }

}

https://www.xamrdz.com/database/63v1907921.html

相关文章: