Java NIO由浅入深(作者原创)

举报
摸鱼打酱油 发表于 2022/04/01 22:41:00 2022/04/01
1.8k+ 0 0
【摘要】 个人简介作者是一个来自河源的大三在校生,以下笔记都是作者自学之路的一些浅薄经验,如有错误请指正,将来会不断的完善笔记,帮助更多的Java爱好者入门。@[toc] NIO三大组件 Channel and BufferJava NIO的核心:通道(Channel)和缓冲区(Buffer),通道是用来传输数据的,缓冲区是存储数据的。常见的Channel有以下四种,其中FileChannel主要用...

个人简介

作者是一个来自河源的大三在校生,以下笔记都是作者自学之路的一些浅薄经验,如有错误请指正,将来会不断的完善笔记,帮助更多的Java爱好者入门。
@[toc]

NIO三大组件

Channel and Buffer

Java NIO的核心:通道(Channel)和缓冲区(Buffer),通道是用来传输数据的,缓冲区是存储数据的。

常见的Channel有以下四种,其中FileChannel主要用于文件传输,其余三种用于网络通信。

  • FileChannel
  • SocketChannel
  • DatagramChannel
  • ServerSocketChannel

Buffer有几种,使用最多的是ByteBuffer

  • ByteBuffer
    • MappedByteBuffer
    • DirectByteBuffer
    • HeapByteBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer
  • CharBuffer

8大基本数据类型除了boolean没有Buffer,其余的7种基本类型都有

Selector

未使用Selector之前,有如下几种方案

1.多线程技术

实现逻辑 :每一个连接进来都开一个线程去处理Socket。

缺点:

  • 如果同时有100000个(大量)连接进来,系统大概率是挡不住的,而且线程会占用内存,会导致内存不足。
  • 线程需要进行上下文切换,成本高

2.采用线程池技术

实现逻辑 :创建一个固定大小(系统能够承载的线程数)的线程池对象,去处理连接的请求,假如线程池大小为
100个线程数,这时候同时并发连接1000个Socket,此时只有100个Socket会得到处理,其余的会阻塞。这样很好的防止了系统线程数
过多导致线程占用内存大,不容易导致系统由于内存占用的问题而崩溃。

相对于第一种多线程技术处理客户端Socket,第二种方案使用线程池去处理连接会更好,但是还是不够好

缺点:

  • 阻塞模式下,线程仅能处理一个连接,若socket连接一直未断开,则该线程无法处理其他socket。

3.使用Selector选择器
img1.png

selector的作用就是配合一个线程来管理多个channel,获取这些 channel 上发生的事件,这些 channel 工作在非阻塞模式下,当一个channel中没有执行任务时,可以去执行其他channel中的任务

注意:fileChannel因为是阻塞式的,所以无法使用selector

使用场景:适合连接数多,但流量较少的场景

流程: 假如当前Selector绑定的Channels没有任何一个Channel触发了感兴趣的事件,
则selector的select()方法会阻塞线程,直到channel触发了事件。这些事件发生后,select方法就会返回这些事件交给thread来处理。

IO and NIO 区别

区别:

  • IO是面向的,NIO是面向缓冲区()的
  • Java IO的各种流是阻塞的,而Java NIO是非阻塞的
  • Java NIO的选择器允许一个单独的线程来监视多个输入通道

普通io读取文件

@Test
    public void test01(){

        try {
            FileInputStream fileInputStream = new FileInputStream("data.txt");

            long start = System.currentTimeMillis();

            byte bytes[]=new byte[1024];

            int n=-1;
            while ((n=fileInputStream.read(bytes,0,1024))!=-1){

                String s = new String(bytes,0,n,"utf-8");

                System.out.println(s);
            }
            long end = System.currentTimeMillis();
            System.out.println("普通io共耗时:"+(end-start)+"ms");
        } catch (Exception e) {
            e.printStackTrace();
        }
        
    }

缓冲流IO读取文件


