高性能网络通信框架Netty一套就够(上)

举报
摸鱼打酱油 发表于 2022/04/01 22:32:38 2022/04/01
【摘要】 个人简介作者是一个来自河源的大三在校生,以下笔记都是作者自学之路的一些浅薄经验,如有错误请指正,将来会不断的完善笔记,帮助更多的Java爱好者入门。@[toc] Netty Netty入门Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序 Netty著名项目由Netty开发的开源框架:dubboZookeeperRocketMQ N...

个人简介

作者是一个来自河源的大三在校生,以下笔记都是作者自学之路的一些浅薄经验,如有错误请指正,将来会不断的完善笔记,帮助更多的Java爱好者入门。

@[toc]

Netty

Netty入门

Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序

Netty著名项目

由Netty开发的开源框架:

  • dubbo
  • Zookeeper
  • RocketMQ

Netty的优势

  • 不需要自己构建协议,Netty自带了多种协议,例如HTTP协议
  • 解决了TCP传输问题,如粘包、半包
  • 解决了一个epoll空轮询的JDK bug。(作者遇到过),即selector的select方法默认是阻塞的,但是并没有阻塞会一直空轮询。
  • Netty对JDK的NIO API进行增强,如下:
    • ThreadLocal==>FastThreadLocal
    • ByteBuffer==>ByteBuf(重要),支持动态扩容,不像原厂的JDK的ByteBuffer超过缓存就报错

Netty Maven

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.65.Final</version>
        </dependency>

依赖说明:暂时不推荐使用Netty5,使用Netty4即可

第一个Netty应用

服务器端:

  private static final Logger log = LoggerFactory.getLogger(NettyServer.class);

  public static void main(String[] args) {

    // Netty的服务器端启动器,装配Netty组件
    new ServerBootstrap()
        // NioEventLoopGroup底层就是线程池+selector
        .group(new NioEventLoopGroup())
        // 通道
        .channel(NioServerSocketChannel.class)
        //“每一个”SocketChannel客户端连接上服务器端“都会”执行这个初始化器ChannelInitializer
        //但是每一个SocketChannel只能够让这个初始化器执行一次
        .childHandler(
            new ChannelInitializer<NioSocketChannel>() {
              @Override
              protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                  log.info("initChannel start......");
                  //往处理器流水线pipeline添加处理器
                  //因为'客户端'发送数据会进行'字符串的编码'再发送到服务器端,所以这里要'创建一个字符串解码器'StringDecoder
                  nioSocketChannel.pipeline().addLast(new StringDecoder());
                  //添加接收数据需要的处理器适配器
                  nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                      //重写通道的‘’读‘’方法,msg就是接收到的数据
                      @Override
                      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                          log.warn(msg.toString()); //打印数据
                          super.channelRead(ctx, msg);
                      }
                  });
                  log.info("initChannel end......");
              }
            })
        .bind(8082);
  }

客户端:

  private static final Logger log = LoggerFactory.getLogger(NettyClient.class);

  public static void main(String[] args) throws InterruptedException {

      //创建Netty客户端的启动器,装配Netty组件
    new Bootstrap()
        .group(new NioEventLoopGroup())
        .channel(NioSocketChannel.class)
        //一旦执行这个应用立刻初始化,这个和childHandler有所不同
        //childHandler是需要socket连接上在初始化,这个不需要。。。。。
        .handler(
            new ChannelInitializer<Channel>() {
              @Override
              protected void initChannel(Channel channel) throws Exception {
                  log.info("initChannel start......");
                  //由于发送的数据需要进行编码再发送,所以需要一个字符串编码器
                  //往通道流水线添加一个字符串编码器
                  channel.pipeline().addLast(new StringEncoder());
                  log.info("initChannel end......");
              }
            })
        // connect方法是“”异步“”的
        .connect("localhost", 8082)
        //坑点:由于connect方法是异步的,所以要同步。。。。。
        //由于connect方法是异步的,如果没有进行同步,可能会造成发送数据在连接服务器之前。
        //一般来说connect连接服务器大概需要>1s,而writeAndFlush是立刻发送数据,所以这里一定要使用sync方法进行同步
        .sync()
        // 获取通道。然后发送数据
        .channel()
        .writeAndFlush("hello你好");
  }

Netty组件

