Netty之Channel解读

举报
yd_249383650 发表于 2023/06/28 15:56:26 2023/06/28
【摘要】 ​ 目录channel 的主要作用ChannelFutureCloseFuture为什么要异步关闭channel 的主要作用close() 可以用来关闭 channelcloseFuture() 用来处理 channel 的关闭sync 方法作用是同步等待 channel 关闭而 addListener 方法是异步等待 channel 关闭pipeline() 方法添加处理器write() ...

 

目录


channel 的主要作用

ChannelFuture

CloseFuture

为什么要异步关闭



channel 的主要作用

  • close() 可以用来关闭 channel
  • closeFuture() 用来处理 channel 的关闭
    • sync 方法作用是同步等待 channel 关闭
    • 而 addListener 方法是异步等待 channel 关闭
  • pipeline() 方法添加处理器
  • write() 方法将数据写入
  • writeAndFlush() 方法将数据写入并刷出

ChannelFuture

下面是一段客户端的代码

new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) {
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    .connect("127.0.0.1", 8080)
    .sync()
    .channel()
    .writeAndFlush(new Date() + ": hello world!");

现在把它拆开来看

ChannelFuture channelFuture = new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) {
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    .connect("127.0.0.1", 8080); // 1

channelFuture.sync().channel().writeAndFlush(new Date() + ": hello world!");

1 处返回的是 ChannelFuture 对象,它的作用是利用 channel() 方法来获取 Channel 对象 

注意 connect 方法是异步的,意味着不等连接建立,方法执行就返回了。因此 channelFuture 对象中不能【立刻】获得到正确的 Channel 对象

实验如下:

ChannelFuture channelFuture = new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) {
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    .connect("127.0.0.1", 8080);

System.out.println(channelFuture.channel()); // 1
channelFuture.sync(); // 2
System.out.println(channelFuture.channel()); // 3
  • 执行到 1 时,连接未建立,打印 [id: 0x2e1884dd]
  • 执行到 2 时,sync 方法是同步等待连接建立完成
  • 执行到 3 时,连接肯定建立了,打印 [id: 0x2e1884dd, L:/127.0.0.1:57191 - R:/127.0.0.1:8080]

除了用 sync 方法可以让异步操作同步以外,还可以使用回调的方式:

编辑

ChannelFuture channelFuture = new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) {
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    .connect("127.0.0.1", 8080);
System.out.println(channelFuture.channel()); // 1
channelFuture.addListener((ChannelFutureListener) future -> {
    System.out.println(future.channel()); // 2
});
  • 执行到 1 时,连接未建立,打印 [id: 0x749124ba]
  • ChannelFutureListener 会在连接建立时被调用(其中 operationComplete 方法),因此执行到 2 时,连接肯定建立了,打印 [id: 0x749124ba, L:/127.0.0.1:57351 - R:/127.0.0.1:8080]

CloseFuture

关闭是由另外一个线程来进行处理的,跟上面的原理一样

 编辑

 编辑

 这里可以看出CLOSE之前就执行了关闭操作

正确关闭的第一种方式:同步关闭处理

      ChannelFuture closeFuture = channel.closeFuture();
        closeFuture.sync();
        log.debug("处理关闭以后的操作"); 

编辑

正确关闭的第二种方式:异步关闭处理

   closeFuture.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) throws Exception {
                log.debug("处理关闭以后的操作");
                group.shutdownGracefully();
            }
        });

上述的测试代码

@Slf4j
public class CloseFutureClient {
    public static void main(String[] args) throws Exception{
        NioEventLoopGroup group = new NioEventLoopGroup();
        ChannelFuture channelFuture = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                        ch.pipeline().addLast(new StringEncoder());
                    }
                }).connect(new InetSocketAddress("localhost", 8080));
        Channel channel = channelFuture.sync().channel();
        log.debug("{}",channel);
        new Thread(()->{
            Scanner scanner = new Scanner(System.in);
            while (true){
                String line = scanner.nextLine();
                if("q".equals(line)){

                    channel.close();//close异步操作
                    log.debug("处理关闭之后的操作");//不能在这里善后 错误的
                    break;
                }
            }
        },"input").start();

        //获取CloseFuture对象
        //1)同步关闭处理
        //2)异步关闭处理
    /*    ChannelFuture closeFuture = channel.closeFuture();
        closeFuture.sync();
        log.debug("处理关闭以后的操作"); */

        ChannelFuture closeFuture = channel.closeFuture();
        closeFuture.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) throws Exception {
                log.debug("处理关闭以后的操作");
                group.shutdownGracefully();
            }
        });

    }
}

为什么要异步关闭

为什么不在一个线程中去执行建立连接、去执行关闭 channel,那样不是也可以吗?非要用这么复杂的异步方式:比如一个线程发起建立连接,另一个线程去真正建立连接。

还有人会笼统地回答,因为 netty 异步方式用了多线程、多线程就效率高。其实这些认识都比较片面,多线程和异步所提升的效率并不是所认为的

思考下面的场景,4 个医生给人看病,每个病人花费 20 分钟,而且医生看病的过程中是以病人为单位的,一个病人看完了,才能看下一个病人。假设病人源源不断地来,可以计算一下 4 个医生一天工作 8 小时,处理的病人总数是:4 * 8 * 3 = 96

编辑

经研究发现,看病可以细分为四个步骤,经拆分后每个步骤需要 5 分钟,如下

编辑

因此可以做如下优化,只有一开始,医生 2、3、4 分别要等待 5、10、15 分钟才能执行工作,但只要后续病人源源不断地来,他们就能够满负荷工作,并且处理病人的能力提高到了 4 * 8 * 12 效率几乎是原来的四倍,(一个小时内接待病人的效率大大提高,四个医生不是同时工作,但都要干满8小时才下班)

 编辑


要点

  • 单线程没法异步提高效率,必须配合多线程、多核 cpu 才能发挥异步的优势,单线程不能同时处理多个任务,因此无法实现异步处理提高效率。异步处理需要利用多线程或多进程机制来实现,以便同时处理多个任务。多线程或多进程能够允许程序同时执行多个操作,从而提高效率。
  • 异步并没有缩短响应时间,反而有所增加,异步可以提高系统的吞吐量,即能够同时处理多个请
  • 合理进行任务拆分,也是利用异步的关键


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。