@Test
    public void test02(){

        try {
            BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream("data.txt"));

            long start = System.currentTimeMillis();

            byte bytes[]=new byte[1024];

            int n=-1;

            while ((n=bufferedInputStream.read(bytes,0,1024))!=-1){

                String s = new String(bytes,0,n,"utf-8");

                System.out.println(s);
            }

            long end = System.currentTimeMillis();
            System.out.println("缓冲流io共耗时:"+(end-start)+"ms");
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

Nio-FileChannel读取文件


//方式1
@Test
    public void test3(){

        try {
            //获取channel,FileInputStream生成的channel只有读的权利
            FileChannel channel = new FileInputStream("data.txt").getChannel();
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024); //开辟一块缓冲区

            long start = System.currentTimeMillis();
            while (true){

                //写入操作
                int read = channel.read(byteBuffer); //如果read=-1,说明缓存“块”没有数据了

                if(read==-1){
                    break;
                }else {

                    byteBuffer.flip();//读写切换,切换为读的操作,实质上就是把limit=position,position=0

                    String de = StandardCharsets.UTF_8.decode(byteBuffer).toString();
                    System.out.println(de);

                    byteBuffer.clear(); //切换为写
                }
            }
            long end = System.currentTimeMillis();
            System.out.println("heap nio共耗时:"+(end-start)+"ms");

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

//方式2
@Test
    public void test4(){

        ByteBuffer byteBuffer = ByteBuffer.allocate(10);

        byteBuffer.put("helloWorld".getBytes());

        debugAll(byteBuffer);

        byteBuffer.flip(); //读模式

        while (byteBuffer.hasRemaining()){

            System.out.println((char)byteBuffer.get());
        }
        byteBuffer.flip();

        System.out.println(StandardCharsets.UTF_8.decode(byteBuffer).toString());

    }

ByteBuffer

创建ByteBuffer缓冲区:

  • ByteBuffer.allocate(int capacity)
  • ByteBuffer.allocateDirect(int capacity)
  • ByteBuffer.wrap(byte[] array,int offset, int length)

ByteBuffer常用方法:

  • get()
  • get(int index)
  • put(byte b)
  • put(byte[] src)
  • limit(int newLimit)
  • mark()
  • reset()
  • clear()
  • flip()
  • compact()

字符串与ByteBuffer的相互转换

字符串转换成ByteBuffer

ByteBuffer byteBuffer = StandardCharsets.UTF_8.encode("hello world\nabc\n\baaa");

ByteBuffer转换成String

String str = StandardCharsets.UTF_8.decode(byteBuffer).toString();

整个Demo

 @Test
    public void test5(){
        //字符串转换成ByteBuffer
        ByteBuffer byteBuffer = StandardCharsets.UTF_8.encode("hello world\nabc\n\baaa");
        //通过StandardCharsets的encode方法获得ByteBuffer,此时获得的ByteBuffer为读模式,无需通过flip切换模式
//        byteBuffer.flip(); //这句话不能加,encode转换成ByteBuffer默认是读模式
        while (byteBuffer.hasRemaining()){

            System.out.printf("%c",(char)byteBuffer.get());
        }

        byteBuffer.flip();
        //ByteBuffer转换成String
        String str = StandardCharsets.UTF_8.decode(byteBuffer).toString();
        System.out.println("\n--------------");
        System.out.println(str);
    }

解决粘包和拆包问题

@Test
    public void test6(){

        String msg = "hello,world\nI'm abc\nHo";

        ByteBuffer byteBuffer = ByteBuffer.allocate(32);

        byteBuffer.put(msg.getBytes());

        byteBuffer=splitGetBuffer(byteBuffer);

        byteBuffer.put("w are you?\n".getBytes()); //多段发送数据

        byteBuffer=splitGetBuffer(byteBuffer);

        byteBuffer.put("aa  bccdd?\n".getBytes()); //多段发送数据

        byteBuffer=splitGetBuffer(byteBuffer);

    }

    private ByteBuffer splitGetBuffer(ByteBuffer byteBuffer) {

        byteBuffer.flip();
        StringBuilder stringBuilder = new StringBuilder();
        int index=-1;
        for (int i = 0; i < byteBuffer.limit(); i++) {

            if(byteBuffer.get(i)!='\n'){ //get(i)不会让position+1

                stringBuilder.append((char) byteBuffer.get(i));
            }else{
                index=i; //记录最后一个分隔符下标
                String data = stringBuilder.toString();
                ByteBuffer dataBuf = ByteBuffer.allocate(data.length());
                dataBuf.put(data.getBytes());
                dataBuf.flip();
                debugAll(dataBuf);
                dataBuf.clear();
                stringBuilder=new StringBuilder();
            }
        }

        ++index;
        ByteBuffer temp = ByteBuffer.allocate(byteBuffer.capacity());
        for (;index<byteBuffer.limit();++index){

            temp.put(byteBuffer.get(index));
        }

        return temp;
    }

文件编程

FileChannel

因为FileChannel只能工作在阻塞环境下,而Selector是非阻塞的,所以FileChannel无法注册到Selector里面去。

获取FileChannel

FileChannel不能直接打开,一定要用FileInputStream或者FileOutputStream或者RandomAccessFile来获取FileChannel对象,
使用getChannel方法即可。

注意以下几点:

  • 通过FileInputStream获取的channel只能读
  • 通过FileOutputStream获取的channel只能写
  • 通过 RandomAccessFile 是否能读写根据构造 RandomAccessFile 时的读写模式决定
FileChannel读取

通过 FileInputStream 获取channel,通过read方法将数据写入到ByteBuffer中,read方法的返回值表示读到了多少字节,若读到了文件末尾则返回-1

int read = channel.read(buffer);
FileChannel写入

因为channel也是有大小的,所以 write方法并不能保证一次将 buffer中的内容全部写入channel。必须需要按照以下规则进行写入

// 通过hasRemaining()方法查看缓冲区中是否还有数据未写入到通道中
while(buffer.hasRemaining()) {
	channel.write(buffer);
}
强制写入

操作系统出于性能的考虑,会将数据缓存,不是立刻写入磁盘,而是等到缓存满了以后将所有数据一次性的写入磁盘。可以调用force(true)方法将文件内容和元数据(文件的权限等信息)立刻写入磁盘

两个Channel传输数据

transferTo方法的使用

        //方法一:
        FileInputStream fileInputStream = new FileInputStream("data.txt"); //读的通道
        FileChannel from = fileInputStream.getChannel();

        FileOutputStream fileInputStream1 = new FileOutputStream("to.txt"); //写的通道
        FileChannel to = fileInputStream1.getChannel();
        long l = from.transferTo(0, from.size(), to);
        //方法二:
        RandomAccessFile r1 = new RandomAccessFile("data.txt", "rw"); //都开启rw权限
        FileChannel from1 = r1.getChannel();
        RandomAccessFile r2 = new RandomAccessFile("to.txt", "rw");
        FileChannel to2 = r2.getChannel();

        from1.transferTo(0,r1.length(),to2);

transferTo方法介绍

使用transferTo方法可以快速、高效地将一个channel中的数据传输到另一个channel中,但一次只能传输2G的内容,
transferTo方法的底层使用了零拷贝技术

Path与Paths

  • Path用来表示文件路径
  • Paths是工具类,用来获取Path实例
 Path path = Paths.get("data.txt");

 Path path1 = Paths.get("D:\\java code\\netty-study\\data.txt");

Files

判断文件是否存在
    Path path = Paths.get("data.txt");
    boolean exists = Files.exists(path);
创建一级目录
  • createDirectory(path)

如果文件夹已存在,则会报错。FileAlreadyExistsException,
此方法只能创建一级目录,如果用此方法创建多级目录则会报错NoSuchFileException。

    Path path = Paths.get("D:\\img");
    Path directory = Files.createDirectory(path);
创建多级目录
  • createDirectories(path)
    Path path = Paths.get("D:\\img\\a\\b");
    Path directories = Files.createDirectories(path);
拷贝文件
    //这种方式如果目标文件‘to’存在则会报错FileAlreadyExistsException
    Path from = Paths.get("data.txt");
    Path to = Paths.get("D:\\img\\target.txt"); //文件名也要写
    Files.copy(from,to);
    //只需要加StandardCopyOption.REPLACE_EXISTING就不会报错,因为它会直接替换掉目标文件
    Path from = Paths.get("data.txt");
    Path path = Paths.get("D:\\img\\target.txt"); //文件名也要写
    Files.copy(from,path, StandardCopyOption.REPLACE_EXISTING);
移动文件
    Path source = Paths.get("data.txt");
    Path target = Paths.get("D:\\img\\target.txt");
    Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);
  • StandardCopyOption.ATOMIC_MOVE保证文件移动的原子性
删除文件
    Path target = Paths.get("D:\\img\\target.txt");
    Files.delete(target); //删除文件
遍历文件夹
  • walkFileTree(Path, FileVisitor)方法
    • Path:文件起始路径
    • FileVisitor:文件访问器,使用访问者模式,这个接口有如下方法
      • preVisitDirectory:访问目录前的操作
      • visitFile:访问文件的操作
      • visitFileFailed:访问文件失败时的操作
      • postVisitDirectory:访问目录后的操作
    Path target = Paths.get("D:\\cTest");

    Files.walkFileTree(target,new SimpleFileVisitor<Path>(){

      @Override
      public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
        System.out.println("1:"+dir);
        return super.preVisitDirectory(dir, attrs);
      }

      @Override
      public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
        System.out.println("2:"+file);
        return super.visitFile(file, attrs);
      }
    });

