02、Netty学习笔记—(NIO网络编程和IO模型)(下)

举报
长路 发表于 2022/11/28 08:17:55 2022/11/28
【摘要】 文章目录一、网络编程1.1、非阻塞VS阻塞1.1.1、阻塞(默认)1.1.2、非阻塞(设置参数)1.1.3、多路复用(selector)1.2、单线程selector实现(多路复用)1.2.1、Selector(课件)创建绑定 Channel 事件监听 Channel 事件select 何时不阻塞1.2.2、代码实现代码实现过程思路(7点)案例1:处理accept()、read()事件案例2:处理

1.3.2、代码实现

多线程NIO服务器server端:

import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedDeque;

import static com.changlu.ByteBuffer.utils.ByteBufferUtil.debugAll;

/**
 * @ClassName MultThreadServer
 * @Author ChangLu
 * @Date 2021/12/24 21:03
 * @Description 多线程优化:①解决worker注册与select选择的阻塞问题。②多个worker(也就是多个selector)处理(非连接事件)。
 */
@Slf4j
public class MultiThreadNioServer {

    public static void main(String[] args) throws Exception{
        Thread.currentThread().setName("boss");
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        Selector boss = Selector.open();
        ssc.register(boss, SelectionKey.OP_ACCEPT, null);
        ssc.bind(new InetSocketAddress(8080));
        //1、创建固定数量的worker(由于Linux的bug问题,所以这里手动指定worker,否则通过代码来动态获取线程数量)
        Worker[] workers = new Worker[4];
//        Worker worker = new Worker("worker-0");
        for (int i = 0; i < workers.length; i++) {
            workers[i] = new Worker("worker-" + i);
        }
        int count = 0;
        while (true) {
            //2、boss线程处理连接请求SocketChannel
            boss.select();
            Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                if (key.isAcceptable()) {
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    log.debug("connected...{},sc {}",sc.getRemoteAddress(),sc);
                    //3、多个请求连接处理注册到不同的worker线程中去
                    //负载均衡依次平衡的注册到某个线程中去
                    workers[count++ % workers.length].register(sc);
                }
            }
        }

    }

    static class Worker implements Runnable{
        private Thread thread;//Worker线程
        private Selector selector;//一个Worker线程对应一个selector
        private String name;//线程名称
        private volatile boolean start = false;//还未初始化
        //用来临时存储SocketChannel用于进行注册
        private ConcurrentLinkedDeque<SocketChannel> queue = new ConcurrentLinkedDeque<>();

        public Worker(String name){
            this.name = name;
        }

        //初始化线程,
        public void register(SocketChannel sc) throws Exception {
            if (!start) {
                this.thread = new Thread(this,this.name);
                this.selector = Selector.open();
                this.thread.start();
                start = true;
            }
            //向队列添加SocketChannel,方便之后某个线程来进行同步方法执行注册操作
            queue.add(sc);
            this.selector.wakeup();//唤醒相关联selector的其他线程的select()阻塞
        }

        @Override
        public void run() {
            while (true) {
                try {
                    selector.select();
                    //***这部分注册代码是针对于手动调用wakeup()唤醒的过程!***
                    SocketChannel sc = queue.poll();
                    if (sc != null) {
                        log.debug("before register...{}",sc.getRemoteAddress());
                        sc.register(this.selector,SelectionKey.OP_READ,null);
                        log.debug("after register...{}",sc.getRemoteAddress());
                    }
                    //******************************************************
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        if (key.isReadable()) {
                            ByteBuffer buffer = ByteBuffer.allocate(16);
                            SocketChannel channel = (SocketChannel) key.channel();
                            channel.read(buffer);
                            buffer.flip();
                            log.info("key is {}",key);
                            debugAll(buffer);
                        }
                    }
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    }


}

客户端client:

import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;

/**
 * @ClassName NIOClient
 * @Author ChangLu
 * @Date 2021/12/26 12:25
 * @Description NIO客户端
 */
public class NioClient {

    public static void main(String[] args) throws Exception{
        final SocketChannel sc = SocketChannel.open();
        final boolean result = sc.connect(new InetSocketAddress(8080));
        if (result) {
            System.out.println("连接成功");
        }else {
            sc.finishConnect();
        }
        //示例(在Evaluate中执行):sc.write(StandardCharsets.UTF_8.encode("hello!"))
        System.in.read();
    }

}

实现效果:本地调试电脑为4个CPU,这里就固定设置了四个woker

  1. Boss线程(也就是主线程)负责处理连接请求。
  2. woker线程(多个)通过负载均衡来将主线程得到的socketchannel依次注册到其线程的selector中。

下面我们来直接连接四个客户端,接着随机挑选两个客户端发送数据,通过查看log打印的线程名称我们就可以看到是否是多线程来处理连接后的请求!

image-20211226224020857

image-20211226224155285

image-20211226224326409

二、NIO vs BIO

2.1、stream与channel的区别

1、缓冲层面

  • stream不会自动缓冲数据,是比较高层的API,不会关心系统提供的一些缓冲功能(例如发送数据使用到的发送缓冲区sendbuffer,接收数据的receivebuffer)。
  • channel:例如socketchannel就能够利用系统提供的发送缓冲区,接收缓冲区,更为底层。(网卡直接读取的缓冲,你自己定义的缓冲还要复制到这个缓冲区)

2、阻塞

  • stream只支持阻塞。
  • channel同时支持阻塞、非阻塞API,网络channel可以配合selector实现多路复用,文件channel不能够配合selector。

3、二者均为全双工,即读写可以同时进行。

  • 全双工通信:是双方可以同时通信(读的同时可以写,写的同时可以读),半双工是必须交替通信,不论是BIO还是NIO都是全双工。


2.2、IO模型

2.2.1、网络IO模型

一定要从网络上的IO模型上去理解,不要被同步、异步、阻塞、非阻塞组合的名词所干扰!

image-20211227210112044

  • java的read()方法不能够真正完成读取数据的功能,从网络上读取数据的功能并不是java干的,而是需要由操作系统来干,这就牵扯到一次从用户程序空间-linux内核空间的切换,也就是java调用read方法实际上间接的是调用操作系统的方法,操作系统这边有一个read()方法能够真正的完成数据的读取。
  • 数据读取分为两个阶段:①等待数据(调用read等待拷贝到网卡缓冲区时间)。②复制数据(从网卡中读取到缓冲区里)。复制结束了就会从linux内核空间切换到用户程序空间。


2.2.2、五种网络IO模型

五种网络IO模型:参考《UNIX 网络编程 -卷1》,用C语言写的触及更加底层的API

分别是:阻塞IO非阻塞IO多路复用信号驱动异步IO

  • 阻塞IO:切换到内核空间这一整段时间用户程序是阻塞住的,在此期间什么活都不能干。
    • image-20211227210244438
  • 非阻塞IO:while(true){int result = read()} 能够不断的尝试获取读取到的数据,在等待数据过程中若是都能够直接得到结果,不过一般都是为0,一旦到复制数据阶段用户程序就会阻塞,这一段时间内会进行缓冲区拷贝!
    • image-20211227210457168
  • 多路复用:核心在于selector。相对于阻塞IO在做一件事时做不了另外一件事(例如在进行accept()等待时无法做read()事件)。一个是具体的事件阻塞,另一个是select阻塞,对于select可以响应多种事件,每一次响应可以做一批事件。
    • 宏观上来看是进行了两次阻塞:
      • image-20211227211601282
    • 若是事件复杂呢,此时就能够看出多路复用的好处:阻塞IO是只能等待具体的事件某个阶段完成才能够往下执行,而多路复用是所有的事件都能往后执行。
      • image-20211227211722624
    • 总结:多路复用是事件驱动,阻塞IO是代码驱动。
  • 信号驱动:不太常用。
  • 异步IO:同步是线程自己获取结果(自己从头到尾把一件事情干完);异步则是线程不去自己获取结果,而是由其他线程送来结果。
    • 同步:线程自己去获取结果(一个线程)。
    • 异步:线程自己不去获取结果,而是由其它线程送结果(至少两个线程)。
      • image-20211227221850586
      • 针对于是哪个线程要得到结果取决于回调方法,如等你的数据都完成了,调用上一次定义的回调方法,调用方法时候把真正读取到的结果作为参数传过去,这就完成了两个线程之间的数据传递,这就是异步模式。

针对于IO模型来区分同步异步:同步阻塞、同步非阻塞、同步多路复用、异步阻塞(不存在)、异步非阻塞

  • 阻塞IO:同步。
  • 非阻塞IO:同步。
  • 多路复用(可单线程、多线程):同步。
  • 异步IO:异步。


2.3、零拷贝(文件传输)

2.3.1、传统IO问题

传统的 IO 将一个文件通过 socket 写出

File f = new File("helloword/data.txt");
RandomAccessFile file = new RandomAccessFile(file, "r");

byte[] buf = new byte[(int)f.length()];
file.read(buf);

Socket socket = ...;
socket.getOutputStream().write(buf);

内部工作流程是这样的:

  1. java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 java 程序的用户态切换至内核态,去调用操作系统(Kernel)的读能力,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统使用 DMA(Direct Memory Access)来实现文件读,其间也不会使用 cpu

    DMA 也可以理解为硬件单元,用来解放 cpu 完成文件 IO

  2. 内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即 byte[] buf),这期间 cpu 会参与拷贝,无法利用 DMA

  3. 调用 write 方法,这时将数据从用户缓冲区(byte[] buf)写入 socket 缓冲区,cpu 会参与拷贝

  4. 接下来要向网卡写数据,这项能力 java 又不具备,因此又得从用户态切换至内核态,调用操作系统的写能力,使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu

可以看到中间环节较多,java 的 IO 实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统来完成的

  • 用户态与内核态的切换发生了 3 次,这个操作比较重量级
  • 数据拷贝了共 4 次


2.3.2、NIO 优化

以下说的是filechannel,也就是文件传输零拷贝。

优化1

通过 DirectByteBuf

  • ByteBuffer.allocate(10) HeapByteBuffer 使用的还是 java 内存
  • ByteBuffer.allocateDirect(10) DirectByteBuffer 使用的是操作系统内存

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yZATRSSv-1642430711320)(F:\在学资料\Netty网络编程\Netty教程源码资料\讲义\img\0025.png)]

大部分步骤与优化前相同,不再赘述。唯有一点:java 可以使用 DirectByteBuf 将堆外内存映射到 jvm 内存中来直接访问使用

  • 这块内存不受 jvm 垃圾回收的影响,因此内存地址固定,有助于 IO 读写
  • java 中的 DirectByteBuf 对象仅维护了此内存的虚引用,内存回收分成两步
    • DirectByteBuf 对象被垃圾回收,将虚引用加入引用队列
    • 通过专门线程访问引用队列,根据虚引用释放堆外内存
  • 减少了一次数据拷贝,用户态与内核态的切换次数没有减少

优化2(linux 2.1)

