02、Netty学习笔记—(NIO网络编程和IO模型)(下)
1.3.2、代码实现
多线程NIO服务器server
端:
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedDeque;
import static com.changlu.ByteBuffer.utils.ByteBufferUtil.debugAll;
/**
* @ClassName MultThreadServer
* @Author ChangLu
* @Date 2021/12/24 21:03
* @Description 多线程优化:①解决worker注册与select选择的阻塞问题。②多个worker(也就是多个selector)处理(非连接事件)。
*/
@Slf4j
public class MultiThreadNioServer {
public static void main(String[] args) throws Exception{
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
ssc.register(boss, SelectionKey.OP_ACCEPT, null);
ssc.bind(new InetSocketAddress(8080));
//1、创建固定数量的worker(由于Linux的bug问题,所以这里手动指定worker,否则通过代码来动态获取线程数量)
Worker[] workers = new Worker[4];
// Worker worker = new Worker("worker-0");
for (int i = 0; i < workers.length; i++) {
workers[i] = new Worker("worker-" + i);
}
int count = 0;
while (true) {
//2、boss线程处理连接请求SocketChannel
boss.select();
Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
log.debug("connected...{},sc {}",sc.getRemoteAddress(),sc);
//3、多个请求连接处理注册到不同的worker线程中去
//负载均衡依次平衡的注册到某个线程中去
workers[count++ % workers.length].register(sc);
}
}
}
}
static class Worker implements Runnable{
private Thread thread;//Worker线程
private Selector selector;//一个Worker线程对应一个selector
private String name;//线程名称
private volatile boolean start = false;//还未初始化
//用来临时存储SocketChannel用于进行注册
private ConcurrentLinkedDeque<SocketChannel> queue = new ConcurrentLinkedDeque<>();
public Worker(String name){
this.name = name;
}
//初始化线程,
public void register(SocketChannel sc) throws Exception {
if (!start) {
this.thread = new Thread(this,this.name);
this.selector = Selector.open();
this.thread.start();
start = true;
}
//向队列添加SocketChannel,方便之后某个线程来进行同步方法执行注册操作
queue.add(sc);
this.selector.wakeup();//唤醒相关联selector的其他线程的select()阻塞
}
@Override
public void run() {
while (true) {
try {
selector.select();
//***这部分注册代码是针对于手动调用wakeup()唤醒的过程!***
SocketChannel sc = queue.poll();
if (sc != null) {
log.debug("before register...{}",sc.getRemoteAddress());
sc.register(this.selector,SelectionKey.OP_READ,null);
log.debug("after register...{}",sc.getRemoteAddress());
}
//******************************************************
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
channel.read(buffer);
buffer.flip();
log.info("key is {}",key);
debugAll(buffer);
}
}
}catch (Exception e){
e.printStackTrace();
}
}
}
}
}
客户端client:
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
/**
* @ClassName NIOClient
* @Author ChangLu
* @Date 2021/12/26 12:25
* @Description NIO客户端
*/
public class NioClient {
public static void main(String[] args) throws Exception{
final SocketChannel sc = SocketChannel.open();
final boolean result = sc.connect(new InetSocketAddress(8080));
if (result) {
System.out.println("连接成功");
}else {
sc.finishConnect();
}
//示例(在Evaluate中执行):sc.write(StandardCharsets.UTF_8.encode("hello!"))
System.in.read();
}
}
实现效果:本地调试电脑为4个CPU,这里就固定设置了四个woker
- Boss线程(也就是主线程)负责处理连接请求。
- woker线程(多个)通过负载均衡来将主线程得到的socketchannel依次注册到其线程的selector中。
下面我们来直接连接四个客户端,接着随机挑选两个客户端发送数据,通过查看log打印的线程名称我们就可以看到是否是多线程来处理连接后的请求!
二、NIO vs BIO
2.1、stream与channel的区别
1、缓冲层面
- stream不会自动缓冲数据,是比较高层的API,不会关心系统提供的一些缓冲功能(例如发送数据使用到的发送缓冲区sendbuffer,接收数据的receivebuffer)。
- channel:例如socketchannel就能够利用系统提供的发送缓冲区,接收缓冲区,更为底层。(网卡直接读取的缓冲,你自己定义的缓冲还要复制到这个缓冲区)
2、阻塞
- stream只支持阻塞。
- channel同时支持阻塞、非阻塞API,网络channel可以配合selector实现多路复用,文件channel不能够配合selector。
3、二者均为全双工,即读写可以同时进行。
- 全双工通信:是双方可以同时通信(读的同时可以写,写的同时可以读),半双工是必须交替通信,不论是BIO还是NIO都是全双工。
2.2、IO模型
2.2.1、网络IO模型
一定要从网络上的IO模型上去理解,不要被同步、异步、阻塞、非阻塞组合的名词所干扰!
- java的read()方法不能够真正完成读取数据的功能,从网络上读取数据的功能并不是java干的,而是需要由操作系统来干,这就牵扯到一次从用户程序空间-linux内核空间的切换,也就是java调用read方法实际上间接的是调用操作系统的方法,操作系统这边有一个read()方法能够真正的完成数据的读取。
- 数据读取分为两个阶段:①等待数据(调用read等待拷贝到网卡缓冲区时间)。②复制数据(从网卡中读取到缓冲区里)。复制结束了就会从linux内核空间切换到用户程序空间。
2.2.2、五种网络IO模型
五种网络IO模型:参考《UNIX 网络编程 -卷1》,用C语言写的触及更加底层的API
分别是:阻塞IO
、非阻塞IO
、多路复用
、信号驱动
、异步IO
- 阻塞IO:切换到内核空间这一整段时间用户程序是阻塞住的,在此期间什么活都不能干。
- 非阻塞IO:while(true){int result = read()} 能够不断的尝试获取读取到的数据,在等待数据过程中若是都能够直接得到结果,不过一般都是为0,一旦到复制数据阶段用户程序就会阻塞,这一段时间内会进行缓冲区拷贝!
- 多路复用:核心在于selector。相对于阻塞IO在做一件事时做不了另外一件事(例如在进行accept()等待时无法做read()事件)。一个是具体的事件阻塞,另一个是select阻塞,对于select可以响应多种事件,每一次响应可以做一批事件。
- 宏观上来看是进行了两次阻塞:
- 若是事件复杂呢,此时就能够看出多路复用的好处:阻塞IO是只能等待具体的事件某个阶段完成才能够往下执行,而多路复用是所有的事件都能往后执行。
- 总结:多路复用是事件驱动,阻塞IO是代码驱动。
- 宏观上来看是进行了两次阻塞:
- 信号驱动:不太常用。
- 异步IO:同步是线程自己获取结果(自己从头到尾把一件事情干完);异步则是线程不去自己获取结果,而是由其他线程送来结果。
- 同步:线程自己去获取结果(一个线程)。
- 异步:线程自己不去获取结果,而是由其它线程送结果(至少两个线程)。
- 针对于是哪个线程要得到结果取决于回调方法,如等你的数据都完成了,调用上一次定义的回调方法,调用方法时候把真正读取到的结果作为参数传过去,这就完成了两个线程之间的数据传递,这就是异步模式。
针对于IO模型来区分同步异步:同步阻塞、同步非阻塞、同步多路复用、异步阻塞(不存在)、异步非阻塞
- 阻塞IO:同步。
- 非阻塞IO:同步。
- 多路复用(可单线程、多线程):同步。
- 异步IO:异步。
2.3、零拷贝(文件传输)
2.3.1、传统IO问题
传统的 IO 将一个文件通过 socket 写出
File f = new File("helloword/data.txt");
RandomAccessFile file = new RandomAccessFile(file, "r");
byte[] buf = new byte[(int)f.length()];
file.read(buf);
Socket socket = ...;
socket.getOutputStream().write(buf);
内部工作流程是这样的:
-
java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 java 程序的用户态切换至内核态,去调用操作系统(Kernel)的读能力,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统使用 DMA(Direct Memory Access)来实现文件读,其间也不会使用 cpu
DMA 也可以理解为硬件单元,用来解放 cpu 完成文件 IO
-
从内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即 byte[] buf),这期间 cpu 会参与拷贝,无法利用 DMA
-
调用 write 方法,这时将数据从用户缓冲区(byte[] buf)写入 socket 缓冲区,cpu 会参与拷贝
-
接下来要向网卡写数据,这项能力 java 又不具备,因此又得从用户态切换至内核态,调用操作系统的写能力,使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu
可以看到中间环节较多,java 的 IO 实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统来完成的
- 用户态与内核态的切换发生了 3 次,这个操作比较重量级
- 数据拷贝了共 4 次
2.3.2、NIO 优化
以下说的是filechannel,也就是文件传输零拷贝。
优化1
通过 DirectByteBuf
- ByteBuffer.allocate(10) HeapByteBuffer 使用的还是 java 内存
- ByteBuffer.allocateDirect(10) DirectByteBuffer 使用的是操作系统内存
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yZATRSSv-1642430711320)(F:\在学资料\Netty网络编程\Netty教程源码资料\讲义\img\0025.png)]
大部分步骤与优化前相同,不再赘述。唯有一点:java 可以使用 DirectByteBuf 将堆外内存映射到 jvm 内存中来直接访问使用
- 这块内存不受 jvm 垃圾回收的影响,因此内存地址固定,有助于 IO 读写
- java 中的 DirectByteBuf 对象仅维护了此内存的虚引用,内存回收分成两步
- DirectByteBuf 对象被垃圾回收,将虚引用加入引用队列
- 通过专门线程访问引用队列,根据虚引用释放堆外内存
- 减少了一次数据拷贝,用户态与内核态的切换次数没有减少
优化2(linux 2.1)
进一步优化(底层采用了 linux 2.1 后提供的 sendFile 方法),java 中对应着两个 channel 调用 transferTo/transferFrom 方法拷贝数据
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yHgDNJ1D-1642430711321)(F:\在学资料\Netty网络编程\Netty教程源码资料\讲义\img\0026.png)]
- java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
- 数据从内核缓冲区传输到 socket 缓冲区,cpu 会参与拷贝
- 最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu
可以看到
- 只发生了一次用户态与内核态的切换
- 数据拷贝了 3 次
进一步优化(linux 2.4)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-L2g3PF1s-1642430711321)(F:\在学资料\Netty网络编程\Netty教程源码资料\讲义\img\0027.png)]
- java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
- 只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
- 使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 cpu
零拷贝
:内核态和用户态没有拷贝操作。
- 不用在java内存中间拷贝,拷贝操作都发生在操作系统和网络设备,socket的缓冲区,不需要复制到java内存中了。
整个过程仅只发生了一次用户态与内核态的切换,数据拷贝了 2 次。所谓的【零拷贝】,并不是真正无拷贝,而是在不会拷贝重复数据到 jvm 内存中,零拷贝的优点有:
- 更少的用户态与内核态的切换
- 不利用 cpu 计算,减少 cpu 缓存伪共享(这部分拷贝操作可以使用DMA来进行操作,并不会利用CPU来计算)
- 零拷贝适合小文件传输:尽量不要传输大文件,一般文件拷贝都是从缓冲区发送到网卡,那么过大文件的话就会有问题,可能会占用其他文件的读写。
2.4、AIO
2.4.1、AIO说明
AIO介绍
AIO 用来解决数据复制阶段的阻塞问题
- 同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置
- 异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果。
AIO的相关问题
对于AIO,两个阶段都能够腾出手来不让cpu去做,但是linux对于异步IO的支持并不好,真正对异步IO性能操作好的是windows,通过一套IOCP的API实现了真正的异步IO。
一般我们将java程序部署到linux上,由于并没有真正支持IO,所以并没有进行大量使用,并且编程的复杂度也较高。
- netty5.0也对异步IO进行了实现,结果发现性能没有优势,并且引入了没有必要的复杂性,维护成本高,之后netty就将5.0废弃了。
- 现在Linux全面支持了异步IO,新API叫做io_uring。AIO 的新归宿:io_uring
注意:AIO支持文件、网络;多路复用只支持网络
2.4.2、文件AIO
目的:读取文件内容到bytebuffer中,读取的操作是异步操作,非主线程来完成,而是让额外的线程执行!
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import static com.changlu.ByteBuffer.utils.ByteBufferUtil.debugAll;
/**
* @ClassName FileAIO
* @Author ChangLu
* @Date 2021/12/28 19:01
* @Description 异步文件读取
*/
@Slf4j
public class FileAIO {
public static void main(String[] args) {
try (AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(Paths.get("data1.txt"), StandardOpenOption.READ)) {
final ByteBuffer buffer = ByteBuffer.allocate(16);
//1、首先执行
log.info("read before...");
//第三个参数是异步接口
fileChannel.read(buffer, 0, null, new CompletionHandler<Integer, ByteBuffer>() {
//3、实际读取完成之后其他线程执行
@Override
public void completed(Integer result, ByteBuffer attachment) {
log.info("read completed...");
buffer.flip();
debugAll(buffer);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
//2、紧接着执行
log.info("read end ...");
//注意:这里使用一个读取字符阻塞API是为了防止主线程结束而导致守护线程也直接停止。默认文件 AIO 使用的线程都是守护线程,所以最后要执行 System.in.read() 以避免守护线程意外结束。直到守护线程执行完后我们敲回车结束程序。
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}
2.4.3、网络AIO
补充了WriteHandler
server:
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
/**
* @ClassName AioServer
* @Author ChangLu
* @Date 2021/12/28 19:16
* @Description 网络异步:编写处理连接、读、写handler
*/
@Slf4j
public class AioServer {
public static void main(String[] args){
try (AsynchronousServerSocketChannel aioChannel = AsynchronousServerSocketChannel.open()) {
aioChannel.bind(new InetSocketAddress(8080));
//关联一个连接事件handler
aioChannel.accept(null,new AcceptHandler(aioChannel));
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
//关闭channel
private static void closeChannel(AsynchronousSocketChannel sc) {
try {
System.out.printf("[%s] %s close\n", Thread.currentThread().getName(), sc.getRemoteAddress());
sc.close();
} catch (IOException e) {
e.printStackTrace();
}
}
//读事件handler
private static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel sc;
public ReadHandler(AsynchronousSocketChannel sc) {
this.sc = sc;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
try {
if (result == -1) {
closeChannel(sc);
return;
}
System.out.printf("[%s] %s read\n", Thread.currentThread().getName(), sc.getRemoteAddress());
attachment.flip();
System.out.println(Charset.defaultCharset().decode(attachment));
attachment.clear();
// 处理完第一个 read 时,需要再次调用 read 方法来处理下一个 read 事件
sc.read(attachment, attachment, this);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
closeChannel(sc);
exc.printStackTrace();
}
}
//连接事件handler
private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
private final AsynchronousServerSocketChannel ssc;
public AcceptHandler(AsynchronousServerSocketChannel ssc) {
this.ssc = ssc;
}
@Override
public void completed(AsynchronousSocketChannel sc, Object attachment) {
try {
System.out.printf("[%s] %s connected\n", Thread.currentThread().getName(), sc.getRemoteAddress());
} catch (IOException e) {
e.printStackTrace();
}
ByteBuffer buffer = ByteBuffer.allocate(16);
// 读事件由 ReadHandler 处理
sc.read(buffer, buffer, new ReadHandler(sc));
// 写事件由 WriteHandler 处理(直接交由sc直接写出去)
sc.write(Charset.defaultCharset().encode("server hello!"), ByteBuffer.allocate(16), new WriteHandler(sc));
// 处理完第一个 accpet 时,需要再次调用 accept 方法来处理下一个 accept 事件
ssc.accept(null, this);
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
}
//写事件handler
private static class WriteHandler implements CompletionHandler<Integer, ByteBuffer>{
private final AsynchronousSocketChannel ssc;
public WriteHandler(AsynchronousSocketChannel ssc) {
this.ssc = ssc;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
log.info("WriteHandle进行写事件...");
//若是一次没有写完再次执行调用!
if (attachment.hasRemaining()) {
ssc.write(attachment,ByteBuffer.allocate(16),this);
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
}
}
}
client:
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import static com.changlu.ByteBuffer.utils.ByteBufferUtil.debugAll;
/**
* @ClassName NioClient
* @Author ChangLu
* @Date 2021/12/28 19:29
* @Description 客户端
*/
public class NioClient {
public static void main(String[] args) throws Exception{
final SocketChannel sc = SocketChannel.open();
final boolean result = sc.connect(new InetSocketAddress("localhost", 8080));
if (result) {
System.out.println("客户端连接成功");
}else{
sc.finishConnect();
}
//接收数据
final ByteBuffer buffer = ByteBuffer.allocate(16);
sc.read(buffer);
debugAll(buffer);
//示例(在Evaluate中执行):sc.write(StandardCharsets.UTF_8.encode("hello!"))
System.in.read();
}
}
注意一下处理事件的线程:对应执行回调函数的线程并不是自己事先开辟的,你可以看到每次通知的线程都可能不相同!下面是两个连接处理操作:
参考资料
- 点赞
- 收藏
- 关注作者
评论(0)