Netty --Selector选择器

举报
赵KK日常技术记录 发表于 2023/06/24 13:36:39 2023/06/24
【摘要】 Selector:检测多个通道上是否有事件的发生。Netty各组件对应关系每一个链接对应一个线程NIO 非阻塞IO java1.4channel buffer Selector线程Thread|Selector 根据不同的事件在各个channel切换|Channel(read/write) 多个连接 Event决定切换到那个channel↑↓Buffer buffer底层有...

Selector:检测多个通道上是否有事件的发生。

Netty各组件对应关系

每一个链接对应一个线程

NIO 非阻塞IO java1.4

channel buffer Selector

线程Thread

|

Selector 根据不同的事件在各个channel切换

|

Channel(read/write) 多个连接 Event决定切换到那个channel

↑↓

Buffer buffer底层有一个数组 数据的读/写 双向的flip切换

Client(socket)

当channel有事件发生返回selectorkey

遍历所有的key 反向得到channel

public abstract class Selector implements Closeable {
}

实现方法

public abstract SelectableChannel channel();

//返回可选择的channel 确定与哪个channel链接

This method performs a blocking <a href="#selop">selection
public abstract int select(long timeout)
 throws IOException;

即使没有监听到事件 也会返回

This method performs a non-blocking <a href="#selop">selection

非阻塞的

public abstract int selectNow() throws IOException;
public abstract Set<SelectionKey> selectedKeys();
1.当客户端链接时 通过seversocketchannel得到对应的socketchannel
2.将得到的socketchannel注册到selector上,一个selctor注册多个selectorchannel
selelctor的父类
public final SelectionKey register(Selector sel, int ops,
                                   Object att)
    throws ClosedChannelException
{
父类SelectableChannel
public final SelectionKey register(Selector sel, int ops//关注的事件)
    throws ClosedChannelException
{
    return register(sel, ops, null);
}
3.注册手返回一个Selelctionkey,与selectors集合关联
4.Selector进行监听selector()方法,返回当前管理的有事件发生的通道的个数
5.进一步得到Selectionkey(有事件发生的)
6.通过Selectionkey反向得到channel()

ops可传入的值

请在此添加图片描述

NioServer

package com.kk.netty;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

/**
 * @author zhaokkstart
 * @create 2020-02-10 14:28
 */
public class NioServer {


    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        Selector selector = Selector.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(6666));
        serverSocketChannel.configureBlocking(false);
        //注册到selector  OP_ACCEPT
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        //等待客户端连接
        while (true) {
            //没有事件发生
            if (selector.select(1000) == 0) {
                System.out.println("服务器等待中....");
                continue;
            }
            //有事件发生的
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            //获取到相关的Selectionkey集合
            Iterator<SelectionKey> iterator = selectionKeys.iterator();

            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                //根据key对应的通道发生的事件作相应的处理
                if (key.isAcceptable()) {
                //生成一个channel
                    try {
                        SocketChannel accept = serverSocketChannel.accept();
                        accept.configureBlocking(false);
                        //注册到selector,关注事件read的channel的buffer
                        accept.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                        //发送数据
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if(key.isReadable()){
                    //读事件    通过key获取到对应的channel
                    SocketChannel channel = (SocketChannel) key.channel();
                    //获取到该channel关联的buffer
                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    //channel读取
                    try {
                        channel.read(buffer);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    System.out.println("客户端发送数据"+new String(buffer.array()));
                    //手动删除当前事件key  防止重复操作
                    iterator.remove();
                }
            };
        }
    }
}

ByteBuffer的warp方法

public static ByteBuffer wrap(byte[] array) {
    return wrap(array, 0, array.length);
}

NioClient

package com.kk.netty;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

/**
 * @author zhaokkstart
 * @create 2020-02-10 11:21
 */
public class NioClient {

    public static void main(String[] args) throws IOException {
        //获取通道
        SocketChannel socketChannel = SocketChannel.open();
        //非阻塞
        socketChannel.configureBlocking(false);
        //提供服务器端ip+端口
        InetSocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 6666);
        if (!socketChannel.connect(socketAddress)) {
            while (!socketChannel.finishConnect()) {
                System.out.println("连接中.....");
            }
        }
        //连接成功  发送数据
        String str = "Hello  KK";
        //不需要指定大小
        ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
        //将buffer写入channel
        socketChannel.write(buffer);
        //读取一个
        System.in.read();
    }
}

启动Server,Client,得到字符串,这时的程序基本与Netty的书写理念一致,弄清了channel与selector以及对应关系。

请在此添加图片描述

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。