当前位置: 首页>数据库>正文

Netty集群Kafka消息转发 netty 群发消息

前言介绍:

    我们的NettyServer收到数据后,需要群发给当前链接到服务端的所有小伙伴。

    技术点:

    1、ChannelGroup 【io.netty.channel.group.DefaultChannelGroup】

欢迎加入:itstack | Netty The Sniper 5360692

环境需求:

    1、jdk1.7以上【jdk1.7以下只能部分支持netty】

    2、Netty-all-5.0【netty3.x 4.x 5每次的变化较大,接口类名也随着变化】

    3、telnet 测试【可以现在你的win7机器上测试这个命令,用于链接到服务端的测试命令】【本案例中已经很不好满足测试需求了】

    4、最好下载个网络调试助手,它能帮助你测试服务端、客户端

代码部分:

======================

TestNettyServerBaseDemo 

    src 

        com.itstack 

            ChildChannelHandler.java 

            MyChannelHandlerPool.java
            MyServerHanlder.java
NettyServer.java

======================

ChildChannelHandler.java

1. package.itstack;
2.  
3. import.netty.buffer.ByteBuf;
4. import.netty.buffer.Unpooled;
5. import.netty.channel.ChannelInitializer;
6. import.netty.channel.socket.SocketChannel;
7. import.netty.handler.codec.DelimiterBasedFrameDecoder;
8. import.netty.handler.codec.Delimiters;
9. import.netty.handler.codec.FixedLengthFrameDecoder;
10. import.netty.handler.codec.LineBasedFrameDecoder;
11. import.netty.handler.codec.string.StringDecoder;
12. import.netty.handler.codec.string.StringEncoder;
13.  
14. publicclassChildChannelHandlerextendsChannelInitializer<SocketChannel>{
15.  
16. @Override
17. protectedvoid(SocketChannel)throwsException{
18.  
19. System.out.println("报告");
20. System.out.println("信息:有一客户端链接到本服务端");
21. System.out.println("IP:"+.localAddress().getHostName());
22. System.out.println("Port:"+.localAddress().getPort());
23. System.out.println("报告完毕");
24.  
25. // 解码器
26. // 基于换行符号
27. .pipeline().addLast(newLineBasedFrameDecoder(1024));
28. 
29. // 基于指定字符串【换行符,这样功能等同于LineBasedFrameDecoder】
30. //		e.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, false, Delimiters.lineDelimiter()));
31. // 基于最大长度
32. //		e.pipeline().addLast(new FixedLengthFrameDecoder(4));
33. // 解码转String
34. .pipeline().addLast(newStringDecoder());
35.  
36. // 编码器 String
37. .pipeline().addLast(newStringEncoder());
38. 
39. // 在管道中添加我们自己的接收数据实现方法
40. .pipeline().addLast(newMyServerHanlder());
41.  
42. }
43.  
44. }