查看CPU最大核心数

int hx = NettyRuntime.availableProcessors(); //cpu核心数
EventLoop

事件循环对象EventLoop

EventLoop本质是一个单线程执行器(同时维护了一个 Selector),里面有run方法处理一个或多个Channel上源源不断的io事件

事件循环组EventLoopGroup

EventLoopGroup是一组EventLoop,而每一个EventLoop都维护着一个selector,Channel 一般会调用EventLoopGroup的register方法来绑定其中一个EventLoop。

      int count=3;
      EventLoopGroup ev=new NioEventLoopGroup(count);
      System.out.println(ev.next().hashCode());//1
      System.out.println(ev.next().hashCode());//2
      System.out.println(ev.next().hashCode());//3
      System.out.println(ev.next().hashCode());//4

通过上面的代码可以看出1和4是同一个对象,因为他们的hashCode相同。得出EventLoopGroup是一个线程池,里面装载着>1个的EventLoop,
EventLoop底层维护了一个线程和selector,而count可以指定EventLoopGroup的线程池大小。

EventLoop普通任务与定时任务

      EventLoopGroup ev=new NioEventLoopGroup(3);
      //普通任务
      ev.next().submit(()->{

          System.out.println("111");

      });

      System.out.println("222");

      //定时任务
      ev.next().scheduleAtFixedRate(()->{

          System.out.println("333");

      },0,1,TimeUnit.SECONDS);

关闭EventLoopGroup

      EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
      eventLoopGroup.shutdownGracefully(); //优雅的关闭EventLoopGroup

分工

      // Netty的服务器端启动器,装配Netty组件
      new ServerBootstrap()
               //******NioEventLoopGroup的分工合作,第一个NioEventLoopGroup处理accept事件
              //第二个NioEventLoopGroup处理读写事件
              .group(new NioEventLoopGroup(),new NioEventLoopGroup())
              // 通道
              .channel(NioServerSocketChannel.class)
              //“每一个”SocketChannel客户端连接上服务器端“都会”执行这个初始化器ChannelInitializer
              //但是每一个SocketChannel只能够让这个初始化器执行一次
              .childHandler(
                      new ChannelInitializer<NioSocketChannel>() {
                          @Override
                          protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                              log.info("initChannel start......");
                              //往处理器流水线pipeline添加处理器
                              //因为'客户端'发送数据会进行'字符串的编码'再发送到服务器端,所以这里要'创建一个字符串解码器'StringDecoder
                              nioSocketChannel.pipeline().addLast(new StringDecoder());
                              //添加接收数据需要的处理器适配器
                              nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                                  //重写通道的‘’读‘’方法,msg就是接收到的数据
                                  @Override
                                  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                      log.warn(msg.toString()); //打印数据
                                      super.channelRead(ctx, msg);
                                  }
                              });
                              log.info("initChannel end......");
                          }
                      })
              .bind(8082);
Channel

Channel常用方法:

  • close()
    • 可以用来关闭Channel
  • closeFuture()
    • 用来处理 Channel 的关闭
  • pipeline()
    • 添加处理器
  • write()
    • 写入数据,只有当缓冲满了或者调用了flush()方法后,才会将数据通过 Channel 发送出去
  • writeAndFlush()
    • 立即发送数据,相当于同时调用write和flush方法,好处是不用等缓存满了才能发出数据的问题

ChannelFuture

获取ChannelFuture

      //创建Netty客户端的启动器,装配Netty组件
      ChannelFuture channelFuture = new Bootstrap()
              .group(new NioEventLoopGroup())
              .channel(NioSocketChannel.class)
              //一旦执行这个应用立刻初始化,这个和childHandler有所不同
              //childHandler是需要socket连接上在初始化,这个不需要。。。。。
              .handler(
                      new ChannelInitializer<Channel>() {
                          @Override
                          protected void initChannel(Channel channel) throws Exception {
                              //由于发送的数据需要进行编码再发送,所以需要一个字符串编码器
                              //往通道流水线添加一个字符串编码器
                              channel.pipeline().addLast(new StringEncoder());
                          }
                      })
              // connect方法是“”异步“”的
              .connect("localhost", 8082);

发送数据的两种方式

  • sync同步channelFuture再发送数据
  • channelFuture添加监听器

