NIO 与异步 I/O(NIO.2 / AsynchronousChannel)

举报
喵手 发表于 2026/01/15 18:02:32 2026/01/15
【摘要】 开篇语哈喽,各位小伙伴们,你们好呀,我是喵手。运营社区:C站/掘金/腾讯云/阿里云/华为云/51CTO;欢迎大家常来逛逛  今天我要给大家分享一些自己日常学习到的一些知识点,并以文字的形式跟大家一起交流,互相学习,一个人虽可以走的更快,但一群人可以走的更远。  我是一名后端开发爱好者,工作日常接触到最多的就是Java语言啦,所以我都尽量抽业余时间把自己所学到所会的,通过文章的形式进行输出,...

开篇语

哈喽,各位小伙伴们,你们好呀,我是喵手。运营社区:C站/掘金/腾讯云/阿里云/华为云/51CTO;欢迎大家常来逛逛

  今天我要给大家分享一些自己日常学习到的一些知识点,并以文字的形式跟大家一起交流,互相学习,一个人虽可以走的更快,但一群人可以走的更远。

  我是一名后端开发爱好者,工作日常接触到最多的就是Java语言啦,所以我都尽量抽业余时间把自己所学到所会的,通过文章的形式进行输出,希望以这种方式帮助到更多的初学者或者想入门的小伙伴们,同时也能对自己的技术进行沉淀,加以复盘,查缺补漏。

小伙伴们在批阅的过程中,如果觉得文章不错,欢迎点赞、收藏、关注哦。三连即是对作者我写作道路上最好的鼓励与支持!

1) Selector 多路复用模型(Reactor 的“人话版”)

传统阻塞 I/O(BIO)常见写法是:一个连接一个线程。连接一多,线程调度、栈内存、上下文切换直接把你机器磨成“风扇起飞”🌀。

Selector 的核心价值:

一个线程就能“同时盯着很多连接”,哪个连接“准备好了”(可读/可写/可接入),我就处理哪个。

1.1 基本角色

  • Channel:通道(ServerSocketChannel / SocketChannel

  • Selector:选择器(事件多路复用)

  • SelectionKey:注册到 Selector 的凭证(包含 interestOps 和 readyOps)

  • 事件类型:

    • OP_ACCEPT:新连接可接入
    • OP_READ:可读
    • OP_WRITE:可写
    • OP_CONNECT:连接建立(客户端用得多)

1.2 典型事件循环(Reactor Loop)

  1. selector.select() 阻塞等待就绪事件
  2. 遍历 selectedKeys()
  3. 根据 key 事件类型处理 accept/read/write
  4. 必须 iterator.remove() 清掉已处理的 key(不然下一轮又处理一遍,痛苦循环😅)

2) ByteBuffer & “零拷贝”与使用技巧(性能的命门)

你写 NIO,90% 的坑都在 ByteBuffer。真的,我不骗你😂。

2.1 ByteBuffer 的四要素:position / limit / capacity / mark

最常见的状态流转:

  • 写入 buffer(从 channel 读进来,或 put 数据)

  • flip():切换到读模式(limit=position,position=0)

  • 读出 buffer(get 或写到 channel)

  • compact()clear()

    • clear():全清空,准备重新写入(position=0,limit=capacity)
    • compact():保留未读完的数据,把剩余数据移到开头,再继续写(非常适合“半包/粘包”的场景)

✅ 口诀(别笑,这真管用):

  • 写完要读:flip
  • 读完要再写:compact / clear
  • 别把 clear 当“擦除数据”,它只是重置指针 😭

2.2 Direct ByteBuffer(堆外) vs Heap ByteBuffer(堆内)

  • ByteBuffer.allocate():堆内 buffer(GC 管)
  • ByteBuffer.allocateDirect():堆外 buffer(减少一次拷贝,适合大量 I/O)

建议:

  • 网络 I/O 高吞吐:可以用 direct buffer(但别频繁创建,最好池化)
  • 普通业务:堆内更省心

2.3 “零拷贝”到底是啥(别被营销词带跑)

严格意义的零拷贝是:减少用户态/内核态之间的数据复制次数。

Java NIO 里你最常用到的“接近零拷贝”手段:

  • FileChannel.transferTo() / transferFrom():文件到 socket(或反向)走内核路径(很多平台上接近 sendfile)
  • MappedByteBuffer(mmap):文件映射到内存,随机读写大文件时很香(但释放映射要小心)

一个典型应用:高性能静态文件下载
文件 -> socket,尽量用 transferTo,避免你把文件读到用户态数组再写出去。

3) NIO.2 异步 I/O:AsynchronousSocketChannel(回调 / CompletionStage)

NIO2(AIO 风格)的理念是:

你发起 read/write,立刻返回;等 I/O 完成后,系统回调你(CompletionHandler),或者你用 Future/Stage 接结果。

3.1 两种写法

  • 回调:channel.read(buf, attachment, CompletionHandler)
  • Future:Future<Integer> f = channel.read(buf); f.get();(但 get 会阻塞,失去“异步味儿”)

更现代一些的姿势:把回调包装成 CompletableFuture / CompletionStage,便于链式处理(下面有示例)。

4) 高并发网络服务设计要点(别只盯着“能连上”)

想写一个能抗的 TCP 服务,除了 API,会踩这些点:

4.1 协议与消息边界(半包/粘包)

TCP 是字节流:一次 read() 可能拿到半条消息,也可能多条消息粘一起。
你必须定义 framing,例如:

  • 定长:每条消息固定 N 字节(简单但不灵活)
  • 长度前缀[4字节长度][payload](最常用)
  • 分隔符\n\r\n(文本协议常用)

4.2 写操作的背压(Backpressure)

高并发下,OP_WRITE 不是“想写就写”,因为:

  • socket send buffer 可能满了
  • 你一次 write 写不完全部数据(partial write)

工程化做法:

  • 每个连接维护一个 待发送队列(outbound queue)
  • 写不完就注册 OP_WRITE,等可写再继续写
  • 写完再取消 OP_WRITE(不然 selector 会一直提醒你“可写”,导致 CPU 空转100%🥲)

4.3 线程模型(Reactor 常见拆分)

  • 单线程 Reactor:一个线程 accept + read + write(简单,适合中小规模)

  • 主从 Reactor

    • boss 线程:只负责 accept
    • worker 线程:各自 selector 处理 read/write(高并发更稳)

5) 实战:基于 Selector 的高并发 TCP Echo 服务(可直接跑)

下面这个例子实现了:

  • 非阻塞 Server + Selector
  • 每连接一个状态:Conn,包含 readBuffer 和 writeQueue
  • 处理 partial write + OP_WRITE 开关
  • 简单 echo(把收到的内容原样回写)

说明:这是教学版 reactor,已经比“玩具版”更接近工程形态了🙂

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.ArrayDeque;
import java.util.Iterator;

public class NioSelectorEchoServer {

    static class Conn {
        // 建议:网络I/O可用 direct buffer,但不要频繁创建
        final ByteBuffer readBuf = ByteBuffer.allocateDirect(8 * 1024);
        final ArrayDeque<ByteBuffer> writeQueue = new ArrayDeque<>();
    }

    public static void main(String[] args) throws Exception {
        int port = (args.length > 0) ? Integer.parseInt(args[0]) : 9000;

        Selector selector = Selector.open();
        ServerSocketChannel server = ServerSocketChannel.open();
        server.configureBlocking(false);
        server.bind(new InetSocketAddress("0.0.0.0", port));
        server.register(selector, SelectionKey.OP_ACCEPT);

        System.out.println("NIO Echo Server listening on " + port);

        while (true) {
            selector.select(); // 阻塞等待就绪事件
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();

            while (it.hasNext()) {
                SelectionKey key = it.next();
                it.remove(); // 非常关键:处理完必须移除

                try {
                    if (!key.isValid()) continue;

                    if (key.isAcceptable()) {
                        handleAccept(server, selector);
                    }
                    if (key.isReadable()) {
                        handleRead(key);
                    }
                    if (key.isWritable()) {
                        handleWrite(key);
                    }
                } catch (IOException e) {
                    // 出异常通常意味着连接断了,清理资源
                    closeKey(key);
                }
            }
        }
    }

    private static void handleAccept(ServerSocketChannel server, Selector selector) throws IOException {
        SocketChannel ch;
        while ((ch = server.accept()) != null) { // 非阻塞:可能一次accept多个
            ch.configureBlocking(false);
            ch.socket().setTcpNoDelay(true);
            Conn conn = new Conn();
            SelectionKey key = ch.register(selector, SelectionKey.OP_READ);
            key.attach(conn);
        }
    }

    private static void handleRead(SelectionKey key) throws IOException {
        SocketChannel ch = (SocketChannel) key.channel();
        Conn conn = (Conn) key.attachment();

        int n = ch.read(conn.readBuf);
        if (n == -1) {
            closeKey(key);
            return;
        }
        if (n == 0) return;

        // 切到读模式
        conn.readBuf.flip();

        // 这里演示 echo:把读到的内容复制成一个待写 buffer
        // 注意:不能直接把 readBuf 放进 writeQueue,因为后续还要继续用 readBuf 读
        ByteBuffer out = ByteBuffer.allocate(conn.readBuf.remaining());
        out.put(conn.readBuf);
        out.flip();

        conn.writeQueue.add(out);

        // 读完准备继续写入(这里用 clear,因为我们已把内容复制走了)
        conn.readBuf.clear();

        // 注册写事件(避免一直 OP_WRITE:只在确实有数据要写时打开)
        key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
    }