网络编程

NIO通信(阻塞模式)

这里有一段简易的通信代码:

服务器端:

      ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //打开serverSocketChannel

      serverSocketChannel.bind(new InetSocketAddress(8080));

      while (true){

      System.out.println("waiting.....");
          SocketChannel socketChannel = serverSocketChannel.accept(); //阻塞
      System.out.println("connect success");
          ByteBuffer byteBuffer = ByteBuffer.allocate(100);
          socketChannel.read(byteBuffer); //阻塞,等待消息发送过来即可封装到缓存里去
          byteBuffer.flip();
      System.out.println(StandardCharsets.UTF_8.decode(byteBuffer).toString());
      }

客户端:

      SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress( 8080));
      ByteBuffer byteBuffer = StandardCharsets.UTF_8.encode("this is nio");
      socketChannel.write(byteBuffer);

实际上,这个和以前的IO+Socket进行通信是一样的,都是属于阻塞状态。

NIO通信(非阻塞模式)

  • configureBlocking(false)

可以通过ServerSocketChannel的configureBlocking(false)方法将获得连接设置为非阻塞的。此时若没有连接,accept会返回null,
可以通过SocketChannel的configureBlocking(false)方法将从通道中读取数据设置为非阻塞的。若此时通道中没有数据可读,read会返回-1