这两种方法本质上都是为了让channelFuture成功创建也就是connect方法完成调用之后才发送数据

      //创建Netty客户端的启动器,装配Netty组件
      ChannelFuture channelFuture = new Bootstrap()
              .group(new NioEventLoopGroup())
              .channel(NioSocketChannel.class)
              //一旦执行这个应用立刻初始化,这个和childHandler有所不同
              //childHandler是需要socket连接上在初始化,这个不需要。。。。。
              .handler(
                      new ChannelInitializer<Channel>() {
                          @Override
                          protected void initChannel(Channel channel) throws Exception {
                              //由于发送的数据需要进行编码再发送,所以需要一个字符串编码器
                              //往通道流水线添加一个字符串编码器
                              channel.pipeline().addLast(new StringEncoder());
                          }
                      })
              // connect方法是“”异步“”的
              .connect("localhost", 8082);

      //"方法一":
      //由于connect方法是异步的,如果没有进行同步,可能会造成发送数据在连接服务器之前。
      //一般来说connect连接服务器大概需要>1s,而writeAndFlush是立刻发送数据,所以这里一定要使用sync方法进行同步

//      channelFuture.sync();
//      Channel channel = channelFuture.channel();
//      channel.writeAndFlush("你好");

      //方法二:使用监听器,监听channelFuture是否完成连接。因为channelFuture只有connect完成之后才会创建
      //使用这种监听器方法就不需要sync进行同步了
      channelFuture.addListener(new ChannelFutureListener() {
          //当connect成功连接之后就会进入这个方法
          @Override
          public void operationComplete(ChannelFuture future) throws Exception {

              Channel channel = future.channel();
              channel.writeAndFlush("operationComplete");
          }
      });

关闭通道channel

      EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
      //创建Netty客户端的启动器,装配Netty组件
      ChannelFuture channelFuture = new Bootstrap()
              .group(eventLoopGroup)
              .channel(NioSocketChannel.class)
              //一旦执行这个应用立刻初始化,这个和childHandler有所不同
              //childHandler是需要socket连接上在初始化,这个不需要。。。。。
              .handler(
                      new ChannelInitializer<Channel>() {
                          @Override
                          protected void initChannel(Channel channel) throws Exception {
                             //日志
                              channel.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
                            //由于发送的数据需要进行编码再发送,所以需要一个字符串编码器
                              //往通道流水线添加一个字符串编码器
                              channel.pipeline().addLast(new StringEncoder());
                          }
                      })
              // connect方法是“”异步“”的
              .connect("localhost", 8082);

      //"方法一":
      //由于connect方法是异步的,如果没有进行同步,可能会造成发送数据在连接服务器之前。
      //一般来说connect连接服务器大概需要>1s,而writeAndFlush是立刻发送数据,所以这里一定要使用sync方法进行同步

//      channelFuture.sync();
//      Channel channel = channelFuture.channel();
//      channel.writeAndFlush("你好");

      //方法二:使用监听器,监听channelFuture是否完成连接。因为channelFuture只有connect完成之后才会创建
      //使用这种监听器方法就不需要sync进行同步了
      channelFuture.addListener(new ChannelFutureListener() {
          //当connect成功连接之后就会进入这个方法
          @Override
          public void operationComplete(ChannelFuture future) throws Exception {

              Channel channel = future.channel();
              channel.writeAndFlush("operationComplete");
              //只有close之后才会调用下面的关闭监听器
              channel.close(); //关闭channel,这个关闭方法也是**异步**的,所以也需要进行监听

              ChannelFuture closeFuture = channel.closeFuture();

              //关闭通道监听器
              closeFuture.addListener(new ChannelFutureListener() {
                  @Override
                  public void operationComplete(ChannelFuture future) throws Exception {
                      log.info("已经关闭channel");
                      //关闭group
                      eventLoopGroup.shutdownGracefully();
                  }
              });

          }
      });
Future&Promise

Future都是用线程池去返回得到的,所以JDK Future需要依赖线程池,Netty Future需要依赖于EventLoopGroup

JDK Futhure和Netty Future、Netty Promise区别:

