高性能网络通信框架Netty一套就够(下集)

举报
摸鱼打酱油 发表于 2022/04/01 22:34:34 2022/04/01
【摘要】 Netty进阶 粘包和半包/拆包问题粘包问题演示服务器端: private static final Logger log= LoggerFactory.getLogger(NettyServer.class); public static void main(String[] args) { NioEventLoopGroup boss = new NioEventLoop...

Netty进阶

粘包和半包/拆包问题

粘包问题演示

服务器端:

  private static final Logger log= LoggerFactory.getLogger(NettyServer.class);

  public static void main(String[] args) {

      NioEventLoopGroup boss = new NioEventLoopGroup(1);
      NioEventLoopGroup worker = new NioEventLoopGroup(6);

      new ServerBootstrap()
              .group(boss,worker)
              .channel(NioServerSocketChannel.class)
              .childHandler(new ChannelInitializer<NioSocketChannel>() {
                  @Override
                  protected void initChannel(NioSocketChannel ch) throws Exception {

                      //不进行加解密不然展示不出粘包效果
//                      ch.pipeline().addLast(new StringDecoder());

                      ch.pipeline().addLast(new LoggingHandler());

                      ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){

                          @Override
                          public void channelActive(ChannelHandlerContext ctx) throws Exception {
                              log.info("客户端已成功连接服务器");
                              super.channelActive(ctx);
                          }

                          @Override
                          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

                              log.info("msg={}",msg);
                              super.channelRead(ctx, msg);
                          }
                      });


                  }
              }).bind(8080);
  }

客户端:

  private static final Logger log= LoggerFactory.getLogger(NettyClient.class);

  public static void main(String[] args) {

    NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
    ChannelFuture channelFuture = new Bootstrap()
            .group(eventLoopGroup)
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<Channel>() {
              @Override
              protected void initChannel(Channel ch) throws Exception {

                //不进行加解密不然展示不出粘包效果
//                ch.pipeline().addLast(new StringEncoder());
                ch.pipeline().addLast(new LoggingHandler());



              }
            }).connect("localhost", 8080);

    channelFuture.addListener(new ChannelFutureListener() {
      @Override
      public void operationComplete(ChannelFuture future) throws Exception {

        Channel channel = future.channel();

        ByteBuf byteBuf = channel.alloc().buffer(16);
        for (int i=0;i<10;i++){
          byteBuf.retain();
          byteBuf.writeBytes(("hello").getBytes("utf-8"));
          channel.writeAndFlush(byteBuf);
          byteBuf.clear();
        }

        channel.close();

        ChannelFuture closeFuture = channel.closeFuture();
        closeFuture.addListener(new ChannelFutureListener() {
          @Override
          public void operationComplete(ChannelFuture future) throws Exception {
            eventLoopGroup.shutdownGracefully();
          }
        });
      }
    });

  }

服务器端输出结果:

16:00:37.869 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xa191631f, L:/127.0.0.1:8080 - R:/127.0.0.1:53693] READ: 50B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 68 65 6c 6c 6f 68 65 6c 6c 6f 68 |hellohellohelloh|
|00000010| 65 6c 6c 6f 68 65 6c 6c 6f 68 65 6c 6c 6f 68 65 |ellohellohellohe|
|00000020| 6c 6c 6f 68 65 6c 6c 6f 68 65 6c 6c 6f 68 65 6c |llohellohellohel|
|00000030| 6c 6f                                           |lo              |
+--------+-------------------------------------------------+----------------+

可以看出原来我们是在客户端分10次发送,而服务器端却一下把10次的数据都粘在一起了,这就是粘包问题。

半包问题展示