进一步优化(底层采用了 linux 2.1 后提供的 sendFile 方法),java 中对应着两个 channel 调用 transferTo/transferFrom 方法拷贝数据

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yHgDNJ1D-1642430711321)(F:\在学资料\Netty网络编程\Netty教程源码资料\讲义\img\0026.png)]

  1. java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
  2. 数据从内核缓冲区传输到 socket 缓冲区,cpu 会参与拷贝
  3. 最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu

可以看到

  • 只发生了一次用户态与内核态的切换
  • 数据拷贝了 3 次

进一步优化(linux 2.4)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-L2g3PF1s-1642430711321)(F:\在学资料\Netty网络编程\Netty教程源码资料\讲义\img\0027.png)]

  1. java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
  2. 只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
  3. 使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 cpu

零拷贝:内核态和用户态没有拷贝操作。

  • 不用在java内存中间拷贝,拷贝操作都发生在操作系统和网络设备,socket的缓冲区,不需要复制到java内存中了。

整个过程仅只发生了一次用户态与内核态的切换,数据拷贝了 2 次。所谓的【零拷贝】,并不是真正无拷贝,而是在不会拷贝重复数据到 jvm 内存中,零拷贝的优点有:

  1. 更少的用户态与内核态的切换
  2. 不利用 cpu 计算,减少 cpu 缓存伪共享(这部分拷贝操作可以使用DMA来进行操作,并不会利用CPU来计算)
  3. 零拷贝适合小文件传输:尽量不要传输大文件,一般文件拷贝都是从缓冲区发送到网卡,那么过大文件的话就会有问题,可能会占用其他文件的读写。