Netty的Future继承与JDK的Future,Netty Promise又对Netty Future进行扩展

  • JDK Future只能同步等待任务结束(或成功、或失败)才能得到结果,例如JDK Future的get是阻塞的获取结果
  • Netty Future既阻塞的获取结果,也可以非阻塞的获取结果,阻塞就是get,非阻塞就是getNow。
  • Netty Promise有Netty Future所有的功能且增加了几个方法,setSuccess、setFailure,而且脱离了任务独立存在,只作为两个线程间传递结果的容器。

JDK Future

      ExecutorService executorService = Executors.newFixedThreadPool(2); //创建一个固定大小的线程池
      //Callable有返回值。
      Future<String> future = executorService.submit(new Callable<String>() {
          @Override
          public String call() throws Exception {
              Thread.sleep(1000);
              return "hello";
          }
      });

      String res = future.get(); //get方法会阻塞,直到线程池的submit执行完毕,返回了future对象才会解除阻塞
      System.out.println(res);
      executorService.shutdown(); //关闭线程池

Netty Future

      EventLoopGroup eventLoopGroup = new NioEventLoopGroup(2);
      Future<String> future = eventLoopGroup.next().submit(new Callable<String>() {
          @Override
          public String call() throws Exception {
              Thread.sleep(1000);
              return "Netty Future";
          }
      });

//      String s1 = future.get(); //阻塞方法,这个方法和jdk的future一样
//      System.out.println(s1);

      String s2 = future.getNow(); //非阻塞方法,如果future没有立刻返回值则不会等待,直接返回null
      System.out.println(s2);

Netty Promise

Promise相当于一个容器,可以用于存放各个线程中的结果,然后让其他线程去获取该结果

      NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
      EventLoop executors = eventLoopGroup.next();
      DefaultPromise<Integer> promise = new DefaultPromise<>(executors);
      new Thread(()->{

          try {
              Thread.sleep(1000);
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
          promise.setSuccess(100);

      }).start();
      Integer res = promise.get();
      System.out.println(res);
Handler&Pipeline

服务端:

              new ServerBootstrap()
              .group(new NioEventLoopGroup(),new NioEventLoopGroup(2))
              .channel(NioServerSocketChannel.class)
              .childHandler(new ChannelInitializer<SocketChannel>() {

                  //pipeline结构
                  //head->handle1->handle2->handle3->handle4->handle5->handle6->tail
                  //且为‘双向链表’,触发Inbound事件则会从head->tail一直走Inbound方法。
                  //触发Outbound事件则会从tail->head一直走Outbound方法。只有触发了对应事件才会走对应的方法。。。。。。
                  @Override
                  protected void initChannel(SocketChannel socketChannel) throws Exception {

                      socketChannel.pipeline().addLast(new StringDecoder());

                      //Inbound处理器
                      //为处理器取名字
                      socketChannel.pipeline().addLast("handle1",new ChannelInboundHandlerAdapter(){

                          @Override
                          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                              log.warn(Thread.currentThread().getName()+"==>"+"handle1");
                              super.channelRead(ctx, msg); //向下传递
                          }
                      });

                      socketChannel.pipeline().addLast("handle2",new ChannelInboundHandlerAdapter(){
                          @Override
                          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                              log.warn(msg.toString());
                              log.warn(Thread.currentThread().getName()+"==>"+"handle2");
                              super.channelRead(ctx, msg); //向下传递
                          }
                      });

                      socketChannel.pipeline().addLast("handle3",new ChannelInboundHandlerAdapter(){
                          @Override
                          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

                              //***不能用这种方法,client会收不到
//                              ByteBuffer buffer = StandardCharsets.UTF_8.encode("hello world");


                              //***用这种,记住*****一定要指定字符类型UTF-8***
                              ByteBuf byteBuf = ctx.alloc().buffer().writeBytes("hello".getBytes("utf-8"));
                              //发送数据,触发OutBound事件
                              socketChannel.writeAndFlush(byteBuf);

                              log.warn(Thread.currentThread().getName()+"==>"+"handle3");
                              super.channelRead(ctx, msg); //向下传递
                          }
                      });

                      //Outbound处理器
                      socketChannel.pipeline().addLast("handle4",new ChannelOutboundHandlerAdapter(){

                          @Override
                          public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                              log.warn(Thread.currentThread().getName()+"==>"+"handle4");
                              super.write(ctx, msg, promise);
                          }
                      });

                      socketChannel.pipeline().addLast("handle5",new ChannelOutboundHandlerAdapter(){

                          @Override
                          public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                              log.warn(Thread.currentThread().getName()+"==>"+"handle5");
                              super.write(ctx, msg, promise);
                          }
                      });

                      socketChannel.pipeline().addLast("handle6",new ChannelOutboundHandlerAdapter(){

                          @Override
                          public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                              log.warn(Thread.currentThread().getName()+"==>"+"handle6");
                              super.write(ctx, msg, promise);
                          }
                      });
                  }
              }).bind(8080);