服务器端:

  private static final Logger log= LoggerFactory.getLogger(NettyServer.class);

  public static void main(String[] args) {

      NioEventLoopGroup boss = new NioEventLoopGroup(1);
      NioEventLoopGroup worker = new NioEventLoopGroup(6);

    new ServerBootstrap()
        .group(boss, worker)
        .channel(NioServerSocketChannel.class)
        // 半包问题:例如,发送方发送100字节数据,而接收方最多只能接收30字节数据,这就是半包问题
            //option(ChannelOption.SO_RCVBUF,10),调整接收缓冲区大小(滑动窗口)
        .option(ChannelOption.SO_RCVBUF,10)
        .childHandler(
            new ChannelInitializer<NioSocketChannel>() {
              @Override
              protected void initChannel(NioSocketChannel ch) throws Exception {

                // 不进行加解密不然展示不出粘包效果
                //                      ch.pipeline().addLast(new StringDecoder());

                ch.pipeline().addLast(new LoggingHandler());

                ch.pipeline()
                    .addLast(
                        new ChannelInboundHandlerAdapter() {

                          @Override
                          public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            log.info("客户端已成功连接服务器");
                            super.channelActive(ctx);
                          }

                          @Override
                          public void channelRead(ChannelHandlerContext ctx, Object msg)
                              throws Exception {

                            log.info("msg={}", msg);
                            super.channelRead(ctx, msg);
                          }
                        });
              }
            })
        .bind(8080);
  }

只需使用这个方法即可

  • option(ChannelOption.SO_RCVBUF,10)

option(ChannelOption.SO_RCVBUF,10),调整接收缓冲区大小。由于接收缓存区的大小<发送方发送的数据大小,所以产生了半包问题。

现象分析

粘包:

  • 产生现象
    • 第一次发送abc,第二次发送def,接收到的是一整个abcdef
  • 原因
    • Netty层
      • 接收方的接收缓冲区太大,Netty的接收缓冲区默认是1024字节
    • 网络层
      • TCP滑动窗口:假如发送方发送100字节数据,而滑动窗口缓冲区可容纳>100字节数据,这时候就会出现粘包问题。
      • Nagle 算法:会造成粘包

半包/拆包:

  • 产生现象
    • 发送abcdef数据,接收方第一次收到ab,第二次收到cd,第三次收到ef
  • 原因
    • Netty层
      • 接收方的接收缓冲区太小,发送方的数据过大,导致接收方无法一次接收下所有数据,就会半包/拆包
    • 网络层
      • 滑动窗口:假设接收方的窗口只剩了128bytes,发送方的报文大小是256bytes,这时接收方窗口中无法容纳发送方的全部报文,发送方只能先发送前128bytes,等待ack后才能发送剩余部分,这就造成了半包
    • 数据链路层
      • MSS 限制:当发送的数据超过MSS限制后,会将数据切分发送,就会造成半包

发送这些问题的本质:因为 TCP 是流式协议,消息无边界

粘包和半包/拆包解决方案

短连接

短连接:即每发送一条数据就重新连接再次发送,反复此操作。

短连接的缺点是显而易见的,每次发送一条数据都要重新连接,这样会大大的浪费时间,因为连接是需要时间的。

客户端每次向服务器发送数据以后,就与服务器断开连接,此时的消息边界为连接建立到连接断开。
这时便无需使用滑动窗口等技术来缓冲数据,则不会发生粘包现象。
但如果一次性数据发送过多,接收方无法一次性容纳所有数据,还是会发生半包现象,所以短链接无法解决半包现象

采用短连接解决粘包代码

服务端:

  private static final Logger log = LoggerFactory.getLogger(NettyServer.class);

  public static void main(String[] args) {

    NioEventLoopGroup boss = new NioEventLoopGroup(1);
    NioEventLoopGroup worker = new NioEventLoopGroup(6);

    new ServerBootstrap()
        .group(boss, worker)
        .channel(NioServerSocketChannel.class)
        .childHandler(
            new ChannelInitializer<NioSocketChannel>() {
              @Override
              protected void initChannel(NioSocketChannel ch) throws Exception {

                ch.pipeline().addLast(new LoggingHandler());

                ch.pipeline()
                    .addLast(
                        new ChannelInboundHandlerAdapter() {

                          @Override
                          public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            log.info("客户端已成功连接服务器");
                            super.channelActive(ctx);
                          }

                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                              log.info("msg="+msg);
                              super.channelRead(ctx, msg);
                            }
                        });
              }
            })
        .bind(8080);
  }