2.4、AIO

2.4.1、AIO说明

AIO介绍

AIO 用来解决数据复制阶段的阻塞问题

  • 同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置
  • 异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果。

AIO的相关问题

对于AIO,两个阶段都能够腾出手来不让cpu去做,但是linux对于异步IO的支持并不好,真正对异步IO性能操作好的是windows,通过一套IOCP的API实现了真正的异步IO。

一般我们将java程序部署到linux上,由于并没有真正支持IO,所以并没有进行大量使用,并且编程的复杂度也较高。

  • netty5.0也对异步IO进行了实现,结果发现性能没有优势,并且引入了没有必要的复杂性,维护成本高,之后netty就将5.0废弃了。
  • 现在Linux全面支持了异步IO,新API叫做io_uring。AIO 的新归宿:io_uring

注意:AIO支持文件、网络;多路复用只支持网络



2.4.2、文件AIO

目的:读取文件内容到bytebuffer中,读取的操作是异步操作,非主线程来完成,而是让额外的线程执行!

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;

import static com.changlu.ByteBuffer.utils.ByteBufferUtil.debugAll;

/**
 * @ClassName FileAIO
 * @Author ChangLu
 * @Date 2021/12/28 19:01
 * @Description 异步文件读取
 */
@Slf4j
public class FileAIO {

    public static void main(String[] args) {
        try (AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(Paths.get("data1.txt"), StandardOpenOption.READ)) {
            final ByteBuffer buffer = ByteBuffer.allocate(16);
            //1、首先执行
            log.info("read before...");
            //第三个参数是异步接口
            fileChannel.read(buffer, 0, null, new CompletionHandler<Integer, ByteBuffer>() {

                //3、实际读取完成之后其他线程执行
                @Override
                public void completed(Integer result, ByteBuffer attachment) {
                    log.info("read completed...");
                    buffer.flip();
                    debugAll(buffer);
                }

                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    exc.printStackTrace();
                }
            });
            //2、紧接着执行
            log.info("read end ...");
            //注意:这里使用一个读取字符阻塞API是为了防止主线程结束而导致守护线程也直接停止。默认文件 AIO 使用的线程都是守护线程,所以最后要执行 System.in.read() 以避免守护线程意外结束。直到守护线程执行完后我们敲回车结束程序。
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

image-20211228191136374



2.4.3、网络AIO

补充了WriteHandler

server:

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;

/**
 * @ClassName AioServer
 * @Author ChangLu
 * @Date 2021/12/28 19:16
 * @Description 网络异步:编写处理连接、读、写handler
 */
@Slf4j
public class AioServer {

    public static void main(String[] args){
        try (AsynchronousServerSocketChannel aioChannel = AsynchronousServerSocketChannel.open()) {
            aioChannel.bind(new InetSocketAddress(8080));
            //关联一个连接事件handler
            aioChannel.accept(null,new AcceptHandler(aioChannel));
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //关闭channel
    private static void closeChannel(AsynchronousSocketChannel sc) {
        try {
            System.out.printf("[%s] %s close\n", Thread.currentThread().getName(), sc.getRemoteAddress());
            sc.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //读事件handler
    private static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
        private final AsynchronousSocketChannel sc;

        public ReadHandler(AsynchronousSocketChannel sc) {
            this.sc = sc;
        }

        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            try {
                if (result == -1) {
                    closeChannel(sc);
                    return;
                }
                System.out.printf("[%s] %s read\n", Thread.currentThread().getName(), sc.getRemoteAddress());
                attachment.flip();
                System.out.println(Charset.defaultCharset().decode(attachment));
                attachment.clear();
                // 处理完第一个 read 时,需要再次调用 read 方法来处理下一个 read 事件
                sc.read(attachment, attachment, this);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            closeChannel(sc);
            exc.printStackTrace();
        }
    }

    //连接事件handler
    private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
        private final AsynchronousServerSocketChannel ssc;

        public AcceptHandler(AsynchronousServerSocketChannel ssc) {
            this.ssc = ssc;
        }

        @Override
        public void completed(AsynchronousSocketChannel sc, Object attachment) {
            try {
                System.out.printf("[%s] %s connected\n", Thread.currentThread().getName(), sc.getRemoteAddress());
            } catch (IOException e) {
                e.printStackTrace();
            }
            ByteBuffer buffer = ByteBuffer.allocate(16);
            // 读事件由 ReadHandler 处理
            sc.read(buffer, buffer, new ReadHandler(sc));
            // 写事件由 WriteHandler 处理(直接交由sc直接写出去)
            sc.write(Charset.defaultCharset().encode("server hello!"), ByteBuffer.allocate(16), new WriteHandler(sc));
            // 处理完第一个 accpet 时,需要再次调用 accept 方法来处理下一个 accept 事件
            ssc.accept(null, this);
        }

        @Override
        public void failed(Throwable exc, Object attachment) {
            exc.printStackTrace();
        }
    }

    //写事件handler
    private static class WriteHandler implements CompletionHandler<Integer, ByteBuffer>{

        private final AsynchronousSocketChannel ssc;

        public WriteHandler(AsynchronousSocketChannel ssc) {
            this.ssc = ssc;
        }

        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            log.info("WriteHandle进行写事件...");
            //若是一次没有写完再次执行调用!
            if (attachment.hasRemaining()) {
                ssc.write(attachment,ByteBuffer.allocate(16),this);
            }
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {

        }
    }


}

client:

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

import static com.changlu.ByteBuffer.utils.ByteBufferUtil.debugAll;

/**
 * @ClassName NioClient
 * @Author ChangLu
 * @Date 2021/12/28 19:29
 * @Description 客户端
 */
public class NioClient {
    public static void main(String[] args) throws Exception{
        final SocketChannel sc = SocketChannel.open();
        final boolean result = sc.connect(new InetSocketAddress("localhost", 8080));
        if (result) {
            System.out.println("客户端连接成功");
        }else{
            sc.finishConnect();
        }
        //接收数据
        final ByteBuffer buffer = ByteBuffer.allocate(16);
        sc.read(buffer);
        debugAll(buffer);
        //示例(在Evaluate中执行):sc.write(StandardCharsets.UTF_8.encode("hello!"))
        System.in.read();
    }
}

注意一下处理事件的线程:对应执行回调函数的线程并不是自己事先开辟的,你可以看到每次通知的线程都可能不相同!下面是两个连接处理操作:

image-20211228194230208



参考资料

[1]. 黑马程序员Netty全套教程,全网最全Netty深入浅出教程,Java网络编程的王者

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。