客户端:

      NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
      ChannelFuture channelFuture = new Bootstrap()
              .group(eventLoopGroup)
              .channel(NioSocketChannel.class)
              .handler(new ChannelInitializer<Channel>() {
                  @Override
                  protected void initChannel(Channel ch) throws Exception {

                      ch.pipeline().addLast(new StringEncoder());
                      ch.pipeline().addLast(new LoggingHandler());

                      ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){

                          @Override
                          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                              System.out.println("--------------"+msg.toString());
                              super.channelRead(ctx, msg);
                          }
                      });

                  }
              }).connect("localhost", 8080);

      channelFuture.addListener(new ChannelFutureListener() {
          @Override
          public void operationComplete(ChannelFuture future) throws Exception {

              Channel channel = future.channel();
              channel.writeAndFlush("client-----");
//              channel.close();
//              ChannelFuture closeFuture = channel.closeFuture();
//              closeFuture.addListener(new ChannelFutureListener() {
//                  @Override
//                  public void operationComplete(ChannelFuture future) throws Exception {
//                      eventLoopGroup.shutdownGracefully();
//                  }
//              });
          }
      });

通过channel.pipeline().addLast(name, handler)添加handler时,记得给handler取名字。这样可以调用pipeline的addAfter、addBefore等方法更灵活地向pipeline中添加handler.

handler需要放入通道的pipeline中,才能根据放入顺序来使用handler:

  • pipeline是结构是一个带有head与tail指针的双向链表,其中的节点为handler处理器
    • 要通过ctx.fireChannelRead(msg)等方法,将当前handler的处理结果传递给下一个handler
  • 当有入站(Inbound)操作时,会从head开始向tail方向调用handler,直到handler不是处理Inbound操作为止
  • 当有出站(Outbound)操作时,会从tail开始向head方向调用handler,直到handler不是处理Outbound操作为止

结构图:

p

ByteBuf

创建ByteBuf

      //创建ByteBuf
      ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(10);

      log(byteBuf);

      StringBuffer stringBuffer = new StringBuffer();

    for (int i = 0; i < 50; i++) {
      stringBuffer.append('1');
    }
    byteBuf.writeBytes(stringBuffer.toString().getBytes("utf-8"));
    log(byteBuf);

运行结果:

read index:0 write index:0 capacity:10

read index:0 write index:50 capacity:64
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 |1111111111111111|
|00000010| 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 |1111111111111111|
|00000020| 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 |1111111111111111|
|00000030| 31 31                                           |11              |
+--------+-------------------------------------------------+----------------+

Process finished with exit code 0

根据打印的capacity可知ByteBuf是会自动扩容的,而NIO的ByteBuffer是不能超出容量的。

public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
    static final int DEFAULT_INITIAL_CAPACITY = 256; //默认初始化容量
    static final int DEFAULT_MAX_CAPACITY = Integer.MAX_VALUE; //最大容量
    static final int DEFAULT_MAX_COMPONENTS = 16;

ByteBuf通过ByteBufAllocator选择allocator并调用对应的buffer()方法来创建的,默认使用直接内存作为ByteBuf,容量为256个字节,可以指定初始容量的大小

如果在handler中创建ByteBuf,建议使用ChannelHandlerContext ctx.alloc().buffer()来创建

3种创建池化的ByteBuf方式

      ByteBuf byteBuf1 = ByteBufAllocator.DEFAULT.buffer(10); //默认创建的是‘’直接内存‘’的ByteBuf

      ByteBuf byteBuf2 = ByteBufAllocator.DEFAULT.heapBuffer(10);//指定创建‘’堆内存‘’的ByteBuf

      ByteBuf byteBuf3 = ByteBufAllocator.DEFAULT.directBuffer(10);//指定创建‘’直接内存‘’的ByteBuf

查看当前ByteBuf对象类型

      ByteBuf byteBuf1 = ByteBufAllocator.DEFAULT.buffer(10); //默认创建的是‘’直接内存‘’的ByteBuf

      ByteBuf byteBuf2 = ByteBufAllocator.DEFAULT.heapBuffer(10);//指定创建‘’堆内存‘’的ByteBuf

      ByteBuf byteBuf3 = ByteBufAllocator.DEFAULT.directBuffer(10);//指定创建‘’直接内存‘’的ByteBuf

      System.out.println(byteBuf1.getClass());
      System.out.println(byteBuf2.getClass());
      System.out.println(byteBuf3.getClass());

输出结果:

class io.netty.buffer.PooledUnsafeDirectByteBuf
class io.netty.buffer.PooledUnsafeHeapByteBuf
class io.netty.buffer.PooledUnsafeDirectByteBuf

池化和非池化

  • Netty4.1之前默认是非池化
  • Netty4.1之后默认是池化,但是Android平台默认是非池化

池化优点:

  • 本质上池化的意义就是可重用ByteBuf
    • 没有池化的话每次需要使用ByteBuf都要重新申请内存。即使是堆内存,释放内存也会增大GC的压力
    • 有了池化,则可以重用池中ByteBuf实例,并且采用了与jemalloc类似的内存分配算法提升分配效率
    • 高并发下,池化更节约内存,减少内存溢出的可能。

IDEA IDE如何设置为非池化

只需要在IDEA IDE的VM options里面设置下面一段代码即可:

-Dio.netty.allocator.type={unpooled|pooled}

ByteBuf组成

  • 最大容量与当前容量
    • 在构造ByteBuf时,可传入两个参数,分别代表初始容量和最大容量,若未传入第二个参数(最大容量),最大容量默认为Integer.MAX_VALUE
    • 当ByteBuf容量无法容纳所有数据时,会进行扩容操作,若超出最大容量,会抛出java.lang.IndexOutOfBoundsException异常
  • 读写操作不同于ByteBuffer只用position进行控制,ByteBuf分别由读指针写指针两个指针控制。进行读写操作时,无需进行模式的切换
    • 读指针前的部分被称为废弃部分,是已经读过的内容
    • 读指针与写指针之间的空间称为可读部分
    • 写指针与当前容量之间的空间称为可写部分

20210423143030

ByteBuf写入

      ByteBuf byteBuf1 = ByteBufAllocator.DEFAULT.buffer(10); //默认创建的是‘’直接内存‘’的ByteBuf
      
      byteBuf1.writeBytes("hello".getBytes("utf-8"));

write和set方法的区别:

ByteBuf中set开头的一系列方法,也可以写入数据,但不会改变写指针位置

ByteBuf的扩容机制

当ByteBuf中的当前容量无法容纳写入的数据时,会自动进行扩容

触发扩容:

      ByteBuf byteBuf1 = ByteBufAllocator.DEFAULT.buffer(10); //默认创建的是‘’直接内存‘’的ByteBuf
      log(byteBuf1);
      byteBuf1.writeBytes("helloaaaaaaaa".getBytes("utf-8"));
      log(byteBuf1);

结果:

read index:0 write index:0 capacity:10

read index:0 write index:13 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 61 61 61 61 61 61 61 61          |helloaaaaaaaa   |
+--------+-------------------------------------------------+----------------+

扩容机制如下:

有两种情况:

  • 写入后的数据小于512字节
    • 这种情况会选择使用16的整数倍进行扩容,比如写入后的数据是14字节,则16*1为最小整数倍,则会扩容到16字节
  • 写入后的数据大于512字节
    • 这种情况会以2的n次方扩容,例如写入后的数据是600字节,此时大于512字节,那么容纳它的容量为2的10次方,因为2的9次方是512容纳不了,所以会扩容到1024字节
    • 如果扩容后的大小大于maxCapacity,则会抛出java.lang.IndexOutOfBoundsException异常

ByteBuf读取

读取后会移动读指针

      ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(10);

      byteBuf.writeBytes("hello".getBytes("utf-8"));

      byte b[]=new byte[5];
      byteBuf.readBytes(b);

     System.out.println(Arrays.toString(b));

ByteBuf以get开头的方法,这些方法不会改变读指针的位置

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。