客户端:

  private static final Logger log= LoggerFactory.getLogger(NettyClient.class);

  public static void main(String[] args) {

    //采用短连接解决“”粘包“”问题,无法解决半包问题
    for (int i = 0; i < 10; i++) {
      sendMessage("hello");
    }


  }


  public static void sendMessage(String msg){

     NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
     ChannelFuture channelFuture = new Bootstrap()
            .group(eentLoopGroup)
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<Channel>() {
              @Override
              protected void initChannel(Channel ch) throws Exception {

                //不进行加解密不然展示不出粘包效果
//                ch.pipeline().addLast(new StringEncoder());
                ch.pipeline().addLast(new LoggingHandler());

                ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                  @Override
                  public void channelActive(ChannelHandlerContext ctx) throws Exception {
                    ByteBuf buffer = ctx.alloc().buffer(16);
                    buffer.writeBytes(msg.getBytes("utf-8"));
                    ch.writeAndFlush(buffer);
                    ch.close();
                    ChannelFuture closeFuture = ch.closeFuture();
                    closeFuture.addListener(new ChannelFutureListener() {
                      @Override
                      public void operationComplete(ChannelFuture future) throws Exception {
                        eventLoopGroup.shutdownGracefully();
                      }
                    });
                  }
                });

              }
            }).connect("localhost", 8080);


  }
定长解码器

客户端于服务器约定一个最大长度,保证客户端每次发送的数据长度都不会大于该长度。若发送数据长度不足则需要补齐至该长度。
服务器接收数据时,将接收到的数据按照约定的最大长度进行拆分,即使发送过程中产生了粘包,也可以通过定长解码器将数据正确地进行拆分。服务端需要用到FixedLengthFrameDecoder对数据进行定长解码

行解码器(推荐)

对于其他解码器,我还是更喜欢行解码器。行解码器主要是靠分隔符\n来判断行进行解码,不过需要进行限制长度,以免服务器一直搜索\n造成卡死。

改造前的粘包代码

服务端:

      NioEventLoopGroup boss = new NioEventLoopGroup(1);
      NioEventLoopGroup worker = new NioEventLoopGroup(6);
      new ServerBootstrap()
              .group(boss,worker)
              .channel(NioServerSocketChannel.class)
              .childHandler(new ChannelInitializer<NioSocketChannel>() {
                  @Override
                  protected void initChannel(NioSocketChannel ch) throws Exception {

                      ch.pipeline().addLast(new LoggingHandler());

                      ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){

                          @Override
                          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                              log.info("msg={}",msg);
                              super.channelRead(ctx, msg);
                          }
                      });

                  }
              }).bind(8080);

客户端:

     NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
      try{
      new Bootstrap()
              .group(nioEventLoopGroup)
              .channel(NioSocketChannel.class)
              .handler(new ChannelInitializer<Channel>() {
                  @Override
                  protected void initChannel(Channel ch) throws Exception {

                      ch.pipeline().addLast(new LoggingHandler());

                      ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){

                          @Override
                          public void channelActive(ChannelHandlerContext ctx) throws Exception {

                              ByteBuf buffer = ctx.alloc().buffer(16);

                              for(int i=0;i<10;i++){
                                  buffer.retain();
                                  buffer.writeBytes("hello world".getBytes("utf-8"));
                                  ctx.channel().writeAndFlush(buffer);
                              }

                              ch.close();//关闭Channel
                              ChannelFuture closeFuture = ch.closeFuture();
                              closeFuture.addListener(new ChannelFutureListener() {
                                  @Override
                                  public void operationComplete(ChannelFuture future) throws Exception {
                                      nioEventLoopGroup.shutdownGracefully();
                                  }
                              });
                          }
                      });
                  }
              }).connect("localhost",8080);
      }catch (Exception e){
          e.printStackTrace();
      }

结果:

13:37:15.286 [nioEventLoopGroup-3-3] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x36ce6c5f, L:/127.0.0.1:8080 - R:/127.0.0.1:64550] READ: 110B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64 68 65 6c 6c 6f |hello worldhello|
|00000010| 20 77 6f 72 6c 64 68 65 6c 6c 6f 20 77 6f 72 6c | worldhello worl|
|00000020| 64 68 65 6c 6c 6f 20 77 6f 72 6c 64 68 65 6c 6c |dhello worldhell|
|00000030| 6f 20 77 6f 72 6c 64 68 65 6c 6c 6f 20 77 6f 72 |o worldhello wor|
|00000040| 6c 64 68 65 6c 6c 6f 20 77 6f 72 6c 64 68 65 6c |ldhello worldhel|
|00000050| 6c 6f 20 77 6f 72 6c 64 68 65 6c 6c 6f 20 77 6f |lo worldhello wo|
|00000060| 72 6c 64 68 65 6c 6c 6f 20 77 6f 72 6c 64       |rldhello world  |
+--------+-------------------------------------------------+----------------+

接收方使用行解码器改造后

服务端:

  private static final Logger log= LoggerFactory.getLogger(NettyServer.class);

  public static void main(String[] args) {

      NioEventLoopGroup boss = new NioEventLoopGroup(1);
      NioEventLoopGroup worker = new NioEventLoopGroup(6);
      new ServerBootstrap()
              .group(boss,worker)
              .channel(NioServerSocketChannel.class)
              .childHandler(new ChannelInitializer<NioSocketChannel>() {
                  @Override
                  protected void initChannel(NioSocketChannel ch) throws Exception {

                      ch.pipeline().addLast(new LineBasedFrameDecoder(1024));//配置行解码器
                      
                      ch.pipeline().addLast(new LoggingHandler());
                      ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){

                          @Override
                          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                              log.info("msg={}",msg);
                              super.channelRead(ctx, msg);
                          }
                      });

                  }
              }).bind(8080);

客户端:

 //把消息加工成可以被行解码器识别的消息
    private static String getMsg(String oldMsg){

        oldMsg+='\n';
        return oldMsg;
    }

  public static void main(String[] args) {

      NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
      try{
      new Bootstrap()
              .group(nioEventLoopGroup)
              .channel(NioSocketChannel.class)
              .handler(new ChannelInitializer<Channel>() {
                  @Override
                  protected void initChannel(Channel ch) throws Exception {

                      ch.pipeline().addLast(new LoggingHandler());

                      ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){

                          @Override
                          public void channelActive(ChannelHandlerContext ctx) throws Exception {

                              ByteBuf buffer = ctx.alloc().buffer(16);

                              for(int i=0;i<10;i++){
                                  buffer.retain();
                                  String msg = getMsg("hello world");
                                  buffer.writeBytes(msg.getBytes("utf-8"));
                                  ctx.channel().writeAndFlush(buffer);
                                  //清理缓存,防止数据堆叠
                                  buffer.clear();
                              }

                              ch.close();//关闭Channel
                              ChannelFuture closeFuture = ch.closeFuture();
                              closeFuture.addListener(new ChannelFutureListener() {
                                  @Override
                                  public void operationComplete(ChannelFuture future) throws Exception {
                                      nioEventLoopGroup.shutdownGracefully();
                                  }
                              });
                          }
                      });
                  }
              }).connect("localhost",8080);
      }catch (Exception e){
          e.printStackTrace();
      }

  }
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。