Netty -NIO体验群聊系统

举报
赵KK日常技术记录 发表于 2023/06/24 13:16:15 2023/06/24
【摘要】 非Netty形式基于Nio的原理体验群聊系统,为了更好的理解Netty的通信,从NIO角度体验下Selector,SelectionKey,channel的关系。​idea打开当前类继承关系Selector真正执行时的类型----WindowsSelectorImplSelector.keys().size()当每链接一个客户端,注册一个key,生成独立的socketChannelselec...

非Netty形式基于Nio的原理体验群聊系统,为了更好的理解Netty的通信,从NIO角度体验下Selector,SelectionKey,channel的关系。

idea打开当前类继承关系

请在此添加图片描述

Selector

请在此添加图片描述

真正执行时的类型----WindowsSelectorImpl

Selector.keys().size()

当每链接一个客户端,注册一个key,生成独立的socketChannel

selector.selectedKeys();注册到channel哪些发生了事件

key.interestOps(Selector.keys().ops)
serverSocketChannel
public abstract class ServerSocketChannel
    extends AbstractSelectableChannel
    implements NetworkChannel
{
}

ServerSocketChannel监听客户端链接

Socketchannel负责读写操作

群聊需求:

1.能监听服务端上线下线

2.发送和接收消息,并实现转发

3.展示IP地址+接收消息

注意事项:

1.转发消息时需排除自己

2.channel注册完毕需要移除防止重复操作

3.简单代码实现获取本机IP

通用代码获取本机地址

InetAddress localHost = InetAddress.getLocalHost();
String address= localHost.getHostAddress().toString();
String name= localHost.getHostName().toString();

接口请求时可能由于Nginx反向代理,获取不到真实IP,单纯request信息获取不到IP,可参考如下代码

  try
    {
        String ip = request.getHeader("x-forwarded-for");
        if (ip == null || ip.length() == 0
                || "unknown".equalsIgnoreCase(ip)) {
            ip = request.getHeader("Proxy-Client-IP");
        }
        if (ip == null || ip.length() == 0
                || "unknown".equalsIgnoreCase(ip)) {
            ip = request.getHeader("WL-Proxy-Client-IP");
        }
        if (ip == null || ip.length() == 0
                || "unknown".equalsIgnoreCase(ip)) {
            ip = request.getRemoteAddr();
            if (ip.equals("127.0.0.1") || ip.equals("0:0:0:0:0:0:0:1")) {
              // 根据网卡取本机配置的IP
                InetAddress inet = InetAddress.getLocalHost();
                ip = inet.getHostAddress();

               // 对于通过多个代理的情况,第一个IP为客户端真实IP,多个IP按照','分割
                if (ip != null && ip.length() > 15) { // "***.***.***.***".length()
                  // = 15
                    if (ip.indexOf(",") > 0) {
                        ip = ip.substring(0, ip.indexOf(","));
                    }
                }
            }
        }
        if (ip != null && ip.length() > 15) { // "***.***.***.***".length()
            if (ip.indexOf(",") > 0) {
                ip = ip.substring(0, ip.indexOf(","));
            }
        }
        System.out.println("ip" + ip);
    } catch(
    Exception e)

    {
        e.printStackTrace();
    }

服务器端代码

/**
 * @author zhaokk
 * @create 2020-02-12-13:58
 */