MyChannelHandlerPool.java

    1. package.itstack;
    2.  
    3. import.netty.channel.group.ChannelGroup;
    4. import.netty.channel.group.DefaultChannelGroup;
    5. import.netty.util.concurrent.GlobalEventExecutor;
    6.  
    7. /**
    8.  * 
    9.  * 这里讲ChannelGroup单独放到一个类里,并有多个客户端使用
    10.  * 同时ChannelGroup是static的
    11.  * 说明:这不是唯一的处理方式
    12.  *
    13.  */
    14. publicclassMyChannelHandlerPool{
    15.  
    16. publicstaticChannelGroup=newDefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    17. 
    18. }
     
    MyServerHanlder.java 
    
    1. package.itstack;
    2.  
    3. import.util.Date;
    4.  
    5. import.netty.buffer.ByteBuf;
    6. import.netty.buffer.Unpooled;
    7. import.netty.channel.ChannelHandlerAdapter;
    8. import.netty.channel.ChannelHandlerContext;
    9. import.netty.handler.codec.bytes.ByteArrayDecoder;
    10.  
    11. publicclassMyServerHanlderextendsChannelHandlerAdapter{
    12.  
    13. /*
    14. 	 * channelAction 
    15. 	 * 
    16. 	 * channel 通道
    17. 	 * action  活跃的
    18. 	 * 
    19. 	 * 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
    20. 	 * 
    21. 	 */
    22. publicvoid(ChannelHandlerContext)throwsException{
    23. 
    24. System.out.println(ctx.channel().localAddress().toString()+" channelActive");
    25. 
    26. //添加到channelGroup 通道组
    27. MyChannelHandlerPool.channelGroup.add(ctx.channel());
    28. 
    29. //通知您已经链接上客户端
    30. String="您已经开启与服务端链接"+" "+ctx.channel().id()+newDate()+" "+ctx.channel().localAddress();
    31. .writeAndFlush(str);
    32. 
    33. }
    34. 
    35. /*
    36. 	 * channelInactive
    37. 	 * 
    38. 	 * channel 	通道
    39. 	 * Inactive 不活跃的
    40. 	 * 
    41. 	 * 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
    42. 	 * 
    43. 	 */
    44. publicvoid(ChannelHandlerContext)throwsException{
    45. 
    46. // 从channelGroup中移除,当有客户端退出后,移除channel。
    47. MyChannelHandlerPool.channelGroup.remove(ctx.channel());
    48. 
    49. System.out.println(ctx.channel().localAddress().toString()+" channelInactive");
    50. 
    51. }
    52. 
    53. /*
    54. 	 * channelRead
    55. 	 * 
    56. 	 * channel 通道
    57. 	 * Read    读
    58. 	 * 
    59. 	 * 简而言之就是从通道中读取数据,也就是服务端接收客户端发来的数据
    60. 	 * 但是这个数据在不进行解码时它是ByteBuf类型的后面例子我们在介绍
    61. 	 * 
    62. 	 */
    63. publicvoid(ChannelHandlerContext,Object)
    64. throwsException{
    65. 
    66. //注意此处已经不需要手工解码了
    67. System.out.println(ctx.channel().id()+""+newDate()+" "+msg);
    68. 
    69. //通知您已经链接上客户端[给客户端穿回去的数据加个换行]
    70. String="服务端收到:"+ctx.channel().id()+newDate()+" "+msg+"\r\n";
    71. 
    72. //收到信息后,群发给所有小伙伴
    73. MyChannelHandlerPool.channelGroup.writeAndFlush(str);
    74. }
    75. 
    76. /*
    77. 	 * channelReadComplete
    78. 	 * 
    79. 	 * channel  通道
    80. 	 * Read     读取
    81. 	 * Complete 完成
    82. 	 * 
    83. 	 * 在通道读取完成后会在这个方法里通知,对应可以做刷新操作
    84. 	 * ctx.flush()
    85. 	 * 
    86. 	 */
    87. publicvoid(ChannelHandlerContext)throwsException{
    88. .flush();
    89. }
    90. 
    91. /*
    92. 	 * exceptionCaught
    93. 	 * 
    94. 	 * exception	异常
    95. 	 * Caught		抓住
    96. 	 * 
    97. 	 * 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接
    98. 	 * 
    99. 	 */
    100. publicvoid(ChannelHandlerContext,Throwable)
    101. throwsException{
    102. .close();
    103. System.out.println("异常信息:\r\n"+cause.getMessage());
    104. }
    105. 
    106. 
    107. 
    108. }
    NettyServer.java
     
    1. package.itstack;
    2.  
    3. import.netty.bootstrap.ServerBootstrap;
    4. import.netty.channel.ChannelFuture;
    5. import.netty.channel.ChannelOption;
    6. import.netty.channel.EventLoopGroup;
    7. import.netty.channel.nio.NioEventLoopGroup;
    8. import.netty.channel.socket.nio.NioServerSocketChannel;
    9.  
    10. publicclassNettyServer{
    11.  
    12. publicstaticvoid(String[]){
    13. 
    14. try{
    15. System.out.println("服务端开启等待客户端链接");
    16. newNettyServer().bing(7397);
    17. }catch(Exception){
    18. .printStackTrace();
    19. }
    20. 
    21. }
    22. 
    23. publicvoid(int)throwsException{
    24. 
    25. EventLoopGroup=newNioEventLoopGroup();
    26. EventLoopGroup=newNioEventLoopGroup();
    27. 
    28. try{
    29. 
    30. ServerBootstrap=newServerBootstrap();
    31. .group(bossGroup,);
    32. .channel(NioServerSocketChannel.class);
    33. .option(ChannelOption.SO_BACKLOG,1024);
    34. .childHandler(newChildChannelHandler());
    35. 
    36. // 绑定端口
    37. ChannelFuture=.bind(port).sync();
    38. 
    39. // 等待服务端监听端口关闭
    40. .channel().closeFuture().sync();
    41. 
    42. }finally{
    43. 
    44. // 优雅的退出
    45. .shutdownGracefully();
    46. .shutdownGracefully();
    47. }
    48. 
    49. }
    50. 
    51. }
    1、启动NettyServer 
    2、控制台输出:
    ----------------------------------------------
    服务端开启等待客户端链接
    ----------------------------------------------
    3、开启2个以上客户端模拟软件
     
    
    	4、
    
    
    	服务端端控制台输出:
    
    
    	----------------------------------------------
    
    
    	报告
    
    信息:有一客户端链接到本服务端
    
    IP:user-PC
    
    Port:7397
    
    报告完毕
    
    user-PC/192.168.30.223:7397 channelActive
    
    defa23d9Tue Dec 30 16:54:51 CST 2014 群号:5360692
    
    defa23d9Tue Dec 30 16:54:51 CST 2014 群号:5360692
    
    defa23d9Tue Dec 30 16:54:52 CST 2014 群号:5360692
    
    defa23d9Tue Dec 30 16:54:53 CST 2014 群号:5360692
    
    报告
    
    信息:有一客户端链接到本服务端
    
    IP:localhost.localdomain

    Port:7397

    报告完毕
    
    localhost.localdomain/127.0.0.1:7397 channelActive
    
    5f735249Tue Dec 30 16:55:02 CST 2014 1
    
    5f735249Tue Dec 30 16:55:03 CST 2014 1
    
    5f735249Tue Dec 30 16:55:04 CST 2014 1
    
    5f735249Tue Dec 30 16:55:04 CST 2014 1
    
    5f735249Tue Dec 30 16:55:04 CST 2014 1
    
    5f735249Tue Dec 30 16:55:05 CST 2014 1
    
    5f735249Tue Dec 30 16:55:05 CST 2014 1
    
    5f735249Tue Dec 30 16:55:05 CST 2014 1
    
    5f735249Tue Dec 30 16:55:05 CST 2014 1
    defa23d9Tue Dec 30 16:55:10 CST 2014

    其中一个客户端控制输出:

    ---------------------------------------------
    
    1. 您已经开启与服务端链接Dec3016:54:492014-PC/192.168.30.223:7397
    2. 服务端收到:defa23d9Tue Dec3016:54:512014群号:5360692
    3.  
    4. 服务端收到:defa23d9Tue Dec3016:54:512014群号:5360692
    5.  
    6. 服务端收到:defa23d9Tue Dec3016:54:522014群号:5360692
    7.  
    8. 服务端收到:defa23d9Tue Dec3016:54:532014群号:5360692
    9.  
    10. 服务端收到:5f735249TueDec3016:55:0220141
    11.  
    12. 服务端收到:5f735249TueDec3016:55:0320141
    13.  
    14. 服务端收到:5f735249TueDec3016:55:0420141
    15.  
    16. 服务端收到:5f735249TueDec3016:55:0420141
    17.  
    18. 服务端收到:5f735249TueDec3016:55:0420141
    19.  
    20. 服务端收到:5f735249TueDec3016:55:0520141
    21.  
    22. 服务端收到:5f735249TueDec3016:55:0520141
    23.  
    24. 服务端收到:5f735249TueDec3016:55:0520141
    25.  
    26. 服务端收到:5f735249TueDec3016:55:0520141
    27.  
    28. 服务端收到:defa23d9Tue Dec3016:55:102014 
    29. ---------------------------------------------

    https://www.xamrdz.com/database/6m51961110.html

    相关文章: