Netty -NIO体验群聊系统
非Netty形式基于Nio的原理体验群聊系统,为了更好的理解Netty的通信,从NIO角度体验下Selector,SelectionKey,channel的关系。
idea打开当前类继承关系
Selector
真正执行时的类型----WindowsSelectorImpl
Selector.keys().size()
当每链接一个客户端,注册一个key,生成独立的socketChannel
selector.selectedKeys();注册到channel哪些发生了事件
key.interestOps(Selector.keys().ops)
serverSocketChannel
public abstract class ServerSocketChannel
extends AbstractSelectableChannel
implements NetworkChannel
{
}
ServerSocketChannel监听客户端链接
Socketchannel负责读写操作
群聊需求:
1.能监听服务端上线下线
2.发送和接收消息,并实现转发
3.展示IP地址+接收消息
注意事项:
1.转发消息时需排除自己
2.channel注册完毕需要移除防止重复操作
3.简单代码实现获取本机IP
通用代码获取本机地址
InetAddress localHost = InetAddress.getLocalHost();
String address= localHost.getHostAddress().toString();
String name= localHost.getHostName().toString();
接口请求时可能由于Nginx反向代理,获取不到真实IP,单纯request信息获取不到IP,可参考如下代码
try
{
String ip = request.getHeader("x-forwarded-for");
if (ip == null || ip.length() == 0
|| "unknown".equalsIgnoreCase(ip)) {
ip = request.getHeader("Proxy-Client-IP");
}
if (ip == null || ip.length() == 0
|| "unknown".equalsIgnoreCase(ip)) {
ip = request.getHeader("WL-Proxy-Client-IP");
}
if (ip == null || ip.length() == 0
|| "unknown".equalsIgnoreCase(ip)) {
ip = request.getRemoteAddr();
if (ip.equals("127.0.0.1") || ip.equals("0:0:0:0:0:0:0:1")) {
// 根据网卡取本机配置的IP
InetAddress inet = InetAddress.getLocalHost();
ip = inet.getHostAddress();
// 对于通过多个代理的情况,第一个IP为客户端真实IP,多个IP按照','分割
if (ip != null && ip.length() > 15) { // "***.***.***.***".length()
// = 15
if (ip.indexOf(",") > 0) {
ip = ip.substring(0, ip.indexOf(","));
}
}
}
}
if (ip != null && ip.length() > 15) { // "***.***.***.***".length()
if (ip.indexOf(",") > 0) {
ip = ip.substring(0, ip.indexOf(","));
}
}
System.out.println("ip" + ip);
} catch(
Exception e)
{
e.printStackTrace();
}
服务器端代码
/**
* @author zhaokk
* @create 2020-02-12-13:58
*/
public class GroupChatServer {
private Selector selector;
private ServerSocketChannel serverSocketChannel;
private static final int PORT = 6667;
//初始化任务
public GroupChatServer() {
//得到选择器
try {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
//设置非阻塞
serverSocketChannel.configureBlocking(false);
//注册到selector
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
监听事件,并处理消息,提示本机上线,并移除注册channel,无连接时提示
//监听
public void listen() {
try {
while (true) {
int count = selector.select(2000);
if (count > 0) {
//处理事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
//监听到Accept
if (key.isAcceptable()) {
SocketChannel socketChannel = serverSocketChannel.accept();
//注册到selector
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
//提示上线
System.out.println(socketChannel.getRemoteAddress() + "上线");
}
//读事件
if (key.isReadable()) {
readData(key);
}
//防止重复处理
iterator.remove();
}
} else {
System.out.println("等待连接...");
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
读取客户端消息离线后关闭通道
//读取客户端消息
private void readData(SelectionKey key) {
SocketChannel channel = null;
try {
//取到关联channel
channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int count = channel.read(buffer);
if (count > 0) {
//转成字符串输出
java.lang.String s = new java.lang.String(buffer.array());
//输出
InetAddress localHost = InetAddress.getLocalHost();
String m = localHost.getHostAddress().toString();
System.out.println("客户端"+"--------"+m+"消息" + s);
//转发消息
sendInfoToOther(s, channel);
}
} catch (IOException e) {
try {
System.out.println(channel.getRemoteAddress() + "离线了");
//离线后 处理
key.cancel();
//关闭通道
channel.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
转发消息,排除自己
//转发消息到通道 发送的消息 排除自己
private void sendInfoToOther(java.lang.String msg, SocketChannel socketChannel) {
//服务器转发消息
System.out.println("服务器转发消息 ....." + msg);
//遍历selector 注册的channel 排除自己
selector.keys().forEach(key -> {
Channel targetChannel = key.channel();
//排除自己 是一个socketchannel 但不是自己
if (targetChannel instanceof SocketChannel && targetChannel != socketChannel) {
SocketChannel channel = (SocketChannel) targetChannel;
ByteBuffer wrap = ByteBuffer.wrap(msg.getBytes());
//将buffer数据写入channel
try {
channel.write(wrap);
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
启动代码
public static void main(String[] args) {
//启动
GroupChatServer groupChatServer = new GroupChatServer();
groupChatServer.listen();
}
客户端代码
/**
* @author zhaokk
* @create 2020-02-12-14:37
*/
public class GroupClient {
//链接服务器
//发送消息
//接收消息
private final String HOST = "127.0.0.1";
private final int port = 6667;
private Selector selector;
private SocketChannel socketChannel;
private String username;
public GroupClient() throws IOException {
selector = Selector.open();
//链接服务器
socketChannel= socketChannel.open(new InetSocketAddress(HOST, port));
socketChannel.configureBlocking(false);
//注册到selector
socketChannel.register(selector, SelectionKey.OP_READ);
InetAddress localHost = InetAddress.getLocalHost();
String s = localHost.getHostAddress().toString();
username = socketChannel.getLocalAddress().toString().substring(1);
System.out.println(username +"--------"+s+ "准备...");
}
发送消息,读取消息
//发送
public void sendInfo(String info) {
info = username + ":" + info;
try {
socketChannel.write(ByteBuffer.wrap(info.getBytes()));
//回复
} catch (IOException e) {
e.printStackTrace();
}
}
public void readInfo() {
try {
int select = selector.select(1000);
if (select > 0) {
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
//读到的数据转成字符串
String s = new String(buffer.array());
System.out.println(s.trim());
}
}
iterator.remove();
} else {
System.out.println("无可用通道...");
}
} catch (IOException e) {
e.printStackTrace();
}
}
启动
public static void main(String[] args) throws IOException {
//启动
GroupClient groupClient = new GroupClient();
new Thread(() -> {
while (true) {
groupClient.readInfo();
try {
Thread.currentThread().sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
//发送数据到服务器端
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String s = scanner.nextLine();
groupClient.sendInfo(s);
}
}
当开启服务器端等待连接,开启客户端提示信息上线
客户端无连接提示无通道
输入消息kkisready
关闭客户端提示离线
Selectionkey.OP_ACCEPT
内置传输流程
通过当前的demo更好的理解Netty如何做到一对一一对多的聊天,通过控制channel中的selector实现,每次channel操作注册完毕需要移除
我向着我的目标前进,我遵循着我的路途,我越过踌躇者与落后者。我的前进将是他们的没落!
---尼采
- 点赞
- 收藏
- 关注作者
评论(0)