服务器端:

      ByteBuffer byteBuffer = ByteBuffer.allocate(100);
      ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //打开通道

      serverSocketChannel.bind(new InetSocketAddress(8082));
      //由于accept方法是阻塞的,我们只需要一行代码就能让它变成非阻塞的
      //开启非阻塞的之后accept方法如果没有连接到客户端就会从阻塞变成返回'null'
      serverSocketChannel.configureBlocking(false);//开启非阻塞
      while (true){
//          System.out.println("waiting...");
          SocketChannel socketChannel = serverSocketChannel.accept(); //阻塞方法

//          System.out.println(socketChannel);

              if(socketChannel!=null){
                  System.out.println("等待读取");
                  socketChannel.configureBlocking(false); //设置SocketChannel为非阻塞
                  int read = socketChannel.read(byteBuffer);//阻塞方法
                  System.out.println("读取到"+read+"字节");
                  if(read>0){

                      byteBuffer.flip();
                      System.out.println(StandardCharsets.UTF_8.decode(byteBuffer).toString());
                  }
              }
      }

客户端:

      SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(8082));
      ByteBuffer byteBuffer = StandardCharsets.UTF_8.encode("hello");
      socketChannel.write(byteBuffer);

Selector

Selector是基于事件驱动的

多路复用

单线程可以配合Selector完成对多个Channel读写事件的监控,这称之为多路复用。

注意:

  • 多路复用只能用于网络IO上,文件IO由于只能处于阻塞环境下才能进行,所以无法多路复用
  • 如果不用Selector的非阻塞模式,线程大部分时间都在做无用功,而Selector能够保证以下几点
    • 有可连接事件时才去连接
    • 有可读事件才去读取
    • 有可写事件才去写入
4种事件类型

进入SelectionKey这个类可以看到:

public static final int OP_READ = 1 << 0; //read事件
public static final int OP_WRITE = 1 << 2; //write事件
public static final int OP_CONNECT = 1 << 3; //connect事件
public static final int OP_ACCEPT = 1 << 4; //accept事件
核心方法select
  • select()
    select方法会一直阻塞直到绑定事件发生
accept事件

服务器端:

    Selector selector = Selector.open(); // 创建选择器
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

    serverSocketChannel.bind(new InetSocketAddress(8081));

    serverSocketChannel.configureBlocking(false); // 通道必须是非阻塞的

    serverSocketChannel.register(
        selector, SelectionKey.OP_ACCEPT); // 把channel注册到selector,并选择accept事件

    for (; ; ) {

      selector.select(); // 选择事件,此时会阻塞,当事件发生时会自动解除阻塞

      System.out.println("begin");

      // 遍历事件发生的集合,获取对应事件
      selector
          .selectedKeys()
          .forEach(
              selectionKey -> {
                if (selectionKey.isAcceptable()) {
                  try {
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    System.out.println("已连接");
                    // 处理完之后记得在发生事件的集合中移除该事件
                    selector.selectedKeys().remove(selectionKey);

                  } catch (IOException e) {
                    e.printStackTrace();
                  }
                }
              });
    }
read事件

原生NIO是真tmd难用,恶心
当accept事件处理之后立刻设置read事件,但不处理read事件,因为用户可能只是连接,但是没有写数据,所以要基于事件触发
别忘了accept事件处理之后要设置为非阻塞模式configureBlocking(false)

    Selector selector = Selector.open(); // 创建选择器
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

    serverSocketChannel.bind(new InetSocketAddress(8081));

    serverSocketChannel.configureBlocking(false); // 通道必须是非阻塞的

    serverSocketChannel.register(
        selector, SelectionKey.OP_ACCEPT); // 把channel注册到selector,并选择accept事件

    try {
      while (true) {

        int count = selector.select(); // 选择事件,此时会阻塞,当事件发生时会自动解除阻塞

        Set<SelectionKey> selectionKeys = selector.selectedKeys();
        Iterator<SelectionKey> iterator = selectionKeys.iterator();

        while (iterator.hasNext()){
          SelectionKey selectionKey = iterator.next();
          if (selectionKey.isAcceptable()) { // 处理accept事件
            try {
              ServerSocketChannel serverSocket = (ServerSocketChannel) selectionKey.channel();
              System.out.println("已连接");

              SocketChannel socketChannel = serverSocket.accept();
              socketChannel.configureBlocking(false);
              socketChannel.register(selector, SelectionKey.OP_READ); // 读事件
              iterator.remove();

            } catch (IOException e) {

            }
          } else if (selectionKey.isReadable()) { // 处理read事件
            // 获取socketChannel,实际上这个channel就是上面注册进selector的对象
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            ByteBuffer byteBuffer = ByteBuffer.allocate(100);
            try{
              int read = socketChannel.read(byteBuffer);
              System.out.println("read:"+read);
            }catch (Exception e){
//              e.printStackTrace();
             continue;  //一定要这样写。。。。。。。防止多次read报错
            }
            byteBuffer.flip();
            debugAll(byteBuffer);
            byteBuffer.clear();
            iterator.remove();

          }

        }

      }
    } catch (Exception e) {
      e.printStackTrace();
    }
selector注意点
  • 事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则下次该事件仍会触发
  • 事件处理之后一定要把selector.selectedKeys这个集合中当前处理完成的事件remove

零拷贝

零拷贝指的是数据无需拷贝到JVM内存中,同时具有以下三个优点:

  • 更少的用户态与内核态的切换
  • 不利用cpu计算,减少cpu缓存伪共享
  • 零拷贝适合小文件传输

NIO优化

使用DirectByteBuffer

  • ByteBuffer.allocate(10)底层对应 HeapByteBuffer,使用的还是Java堆内存
  • ByteBuffer.allocateDirect(10)底层对应DirectByteBuffer,使用的是操作系统内存,不过需要手动释放内存

优点:

  • 减少了一次数据拷贝,用户态与内核态的切换次数没有减少
  • 这块内存不受 JVM 垃圾回收的影响,因此内存地址固定,有助于 IO 读写
linux2.4优化
  • Java 调用 transferTo 方法后,要从 Java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 CPU
  • 只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
  • 使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 CPU
  • 整个过程仅只发生了1次用户态与内核态的切换,数据拷贝了 2 次
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

作者其他文章

评论(0

抱歉,系统识别当前为高风险访问,暂不支持该操作

    全部回复

    上滑加载中

    设置昵称

    在此一键设置昵称,即可参与社区互动!

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。