    private static void handleWrite(SelectionKey key) throws IOException {
        SocketChannel ch = (SocketChannel) key.channel();
        Conn conn = (Conn) key.attachment();

        while (!conn.writeQueue.isEmpty()) {
            ByteBuffer buf = conn.writeQueue.peek();

            ch.write(buf); // 非阻塞:可能写不完
            if (buf.hasRemaining()) {
                // send buffer 满了,等下次可写再继续
                return;
            }
            conn.writeQueue.poll(); // 写完出队
        }

        // 队列清空:关闭 OP_WRITE,避免 CPU 空转
        key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
    }

    private static void closeKey(SelectionKey key) {
        try { key.channel().close(); } catch (IOException ignored) {}
        try { key.cancel(); } catch (Exception ignored) {}
    }
}

如何测试(本地)

  • 起服务:java NioSelectorEchoServer 9000
  • nc 测:nc 127.0.0.1 9000,输入啥回啥
  • 压测:可以用简单脚本开很多连接并发写入,观察 CPU 与吞吐

6) 进阶:NIO2 AsynchronousSocketChannel(回调 + CompletableFuture 包装示例)

如果你更喜欢异步风格,可以这样包装回调为 CompletableFuture,写起来更“链式”。

这是一个示意级片段(用于理解模式),工程里通常会加协议解析、连接状态管理、背压队列等。

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CompletableFuture;

public class AioFutures {

    public static CompletableFuture<Integer> readAsync(AsynchronousSocketChannel ch, ByteBuffer buf) {
        CompletableFuture<Integer> cf = new CompletableFuture<>();
        ch.read(buf, null, new CompletionHandler<>() {
            @Override public void completed(Integer result, Object att) { cf.complete(result); }
            @Override public void failed(Throwable exc, Object att) { cf.completeExceptionally(exc); }
        });
        return cf;
    }

    public static CompletableFuture<Integer> writeAsync(AsynchronousSocketChannel ch, ByteBuffer buf) {
        CompletableFuture<Integer> cf = new CompletableFuture<>();
        ch.write(buf, null, new CompletionHandler<>() {
            @Override public void completed(Integer result, Object att) { cf.complete(result); }
            @Override public void failed(Throwable exc, Object att) { cf.completeExceptionally(exc); }
        });
        return cf;
    }
}

read -> flip -> write -> clear 串起来就能形成一个异步 echo loop(但要注意 partial write、以及读写复用 buffer 的并发安全)。

7) 常见陷阱(这些坑非常“日常”😵)

7.1 Buffer 忘记 flip / clear / compact

  • 忘 flip:写完不切换到读,读不到数据(remaining=0)
  • 乱 clear:半包还没处理完就清空,消息丢了
  • 不会 compact:处理半包时最常见错误

建议:写协议解析时固定模板:

  1. read 到 buffer
  2. flip
  3. while(能解析出完整消息) 解析
  4. compact(保留剩余半包)

7.2 误用 OP_WRITE 导致 CPU 100%

很多人一上来就给所有连接都注册 OP_WRITE,然后 selector 会疯狂告诉你“可写可写可写”,CPU 原地起飞🛫。
正确做法:只有当写队列非空时才注册 OP_WRITE,写完立刻取消。

7.3 ByteBuffer 复用导致数据错乱

把同一个 ByteBuffer 同时用于:

  • 读入(read)
  • 写出(write)
  • 或多个连接共享
    很容易“内容被覆盖”。
    每连接独立 buffer 或使用池化(但池化要严谨)。

7.4 粘包半包没处理

TCP 字节流没有消息边界。没有 framing,线上一定出事故(只是早晚😅)。

8) 延伸阅读与学习路线(配合你要学 Netty)

  • 先吃透:Selector / ByteBuffer / partial read-write / framing / backpressure

  • 再看 Netty:

    • EventLoop(Reactor 线程模型)
    • ByteBuf(比 ByteBuffer 更工程化:引用计数、池化)
    • ChannelPipeline(协议处理链)

… …

文末

好啦,以上就是我这期的全部内容,如果有任何疑问,欢迎下方留言哦,咱们下期见。

… …

学习不分先后,知识不分多少;事无巨细,当以虚心求教;三人行,必有我师焉!!!

wished for you successed !!!


⭐️若喜欢我,就请关注我叭。

⭐️若对您有用,就请点赞叭。
⭐️若有疑问,就请评论留言告诉我叭。


版权声明:本文由作者原创,转载请注明出处,谢谢支持!

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。