NIO 与异步 I/O(NIO.2 / AsynchronousChannel)
开篇语
哈喽,各位小伙伴们,你们好呀,我是喵手。运营社区:C站/掘金/腾讯云/阿里云/华为云/51CTO;欢迎大家常来逛逛
今天我要给大家分享一些自己日常学习到的一些知识点,并以文字的形式跟大家一起交流,互相学习,一个人虽可以走的更快,但一群人可以走的更远。
我是一名后端开发爱好者,工作日常接触到最多的就是Java语言啦,所以我都尽量抽业余时间把自己所学到所会的,通过文章的形式进行输出,希望以这种方式帮助到更多的初学者或者想入门的小伙伴们,同时也能对自己的技术进行沉淀,加以复盘,查缺补漏。
小伙伴们在批阅的过程中,如果觉得文章不错,欢迎点赞、收藏、关注哦。三连即是对作者我写作道路上最好的鼓励与支持!
1) Selector 多路复用模型(Reactor 的“人话版”)
传统阻塞 I/O(BIO)常见写法是:一个连接一个线程。连接一多,线程调度、栈内存、上下文切换直接把你机器磨成“风扇起飞”🌀。
Selector 的核心价值:
一个线程就能“同时盯着很多连接”,哪个连接“准备好了”(可读/可写/可接入),我就处理哪个。
1.1 基本角色
-
Channel:通道(ServerSocketChannel/SocketChannel) -
Selector:选择器(事件多路复用) -
SelectionKey:注册到 Selector 的凭证(包含 interestOps 和 readyOps) -
事件类型:
OP_ACCEPT:新连接可接入OP_READ:可读OP_WRITE:可写OP_CONNECT:连接建立(客户端用得多)
1.2 典型事件循环(Reactor Loop)
selector.select()阻塞等待就绪事件- 遍历
selectedKeys() - 根据 key 事件类型处理 accept/read/write
- 必须
iterator.remove()清掉已处理的 key(不然下一轮又处理一遍,痛苦循环😅)
2) ByteBuffer & “零拷贝”与使用技巧(性能的命门)
你写 NIO,90% 的坑都在 ByteBuffer。真的,我不骗你😂。
2.1 ByteBuffer 的四要素:position / limit / capacity / mark
最常见的状态流转:
-
写入 buffer(从 channel 读进来,或 put 数据)
-
flip():切换到读模式(limit=position,position=0) -
读出 buffer(get 或写到 channel)
-
compact()或clear():clear():全清空,准备重新写入(position=0,limit=capacity)compact():保留未读完的数据,把剩余数据移到开头,再继续写(非常适合“半包/粘包”的场景)
✅ 口诀(别笑,这真管用):
- 写完要读:flip
- 读完要再写:compact / clear
- 别把 clear 当“擦除数据”,它只是重置指针 😭
2.2 Direct ByteBuffer(堆外) vs Heap ByteBuffer(堆内)
ByteBuffer.allocate():堆内 buffer(GC 管)ByteBuffer.allocateDirect():堆外 buffer(减少一次拷贝,适合大量 I/O)
建议:
- 网络 I/O 高吞吐:可以用 direct buffer(但别频繁创建,最好池化)
- 普通业务:堆内更省心
2.3 “零拷贝”到底是啥(别被营销词带跑)
严格意义的零拷贝是:减少用户态/内核态之间的数据复制次数。
Java NIO 里你最常用到的“接近零拷贝”手段:
FileChannel.transferTo()/transferFrom():文件到 socket(或反向)走内核路径(很多平台上接近 sendfile)MappedByteBuffer(mmap):文件映射到内存,随机读写大文件时很香(但释放映射要小心)
一个典型应用:高性能静态文件下载
文件 -> socket,尽量用 transferTo,避免你把文件读到用户态数组再写出去。
3) NIO.2 异步 I/O:AsynchronousSocketChannel(回调 / CompletionStage)
NIO2(AIO 风格)的理念是:
你发起 read/write,立刻返回;等 I/O 完成后,系统回调你(CompletionHandler),或者你用 Future/Stage 接结果。
3.1 两种写法
- 回调:
channel.read(buf, attachment, CompletionHandler) - Future:
Future<Integer> f = channel.read(buf); f.get();(但 get 会阻塞,失去“异步味儿”)
更现代一些的姿势:把回调包装成 CompletableFuture / CompletionStage,便于链式处理(下面有示例)。
4) 高并发网络服务设计要点(别只盯着“能连上”)
想写一个能抗的 TCP 服务,除了 API,会踩这些点:
4.1 协议与消息边界(半包/粘包)
TCP 是字节流:一次 read() 可能拿到半条消息,也可能多条消息粘一起。
你必须定义 framing,例如:
- 定长:每条消息固定 N 字节(简单但不灵活)
- 长度前缀:
[4字节长度][payload](最常用) - 分隔符:
\n、\r\n(文本协议常用)
4.2 写操作的背压(Backpressure)
高并发下,OP_WRITE 不是“想写就写”,因为:
- socket send buffer 可能满了
- 你一次 write 写不完全部数据(partial write)
工程化做法:
- 每个连接维护一个 待发送队列(outbound queue)
- 写不完就注册
OP_WRITE,等可写再继续写 - 写完再取消
OP_WRITE(不然 selector 会一直提醒你“可写”,导致 CPU 空转100%🥲)
4.3 线程模型(Reactor 常见拆分)
-
单线程 Reactor:一个线程 accept + read + write(简单,适合中小规模)
-
主从 Reactor:
- boss 线程:只负责 accept
- worker 线程:各自 selector 处理 read/write(高并发更稳)
5) 实战:基于 Selector 的高并发 TCP Echo 服务(可直接跑)
下面这个例子实现了:
- 非阻塞 Server + Selector
- 每连接一个状态:
Conn,包含 readBuffer 和 writeQueue - 处理 partial write +
OP_WRITE开关 - 简单 echo(把收到的内容原样回写)
说明:这是教学版 reactor,已经比“玩具版”更接近工程形态了🙂
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.ArrayDeque;
import java.util.Iterator;
public class NioSelectorEchoServer {
static class Conn {
// 建议:网络I/O可用 direct buffer,但不要频繁创建
final ByteBuffer readBuf = ByteBuffer.allocateDirect(8 * 1024);
final ArrayDeque<ByteBuffer> writeQueue = new ArrayDeque<>();
}
public static void main(String[] args) throws Exception {
int port = (args.length > 0) ? Integer.parseInt(args[0]) : 9000;
Selector selector = Selector.open();
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false);
server.bind(new InetSocketAddress("0.0.0.0", port));
server.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("NIO Echo Server listening on " + port);
while (true) {
selector.select(); // 阻塞等待就绪事件
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove(); // 非常关键:处理完必须移除
try {
if (!key.isValid()) continue;
if (key.isAcceptable()) {
handleAccept(server, selector);
}
if (key.isReadable()) {
handleRead(key);
}
if (key.isWritable()) {
handleWrite(key);
}
} catch (IOException e) {
// 出异常通常意味着连接断了,清理资源
closeKey(key);
}
}
}
}
private static void handleAccept(ServerSocketChannel server, Selector selector) throws IOException {
SocketChannel ch;
while ((ch = server.accept()) != null) { // 非阻塞:可能一次accept多个
ch.configureBlocking(false);
ch.socket().setTcpNoDelay(true);
Conn conn = new Conn();
SelectionKey key = ch.register(selector, SelectionKey.OP_READ);
key.attach(conn);
}
}
private static void handleRead(SelectionKey key) throws IOException {
SocketChannel ch = (SocketChannel) key.channel();
Conn conn = (Conn) key.attachment();
int n = ch.read(conn.readBuf);
if (n == -1) {
closeKey(key);
return;
}
if (n == 0) return;
// 切到读模式
conn.readBuf.flip();
// 这里演示 echo:把读到的内容复制成一个待写 buffer
// 注意:不能直接把 readBuf 放进 writeQueue,因为后续还要继续用 readBuf 读
ByteBuffer out = ByteBuffer.allocate(conn.readBuf.remaining());
out.put(conn.readBuf);
out.flip();
conn.writeQueue.add(out);
// 读完准备继续写入(这里用 clear,因为我们已把内容复制走了)
conn.readBuf.clear();
// 注册写事件(避免一直 OP_WRITE:只在确实有数据要写时打开)
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
}
private static void handleWrite(SelectionKey key) throws IOException {
SocketChannel ch = (SocketChannel) key.channel();
Conn conn = (Conn) key.attachment();
while (!conn.writeQueue.isEmpty()) {
ByteBuffer buf = conn.writeQueue.peek();
ch.write(buf); // 非阻塞:可能写不完
if (buf.hasRemaining()) {
// send buffer 满了,等下次可写再继续
return;
}
conn.writeQueue.poll(); // 写完出队
}
// 队列清空:关闭 OP_WRITE,避免 CPU 空转
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
}
private static void closeKey(SelectionKey key) {
try { key.channel().close(); } catch (IOException ignored) {}
try { key.cancel(); } catch (Exception ignored) {}
}
}
如何测试(本地)
- 起服务:
java NioSelectorEchoServer 9000 - 用
nc测:nc 127.0.0.1 9000,输入啥回啥 - 压测:可以用简单脚本开很多连接并发写入,观察 CPU 与吞吐
6) 进阶:NIO2 AsynchronousSocketChannel(回调 + CompletableFuture 包装示例)
如果你更喜欢异步风格,可以这样包装回调为 CompletableFuture,写起来更“链式”。
这是一个示意级片段(用于理解模式),工程里通常会加协议解析、连接状态管理、背压队列等。
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CompletableFuture;
public class AioFutures {
public static CompletableFuture<Integer> readAsync(AsynchronousSocketChannel ch, ByteBuffer buf) {
CompletableFuture<Integer> cf = new CompletableFuture<>();
ch.read(buf, null, new CompletionHandler<>() {
@Override public void completed(Integer result, Object att) { cf.complete(result); }
@Override public void failed(Throwable exc, Object att) { cf.completeExceptionally(exc); }
});
return cf;
}
public static CompletableFuture<Integer> writeAsync(AsynchronousSocketChannel ch, ByteBuffer buf) {
CompletableFuture<Integer> cf = new CompletableFuture<>();
ch.write(buf, null, new CompletionHandler<>() {
@Override public void completed(Integer result, Object att) { cf.complete(result); }
@Override public void failed(Throwable exc, Object att) { cf.completeExceptionally(exc); }
});
return cf;
}
}
把 read -> flip -> write -> clear 串起来就能形成一个异步 echo loop(但要注意 partial write、以及读写复用 buffer 的并发安全)。
7) 常见陷阱(这些坑非常“日常”😵)
7.1 Buffer 忘记 flip / clear / compact
- 忘 flip:写完不切换到读,读不到数据(remaining=0)
- 乱 clear:半包还没处理完就清空,消息丢了
- 不会 compact:处理半包时最常见错误
建议:写协议解析时固定模板:
- read 到 buffer
- flip
- while(能解析出完整消息) 解析
- compact(保留剩余半包)
7.2 误用 OP_WRITE 导致 CPU 100%
很多人一上来就给所有连接都注册 OP_WRITE,然后 selector 会疯狂告诉你“可写可写可写”,CPU 原地起飞🛫。
正确做法:只有当写队列非空时才注册 OP_WRITE,写完立刻取消。
7.3 ByteBuffer 复用导致数据错乱
把同一个 ByteBuffer 同时用于:
- 读入(read)
- 写出(write)
- 或多个连接共享
很容易“内容被覆盖”。
每连接独立 buffer 或使用池化(但池化要严谨)。
7.4 粘包半包没处理
TCP 字节流没有消息边界。没有 framing,线上一定出事故(只是早晚😅)。
8) 延伸阅读与学习路线(配合你要学 Netty)
-
先吃透:Selector / ByteBuffer / partial read-write / framing / backpressure
-
再看 Netty:
- EventLoop(Reactor 线程模型)
- ByteBuf(比 ByteBuffer 更工程化:引用计数、池化)
- ChannelPipeline(协议处理链)
… …
文末
好啦,以上就是我这期的全部内容,如果有任何疑问,欢迎下方留言哦,咱们下期见。
… …
学习不分先后,知识不分多少;事无巨细,当以虚心求教;三人行,必有我师焉!!!
wished for you successed !!!
⭐️若喜欢我,就请关注我叭。
⭐️若对您有用,就请点赞叭。
⭐️若有疑问,就请评论留言告诉我叭。
版权声明:本文由作者原创,转载请注明出处,谢谢支持!
- 点赞
- 收藏
- 关注作者
评论(0)