public class GroupChatServer {
    private Selector selector;
    private ServerSocketChannel serverSocketChannel;
    private static final int PORT = 6667;
    //初始化任务
    public GroupChatServer() {
        //得到选择器
        try {
            selector = Selector.open();
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
            //设置非阻塞
            serverSocketChannel.configureBlocking(false);
            //注册到selector
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

监听事件,并处理消息,提示本机上线,并移除注册channel,无连接时提示

 //监听
    public void listen() {
        try {
            while (true) {
                int count = selector.select(2000);
                if (count > 0) {
                    //处理事件
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        //监听到Accept
                        if (key.isAcceptable()) {
                            SocketChannel socketChannel = serverSocketChannel.accept();
                            //注册到selector
                            socketChannel.configureBlocking(false);
                            socketChannel.register(selector, SelectionKey.OP_READ);
                            //提示上线
                            System.out.println(socketChannel.getRemoteAddress() + "上线");
                        }
                        //读事件
                        if (key.isReadable()) {
                            readData(key);

                        }
                        //防止重复处理
                        iterator.remove();
                    }

                } else {

                    System.out.println("等待连接...");
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

读取客户端消息离线后关闭通道

 //读取客户端消息
    private void readData(SelectionKey key) {
        SocketChannel channel = null;
        try {
            //取到关联channel
            channel = (SocketChannel) key.channel();
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int count = channel.read(buffer);
            if (count > 0) {
                //转成字符串输出
                java.lang.String s = new java.lang.String(buffer.array());
                //输出
                InetAddress localHost = InetAddress.getLocalHost();
                String m = localHost.getHostAddress().toString();
                System.out.println("客户端"+"--------"+m+"消息" + s);
                //转发消息
                sendInfoToOther(s, channel);
            }
        } catch (IOException e) {
            try {
                System.out.println(channel.getRemoteAddress() + "离线了");
                //离线后 处理
                key.cancel();
                //关闭通道
                channel.close();
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }

转发消息,排除自己

//转发消息到通道    发送的消息   排除自己
    private void sendInfoToOther(java.lang.String msg, SocketChannel socketChannel) {

        //服务器转发消息
        System.out.println("服务器转发消息 ....." + msg);
        //遍历selector 注册的channel   排除自己
        selector.keys().forEach(key -> {
            Channel targetChannel = key.channel();
            //排除自己    是一个socketchannel   但不是自己
            if (targetChannel instanceof SocketChannel && targetChannel != socketChannel) {
                SocketChannel channel = (SocketChannel) targetChannel;
                ByteBuffer wrap = ByteBuffer.wrap(msg.getBytes());
                //将buffer数据写入channel
                try {
                    channel.write(wrap);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
    }

启动代码

 public static void main(String[] args) {

        //启动
        GroupChatServer groupChatServer = new GroupChatServer();

        groupChatServer.listen();

    }

客户端代码

/**
 * @author zhaokk
 * @create 2020-02-12-14:37
 */
public class GroupClient {

    //链接服务器
    //发送消息
    //接收消息

    private final String HOST = "127.0.0.1";

    private final int port = 6667;

    private Selector selector;

    private SocketChannel socketChannel;

    private String username;

    public GroupClient() throws IOException {

        selector = Selector.open();
        //链接服务器
        socketChannel= socketChannel.open(new InetSocketAddress(HOST, port));
        socketChannel.configureBlocking(false);
        //注册到selector
        socketChannel.register(selector, SelectionKey.OP_READ);
        InetAddress localHost = InetAddress.getLocalHost();
        String s = localHost.getHostAddress().toString();
        username = socketChannel.getLocalAddress().toString().substring(1);
        System.out.println(username +"--------"+s+ "准备...");

    }

发送消息,读取消息

//发送

    public void sendInfo(String info) {

        info = username + ":" + info;

        try {
            socketChannel.write(ByteBuffer.wrap(info.getBytes()));
            //回复
        } catch (IOException e) {
            e.printStackTrace();
        }


    }

    public void readInfo() {
        try {
            int select = selector.select(1000);
            if (select > 0) {
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    if (key.isReadable()) {
                        SocketChannel channel = (SocketChannel) key.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        channel.read(buffer);
                        //读到的数据转成字符串
                        String s = new String(buffer.array());
                        System.out.println(s.trim());
                    }
                }
                iterator.remove();
            } else {
                System.out.println("无可用通道...");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

启动

public static void main(String[] args) throws IOException {
        //启动
        GroupClient groupClient = new GroupClient();
        new Thread(() -> {
            while (true) {
                groupClient.readInfo();
                try {
                    Thread.currentThread().sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        //发送数据到服务器端
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNextLine()) {
            String s = scanner.nextLine();
            groupClient.sendInfo(s);
        }
    }

请在此添加图片描述

当开启服务器端等待连接,开启客户端提示信息上线

请在此添加图片描述

客户端无连接提示无通道

输入消息kkisready

请在此添加图片描述

关闭客户端提示离线

请在此添加图片描述

Selectionkey.OP_ACCEPT

请在此添加图片描述

内置传输流程

请在此添加图片描述

通过当前的demo更好的理解Netty如何做到一对一一对多的聊天,通过控制channel中的selector实现,每次channel操作注册完毕需要移除

我向着我的目标前进,我遵循着我的路途,我越过踌躇者与落后者。我的前进将是他们的没落!

                       ---尼采
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。